You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

419 lines
13 KiB

11 months ago
  1. const { NativeEmbedder } = require("../../EmbeddingEngines/native");
  2. const { v4: uuidv4 } = require("uuid");
  3. const {
  4. writeResponseChunk,
  5. clientAbortedHandler,
  6. formatChatHistory,
  7. } = require("../../helpers/chat/responses");
  8. const fs = require("fs");
  9. const path = require("path");
  10. const { safeJsonParse } = require("../../http");
  11. const {
  12. LLMPerformanceMonitor,
  13. } = require("../../helpers/chat/LLMPerformanceMonitor");
  14. const cacheFolder = path.resolve(
  15. process.env.STORAGE_DIR
  16. ? path.resolve(process.env.STORAGE_DIR, "models", "novita")
  17. : path.resolve(__dirname, `../../../storage/models/novita`)
  18. );
  19. class NovitaLLM {
  20. constructor(embedder = null, modelPreference = null) {
  21. if (!process.env.NOVITA_LLM_API_KEY)
  22. throw new Error("No Novita API key was set.");
  23. const { OpenAI: OpenAIApi } = require("openai");
  24. this.basePath = "https://api.novita.ai/v3/openai";
  25. this.openai = new OpenAIApi({
  26. baseURL: this.basePath,
  27. apiKey: process.env.NOVITA_LLM_API_KEY ?? null,
  28. defaultHeaders: {
  29. "HTTP-Referer": "https://anythingllm.com",
  30. "X-Novita-Source": "anythingllm",
  31. },
  32. });
  33. this.model =
  34. modelPreference ||
  35. process.env.NOVITA_LLM_MODEL_PREF ||
  36. "deepseek/deepseek-r1";
  37. this.limits = {
  38. history: this.promptWindowLimit() * 0.15,
  39. system: this.promptWindowLimit() * 0.15,
  40. user: this.promptWindowLimit() * 0.7,
  41. };
  42. this.embedder = embedder ?? new NativeEmbedder();
  43. this.defaultTemp = 0.7;
  44. this.timeout = this.#parseTimeout();
  45. if (!fs.existsSync(cacheFolder))
  46. fs.mkdirSync(cacheFolder, { recursive: true });
  47. this.cacheModelPath = path.resolve(cacheFolder, "models.json");
  48. this.cacheAtPath = path.resolve(cacheFolder, ".cached_at");
  49. this.log(`Loaded with model: ${this.model}`);
  50. }
  51. log(text, ...args) {
  52. console.log(`\x1b[36m[${this.constructor.name}]\x1b[0m ${text}`, ...args);
  53. }
  54. /**
  55. * Novita has various models that never return `finish_reasons` and thus leave the stream open
  56. * which causes issues in subsequent messages. This timeout value forces us to close the stream after
  57. * x milliseconds. This is a configurable value via the NOVITA_LLM_TIMEOUT_MS value
  58. * @returns {number} The timeout value in milliseconds (default: 500)
  59. */
  60. #parseTimeout() {
  61. if (isNaN(Number(process.env.NOVITA_LLM_TIMEOUT_MS))) return 500;
  62. const setValue = Number(process.env.NOVITA_LLM_TIMEOUT_MS);
  63. if (setValue < 500) return 500;
  64. return setValue;
  65. }
  66. // This checks if the .cached_at file has a timestamp that is more than 1Week (in millis)
  67. // from the current date. If it is, then we will refetch the API so that all the models are up
  68. // to date.
  69. #cacheIsStale() {
  70. const MAX_STALE = 6.048e8; // 1 Week in MS
  71. if (!fs.existsSync(this.cacheAtPath)) return true;
  72. const now = Number(new Date());
  73. const timestampMs = Number(fs.readFileSync(this.cacheAtPath));
  74. return now - timestampMs > MAX_STALE;
  75. }
  76. // The Novita model API has a lot of models, so we cache this locally in the directory
  77. // as if the cache directory JSON file is stale or does not exist we will fetch from API and store it.
  78. // This might slow down the first request, but we need the proper token context window
  79. // for each model and this is a constructor property - so we can really only get it if this cache exists.
  80. // We used to have this as a chore, but given there is an API to get the info - this makes little sense.
  81. async #syncModels() {
  82. if (fs.existsSync(this.cacheModelPath) && !this.#cacheIsStale())
  83. return false;
  84. this.log("Model cache is not present or stale. Fetching from Novita API.");
  85. await fetchNovitaModels();
  86. return;
  87. }
  88. #appendContext(contextTexts = []) {
  89. if (!contextTexts || !contextTexts.length) return "";
  90. return (
  91. "\nContext:\n" +
  92. contextTexts
  93. .map((text, i) => {
  94. return `[CONTEXT ${i}]:\n${text}\n[END CONTEXT ${i}]\n\n`;
  95. })
  96. .join("")
  97. );
  98. }
  99. models() {
  100. if (!fs.existsSync(this.cacheModelPath)) return {};
  101. return safeJsonParse(
  102. fs.readFileSync(this.cacheModelPath, { encoding: "utf-8" }),
  103. {}
  104. );
  105. }
  106. streamingEnabled() {
  107. return "streamGetChatCompletion" in this;
  108. }
  109. static promptWindowLimit(modelName) {
  110. const cacheModelPath = path.resolve(cacheFolder, "models.json");
  111. const availableModels = fs.existsSync(cacheModelPath)
  112. ? safeJsonParse(
  113. fs.readFileSync(cacheModelPath, { encoding: "utf-8" }),
  114. {}
  115. )
  116. : {};
  117. return availableModels[modelName]?.maxLength || 4096;
  118. }
  119. promptWindowLimit() {
  120. const availableModels = this.models();
  121. return availableModels[this.model]?.maxLength || 4096;
  122. }
  123. async isValidChatCompletionModel(model = "") {
  124. await this.#syncModels();
  125. const availableModels = this.models();
  126. return availableModels.hasOwnProperty(model);
  127. }
  128. /**
  129. * Generates appropriate content array for a message + attachments.
  130. * @param {{userPrompt:string, attachments: import("../../helpers").Attachment[]}}
  131. * @returns {string|object[]}
  132. */
  133. #generateContent({ userPrompt, attachments = [] }) {
  134. if (!attachments.length) {
  135. return userPrompt;
  136. }
  137. const content = [{ type: "text", text: userPrompt }];
  138. for (let attachment of attachments) {
  139. content.push({
  140. type: "image_url",
  141. image_url: {
  142. url: attachment.contentString,
  143. detail: "auto",
  144. },
  145. });
  146. }
  147. return content.flat();
  148. }
  149. constructPrompt({
  150. systemPrompt = "",
  151. contextTexts = [],
  152. chatHistory = [],
  153. userPrompt = "",
  154. attachments = [],
  155. }) {
  156. const prompt = {
  157. role: "system",
  158. content: `${systemPrompt}${this.#appendContext(contextTexts)}`,
  159. };
  160. return [
  161. prompt,
  162. ...formatChatHistory(chatHistory, this.#generateContent),
  163. {
  164. role: "user",
  165. content: this.#generateContent({ userPrompt, attachments }),
  166. },
  167. ];
  168. }
  169. async getChatCompletion(messages = null, { temperature = 0.7 }) {
  170. if (!(await this.isValidChatCompletionModel(this.model)))
  171. throw new Error(
  172. `Novita chat: ${this.model} is not valid for chat completion!`
  173. );
  174. const result = await LLMPerformanceMonitor.measureAsyncFunction(
  175. this.openai.chat.completions
  176. .create({
  177. model: this.model,
  178. messages,
  179. temperature,
  180. })
  181. .catch((e) => {
  182. throw new Error(e.message);
  183. })
  184. );
  185. if (
  186. !result.output.hasOwnProperty("choices") ||
  187. result.output.choices.length === 0
  188. )
  189. return null;
  190. return {
  191. textResponse: result.output.choices[0].message.content,
  192. metrics: {
  193. prompt_tokens: result.output.usage.prompt_tokens || 0,
  194. completion_tokens: result.output.usage.completion_tokens || 0,
  195. total_tokens: result.output.usage.total_tokens || 0,
  196. outputTps: result.output.usage.completion_tokens / result.duration,
  197. duration: result.duration,
  198. },
  199. };
  200. }
  201. async streamGetChatCompletion(messages = null, { temperature = 0.7 }) {
  202. if (!(await this.isValidChatCompletionModel(this.model)))
  203. throw new Error(
  204. `Novita chat: ${this.model} is not valid for chat completion!`
  205. );
  206. const measuredStreamRequest = await LLMPerformanceMonitor.measureStream(
  207. this.openai.chat.completions.create({
  208. model: this.model,
  209. stream: true,
  210. messages,
  211. temperature,
  212. }),
  213. messages
  214. );
  215. return measuredStreamRequest;
  216. }
  217. /**
  218. * Handles the default stream response for a chat.
  219. * @param {import("express").Response} response
  220. * @param {import('../../helpers/chat/LLMPerformanceMonitor').MonitoredStream} stream
  221. * @param {Object} responseProps
  222. * @returns {Promise<string>}
  223. */
  224. handleStream(response, stream, responseProps) {
  225. const timeoutThresholdMs = this.timeout;
  226. const { uuid = uuidv4(), sources = [] } = responseProps;
  227. return new Promise(async (resolve) => {
  228. let fullText = "";
  229. let lastChunkTime = null; // null when first token is still not received.
  230. // Establish listener to early-abort a streaming response
  231. // in case things go sideways or the user does not like the response.
  232. // We preserve the generated text but continue as if chat was completed
  233. // to preserve previously generated content.
  234. const handleAbort = () => {
  235. stream?.endMeasurement({
  236. completion_tokens: LLMPerformanceMonitor.countTokens(fullText),
  237. });
  238. clientAbortedHandler(resolve, fullText);
  239. };
  240. response.on("close", handleAbort);
  241. // NOTICE: Not all Novita models will return a stop reason
  242. // which keeps the connection open and so the model never finalizes the stream
  243. // like the traditional OpenAI response schema does. So in the case the response stream
  244. // never reaches a formal close state we maintain an interval timer that if we go >=timeoutThresholdMs with
  245. // no new chunks then we kill the stream and assume it to be complete. Novita is quite fast
  246. // so this threshold should permit most responses, but we can adjust `timeoutThresholdMs` if
  247. // we find it is too aggressive.
  248. const timeoutCheck = setInterval(() => {
  249. if (lastChunkTime === null) return;
  250. const now = Number(new Date());
  251. const diffMs = now - lastChunkTime;
  252. if (diffMs >= timeoutThresholdMs) {
  253. this.log(
  254. `Novita stream did not self-close and has been stale for >${timeoutThresholdMs}ms. Closing response stream.`
  255. );
  256. writeResponseChunk(response, {
  257. uuid,
  258. sources,
  259. type: "textResponseChunk",
  260. textResponse: "",
  261. close: true,
  262. error: false,
  263. });
  264. clearInterval(timeoutCheck);
  265. response.removeListener("close", handleAbort);
  266. stream?.endMeasurement({
  267. completion_tokens: LLMPerformanceMonitor.countTokens(fullText),
  268. });
  269. resolve(fullText);
  270. }
  271. }, 500);
  272. try {
  273. for await (const chunk of stream) {
  274. const message = chunk?.choices?.[0];
  275. const token = message?.delta?.content;
  276. lastChunkTime = Number(new Date());
  277. if (token) {
  278. fullText += token;
  279. writeResponseChunk(response, {
  280. uuid,
  281. sources: [],
  282. type: "textResponseChunk",
  283. textResponse: token,
  284. close: false,
  285. error: false,
  286. });
  287. }
  288. if (message.finish_reason !== null) {
  289. writeResponseChunk(response, {
  290. uuid,
  291. sources,
  292. type: "textResponseChunk",
  293. textResponse: "",
  294. close: true,
  295. error: false,
  296. });
  297. response.removeListener("close", handleAbort);
  298. stream?.endMeasurement({
  299. completion_tokens: LLMPerformanceMonitor.countTokens(fullText),
  300. });
  301. resolve(fullText);
  302. }
  303. }
  304. } catch (e) {
  305. writeResponseChunk(response, {
  306. uuid,
  307. sources,
  308. type: "abort",
  309. textResponse: null,
  310. close: true,
  311. error: e.message,
  312. });
  313. response.removeListener("close", handleAbort);
  314. stream?.endMeasurement({
  315. completion_tokens: LLMPerformanceMonitor.countTokens(fullText),
  316. });
  317. resolve(fullText);
  318. }
  319. });
  320. }
  321. // Simple wrapper for dynamic embedder & normalize interface for all LLM implementations
  322. async embedTextInput(textInput) {
  323. return await this.embedder.embedTextInput(textInput);
  324. }
  325. async embedChunks(textChunks = []) {
  326. return await this.embedder.embedChunks(textChunks);
  327. }
  328. async compressMessages(promptArgs = {}, rawHistory = []) {
  329. const { messageArrayCompressor } = require("../../helpers/chat");
  330. const messageArray = this.constructPrompt(promptArgs);
  331. return await messageArrayCompressor(this, messageArray, rawHistory);
  332. }
  333. }
  334. async function fetchNovitaModels() {
  335. return await fetch(`https://api.novita.ai/v3/openai/models`, {
  336. method: "GET",
  337. headers: {
  338. "Content-Type": "application/json",
  339. },
  340. })
  341. .then((res) => res.json())
  342. .then(({ data = [] }) => {
  343. const models = {};
  344. data.forEach((model) => {
  345. models[model.id] = {
  346. id: model.id,
  347. name: model.title,
  348. organization:
  349. model.id.split("/")[0].charAt(0).toUpperCase() +
  350. model.id.split("/")[0].slice(1),
  351. maxLength: model.context_size,
  352. };
  353. });
  354. // Cache all response information
  355. if (!fs.existsSync(cacheFolder))
  356. fs.mkdirSync(cacheFolder, { recursive: true });
  357. fs.writeFileSync(
  358. path.resolve(cacheFolder, "models.json"),
  359. JSON.stringify(models),
  360. {
  361. encoding: "utf-8",
  362. }
  363. );
  364. fs.writeFileSync(
  365. path.resolve(cacheFolder, ".cached_at"),
  366. String(Number(new Date())),
  367. {
  368. encoding: "utf-8",
  369. }
  370. );
  371. return models;
  372. })
  373. .catch((e) => {
  374. console.error(e);
  375. return {};
  376. });
  377. }
  378. module.exports = {
  379. NovitaLLM,
  380. fetchNovitaModels,
  381. };