You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

153 lines
6.7 KiB

11 months ago
  1. const { Document } = require('../models/documents.js');
  2. const { DocumentSyncQueue } = require('../models/documentSyncQueue.js');
  3. const { CollectorApi } = require('../utils/collectorApi');
  4. const { fileData } = require("../utils/files");
  5. const { log, conclude, updateSourceDocument } = require('./helpers/index.js');
  6. const { getVectorDbClass } = require('../utils/helpers/index.js');
  7. const { DocumentSyncRun } = require('../models/documentSyncRun.js');
  8. (async () => {
  9. try {
  10. const queuesToProcess = await DocumentSyncQueue.staleDocumentQueues();
  11. if (queuesToProcess.length === 0) {
  12. log('No outstanding documents to sync. Exiting.');
  13. return;
  14. }
  15. const collector = new CollectorApi();
  16. if (!(await collector.online())) {
  17. log('Could not reach collector API. Exiting.');
  18. return;
  19. }
  20. log(`${queuesToProcess.length} watched documents have been found to be stale and will be updated now.`)
  21. for (const queue of queuesToProcess) {
  22. let newContent = null;
  23. const document = queue.workspaceDoc;
  24. const workspace = document.workspace;
  25. const { metadata, type, source } = Document.parseDocumentTypeAndSource(document);
  26. if (!metadata || !DocumentSyncQueue.validFileTypes.includes(type)) {
  27. // Document is either broken, invalid, or not supported so drop it from future queues.
  28. log(`Document ${document.filename} has no metadata, is broken, or invalid and has been removed from all future runs.`)
  29. await DocumentSyncQueue.unwatch(document);
  30. continue;
  31. }
  32. if (type === 'link' || type === 'youtube') {
  33. const response = await collector.forwardExtensionRequest({
  34. endpoint: "/ext/resync-source-document",
  35. method: "POST",
  36. body: JSON.stringify({
  37. type,
  38. options: { link: source }
  39. })
  40. });
  41. newContent = response?.content;
  42. }
  43. if (type === 'confluence' || type === 'github' || type === 'gitlab') {
  44. const response = await collector.forwardExtensionRequest({
  45. endpoint: "/ext/resync-source-document",
  46. method: "POST",
  47. body: JSON.stringify({
  48. type,
  49. options: { chunkSource: metadata.chunkSource }
  50. })
  51. });
  52. newContent = response?.content;
  53. }
  54. if (!newContent) {
  55. // Check if the last "x" runs were all failures (not exits!). If so - remove the job entirely since it is broken.
  56. const failedRunCount = (await DocumentSyncRun.where({ queueId: queue.id }, DocumentSyncQueue.maxRepeatFailures, { createdAt: 'desc' })).filter((run) => run.status === DocumentSyncRun.statuses.failed).length;
  57. if (failedRunCount >= DocumentSyncQueue.maxRepeatFailures) {
  58. log(`Document ${document.filename} has failed to refresh ${failedRunCount} times continuously and will now be removed from the watched document set.`)
  59. await DocumentSyncQueue.unwatch(document);
  60. continue;
  61. }
  62. log(`Failed to get a new content response from collector for source ${source}. Skipping, but will retry next worker interval. Attempt ${failedRunCount === 0 ? 1 : failedRunCount}/${DocumentSyncQueue.maxRepeatFailures}`);
  63. await DocumentSyncQueue.saveRun(queue.id, DocumentSyncRun.statuses.failed, { filename: document.filename, workspacesModified: [], reason: 'No content found.' })
  64. continue;
  65. }
  66. const currentDocumentData = await fileData(document.docpath)
  67. if (currentDocumentData.pageContent === newContent) {
  68. const nextSync = DocumentSyncQueue.calcNextSync(queue)
  69. log(`Source ${source} is unchanged and will be skipped. Next sync will be ${nextSync.toLocaleString()}.`);
  70. await DocumentSyncQueue._update(
  71. queue.id,
  72. {
  73. lastSyncedAt: new Date().toISOString(),
  74. nextSyncAt: nextSync.toISOString(),
  75. }
  76. );
  77. await DocumentSyncQueue.saveRun(queue.id, DocumentSyncRun.statuses.exited, { filename: document.filename, workspacesModified: [], reason: 'Content unchanged.' })
  78. continue;
  79. }
  80. // update the defined document and workspace vectorDB with the latest information
  81. // it will skip cache and create a new vectorCache file.
  82. const vectorDatabase = getVectorDbClass();
  83. await vectorDatabase.deleteDocumentFromNamespace(workspace.slug, document.docId);
  84. await vectorDatabase.addDocumentToNamespace(
  85. workspace.slug,
  86. { ...currentDocumentData, pageContent: newContent, docId: document.docId },
  87. document.docpath,
  88. true
  89. );
  90. updateSourceDocument(
  91. document.docpath,
  92. {
  93. ...currentDocumentData,
  94. pageContent: newContent,
  95. docId: document.docId,
  96. published: (new Date).toLocaleString(),
  97. // Todo: Update word count and token_estimate?
  98. }
  99. )
  100. log(`Workspace "${workspace.name}" vectors of ${source} updated. Document and vector cache updated.`)
  101. // Now we can bloom the results to all matching documents in all other workspaces
  102. const workspacesModified = [workspace.slug];
  103. const moreReferences = await Document.where({
  104. id: { not: document.id },
  105. filename: document.filename
  106. }, null, null, { workspace: true });
  107. if (moreReferences.length !== 0) {
  108. log(`${source} is referenced in ${moreReferences.length} other workspaces. Updating those workspaces as well...`)
  109. for (const additionalDocumentRef of moreReferences) {
  110. const additionalWorkspace = additionalDocumentRef.workspace;
  111. workspacesModified.push(additionalWorkspace.slug);
  112. await vectorDatabase.deleteDocumentFromNamespace(additionalWorkspace.slug, additionalDocumentRef.docId);
  113. await vectorDatabase.addDocumentToNamespace(
  114. additionalWorkspace.slug,
  115. { ...currentDocumentData, pageContent: newContent, docId: additionalDocumentRef.docId },
  116. additionalDocumentRef.docpath,
  117. );
  118. log(`Workspace "${additionalWorkspace.name}" vectors for ${source} was also updated with the new content from cache.`)
  119. }
  120. }
  121. const nextRefresh = DocumentSyncQueue.calcNextSync(queue);
  122. log(`${source} has been refreshed in all workspaces it is currently referenced in. Next refresh will be ${nextRefresh.toLocaleString()}.`)
  123. await DocumentSyncQueue._update(
  124. queue.id,
  125. {
  126. lastSyncedAt: new Date().toISOString(),
  127. nextSyncAt: nextRefresh.toISOString(),
  128. }
  129. );
  130. await DocumentSyncQueue.saveRun(queue.id, DocumentSyncRun.statuses.success, { filename: document.filename, workspacesModified })
  131. }
  132. } catch (e) {
  133. console.error(e)
  134. log(`errored with ${e.message}`)
  135. } finally {
  136. conclude();
  137. }
  138. })();