@spfn/workflow
Lightweight workflow engine - Pipeline orchestration based on @spfn/core Jobs
Overview
@spfn/workflow is a workflow engine that defines and executes complex business processes by chaining multiple Jobs together.
┌─────────┐ data ┌─────────┐ data ┌─────────┐
│ Job A │ ────────▶ │ Job B │ ────────▶ │ Job C │
└─────────┘ └─────────┘ └─────────┘
│ │ │
└─────────────────────┴─────────────────────┘
Workflow
Installation
pnpm add @spfn/workflowPrerequisites
@spfn/coreserver with database enabled (PostgreSQL)- Workflow tables (
spfn_workflow.executions,spfn_workflow.step_executions) are created automatically via Drizzle migration
Core Concepts
| Concept | Description |
|---|---|
| Job | Independent unit of work (defined in @spfn/core) |
| Workflow | Pipeline that chains Jobs together |
| Step | Individual execution unit within a Workflow |
| Engine | Workflow execution and state management |
Quick Start
1. Define Jobs (@spfn/core)
import { job } from '@spfn/core/job';
import { Type } from '@sinclair/typebox';
const createRepo = job('create-repo')
.input(Type.Object({
tenantId: Type.String(),
}))
.output(Type.Object({
repoId: Type.String(),
repoUrl: Type.String(),
}))
.compensate(async (input, output) => {
// Compensation logic executed during rollback
await gitea.deleteRepo(output.repoId);
})
.handler(async (input) => {
const repo = await gitea.create(input.tenantId);
return { repoId: repo.id, repoUrl: repo.url };
});2. Define Workflow
import { workflow } from '@spfn/workflow';
export const provisionTenant = workflow('provision-tenant')
.input(Type.Object({
tenantId: Type.String(),
plan: Type.String(),
}))
.resumable(true) // Resume from failure point
.rollback(true) // Enable rollback on failure
// Sequential execution
.pipe(createPodIdentity, (ctx) => ({
tenantId: ctx.input.tenantId,
plan: ctx.input.plan,
}))
// Parallel execution
.parallel({
appRepo: [createAppRepo, (ctx) => ({
tenantId: ctx.input.tenantId,
})],
gitopsRepo: [createGitopsRepo, (ctx) => ({
tenantId: ctx.input.tenantId,
})],
})
// Reference previous results (type-inferred)
.pipe(notifyComplete, (ctx) => ({
tenantId: ctx.input.tenantId,
appRepoUrl: ctx.results.appRepo.repoUrl,
gitopsRepoUrl: ctx.results.gitopsRepo.repoUrl,
}))
.build();3. Define Workflow Router
// workflow.router.ts
import { defineWorkflowRouter } from '@spfn/workflow';
export const workflowRouter = defineWorkflowRouter([
provisionTenant,
deprovisionTenant,
]);4. Register in Server Config
// server.config.ts
import { defineServerConfig } from '@spfn/core/server';
import { workflowRouter } from './workflow.router';
export default defineServerConfig()
.routes(appRouter)
.workflows(workflowRouter)
.build();Server starts → DB initialized → Workflow engine auto-initialized. After this, workflowRouter.engine is ready to use.
5. Execute Workflow
Use workflowRouter.engine in route handlers or services after the server has started.
// In a route handler or service
import { workflowRouter } from './workflow.router';
// Execute (async)
const execution = await workflowRouter.engine.start('provision-tenant', {
tenantId: 'abc',
plan: 'pro',
});
// Check status
const status = await workflowRouter.engine.get(execution.id);
// Get step output
const output = await workflowRouter.engine.getStepOutput(execution.id, 'appRepo');API Reference
Workflow Builder
workflow(name)
Creates a new workflow builder.
const wf = workflow('my-workflow');.input(schema)
Defines the input schema (TypeBox).
.input(Type.Object({
tenantId: Type.String(),
plan: Type.Union([Type.Literal('free'), Type.Literal('pro')]),
})).pipe(job, mapper)
Adds a sequential execution step. The mapper receives a WorkflowContext with:
ctx.input— Original workflow inputctx.results— Results from previous steps (type-inferred)ctx.execution— Execution metadata (id,workflowName,startedAt)
.pipe(createRepo, (ctx) => ({
tenantId: ctx.input.tenantId,
})).parallel(steps)
Adds parallel execution steps.
.parallel({
appRepo: [createAppRepo, (ctx) => ({ tenantId: ctx.input.tenantId })],
gitopsRepo: [createGitopsRepo, (ctx) => ({ tenantId: ctx.input.tenantId })],
}).resumable(enabled)
Sets whether the workflow can resume from failure point.
true: Skip completed steps and restart from failure pointfalse(default): Restart from beginning
.rollback(enabled)
Sets whether rollback is enabled on failure.
true(default): Executecompensatefunctions of completed steps in reverse orderfalse: No rollback
.notify(config)
Adds notification configuration.
.notify({
on: ['failed', 'completed'],
when: (event) => event.input.plan === 'pro', // Conditional (optional)
providers: [consoleProvider, slackProvider],
}).build()
Completes the workflow definition.
Workflow Router
defineWorkflowRouter(workflows)
Defines a workflow router for server registration. The engine is lazily initialized when the server starts.
import { defineWorkflowRouter } from '@spfn/workflow';
const router = defineWorkflowRouter([provisionTenant, deprovisionTenant]);router.engine
Access the workflow engine instance. Throws if the server has not been started yet.
const execution = await router.engine.start('provision-tenant', {
tenantId: 'abc',
plan: 'pro',
});router.isInitialized
Check if the workflow engine has been initialized.
isWorkflowRouter(value)
Type guard to check if a value is a WorkflowRouter.
import { isWorkflowRouter } from '@spfn/workflow';
if (isWorkflowRouter(value))
{
value.engine.start(...);
}Workflow Engine
createWorkflowEngine(options)
Creates a workflow engine directly (low-level API). Prefer defineWorkflowRouter for typical usage.
const engine = createWorkflowEngine({
workflows: [provisionTenant],
db: database,
storage: s3Storage, // Optional
largeOutputThreshold: 1024 * 1024, // 1MB (optional)
logger: customLogger, // Optional (defaults to console)
validateInput: true, // Optional (default: true)
});| Option | Type | Default | Description |
|---|---|---|---|
workflows |
WorkflowDef[] |
(required) | Workflow definitions |
db |
unknown |
(required) | Drizzle database instance |
storage |
OutputStorage |
- | External storage for large outputs |
largeOutputThreshold |
number |
1048576 (1MB) |
Byte threshold for external storage |
logger |
WorkflowLogger |
defaultLogger |
Custom logger |
validateInput |
boolean |
true |
Validate input against schema |
engine.start(name, input)
Executes a workflow asynchronously.
const execution = await engine.start('provision-tenant', {
tenantId: 'abc',
plan: 'pro',
});
// { id: 'exec-123', workflowName: 'provision-tenant', status: 'pending' }engine.get(executionId)
Gets execution status.
const status = await engine.get('exec-123');
// { id, workflowName, status, input, steps: [...], createdAt, ... }engine.getStepOutput(executionId, stepName)
Gets the output of a specific step.
const output = await engine.getStepOutput('exec-123', 'appRepo');
// { repoId: 'repo-abc', repoUrl: 'https://...' }engine.list(options?)
Lists executions.
const list = await engine.list({
workflowName: 'provision-tenant',
status: 'failed',
limit: 10,
offset: 0,
});engine.retry(executionId)
Retries a failed workflow.
resumable: true: Resume from failure pointresumable: false: Restart from beginning
await engine.retry('exec-123');engine.cancel(executionId, options?)
Cancels a running workflow.
await engine.cancel('exec-123');
await engine.cancel('exec-123', { rollback: true }); // With rollbackengine.subscribe(executionId, callback)
Subscribes to execution events.
const unsubscribe = engine.subscribe('exec-123', (event) => {
console.log(event.type, event.stepName);
});
// Unsubscribe
unsubscribe();Notification Providers
Only consoleProvider is provided by default. For email, SMS, Slack, etc., implement your own provider using @spfn/notification.
consoleProvider
Logs events to console (built-in).
import { consoleProvider } from '@spfn/workflow';
.notify({
on: ['failed'],
providers: [consoleProvider],
})formatEventAsText(event)
Helper function to format workflow events as plain text. Useful when implementing custom providers.
import { formatEventAsText } from '@spfn/workflow';
const text = formatEventAsText(event);
// Output:
// Workflow: provision-tenant
// Event: failed
// Execution ID: exec-123
// Timestamp: 2024-01-01T00:00:00.000Z
// Error: Connection timeoutCustom Providers with @spfn/notification
Implement custom notification providers using @spfn/notification for email, SMS, Slack, etc.
Email Provider
import { formatEventAsText } from '@spfn/workflow';
import { sendEmail } from '@spfn/notification/server';
import type { NotificationProvider } from '@spfn/workflow';
const emailProvider: NotificationProvider = {
name: 'email',
async notify(event)
{
await sendEmail({
to: 'admin@example.com',
subject: `[Workflow] ${event.workflowName}: ${event.type}`,
text: formatEventAsText(event),
});
},
};
// Use in workflow
workflow('provision-tenant')
.pipe(...)
.notify({
on: ['failed', 'completed'],
providers: [emailProvider],
})
.build();SMS Provider
import { sendSMS } from '@spfn/notification/server';
import type { NotificationProvider } from '@spfn/workflow';
const smsProvider: NotificationProvider = {
name: 'sms',
async notify(event)
{
const message = `[${event.workflowName}] ${event.type}` +
(event.error ? `: ${event.error}` : '');
await sendSMS({
to: '+821012345678',
message,
});
},
};Slack Provider
import type { NotificationProvider } from '@spfn/workflow';
const slackProvider: NotificationProvider = {
name: 'slack',
async notify(event)
{
await fetch(process.env.SLACK_WEBHOOK_URL!, {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({
text: `[${event.workflowName}] ${event.type}`,
attachments: [{
color: event.type === 'failed' ? 'danger' : 'good',
fields: [
{ title: 'Execution ID', value: event.executionId, short: true },
{ title: 'Event', value: event.type, short: true },
],
}],
}),
});
},
};Combined Notifications
.notify() accepts a single configuration. To handle multiple event types with different providers, include all providers in one call and use the when condition for filtering:
import { consoleProvider } from '@spfn/workflow';
workflow('provision-tenant')
.pipe(...)
.notify({
on: ['started', 'completed', 'failed'],
providers: [consoleProvider, emailProvider, slackProvider],
})
.build();Events
Events emitted during workflow execution:
| Event | Description |
|---|---|
started |
Workflow execution started |
step.started |
Step execution started |
step.completed |
Step execution completed |
step.failed |
Step execution failed |
completed |
Entire workflow completed |
failed |
Workflow failed |
cancelled |
Workflow cancelled |
interface WorkflowEvent {
type: 'started' | 'step.started' | 'step.completed' | ...;
workflowName: string;
executionId: string;
stepName?: string;
stepIndex?: number;
input?: unknown;
output?: unknown;
error?: string;
timestamp: Date;
}Status Flow
Workflow Status
pending → running → completed
→ failed → compensating → compensated
→ cancelled
| Status | Description |
|---|---|
pending |
Waiting for execution |
running |
Currently executing |
completed |
Successfully completed |
failed |
Failed |
compensating |
Rollback in progress |
compensated |
Rollback completed |
cancelled |
Cancelled |
Step Status
| Status | Description |
|---|---|
pending |
Waiting for execution |
running |
Currently executing |
completed |
Completed |
failed |
Failed |
skipped |
Skipped |
compensated |
Compensation completed |
Rollback Strategy
Execution: Step1 ✓ → Step2 ✓ → Step3 ✗
Rollback: Step2.compensate() → Step1.compensate()
(Step3 doesn't need compensation as it failed)
(Continue even if compensation fails)
- Rollback only executes for Jobs with
compensatefunction defined - Continues with other compensations even if one fails
- Executes in reverse order (from last completed step)
Database Entities
All tables are created in the spfn_workflow schema.
spfn_workflow.executions
{
id: string;
workflowName: string;
status: WorkflowStatus;
input: unknown; // jsonb
currentStep: number;
error?: string;
createdAt: Date;
updatedAt: Date;
completedAt?: Date;
}spfn_workflow.step_executions
{
id: string;
executionId: string; // FK (cascade delete)
stepName: string;
stepIndex: number;
status: WorkflowStepStatus;
output?: unknown; // jsonb or { $ref: url }
error?: string;
startedAt?: Date;
completedAt?: Date;
createdAt: Date;
updatedAt: Date;
}Large Output Handling
When step output exceeds threshold (default 1MB), it's stored in external storage.
// Storage interface
interface OutputStorage {
upload(data: unknown): Promise<string>; // Returns URL
download(url: string): Promise<unknown>;
}
// Engine configuration
const engine = createWorkflowEngine({
workflows: [...],
db: database,
storage: {
async upload(data) {
const key = `outputs/${crypto.randomUUID()}.json`;
await s3.putObject({ Key: key, Body: JSON.stringify(data) });
return `s3://${bucket}/${key}`;
},
async download(url) {
const key = url.replace(`s3://${bucket}/`, '');
const { Body } = await s3.getObject({ Key: key });
return JSON.parse(await Body.transformToString());
},
},
largeOutputThreshold: 1024 * 1024, // 1MB
});Project Structure
src/
├── index.ts # Public exports
├── builder/ # Workflow definition API
│ ├── workflow-builder.ts # Builder implementation
│ └── types.ts # Type definitions
├── engine/ # Execution engine
│ ├── workflow-engine.ts # Engine implementation
│ └── types.ts # Type definitions
├── entities/ # DB entities
│ ├── schema.ts # PostgreSQL schema definition
│ ├── workflow-execution.entity.ts
│ └── workflow-step-execution.entity.ts
├── notification/ # Notification system
│ ├── providers.ts # Built-in providers
│ └── types.ts # Type definitions
├── config/ # Configuration utilities
│ ├── workflow-router.ts # defineWorkflowRouter, isWorkflowRouter
│ └── types.ts # WorkflowRouter, WorkflowRouterConfig
└── types/ # Common types
└── status.ts # Status types
Design Principles
| Principle | Description |
|---|---|
| Step Independence | Steps (Jobs) are unaware of other steps |
| Step Reusability | A single step can participate in multiple workflows |
| Composition | New workflows = combination of existing steps |
| Explicit Mapping | Data transfer between steps is explicitly defined |
| Type Safety | Only registered workflows can be executed, input/output types are inferred |
License
MIT