@trebired/tasks
Durable background task host for Bun and Node.js applications, with built-in progress state, step replay, live subscription bootstrap, stale detection, pluggable execution backends, and built-in durable Postgres and SQLite stores.
@trebired/tasks is the generic Trebired package for hosts that need real background work outside the request path without rebuilding the whole task observability layer around it later.
It owns:
- durable task records
- claiming, leasing, heartbeats, retries, and stale recovery
- normalized progress state
- ordered task steps
- snapshot and replay reads
- aggregate task state
- normalized lifecycle events
- live subscribe-plus-bootstrap flows
- optional transport and client-side helpers
It stays intentionally generic.
It does not know about products, deployments, repositories, publications, agents, server panels, or app-specific UI wording.
In plain terms:
- it is a durable task infrastructure and observability layer you embed into your host
- it is not a hosted queue service
- it is not tied to Redis
- it is not built around worker threads or Piscina
- it does not force Socket.IO, React, or a specific web framework into the core
Install
Runtime support: Bun 1+ and Node.js 18+.
npm install @trebired/tasksFor the Postgres driver:
npm install pgQuick Start
Define a task handler in a normal module:
import { defineTaskHandler } from "@trebired/tasks";
export default defineTaskHandler<{ reportId: string }, { outputPath: string }>({
async run(input, context) {
await context.setProgress({
percent: 10,
label: "loading inputs",
});
await context.appendStep({
level: "info",
message: `Preparing report ${input.reportId}`,
percent: 10,
});
await new Promise((resolve) => setTimeout(resolve, 500));
await context.setProgress({
percent: 100,
label: "done",
meta: {
reportId: input.reportId,
},
});
return {
outputPath: `/tmp/reports/${input.reportId}.json`,
};
},
});Create a lightweight local host with the generic SQLite store factory:
import {
createInProcessTaskExecutor,
createTaskStore,
preparePostgresTaskStoreSchema,
prepareTaskStoreSchema,
createTaskHost,
} from "@trebired/tasks";
import { join } from "node:path";
const path = join(process.cwd(), ".data", "tasks.sqlite");
await prepareTaskStoreSchema({
driver: "sqlite",
sqlite: {
path,
},
});
const tasks = createTaskHost({
store: createTaskStore({
driver: "sqlite",
sqlite: {
path,
},
}),
executor: createInProcessTaskExecutor(),
handlers: [
{
kind: "report.generate",
entrypoint: {
module: new URL("./handlers/report_task.ts", import.meta.url),
},
concurrency: {
limit: 2,
},
retry: {
maxAttempts: 3,
},
},
],
});
await tasks.start();
const queued = await tasks.enqueue("report.generate", {
reportId: "rpt_42",
}, {
dedupeKey: "report:rpt_42",
concurrencyKey: "report:rpt_42",
});
console.log(queued.task.id, queued.disposition);Read the current snapshot and recent steps:
const snapshot = await tasks.readSnapshot(queued.task.id, {
includeSteps: 20,
});
console.log(snapshot?.state, snapshot?.progress.percent, snapshot?.steps?.length);Lifecycle And Ownership Model
The host still owns:
- what each task kind actually does
- which permissions gate enqueue or subscribe access
- how task state is rendered in UI
- whether lifecycle data is mirrored into logs, metrics, or app-specific diagnostics
The package owns:
- task state transitions
- durable progress state
- step persistence and replay
- dedupe and supersedence mechanics
- live lifecycle normalization
- snapshot bootstrap for current state
- stale/watchdog state
- retention helpers
That split is deliberate. Apps should not need to rebuild generic task observability every time they add progress panels or live dashboards.
Task Model
The persisted task record still has the core durable states:
queuedclaimedrunningsucceededfailedcancelled
On top of that, the package now exposes a first-class lifecycle/progress state model:
queuedclaimedrunningretryingsucceededfailedcancelledstale
retrying and stale are package-owned lifecycle states that sit above the lower-level persisted status.
That means a host can build UI against snapshot.state instead of reverse-engineering retry and stale semantics from multiple raw fields.
Progress Model
Every snapshot includes a normalized progress contract:
type TaskProgressState = {
state: TaskLifecycleState;
percent: number | null;
label: string | null;
meta: Record<string, unknown> | null;
startedAt: string | null;
updatedAt: string | null;
finishedAt: string | null;
retryScheduledAt: string | null;
staleAt: string | null;
staleReason: string | null;
lastHeartbeatAt: string | null;
};Handlers update the task-facing parts of that model through:
await context.setProgress({
percent: 35,
label: "transcoding",
meta: {
frame: 1820,
},
});The engine owns the lifecycle-facing parts such as:
statestartedAtfinishedAtretryScheduledAtstaleAtlastHeartbeatAt
Steps Model
Task steps are first-class persisted records, not just an app convention:
type TaskStepRecord = {
id: string;
taskId: string;
attempt: number;
kind: TaskStepKind;
level: TaskStepLevel;
message: string;
meta: Record<string, unknown> | null;
percent: number | null;
createdAt: string;
};Handlers append them through:
await context.appendStep({
level: "info",
kind: "checkpoint",
message: "Source downloaded",
percent: 20,
});These steps are normalized the same way whether they came from a child-process executor, a future custom executor, or host-owned testing infrastructure.
Snapshots, Replay, And Aggregate Reads
The host now exposes package-owned read APIs for UI and dashboard usage:
tasks.readSnapshot(taskId, { includeSteps? })tasks.listSnapshots(query?)tasks.readAggregate(query?)tasks.bootstrap(query?)
That means an app can ask for:
- the current task snapshot for a modal or detail panel
- recent steps for replay
- current aggregate counts for a dashboard
- a bootstrap payload for subscribe-plus-live flows
Example:
const bootstrap = await tasks.bootstrap({
channels: [
taskChannel.scope("workspace:42"),
],
recentSteps: 25,
});
console.log(bootstrap.snapshots.length, bootstrap.aggregate?.byState.running);Channel And Scope Model
@trebired/tasks now has a package-owned channel model for grouping tasks and subscriptions.
Built-in helpers:
import { taskChannel } from "@trebired/tasks";
taskChannel.task("task_42");
taskChannel.kind("report.generate");
taskChannel.topic("imports");
taskChannel.resource("repo:42");
taskChannel.correlation("request:abc");
taskChannel.dedupe("report:rpt_42");
taskChannel.concurrency("report:rpt_42");
taskChannel.supersede("scan:repo_7");
taskChannel.scope("workspace:42");Tasks can also carry extra host-defined channels directly:
await tasks.enqueue("report.generate", input, {
channels: [
taskChannel.scope("workspace:42"),
taskChannel.scope("sidebar"),
],
});This keeps subscription routing generic and typed without forcing every app to invent its own free-form key naming scheme.
Subscription queries can also filter directly by package-owned keys:
const bootstrap = await tasks.bootstrap({
dedupeKey: "report:rpt_42",
concurrencyKey: "report:rpt_42",
supersedeKey: "scan:repo_7",
});Live Subscription Model
The package now ships a generic live hub for real-time UI flows:
import { createTaskLiveHub } from "@trebired/tasks";
const hub = createTaskLiveHub(tasks);Main methods:
hub.bootstrap(query?)hub.subscribe(query, listener)
The live hub delegates bootstrap reads back to the host, so polling, direct reads, and live subscribe-plus-bootstrap all share one canonical snapshot path.
The flow is intentionally explicit:
- client or transport subscribes with a query
- the listener immediately receives a bootstrap payload
- the listener then receives normalized live updates
The messages are transport-neutral:
type TaskLiveMessage =
| {
type: "bootstrap";
query: TaskSubscriptionQuery;
snapshots: TaskSnapshot[];
steps: Record<string, TaskStepRecord[]>;
aggregate: TaskAggregateSnapshot | null;
timestamp: string;
}
| {
type: "event";
event: TaskLifecycleEvent;
timestamp: string;
};Normalized Lifecycle Events
Apps no longer need to translate low-level host events into UI-friendly lifecycle updates.
The host exposes:
tasks.onEvent(listener)for lower-level host eventstasks.onLifecycleEvent(listener)for normalized lifecycle events
Lifecycle event names:
enqueuedclaimedstartedprogressstepretriedsucceededfailedcancelledstalelease_lost
Each normalized event carries the current snapshot when one exists, plus the step record for step events.
Generic Event Entries And Adapters
When a consumer wants timeline rows, log entries, websocket payloads, or diagnostics records, the package now exposes presentation-friendly event entry helpers too:
import {
createTaskLifecycleEventAdapter,
normalizeTaskHostEventEntry,
} from "@trebired/tasks";The normalized entry shape stays generic:
type TaskEventEntry = {
type: TaskEventEntryType;
level: TaskStepLevel;
message: string;
percent: number | null;
timestamp: string;
metadata: Record<string, unknown> | null;
runnerId: string | null;
taskId: string | null;
kind: string | null;
state: TaskLifecycleState | null;
channels: string[];
stepId: string | null;
};Use the direct normalizers when you already have an event object:
const entry = normalizeTaskHostEventEntry(event);
console.log(entry.type, entry.message, entry.percent);Use the adapters when you want to forward normalized entries straight into another sink:
tasks.onLifecycleEvent(createTaskLifecycleEventAdapter((entry) => {
publishToTimeline(entry);
}));This keeps event parsing package-owned while still letting each host choose its own logging, websocket, notification, or storage layer.
Bootstrap Plus Replay Flow
The preferred real-time flow is:
- build a query by task id, kind, or channels
- call
hub.subscribe(query, listener) - use the bootstrap snapshot immediately
- apply later live events on top
Example:
const unsubscribe = await hub.subscribe({
channels: [
taskChannel.scope("workspace:42"),
],
recentSteps: 50,
}, (message) => {
if (message.type === "bootstrap") {
console.log(message.snapshots.length);
return;
}
console.log(message.event.event, message.event.snapshot?.progress.percent);
});That bootstrap-first model is package-owned specifically so apps do not need to invent one-off “give me current state, then also subscribe” protocols every time.
Tiny Client Helper
For framework-agnostic client-side state, use:
import { createTaskLiveTracker } from "@trebired/tasks";
const tracker = createTaskLiveTracker();
await hub.subscribe(query, (message) => {
tracker.apply(message);
});
const state = tracker.getState();
console.log(state.snapshots[0]?.progress.percent);The tracker owns:
- bootstrap application
- later event application
- snapshot replacement
- step accumulation
- aggregate refresh from current snapshots
It stays intentionally small. It does not assume React, Vue, Svelte, or a browser runtime.
Optional Socket.IO Bridge
The core live model does not hardcode Socket.IO.
If a host already uses Socket.IO-style transport, attachTaskLiveSocketBridge() can wire the live hub into a socket server without pulling Socket.IO into the rest of the package model:
import {
attachTaskLiveSocketBridge,
createTaskLiveHub,
} from "@trebired/tasks";
const hub = createTaskLiveHub(tasks);
attachTaskLiveSocketBridge(io, {
hub,
subscribeEvent: "tasks:subscribe",
publishEvent: "tasks:live",
});The bridge expects a Socket.IO-like shape through duck typing. The package does not require a direct runtime dependency on socket.io.
Dedupe And Supersedence
Deduplication is now surfaced explicitly through the enqueue result:
const result = await tasks.enqueue("import.users", payload, {
dedupeKey: "import:users:2026-06-21",
});
console.log(result.disposition);Possible enqueue dispositions:
created: a new task record was createdreused: an existing open task was reusedsuperseded: a new task was created and older matching active tasks were replaced
Use supersedence when the newest task should replace older open work:
const result = await tasks.enqueue("scan.repository", payload, {
supersedeKey: "repo:42",
supersedeExisting: true,
});
console.log(result.supersededTaskIds);This is useful for “already in progress” and “newer request replaced older request” UX without each app inventing its own semantics.
Stale And Watchdog Behavior
The package now owns stale/watchdog mechanics too.
Runner options:
const tasks = createTaskHost({
store,
runner: {
watchdogMs: 60_000,
watchdogScanIntervalMs: 5_000,
},
});What that means:
- if a claimed or running task stops reporting heartbeat or progress for too long, it can become
stale - stale state is reflected in snapshots through
progress.staleAtandprogress.staleReason - normalized lifecycle can emit
stale - expired leases can still be requeued separately through the durable recovery path
In practice this lets UIs show:
- “still running”
- “retry scheduled”
- “stale, runner may be gone”
without app-owned timeout heuristics.
Persistence Policy Helpers
The store now exposes a package-owned retention interface:
await tasks.compact({
successTtlMs: 7 * 24 * 60 * 60 * 1000,
failedTtlMs: 30 * 24 * 60 * 60 * 1000,
stepLimitPerTask: 200,
keepLatestSuccessesPerKind: 20,
keepLatestFailuresPerKind: 20,
});Runner-managed automatic compaction is also supported through:
const tasks = createTaskHost({
store,
runner: {
retentionPolicy: {
successTtlMs: 7 * 24 * 60 * 60 * 1000,
stepLimitPerTask: 200,
},
retentionScanIntervalMs: 60_000,
},
});This covers generic retention concerns such as:
- step history limits
- TTL-based cleanup
- keeping only recent successes or failures per kind
Storage Adapter Model
The core engine depends on a TaskStore contract, but the public happy path is now backend-agnostic.
Use the generic store factory:
import {
createTaskStore,
prepareTaskStoreSchema,
} from "@trebired/tasks";SQLite for local durable hosts:
const path = "/var/lib/tasks/tasks.sqlite";
await prepareTaskStoreSchema({
driver: "sqlite",
sqlite: {
path,
},
});
const store = createTaskStore({
driver: "sqlite",
sqlite: {
path,
},
});Postgres for multi-process or externally managed database hosts:
await prepareTaskStoreSchema({
driver: "postgres",
postgres: {
client: pool,
schema: "public",
tablePrefix: "tb_",
},
});
const store = createTaskStore({
driver: "postgres",
postgres: {
client: pool,
},
});Use the package-owned schema preparation helper during startup:
import { preparePostgresTaskStoreSchema } from "@trebired/tasks";
await preparePostgresTaskStoreSchema({
client: pool,
schema: "public",
tablePrefix: "tb_",
});That path is:
- idempotent
- safe to call repeatedly on boot
- responsible for fresh schema creation and additive package-owned upgrades
If the host wants the raw SQL for inspection or external migrations, the package still exposes:
import { createPostgresTaskStoreSchema } from "@trebired/tasks";
const sql = createPostgresTaskStoreSchema({
schema: "public",
tablePrefix: "tb_",
});The same generic shape works for schema text generation too:
const sqliteSql = createTaskStoreSchema({
driver: "sqlite",
});Then create the adapter:
import { createPostgresTaskStore } from "@trebired/tasks";
const store = createPostgresTaskStore({
client: pool,
});The explicit backend adapters remain available too:
createPostgresTaskStore(...)preparePostgresTaskStoreSchema(...)createPostgresTaskStoreSchema(...)createSqliteTaskStore(...)prepareSqliteTaskStoreSchema(...)createSqliteTaskStoreSchema(...)
The Postgres adapter targets pg-style pools with:
query(sql, params?)connect()- pooled client
release()
That keeps claim and lease behavior transaction-owned instead of pretending a stateless query function is enough for a durable task engine.
Executor Model
The core engine still does not assume worker threads.
Execution is abstracted behind TaskExecutor.
The package now ships two first-class generic executors:
createChildProcessTaskExecutor()createInProcessTaskExecutor()
Use the in-process executor when the host wants task execution in the current runtime:
import {
createInProcessTaskExecutor,
createTaskHost,
} from "@trebired/tasks";
const tasks = createTaskHost({
store,
handlers,
executor: createInProcessTaskExecutor(),
});The in-process executor owns:
- handler module loading and export resolution
- cooperative cancellation through
AbortSignal - progress and step forwarding
- normalized failure shaping
The default still remains createChildProcessTaskExecutor().
Why child process first:
- it works cleanly across Bun and Node hosts
- it keeps the package boundary runtime-agnostic
- it isolates heavy work and crashes better than in-process execution
- it avoids making Node-specific worker-thread behavior the default mental model
Why Piscina is still only a future optional adapter:
- Piscina is useful for Node-specific worker-thread workloads
- Bun and Node do not share the same worker-thread assumptions
- making Piscina the core would leak a runtime-specific execution choice into the package’s main API
In other words, child process is the conservative generic default. Piscina can fit later as an adapter without redefining the package.
Why in-process is still not the default:
- cancellation is cooperative, so the handler must respect
AbortSignal - heavy synchronous work can still block the host event loop
- process isolation is still the safer default for generic package-owned execution
Core API
Main host entrypoint:
import { createTaskHost } from "@trebired/tasks";
const tasks = createTaskHost({
store,
handlers,
});Main methods:
tasks.start()tasks.stop()tasks.registerHandler(handler)tasks.enqueue(kind, input, options?)tasks.getTask(taskId)tasks.listTasks(query?)tasks.listTaskSteps(taskId, query?)tasks.readSnapshot(taskId, options?)tasks.listSnapshots(query?)tasks.readAggregate(query?)tasks.bootstrap(query?)tasks.cancel(taskId, reason?)tasks.compact(policy?)tasks.onEvent(listener)tasks.onLifecycleEvent(listener)tasks.getState()
Progress Bar Example
For a progress bar, the snapshot model is enough:
const snapshot = await tasks.readSnapshot(taskId);
const percent = snapshot?.progress.percent ?? 0;
const label = snapshot?.progress.label || snapshot?.state || "queued";With live updates:
await hub.subscribe({
taskIds: [taskId],
}, (message) => {
tracker.apply(message);
const current = tracker.getState().snapshots[0];
renderProgressBar(current?.progress.percent || 0, current?.progress.label || current?.state || "queued");
});Live Modal Or Task Panel Example
For a current-task panel, the usual flow is:
await hub.subscribe({
channels: [
taskChannel.scope("workspace:42"),
],
recentSteps: 100,
}, (message) => {
tracker.apply(message);
const state = tracker.getState();
renderTaskPanel({
tasks: state.snapshots,
steps: state.steps,
aggregate: state.aggregate,
});
});That gives the panel:
- current task states
- ordered recent steps per task
- aggregate counts for summary badges
without app-owned event reconstruction.
Current API
The first public slice is still deliberate rather than huge:
import {
attachTaskLiveSocketBridge,
createChildProcessTaskExecutor,
createInProcessTaskExecutor,
createPostgresTaskStore,
preparePostgresTaskStoreSchema,
createPostgresTaskStoreSchema,
createSqliteTaskStore,
prepareSqliteTaskStoreSchema,
createSqliteTaskStoreSchema,
createTaskHostEventAdapter,
createTaskHost,
createTaskLifecycleEventAdapter,
createTaskLiveHub,
createTaskLiveTracker,
createTaskStore,
createTaskStoreSchema,
defineTaskHandler,
normalizeTaskHostEventEntry,
normalizeTaskLifecycleEventEntry,
prepareTaskStoreSchema,
taskChannel,
} from "@trebired/tasks";Important exported types include:
TaskRecordTaskSnapshotTaskProgressStateTaskStepRecordTaskLifecycleEventTaskEventEntryTaskSubscriptionQueryTaskSubscriptionBootstrapTaskStoreTaskStoreFactoryOptionsTaskExecutorTaskRetentionPolicy
Examples
Durable Postgres + child-process execution:
Durable SQLite + in-process execution:
Live bootstrap + tracker flow:
Notes And Limitations
- The generic public store factory currently ships with Postgres and SQLite drivers.
- The default executor is child-process based. Worker-thread and Piscina adapters can be added later behind the same
TaskExecutorcontract. - The Socket.IO bridge is intentionally thin and optional. The core live contract remains transport-agnostic.
- Node child-process handlers should usually point at runnable JavaScript modules unless the host already provides a loader for TypeScript modules. Bun can run
.tstask entrypoints directly. - The SQLite driver uses Bun's builtin SQLite support when available, falls back to Node's
node:sqlitewhen available, and otherwise usesbetter-sqlite3. - The retention helpers are generic package-owned policies, not a replacement for app-specific archival decisions.