You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

238 lines
7.2 KiB

11 months ago
  1. const fs = require("fs");
  2. const path = require("path");
  3. const { v4: uuidv4 } = require("uuid");
  4. const { FlowExecutor } = require("./executor");
  5. const { normalizePath } = require("../files");
  6. const { safeJsonParse } = require("../http");
  7. class AgentFlows {
  8. static flowsDir = process.env.STORAGE_DIR
  9. ? path.join(process.env.STORAGE_DIR, "plugins", "agent-flows")
  10. : path.join(process.cwd(), "storage", "plugins", "agent-flows");
  11. constructor() {}
  12. /**
  13. * Ensure flows directory exists
  14. * @returns {Boolean} True if directory exists, false otherwise
  15. */
  16. static createOrCheckFlowsDir() {
  17. try {
  18. if (fs.existsSync(AgentFlows.flowsDir)) return true;
  19. fs.mkdirSync(AgentFlows.flowsDir, { recursive: true });
  20. return true;
  21. } catch (error) {
  22. console.error("Failed to create flows directory:", error);
  23. return false;
  24. }
  25. }
  26. /**
  27. * Helper to get all flow files with their contents
  28. * @returns {Object} Map of flow UUID to flow config
  29. */
  30. static getAllFlows() {
  31. AgentFlows.createOrCheckFlowsDir();
  32. const files = fs.readdirSync(AgentFlows.flowsDir);
  33. const flows = {};
  34. for (const file of files) {
  35. if (!file.endsWith(".json")) continue;
  36. try {
  37. const filePath = path.join(AgentFlows.flowsDir, file);
  38. const content = fs.readFileSync(normalizePath(filePath), "utf8");
  39. const config = JSON.parse(content);
  40. const id = file.replace(".json", "");
  41. flows[id] = config;
  42. } catch (error) {
  43. console.error(`Error reading flow file ${file}:`, error);
  44. }
  45. }
  46. return flows;
  47. }
  48. /**
  49. * Load a flow configuration by UUID
  50. * @param {string} uuid - The UUID of the flow to load
  51. * @returns {Object|null} Flow configuration or null if not found
  52. */
  53. static loadFlow(uuid) {
  54. try {
  55. const flowJsonPath = normalizePath(
  56. path.join(AgentFlows.flowsDir, `${uuid}.json`)
  57. );
  58. if (!uuid || !fs.existsSync(flowJsonPath)) return null;
  59. const flow = safeJsonParse(fs.readFileSync(flowJsonPath, "utf8"), null);
  60. if (!flow) return null;
  61. return {
  62. name: flow.name,
  63. uuid,
  64. config: flow,
  65. };
  66. } catch (error) {
  67. console.error("Failed to load flow:", error);
  68. return null;
  69. }
  70. }
  71. /**
  72. * Save a flow configuration
  73. * @param {string} name - The name of the flow
  74. * @param {Object} config - The flow configuration
  75. * @param {string|null} uuid - Optional UUID for the flow
  76. * @returns {Object} Result of the save operation
  77. */
  78. static saveFlow(name, config, uuid = null) {
  79. try {
  80. AgentFlows.createOrCheckFlowsDir();
  81. if (!uuid) uuid = uuidv4();
  82. const normalizedUuid = normalizePath(`${uuid}.json`);
  83. const filePath = path.join(AgentFlows.flowsDir, normalizedUuid);
  84. fs.writeFileSync(filePath, JSON.stringify({ ...config, name }, null, 2));
  85. return { success: true, uuid };
  86. } catch (error) {
  87. console.error("Failed to save flow:", error);
  88. return { success: false, error: error.message };
  89. }
  90. }
  91. /**
  92. * List all available flows
  93. * @returns {Array} Array of flow summaries
  94. */
  95. static listFlows() {
  96. try {
  97. const flows = AgentFlows.getAllFlows();
  98. return Object.entries(flows).map(([uuid, flow]) => ({
  99. name: flow.name,
  100. uuid,
  101. description: flow.description,
  102. active: flow.active !== false,
  103. }));
  104. } catch (error) {
  105. console.error("Failed to list flows:", error);
  106. return [];
  107. }
  108. }
  109. /**
  110. * Delete a flow by UUID
  111. * @param {string} uuid - The UUID of the flow to delete
  112. * @returns {Object} Result of the delete operation
  113. */
  114. static deleteFlow(uuid) {
  115. try {
  116. const filePath = normalizePath(
  117. path.join(AgentFlows.flowsDir, `${uuid}.json`)
  118. );
  119. if (!fs.existsSync(filePath)) throw new Error(`Flow ${uuid} not found`);
  120. fs.rmSync(filePath);
  121. return { success: true };
  122. } catch (error) {
  123. console.error("Failed to delete flow:", error);
  124. return { success: false, error: error.message };
  125. }
  126. }
  127. /**
  128. * Execute a flow by UUID
  129. * @param {string} uuid - The UUID of the flow to execute
  130. * @param {Object} variables - Initial variables for the flow
  131. * @param {Function} introspectFn - Function to introspect the flow
  132. * @param {Function} loggerFn - Function to log the flow
  133. * @returns {Promise<Object>} Result of flow execution
  134. */
  135. static async executeFlow(
  136. uuid,
  137. variables = {},
  138. introspectFn = null,
  139. loggerFn = null
  140. ) {
  141. const flow = AgentFlows.loadFlow(uuid);
  142. if (!flow) throw new Error(`Flow ${uuid} not found`);
  143. const flowExecutor = new FlowExecutor();
  144. return await flowExecutor.executeFlow(
  145. flow,
  146. variables,
  147. introspectFn,
  148. loggerFn
  149. );
  150. }
  151. /**
  152. * Get all active flows as plugins that can be loaded into the agent
  153. * @returns {string[]} Array of flow names in @@flow_{uuid} format
  154. */
  155. static activeFlowPlugins() {
  156. const flows = AgentFlows.getAllFlows();
  157. return Object.entries(flows)
  158. .filter(([_, flow]) => flow.active !== false)
  159. .map(([uuid]) => `@@flow_${uuid}`);
  160. }
  161. /**
  162. * Load a flow plugin by its UUID
  163. * @param {string} uuid - The UUID of the flow to load
  164. * @returns {Object|null} Plugin configuration or null if not found
  165. */
  166. static loadFlowPlugin(uuid) {
  167. const flow = AgentFlows.loadFlow(uuid);
  168. if (!flow) return null;
  169. const startBlock = flow.config.steps?.find((s) => s.type === "start");
  170. const variables = startBlock?.config?.variables || [];
  171. return {
  172. name: `flow_${uuid}`,
  173. description: `Execute agent flow: ${flow.name}`,
  174. plugin: (_runtimeArgs = {}) => ({
  175. name: `flow_${uuid}`,
  176. description: flow.description || `Execute agent flow: ${flow.name}`,
  177. setup: (aibitat) => {
  178. aibitat.function({
  179. name: `flow_${uuid}`,
  180. description: flow.description || `Execute agent flow: ${flow.name}`,
  181. parameters: {
  182. type: "object",
  183. properties: variables.reduce((acc, v) => {
  184. if (v.name) {
  185. acc[v.name] = {
  186. type: "string",
  187. description:
  188. v.description || `Value for variable ${v.name}`,
  189. };
  190. }
  191. return acc;
  192. }, {}),
  193. },
  194. handler: async (args) => {
  195. aibitat.introspect(`Executing flow: ${flow.name}`);
  196. const result = await AgentFlows.executeFlow(
  197. uuid,
  198. args,
  199. aibitat.introspect,
  200. aibitat.handlerProps.log
  201. );
  202. if (!result.success) {
  203. aibitat.introspect(
  204. `Flow failed: ${result.results[0]?.error || "Unknown error"}`
  205. );
  206. return `Flow execution failed: ${result.results[0]?.error || "Unknown error"}`;
  207. }
  208. aibitat.introspect(`${flow.name} completed successfully`);
  209. return typeof result === "object"
  210. ? JSON.stringify(result)
  211. : String(result);
  212. },
  213. });
  214. },
  215. }),
  216. flowName: flow.name,
  217. };
  218. }
  219. }
  220. module.exports.AgentFlows = AgentFlows;