You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 

146 lines
4.3 KiB

const { FLOW_TYPES } = require("./flowTypes");
const executeApiCall = require("./executors/api-call");
const executeWebsite = require("./executors/website");
const executeFile = require("./executors/file");
const executeCode = require("./executors/code");
const executeLLMInstruction = require("./executors/llm-instruction");
const executeWebScraping = require("./executors/web-scraping");
const { Telemetry } = require("../../models/telemetry");
class FlowExecutor {
constructor() {
this.variables = {};
this.introspect = () => {}; // Default no-op introspect
this.logger = console.info; // Default console.info
}
attachLogging(introspectFn, loggerFn) {
this.introspect = introspectFn || (() => {});
this.logger = loggerFn || console.info;
}
// Utility to replace variables in config
replaceVariables(config) {
const deepReplace = (obj) => {
if (typeof obj === "string") {
return obj.replace(/\${([^}]+)}/g, (match, varName) => {
return this.variables[varName] !== undefined
? this.variables[varName]
: match;
});
}
if (Array.isArray(obj)) {
return obj.map((item) => deepReplace(item));
}
if (obj && typeof obj === "object") {
const result = {};
for (const [key, value] of Object.entries(obj)) {
result[key] = deepReplace(value);
}
return result;
}
return obj;
};
return deepReplace(config);
}
// Main execution method
async executeStep(step) {
const config = this.replaceVariables(step.config);
let result;
// Create execution context with introspect
const context = {
introspect: this.introspect,
variables: this.variables,
logger: this.logger,
model: process.env.LLM_PROVIDER_MODEL || "gpt-4",
provider: process.env.LLM_PROVIDER || "openai",
};
switch (step.type) {
case FLOW_TYPES.START.type:
// For start blocks, we just initialize variables if they're not already set
if (config.variables) {
config.variables.forEach((v) => {
if (v.name && !this.variables[v.name]) {
this.variables[v.name] = v.value || "";
}
});
}
result = this.variables;
break;
case FLOW_TYPES.API_CALL.type:
result = await executeApiCall(config, context);
break;
case FLOW_TYPES.WEBSITE.type:
result = await executeWebsite(config, context);
break;
case FLOW_TYPES.FILE.type:
result = await executeFile(config, context);
break;
case FLOW_TYPES.CODE.type:
result = await executeCode(config, context);
break;
case FLOW_TYPES.LLM_INSTRUCTION.type:
result = await executeLLMInstruction(config, context);
break;
case FLOW_TYPES.WEB_SCRAPING.type:
result = await executeWebScraping(config, context);
break;
default:
throw new Error(`Unknown flow type: ${step.type}`);
}
// Store result in variable if specified
if (config.resultVariable || config.responseVariable) {
const varName = config.resultVariable || config.responseVariable;
this.variables[varName] = result;
}
return result;
}
// Execute entire flow
async executeFlow(
flow,
initialVariables = {},
introspectFn = null,
loggerFn = null
) {
await Telemetry.sendTelemetry("agent_flow_execution_started");
// Initialize variables with both initial values and any passed-in values
this.variables = {
...(
flow.config.steps.find((s) => s.type === "start")?.config?.variables ||
[]
).reduce((acc, v) => ({ ...acc, [v.name]: v.value }), {}),
...initialVariables, // This will override any default values with passed-in values
};
this.attachLogging(introspectFn, loggerFn);
const results = [];
for (const step of flow.config.steps) {
try {
const result = await this.executeStep(step);
results.push({ success: true, result });
} catch (error) {
results.push({ success: false, error: error.message });
break;
}
}
return {
success: results.every((r) => r.success),
results,
variables: this.variables,
};
}
}
module.exports = {
FlowExecutor,
FLOW_TYPES,
};