You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

265 lines
9.1 KiB

11 months ago
  1. const { v4: uuidv4 } = require("uuid");
  2. const moment = require("moment");
  3. function clientAbortedHandler(resolve, fullText) {
  4. console.log(
  5. "\x1b[43m\x1b[34m[STREAM ABORTED]\x1b[0m Client requested to abort stream. Exiting LLM stream handler early."
  6. );
  7. resolve(fullText);
  8. return;
  9. }
  10. /**
  11. * Handles the default stream response for a chat.
  12. * @param {import("express").Response} response
  13. * @param {import('./LLMPerformanceMonitor').MonitoredStream} stream
  14. * @param {Object} responseProps
  15. * @returns {Promise<string>}
  16. */
  17. function handleDefaultStreamResponseV2(response, stream, responseProps) {
  18. const { uuid = uuidv4(), sources = [] } = responseProps;
  19. // Why are we doing this?
  20. // OpenAI do enable the usage metrics in the stream response but:
  21. // 1. This parameter is not available in our current API version (TODO: update)
  22. // 2. The usage metrics are not available in _every_ provider that uses this function
  23. // 3. We need to track the usage metrics for every provider that uses this function - not just OpenAI
  24. // Other keys are added by the LLMPerformanceMonitor.measureStream method
  25. let hasUsageMetrics = false;
  26. let usage = {
  27. // prompt_tokens can be in this object if the provider supports it - otherwise we manually count it
  28. // When the stream is created in the LLMProviders `streamGetChatCompletion` `LLMPerformanceMonitor.measureStream` call.
  29. completion_tokens: 0,
  30. };
  31. return new Promise(async (resolve) => {
  32. let fullText = "";
  33. // Establish listener to early-abort a streaming response
  34. // in case things go sideways or the user does not like the response.
  35. // We preserve the generated text but continue as if chat was completed
  36. // to preserve previously generated content.
  37. const handleAbort = () => {
  38. stream?.endMeasurement(usage);
  39. clientAbortedHandler(resolve, fullText);
  40. };
  41. response.on("close", handleAbort);
  42. // Now handle the chunks from the streamed response and append to fullText.
  43. try {
  44. for await (const chunk of stream) {
  45. const message = chunk?.choices?.[0];
  46. const token = message?.delta?.content;
  47. // If we see usage metrics in the chunk, we can use them directly
  48. // instead of estimating them, but we only want to assign values if
  49. // the response object is the exact same key:value pair we expect.
  50. if (
  51. chunk.hasOwnProperty("usage") && // exists
  52. !!chunk.usage && // is not null
  53. Object.values(chunk.usage).length > 0 // has values
  54. ) {
  55. if (chunk.usage.hasOwnProperty("prompt_tokens")) {
  56. usage.prompt_tokens = Number(chunk.usage.prompt_tokens);
  57. }
  58. if (chunk.usage.hasOwnProperty("completion_tokens")) {
  59. hasUsageMetrics = true; // to stop estimating counter
  60. usage.completion_tokens = Number(chunk.usage.completion_tokens);
  61. }
  62. }
  63. if (token) {
  64. fullText += token;
  65. // If we never saw a usage metric, we can estimate them by number of completion chunks
  66. if (!hasUsageMetrics) usage.completion_tokens++;
  67. writeResponseChunk(response, {
  68. uuid,
  69. sources: [],
  70. type: "textResponseChunk",
  71. textResponse: token,
  72. close: false,
  73. error: false,
  74. });
  75. }
  76. // LocalAi returns '' and others return null on chunks - the last chunk is not "" or null.
  77. // Either way, the key `finish_reason` must be present to determine ending chunk.
  78. if (
  79. message?.hasOwnProperty("finish_reason") && // Got valid message and it is an object with finish_reason
  80. message.finish_reason !== "" &&
  81. message.finish_reason !== null
  82. ) {
  83. writeResponseChunk(response, {
  84. uuid,
  85. sources,
  86. type: "textResponseChunk",
  87. textResponse: "",
  88. close: true,
  89. error: false,
  90. });
  91. response.removeListener("close", handleAbort);
  92. stream?.endMeasurement(usage);
  93. resolve(fullText);
  94. break; // Break streaming when a valid finish_reason is first encountered
  95. }
  96. }
  97. } catch (e) {
  98. console.log(`\x1b[43m\x1b[34m[STREAMING ERROR]\x1b[0m ${e.message}`);
  99. writeResponseChunk(response, {
  100. uuid,
  101. type: "abort",
  102. textResponse: null,
  103. sources: [],
  104. close: true,
  105. error: e.message,
  106. });
  107. stream?.endMeasurement(usage);
  108. resolve(fullText); // Return what we currently have - if anything.
  109. }
  110. });
  111. }
  112. function convertToChatHistory(history = []) {
  113. const formattedHistory = [];
  114. for (const record of history) {
  115. const { prompt, response, createdAt, feedbackScore = null, id } = record;
  116. const data = JSON.parse(response);
  117. // In the event that a bad response was stored - we should skip its entire record
  118. // because it was likely an error and cannot be used in chats and will fail to render on UI.
  119. if (typeof prompt !== "string") {
  120. console.log(
  121. `[convertToChatHistory] ChatHistory #${record.id} prompt property is not a string - skipping record.`
  122. );
  123. continue;
  124. } else if (typeof data.text !== "string") {
  125. console.log(
  126. `[convertToChatHistory] ChatHistory #${record.id} response.text property is not a string - skipping record.`
  127. );
  128. continue;
  129. }
  130. formattedHistory.push([
  131. {
  132. role: "user",
  133. content: prompt,
  134. sentAt: moment(createdAt).unix(),
  135. attachments: data?.attachments ?? [],
  136. chatId: id,
  137. },
  138. {
  139. type: data?.type || "chart",
  140. role: "assistant",
  141. content: data.text,
  142. sources: data.sources || [],
  143. chatId: id,
  144. sentAt: moment(createdAt).unix(),
  145. feedbackScore,
  146. metrics: data?.metrics || {},
  147. },
  148. ]);
  149. }
  150. return formattedHistory.flat();
  151. }
  152. /**
  153. * Converts a chat history to a prompt history.
  154. * @param {Object[]} history - The chat history to convert
  155. * @returns {{role: string, content: string, attachments?: import("..").Attachment}[]}
  156. */
  157. function convertToPromptHistory(history = []) {
  158. const formattedHistory = [];
  159. for (const record of history) {
  160. const { prompt, response } = record;
  161. const data = JSON.parse(response);
  162. // In the event that a bad response was stored - we should skip its entire record
  163. // because it was likely an error and cannot be used in chats and will fail to render on UI.
  164. if (typeof prompt !== "string") {
  165. console.log(
  166. `[convertToPromptHistory] ChatHistory #${record.id} prompt property is not a string - skipping record.`
  167. );
  168. continue;
  169. } else if (typeof data.text !== "string") {
  170. console.log(
  171. `[convertToPromptHistory] ChatHistory #${record.id} response.text property is not a string - skipping record.`
  172. );
  173. continue;
  174. }
  175. formattedHistory.push([
  176. {
  177. role: "user",
  178. content: prompt,
  179. // if there are attachments, add them as a property to the user message so we can reuse them in chat history later if supported by the llm.
  180. ...(data?.attachments?.length > 0
  181. ? { attachments: data?.attachments }
  182. : {}),
  183. },
  184. {
  185. role: "assistant",
  186. content: data.text,
  187. },
  188. ]);
  189. }
  190. return formattedHistory.flat();
  191. }
  192. function writeResponseChunk(response, data) {
  193. response.write(`data: ${JSON.stringify(data)}\n\n`);
  194. return;
  195. }
  196. /**
  197. * Formats the chat history to re-use attachments in the chat history
  198. * that might have existed in the conversation earlier.
  199. * @param {{role:string, content:string, attachments?: Object[]}[]} chatHistory
  200. * @param {function} formatterFunction - The function to format the chat history from the llm provider
  201. * @param {('asProperty'|'spread')} mode - "asProperty" or "spread". Determines how the content is formatted in the message object.
  202. * @returns {object[]}
  203. */
  204. function formatChatHistory(
  205. chatHistory = [],
  206. formatterFunction,
  207. mode = "asProperty"
  208. ) {
  209. return chatHistory.map((historicalMessage) => {
  210. if (
  211. historicalMessage?.role !== "user" || // Only user messages can have attachments
  212. !historicalMessage?.attachments || // If there are no attachments, we can skip this
  213. !historicalMessage.attachments.length // If there is an array but it is empty, we can skip this
  214. )
  215. return historicalMessage;
  216. // Some providers, like Ollama, expect the content to be embedded in the message object.
  217. if (mode === "spread") {
  218. return {
  219. role: historicalMessage.role,
  220. ...formatterFunction({
  221. userPrompt: historicalMessage.content,
  222. attachments: historicalMessage.attachments,
  223. }),
  224. };
  225. }
  226. // Most providers expect the content to be a property of the message object formatted like OpenAI models.
  227. return {
  228. role: historicalMessage.role,
  229. content: formatterFunction({
  230. userPrompt: historicalMessage.content,
  231. attachments: historicalMessage.attachments,
  232. }),
  233. };
  234. });
  235. }
  236. module.exports = {
  237. handleDefaultStreamResponseV2,
  238. convertToChatHistory,
  239. convertToPromptHistory,
  240. writeResponseChunk,
  241. clientAbortedHandler,
  242. formatChatHistory,
  243. };