Workflows are long-running processes that handle complex, multi-step operations or scheduled tasks. Unlike conversations , workflows can run independently on a schedule or be triggered by events.
While workflows in the ADK are similar in concept to Workflows in Botpress Studio, they behave differently and shouldn’t be treated as equivalent.
Creating a workflow
Create a workflow in src/workflows/:
import { Workflow } from "@botpress/runtime" ;
export default new Workflow ({
name: "my-workflow" ,
description: "A workflow that processes data" ,
handler : async ({}) => {
// Workflow logic
},
});
Scheduled workflows
Workflows can run on a schedule using cron syntax:
import { WebsiteKB } from '../knowledge/docs'
export default new Workflow ({
name: "periodic-indexing" ,
description: "Indexes knowledge base every 6 hours" ,
schedule: "0 */6 * * *" , // Every 6 hours
handler : async ({}) => {
await WebsiteKB . refresh ();
},
});
Steps
By default, workflows time out after 2 minutes—if you need to run a workflow for longer, you should use the step function. This lets you break the workflow up into a series of steps that each take only a few seconds to run individually:
export default new Workflow ({
name: "data-processing" ,
handler : async ({ step }) => {
// Step 1: Fetch data
const data = await step ( "fetch-data" , async () => {
return await fetchDataFromAPI ();
});
// Step 2: Process data
const processed = await step ( "process-data" , async () => {
return processData ( data );
});
// Step 3: Store results
await step ( "store-results" , async () => {
await saveResults ( processed );
});
},
});
Steps are persisted —if a workflow is interrupted, it can resume from the last completed step. This provides better handling when errors occur in complex, long-running workflows.
You can nest steps inside other steps—each step will complete when each of its sub-steps complete.
Reference
The step object also provides many additional methods for workflow control:
Step reference Learn about all available step methods
Handling failing steps
If a step (or a step method ) fails, it’ll throw a rejected promise. This will fail not only the current step, but the entire workflow .
To avoid this, make sure you catch errors gracefully:
try {
await step . request ( "orderId" , "Please provide the order ID." );
} catch ( err ) {
console . log ( err );
}
If the method you’re using has an options.maxAttempts field, it’ll throw an error after the maximum number of retries has been exceeded. You can track the retries using the attempt parameter and catch any errors:
try {
await step (
"data-processing" ,
async ({ attempt }) => {
console . log ( `Trying step: attempt # ${ attempt } ` );
// Your code here
},
{ maxAttempts: 10 }
);
} catch ( err ) {
console . log ( err );
}
Output
Workflows can return an output that matches the output schema:
export default new Workflow ({
name: "calculate-totals" ,
input: z . object ({ orderId: z . string () }),
output: z . object ({ total: z . number () }),
handler : async ({ input , step }) => {
const order = await step ( "fetch-order" , async () => {
return await fetchOrder ( input . orderId );
});
return { total: order . total };
},
});
Reference
Workflow props
Unique name for the workflow. Used to identify and reference the workflow.
Optional description of what the workflow does.
Optional Zod schema defining the input parameters for the workflow. Defaults to an empty object if not provided.
Optional Zod schema defining the output/return type of the workflow. Defaults to an empty object if not provided.
Optional Zod schema defining the workflow state structure. Defaults to an empty object if not provided.
Optional object mapping request names to their Zod schemas. Each key is a request name and value is a Zod schema for the request data.
handler
(props: object) => Promise<any> | any
required
Handler function that implements the workflow logic. See handler parameters below for details.
Optional cron expression for scheduled workflows (e.g., “0 */6 * * *” for every 6 hours).
timeout
'{number}s' | '{number}m' | '{number}h'
Optional timeout for workflow execution (e.g., “5m”, “1h”). Defaults to “5m”.
Handler parameters
The handler function receives workflow context and provides step management:
The validated input object matching the workflow’s input schema. Typed based on your input schema.
Workflow state object that persists across workflow executions and steps. Typed based on your state schema.
Step function for creating checkpoints. Also provides methods like listen, fail, progress, abort, sleep, sleepUntil, waitForWorkflow, executeWorkflow, map, forEach, batch, and request.
Botpress client for making API calls (tables, events, etc.).
An abort signal that indicates when the workflow should stop execution.
execute
(props: object) => Promise<any>
required
Function to execute autonomous AI logic. Used to run the agent with instructions, tools, knowledge bases, etc. instructions
string | (() => string) | (() => Promise<string>)
required
Instructions for the AI agent. Can be a string or a function that returns a string.
Optional array of tools the agent can use.
Optional array of objects the agent can interact with.
Optional record of exit handlers.
Optional abort signal to cancel execution.
Optional hooks for customizing behavior (onBeforeTool, onAfterTool, onTrace, etc.).
temperature
number | (() => number) | (() => Promise<number>)
Optional temperature for the AI model. Defaults to 0.7.
model
string | string[] | (() => string | string[]) | (() => Promise<string | string[]>)
Optional model name(s) to use. Can be a string, array of strings, or a function that returns either.
Optional array of knowledge bases to use.
Optional maximum number of iterations (loops). Defaults to 10.
Methods
Workflows can be started and managed programmatically:
workflow.start()
Start a new workflow instance:
import ProcessingWorkflow from "../workflows/processing" ;
const instance = await ProcessingWorkflow . start ({ orderId: "12345" });
console . log ( "Started workflow:" , instance . id );
workflow.getOrCreate()
Get an existing workflow or create a new one with deduplication:
const instance = await ProcessingWorkflow . getOrCreate ({
key: "order-12345" , // Unique key for deduplication
input: { orderId: "12345" },
statuses: [ "pending" , "in_progress" ], // Only match workflows with these statuses
});
Convert a workflow into a tool for use with execute():
import { Conversation } from "@botpress/runtime" ;
import ProcessingWorkflow from "../workflows/processing" ;
export default new Conversation ({
channel: "*" ,
handler : async ({ execute }) => {
await execute ({
instructions: "You are a helpful assistant." ,
tools: [ ProcessingWorkflow . asTool ()],
});
},
});
workflow.provide()
Provide data in response to a workflow data request (used with step.request()):
import { isWorkflowDataRequest } from "@botpress/runtime" ;
import OrderWorkflow from "../workflows/order" ;
export default new Conversation ({
channel: "*" ,
handler : async ({ type , request }) => {
if ( type === "workflow_request" ) {
await OrderWorkflow . provide ( request , { orderId: "12345" });
}
},
});
Helper functions
isWorkflowDataRequest()
Check if an event is a workflow data request:
import { isWorkflowDataRequest } from "@botpress/runtime" ;
if ( isWorkflowDataRequest ( event )) {
// Handle the workflow data request
}
isWorkflowCallback()
Check if an event is a workflow callback:
import { isWorkflowCallback } from "@botpress/runtime" ;
if ( isWorkflowCallback ( event )) {
// Handle the workflow callback
}