You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

636 lines
19 KiB

11 months ago
  1. const { v4: uuidv4 } = require("uuid");
  2. const { DocumentManager } = require("../DocumentManager");
  3. const { WorkspaceChats } = require("../../models/workspaceChats");
  4. const { getVectorDbClass, getLLMProvider } = require("../helpers");
  5. const { writeResponseChunk } = require("../helpers/chat/responses");
  6. const { chatPrompt, sourceIdentifier, recentChatHistory } = require("./index");
  7. const {
  8. EphemeralAgentHandler,
  9. EphemeralEventListener,
  10. } = require("../agents/ephemeral");
  11. const { Telemetry } = require("../../models/telemetry");
  12. /**
  13. * @typedef ResponseObject
  14. * @property {string} id - uuid of response
  15. * @property {string} type - Type of response
  16. * @property {string|null} textResponse - full text response
  17. * @property {object[]} sources
  18. * @property {boolean} close
  19. * @property {string|null} error
  20. * @property {object} metrics
  21. */
  22. /**
  23. * Handle synchronous chats with your workspace via the developer API endpoint
  24. * @param {{
  25. * workspace: import("@prisma/client").workspaces,
  26. * message:string,
  27. * mode: "chat"|"query",
  28. * user: import("@prisma/client").users|null,
  29. * thread: import("@prisma/client").workspace_threads|null,
  30. * sessionId: string|null,
  31. * attachments: { name: string; mime: string; contentString: string }[],
  32. * }} parameters
  33. * @returns {Promise<ResponseObject>}
  34. */
  35. async function chatSync({
  36. workspace,
  37. message = null,
  38. mode = "chat",
  39. user = null,
  40. thread = null,
  41. sessionId = null,
  42. attachments = [],
  43. }) {
  44. const uuid = uuidv4();
  45. const chatMode = mode ?? "chat";
  46. if (EphemeralAgentHandler.isAgentInvocation({ message })) {
  47. await Telemetry.sendTelemetry("agent_chat_started");
  48. // Initialize the EphemeralAgentHandler to handle non-continuous
  49. // conversations with agents since this is over REST.
  50. const agentHandler = new EphemeralAgentHandler({
  51. uuid,
  52. workspace,
  53. prompt: message,
  54. userId: user?.id || null,
  55. threadId: thread?.id || null,
  56. sessionId,
  57. });
  58. // Establish event listener that emulates websocket calls
  59. // in Aibitat so that we can keep the same interface in Aibitat
  60. // but use HTTP.
  61. const eventListener = new EphemeralEventListener();
  62. await agentHandler.init();
  63. await agentHandler.createAIbitat({ handler: eventListener });
  64. agentHandler.startAgentCluster();
  65. // The cluster has started and now we wait for close event since
  66. // this is a synchronous call for an agent, so we return everything at once.
  67. // After this, we conclude the call as we normally do.
  68. return await eventListener
  69. .waitForClose()
  70. .then(async ({ thoughts, textResponse }) => {
  71. await WorkspaceChats.new({
  72. workspaceId: workspace.id,
  73. prompt: String(message),
  74. response: {
  75. text: textResponse,
  76. sources: [],
  77. type: chatMode,
  78. thoughts,
  79. },
  80. include: false,
  81. apiSessionId: sessionId,
  82. });
  83. return {
  84. id: uuid,
  85. type: "textResponse",
  86. sources: [],
  87. close: true,
  88. error: null,
  89. textResponse,
  90. thoughts,
  91. };
  92. });
  93. }
  94. const LLMConnector = getLLMProvider({
  95. provider: workspace?.chatProvider,
  96. model: workspace?.chatModel,
  97. });
  98. const VectorDb = getVectorDbClass();
  99. const messageLimit = workspace?.openAiHistory || 20;
  100. const hasVectorizedSpace = await VectorDb.hasNamespace(workspace.slug);
  101. const embeddingsCount = await VectorDb.namespaceCount(workspace.slug);
  102. // User is trying to query-mode chat a workspace that has no data in it - so
  103. // we should exit early as no information can be found under these conditions.
  104. if ((!hasVectorizedSpace || embeddingsCount === 0) && chatMode === "query") {
  105. const textResponse =
  106. workspace?.queryRefusalResponse ??
  107. "There is no relevant information in this workspace to answer your query.";
  108. await WorkspaceChats.new({
  109. workspaceId: workspace.id,
  110. prompt: String(message),
  111. response: {
  112. text: textResponse,
  113. sources: [],
  114. type: chatMode,
  115. metrics: {},
  116. },
  117. include: false,
  118. apiSessionId: sessionId,
  119. });
  120. return {
  121. id: uuid,
  122. type: "textResponse",
  123. sources: [],
  124. close: true,
  125. error: null,
  126. textResponse,
  127. metrics: {},
  128. };
  129. }
  130. // If we are here we know that we are in a workspace that is:
  131. // 1. Chatting in "chat" mode and may or may _not_ have embeddings
  132. // 2. Chatting in "query" mode and has at least 1 embedding
  133. let contextTexts = [];
  134. let sources = [];
  135. let pinnedDocIdentifiers = [];
  136. const { rawHistory, chatHistory } = await recentChatHistory({
  137. user,
  138. workspace,
  139. thread,
  140. messageLimit,
  141. apiSessionId: sessionId,
  142. });
  143. await new DocumentManager({
  144. workspace,
  145. maxTokens: LLMConnector.promptWindowLimit(),
  146. })
  147. .pinnedDocs()
  148. .then((pinnedDocs) => {
  149. pinnedDocs.forEach((doc) => {
  150. const { pageContent, ...metadata } = doc;
  151. pinnedDocIdentifiers.push(sourceIdentifier(doc));
  152. contextTexts.push(doc.pageContent);
  153. sources.push({
  154. text:
  155. pageContent.slice(0, 1_000) +
  156. "...continued on in source document...",
  157. ...metadata,
  158. });
  159. });
  160. });
  161. const vectorSearchResults =
  162. embeddingsCount !== 0
  163. ? await VectorDb.performSimilaritySearch({
  164. namespace: workspace.slug,
  165. input: message,
  166. LLMConnector,
  167. similarityThreshold: workspace?.similarityThreshold,
  168. topN: workspace?.topN,
  169. filterIdentifiers: pinnedDocIdentifiers,
  170. rerank: workspace?.vectorSearchMode === "rerank",
  171. })
  172. : {
  173. contextTexts: [],
  174. sources: [],
  175. message: null,
  176. };
  177. // Failed similarity search if it was run at all and failed.
  178. if (!!vectorSearchResults.message) {
  179. return {
  180. id: uuid,
  181. type: "abort",
  182. textResponse: null,
  183. sources: [],
  184. close: true,
  185. error: vectorSearchResults.message,
  186. metrics: {},
  187. };
  188. }
  189. const { fillSourceWindow } = require("../helpers/chat");
  190. const filledSources = fillSourceWindow({
  191. nDocs: workspace?.topN || 4,
  192. searchResults: vectorSearchResults.sources,
  193. history: rawHistory,
  194. filterIdentifiers: pinnedDocIdentifiers,
  195. });
  196. // Why does contextTexts get all the info, but sources only get current search?
  197. // This is to give the ability of the LLM to "comprehend" a contextual response without
  198. // populating the Citations under a response with documents the user "thinks" are irrelevant
  199. // due to how we manage backfilling of the context to keep chats with the LLM more correct in responses.
  200. // If a past citation was used to answer the question - that is visible in the history so it logically makes sense
  201. // and does not appear to the user that a new response used information that is otherwise irrelevant for a given prompt.
  202. // TLDR; reduces GitHub issues for "LLM citing document that has no answer in it" while keep answers highly accurate.
  203. contextTexts = [...contextTexts, ...filledSources.contextTexts];
  204. sources = [...sources, ...vectorSearchResults.sources];
  205. // If in query mode and no context chunks are found from search, backfill, or pins - do not
  206. // let the LLM try to hallucinate a response or use general knowledge and exit early
  207. if (chatMode === "query" && contextTexts.length === 0) {
  208. const textResponse =
  209. workspace?.queryRefusalResponse ??
  210. "There is no relevant information in this workspace to answer your query.";
  211. await WorkspaceChats.new({
  212. workspaceId: workspace.id,
  213. prompt: message,
  214. response: {
  215. text: textResponse,
  216. sources: [],
  217. type: chatMode,
  218. metrics: {},
  219. },
  220. threadId: thread?.id || null,
  221. include: false,
  222. apiSessionId: sessionId,
  223. user,
  224. });
  225. return {
  226. id: uuid,
  227. type: "textResponse",
  228. sources: [],
  229. close: true,
  230. error: null,
  231. textResponse,
  232. metrics: {},
  233. };
  234. }
  235. // Compress & Assemble message to ensure prompt passes token limit with room for response
  236. // and build system messages based on inputs and history.
  237. const messages = await LLMConnector.compressMessages(
  238. {
  239. systemPrompt: chatPrompt(workspace),
  240. userPrompt: message,
  241. contextTexts,
  242. chatHistory,
  243. attachments,
  244. },
  245. rawHistory
  246. );
  247. // Send the text completion.
  248. const { textResponse, metrics: performanceMetrics } =
  249. await LLMConnector.getChatCompletion(messages, {
  250. temperature: workspace?.openAiTemp ?? LLMConnector.defaultTemp,
  251. });
  252. if (!textResponse) {
  253. return {
  254. id: uuid,
  255. type: "abort",
  256. textResponse: null,
  257. sources: [],
  258. close: true,
  259. error: "No text completion could be completed with this input.",
  260. metrics: performanceMetrics,
  261. };
  262. }
  263. const { chat } = await WorkspaceChats.new({
  264. workspaceId: workspace.id,
  265. prompt: message,
  266. response: {
  267. text: textResponse,
  268. sources,
  269. type: chatMode,
  270. metrics: performanceMetrics,
  271. },
  272. threadId: thread?.id || null,
  273. apiSessionId: sessionId,
  274. user,
  275. });
  276. return {
  277. id: uuid,
  278. type: "textResponse",
  279. close: true,
  280. error: null,
  281. chatId: chat.id,
  282. textResponse,
  283. sources,
  284. metrics: performanceMetrics,
  285. };
  286. }
  287. /**
  288. * Handle streamable HTTP chunks for chats with your workspace via the developer API endpoint
  289. * @param {{
  290. * response: import("express").Response,
  291. * workspace: import("@prisma/client").workspaces,
  292. * message:string,
  293. * mode: "chat"|"query",
  294. * user: import("@prisma/client").users|null,
  295. * thread: import("@prisma/client").workspace_threads|null,
  296. * sessionId: string|null,
  297. * attachments: { name: string; mime: string; contentString: string }[],
  298. * }} parameters
  299. * @returns {Promise<VoidFunction>}
  300. */
  301. async function streamChat({
  302. response,
  303. workspace,
  304. message = null,
  305. mode = "chat",
  306. user = null,
  307. thread = null,
  308. sessionId = null,
  309. attachments = [],
  310. }) {
  311. const uuid = uuidv4();
  312. const chatMode = mode ?? "chat";
  313. if (EphemeralAgentHandler.isAgentInvocation({ message })) {
  314. await Telemetry.sendTelemetry("agent_chat_started");
  315. // Initialize the EphemeralAgentHandler to handle non-continuous
  316. // conversations with agents since this is over REST.
  317. const agentHandler = new EphemeralAgentHandler({
  318. uuid,
  319. workspace,
  320. prompt: message,
  321. userId: user?.id || null,
  322. threadId: thread?.id || null,
  323. sessionId,
  324. });
  325. // Establish event listener that emulates websocket calls
  326. // in Aibitat so that we can keep the same interface in Aibitat
  327. // but use HTTP.
  328. const eventListener = new EphemeralEventListener();
  329. await agentHandler.init();
  330. await agentHandler.createAIbitat({ handler: eventListener });
  331. agentHandler.startAgentCluster();
  332. // The cluster has started and now we wait for close event since
  333. // and stream back any results we get from agents as they come in.
  334. return eventListener
  335. .streamAgentEvents(response, uuid)
  336. .then(async ({ thoughts, textResponse }) => {
  337. console.log({ thoughts, textResponse });
  338. await WorkspaceChats.new({
  339. workspaceId: workspace.id,
  340. prompt: String(message),
  341. response: {
  342. text: textResponse,
  343. sources: [],
  344. type: chatMode,
  345. thoughts,
  346. },
  347. include: false,
  348. apiSessionId: sessionId,
  349. });
  350. writeResponseChunk(response, {
  351. uuid,
  352. type: "finalizeResponseStream",
  353. textResponse,
  354. thoughts,
  355. close: true,
  356. error: false,
  357. });
  358. });
  359. }
  360. const LLMConnector = getLLMProvider({
  361. provider: workspace?.chatProvider,
  362. model: workspace?.chatModel,
  363. });
  364. const VectorDb = getVectorDbClass();
  365. const messageLimit = workspace?.openAiHistory || 20;
  366. const hasVectorizedSpace = await VectorDb.hasNamespace(workspace.slug);
  367. const embeddingsCount = await VectorDb.namespaceCount(workspace.slug);
  368. // User is trying to query-mode chat a workspace that has no data in it - so
  369. // we should exit early as no information can be found under these conditions.
  370. if ((!hasVectorizedSpace || embeddingsCount === 0) && chatMode === "query") {
  371. const textResponse =
  372. workspace?.queryRefusalResponse ??
  373. "There is no relevant information in this workspace to answer your query.";
  374. writeResponseChunk(response, {
  375. id: uuid,
  376. type: "textResponse",
  377. textResponse,
  378. sources: [],
  379. attachments: [],
  380. close: true,
  381. error: null,
  382. metrics: {},
  383. });
  384. await WorkspaceChats.new({
  385. workspaceId: workspace.id,
  386. prompt: message,
  387. response: {
  388. text: textResponse,
  389. sources: [],
  390. type: chatMode,
  391. attachments: [],
  392. metrics: {},
  393. },
  394. threadId: thread?.id || null,
  395. apiSessionId: sessionId,
  396. include: false,
  397. user,
  398. });
  399. return;
  400. }
  401. // If we are here we know that we are in a workspace that is:
  402. // 1. Chatting in "chat" mode and may or may _not_ have embeddings
  403. // 2. Chatting in "query" mode and has at least 1 embedding
  404. let completeText;
  405. let metrics = {};
  406. let contextTexts = [];
  407. let sources = [];
  408. let pinnedDocIdentifiers = [];
  409. const { rawHistory, chatHistory } = await recentChatHistory({
  410. user,
  411. workspace,
  412. thread,
  413. messageLimit,
  414. apiSessionId: sessionId,
  415. });
  416. // Look for pinned documents and see if the user decided to use this feature. We will also do a vector search
  417. // as pinning is a supplemental tool but it should be used with caution since it can easily blow up a context window.
  418. // However we limit the maximum of appended context to 80% of its overall size, mostly because if it expands beyond this
  419. // it will undergo prompt compression anyway to make it work. If there is so much pinned that the context here is bigger than
  420. // what the model can support - it would get compressed anyway and that really is not the point of pinning. It is really best
  421. // suited for high-context models.
  422. await new DocumentManager({
  423. workspace,
  424. maxTokens: LLMConnector.promptWindowLimit(),
  425. })
  426. .pinnedDocs()
  427. .then((pinnedDocs) => {
  428. pinnedDocs.forEach((doc) => {
  429. const { pageContent, ...metadata } = doc;
  430. pinnedDocIdentifiers.push(sourceIdentifier(doc));
  431. contextTexts.push(doc.pageContent);
  432. sources.push({
  433. text:
  434. pageContent.slice(0, 1_000) +
  435. "...continued on in source document...",
  436. ...metadata,
  437. });
  438. });
  439. });
  440. const vectorSearchResults =
  441. embeddingsCount !== 0
  442. ? await VectorDb.performSimilaritySearch({
  443. namespace: workspace.slug,
  444. input: message,
  445. LLMConnector,
  446. similarityThreshold: workspace?.similarityThreshold,
  447. topN: workspace?.topN,
  448. filterIdentifiers: pinnedDocIdentifiers,
  449. rerank: workspace?.vectorSearchMode === "rerank",
  450. })
  451. : {
  452. contextTexts: [],
  453. sources: [],
  454. message: null,
  455. };
  456. // Failed similarity search if it was run at all and failed.
  457. if (!!vectorSearchResults.message) {
  458. writeResponseChunk(response, {
  459. id: uuid,
  460. type: "abort",
  461. textResponse: null,
  462. sources: [],
  463. close: true,
  464. error: vectorSearchResults.message,
  465. metrics: {},
  466. });
  467. return;
  468. }
  469. const { fillSourceWindow } = require("../helpers/chat");
  470. const filledSources = fillSourceWindow({
  471. nDocs: workspace?.topN || 4,
  472. searchResults: vectorSearchResults.sources,
  473. history: rawHistory,
  474. filterIdentifiers: pinnedDocIdentifiers,
  475. });
  476. // Why does contextTexts get all the info, but sources only get current search?
  477. // This is to give the ability of the LLM to "comprehend" a contextual response without
  478. // populating the Citations under a response with documents the user "thinks" are irrelevant
  479. // due to how we manage backfilling of the context to keep chats with the LLM more correct in responses.
  480. // If a past citation was used to answer the question - that is visible in the history so it logically makes sense
  481. // and does not appear to the user that a new response used information that is otherwise irrelevant for a given prompt.
  482. // TLDR; reduces GitHub issues for "LLM citing document that has no answer in it" while keep answers highly accurate.
  483. contextTexts = [...contextTexts, ...filledSources.contextTexts];
  484. sources = [...sources, ...vectorSearchResults.sources];
  485. // If in query mode and no context chunks are found from search, backfill, or pins - do not
  486. // let the LLM try to hallucinate a response or use general knowledge and exit early
  487. if (chatMode === "query" && contextTexts.length === 0) {
  488. const textResponse =
  489. workspace?.queryRefusalResponse ??
  490. "There is no relevant information in this workspace to answer your query.";
  491. writeResponseChunk(response, {
  492. id: uuid,
  493. type: "textResponse",
  494. textResponse,
  495. sources: [],
  496. close: true,
  497. error: null,
  498. metrics: {},
  499. });
  500. await WorkspaceChats.new({
  501. workspaceId: workspace.id,
  502. prompt: message,
  503. response: {
  504. text: textResponse,
  505. sources: [],
  506. type: chatMode,
  507. attachments: [],
  508. metrics: {},
  509. },
  510. threadId: thread?.id || null,
  511. apiSessionId: sessionId,
  512. include: false,
  513. user,
  514. });
  515. return;
  516. }
  517. // Compress & Assemble message to ensure prompt passes token limit with room for response
  518. // and build system messages based on inputs and history.
  519. const messages = await LLMConnector.compressMessages(
  520. {
  521. systemPrompt: chatPrompt(workspace),
  522. userPrompt: message,
  523. contextTexts,
  524. chatHistory,
  525. attachments,
  526. },
  527. rawHistory
  528. );
  529. // If streaming is not explicitly enabled for connector
  530. // we do regular waiting of a response and send a single chunk.
  531. if (LLMConnector.streamingEnabled() !== true) {
  532. console.log(
  533. `\x1b[31m[STREAMING DISABLED]\x1b[0m Streaming is not available for ${LLMConnector.constructor.name}. Will use regular chat method.`
  534. );
  535. const { textResponse, metrics: performanceMetrics } =
  536. await LLMConnector.getChatCompletion(messages, {
  537. temperature: workspace?.openAiTemp ?? LLMConnector.defaultTemp,
  538. });
  539. completeText = textResponse;
  540. metrics = performanceMetrics;
  541. writeResponseChunk(response, {
  542. uuid,
  543. sources,
  544. type: "textResponseChunk",
  545. textResponse: completeText,
  546. close: true,
  547. error: false,
  548. metrics,
  549. });
  550. } else {
  551. const stream = await LLMConnector.streamGetChatCompletion(messages, {
  552. temperature: workspace?.openAiTemp ?? LLMConnector.defaultTemp,
  553. });
  554. completeText = await LLMConnector.handleStream(response, stream, {
  555. uuid,
  556. sources,
  557. });
  558. metrics = stream.metrics;
  559. }
  560. if (completeText?.length > 0) {
  561. const { chat } = await WorkspaceChats.new({
  562. workspaceId: workspace.id,
  563. prompt: message,
  564. response: { text: completeText, sources, type: chatMode, metrics },
  565. threadId: thread?.id || null,
  566. apiSessionId: sessionId,
  567. user,
  568. });
  569. writeResponseChunk(response, {
  570. uuid,
  571. type: "finalizeResponseStream",
  572. close: true,
  573. error: false,
  574. chatId: chat.id,
  575. metrics,
  576. });
  577. return;
  578. }
  579. writeResponseChunk(response, {
  580. uuid,
  581. type: "finalizeResponseStream",
  582. close: true,
  583. error: false,
  584. });
  585. return;
  586. }
  587. module.exports.ApiChatHandler = {
  588. chatSync,
  589. streamChat,
  590. };