Logo

Basic Workflow Patterns

Learn common patterns and techniques for building effective workflows

This guide explores common patterns you can use to build more complex workflows with workflows.

Fan-out (Parallelism)

One of the most powerful features of workflows is the ability to run tasks in parallel:

import { createWorkflow, workflowEvent, getContext } from "@llama-flow/core";
 
// Define events
const startEvent = workflowEvent<string>();
const processItemEvent = workflowEvent<number>();
const resultEvent = workflowEvent<string>();
const completeEvent = workflowEvent<string[]>();
 
// Create workflow
const workflow = createWorkflow();
 
// Define a variable accessible within the handler scope to signal completion
let itemsToProcess = 10; // Total number of items
let itemsProcessed = 0;
 
// Process start event: fan out to multiple processItemEvent events
workflow.handle([startEvent], async (start) => {
  const { sendEvent, stream } = getContext();
  itemsProcessed = 0; // Reset counter for this execution context
 
  // Emit multiple events to be processed in parallel
  for (let i = 0; i < itemsToProcess; i++) {
    sendEvent(processItemEvent.with(i));
  }
 
  // Use an async IIFE to collect results and emit completeEvent
  try {
    const results = await stream
      .filter(resultEvent)
      .until(() => itemsProcessed === itemsToProcess)
      .toArray();
    // Send the final aggregated result
    sendEvent(completeEvent.with(results.map((event) => event.data)));
  } catch (err) {
    console.error("Error processing items:", err);
    // Handle error if needed
  }
 
  // Note: This handler finishes *before* the collection completes.
  // Returning nothing or a specific "processing started" event might be appropriate.
});
 
// Process each item
workflow.handle([processItemEvent], async (event) => {
  // Simulate async work
  await new Promise((resolve) => setTimeout(resolve, Math.random() * 100));
  const processedValue = `Processed: ${event.data}`;
 
  // Crucially, update the shared counter *after* processing
  itemsProcessed++;
 
  return resultEvent.with(processedValue);
});
 
// Example E2E Run Usage
async function runFanOut() {
  console.log("Running fan-out");
  const { stream, sendEvent } = workflow.createContext();
  sendEvent(startEvent.with("Start fan-out"));
 
  for await (const event of stream) {
    if (processItemEvent.include(event)) {
      console.log(`Processing item: ${event.data}`);
    } else if (resultEvent.include(event)) {
      console.log(`Result received: ${event.data}`);
    } else if (completeEvent.include(event)) {
      console.log("Final aggregated results:", event.data);
      break; // Stop processing the stream
    }
  }
}
 
runFanOut();

This pattern allows you to:

  1. Emit multiple events to be processed in parallel
  2. Collect results as they come in
  3. Complete once all parallel tasks are finished

Conditional Branching

You can implement conditional logic in your workflows:

import { createWorkflow, workflowEvent } from "@llama-flow/core";
 
const inputEvent = workflowEvent<number>();
const evenNumberEvent = workflowEvent<string>();
const oddNumberEvent = workflowEvent<string>();
const resultEvent = workflowEvent<string>();
 
const workflow = createWorkflow();
 
// Branch based on whether the number is even or odd
workflow.handle([inputEvent], (event) => {
  if (event.data % 2 === 0) {
    return evenNumberEvent.with(`${event.data} is even`);
  } else {
    return oddNumberEvent.with(`${event.data} is odd`);
  }
});
 
// Handle even numbers
workflow.handle([evenNumberEvent], (event) => {
  return resultEvent.with(`Even result: ${event.data}`);
});
 
// Handle odd numbers
workflow.handle([oddNumberEvent], (event) => {
  return resultEvent.with(`Odd result: ${event.data}`);
});
 
// Example E2E Run Usage
async function run(input_number: number) {
  // Create a workflow context and send the initial event
  const { stream, sendEvent } = workflow.createContext();
  sendEvent(inputEvent.with(input_number));
 
  // Collect all events until we get a stopEvent
  const allEvents = await stream.until(resultEvent).toArray();
 
  // The last event will be the stopEvent that was requested
  const finalEvent = allEvents[allEvents.length - 1];
  if (resultEvent.include(finalEvent)) {
    console.log(`Result: ${finalEvent.data}`);
  }
}
 
run(42);
run(43);

Using Middleware

LlamaIndex workflows provide middleware that can enhance your workflows:

withState Middleware

The withState middleware adds a persistent state to your workflow context:

import { createStatefulMiddleware } from "@llama-flow/core/middleware/state";
 
// Define your state type
type CounterState = {
  count: number;
  history: number[];
};
 
// Create a workflow with state middleware
const { withState } = createStatefulMiddleware<CounterState>(() => ({
  count: 0,
  history: [],
}));
 
const workflow = withState(createWorkflow());
 
// Use the state in your handlers
workflow.handle([startEvent], () => {
  const { state } = getContext();
  state.count += 1;
  state.history.push(state.count);
  return incrementEvent.with(state.count);
});
 
workflow.handle([incrementEvent], (increment) => {
  const { state } = getContext();
  console.log("Current count:", state.count);
  console.log("History:", state.history);
  return resultEvent.with(state.count);
});
 
// Run the workflow
async function runWithState() {
  const { sendEvent, state } = workflow.createContext();
 
  // Send start event multiple times to see state update
  sendEvent(startEvent.with());
  sendEvent(startEvent.with());
  sendEvent(startEvent.with());
 
  // The state persists across events
  console.log("Final count:", state.count);
  console.log("Final history:", state.history);
}
 
runWithState();

You can also create a state with input:

const { withState } = createStatefulMiddleware(
  (input: { initialCount: number }) => ({
    count: input.initialCount,
    history: [input.initialCount],
  }),
);
 
const workflow = withState(createWorkflow());
const { state } = workflow.createContext({ initialCount: 10 });

withValidation Middleware

The withValidation middleware adds compile-time and runtime validation to your workflows:

import { createWorkflow, workflowEvent } from "@llama-flow/core";
import { withValidation } from "@llama-flow/core/middleware/validation";
 
const startEvent = workflowEvent<string, "start">();
const processEvent = workflowEvent<number, "process">();
const resultEvent = workflowEvent<string, "result">();
const disallowedEvent = workflowEvent<void, "disallowed">();
 
// Create a workflow with validation middleware
// Define allowed event paths
const workflow = withValidation(createWorkflow(), [
  [[startEvent], [processEvent]], // startEvent can only lead to processEvent
  [[processEvent], [resultEvent]], // processEvent can only lead to resultEvent
]);
 
// This will pass validation
workflow.strictHandle([startEvent], (sendEvent, start) => {
  sendEvent(processEvent.with(123)); // ✅ This is allowed
});
 
// This would fail at compile time and runtime
workflow.strictHandle([startEvent], (sendEvent, start) => {
  // sendEvent(disallowedEvent.with("disallowed")); // ❌ This would cause an error
  // sendEvent(resultEvent.with("result")); // ❌ This would also cause an error
});

Error Handling

LlamaIndex workflows provide built-in mechanisms for handling errors:

import { createWorkflow, workflowEvent, getContext } from "@llama-flow/core";
 
const startEvent = workflowEvent<string>();
const processEvent = workflowEvent<number>();
const errorEvent = workflowEvent<Error>();
const resultEvent = workflowEvent<string>();
 
const workflow = createWorkflow();
 
workflow.handle([startEvent], (start) => {
  try {
    const num = Number.parseInt(start.data, 10);
    if (isNaN(num)) {
      throw new Error("Invalid number");
    }
    return processEvent.with(num);
  } catch (err) {
    return errorEvent.with(err instanceof Error ? err : new Error(String(err)));
  }
});
 
workflow.handle([processEvent], (event) => {
  return resultEvent.with(`Result: ${event.data * 2}`);
});
 
workflow.handle([errorEvent], (event) => {
  return resultEvent.with(`Error: ${event.data.message}`);
});

You can also use the signal in getContext() to handle errors:

workflow.handle([processEvent], () => {
  const { signal } = getContext();
 
  signal.onabort = () => {
    console.error("Process aborted:", signal.reason);
    // Clean up resources
  };
 
  // Your processing logic here
});

Connecting with Server Endpoints

Workflow can be used as middleware in server frameworks like Express, Hono, or Fastify:

import { Hono } from "hono";
import { serve } from "@hono/node-server";
import { createHonoHandler } from "@llama-flow/core/interrupter/hono";
import { createWorkflow, workflowEvent } from "@llama-flow/core";
 
// Define events
const queryEvent = workflowEvent<string>();
const responseEvent = workflowEvent<string>();
 
// Create workflow
const workflow = createWorkflow();
 
workflow.handle([queryEvent], (event) => {
  const response = `Processed: ${event.data}`;
  return responseEvent.with(response);
});
 
// Create Hono app
const app = new Hono();
 
// Set up workflow endpoint
app.post(
  "/workflow",
  createHonoHandler(
    workflow,
    async (ctx) => queryEvent.with(await ctx.req.text()),
    responseEvent,
  ),
);
 
// Start server
serve(app, ({ port }) => {
  console.log(`Server started at http://localhost:${port}`);
});

Next Steps

Now that you've learned about basic workflow patterns, explore more advanced topics:

On this page