@prostojs/redisjm
@prostojs/redisjm
Redis Job Manager for distributed job queues in Kubernetes-like multi-instance environments.
When running multiple instances of the same application, @prostojs/redisjm ensures that job runs are queued exactly once and picked up by only one instance. It uses Redis for queue management, distributed locking, heartbeat monitoring, and job lifecycle tracking.
Features
- Redis-backed job queue with atomic distributed locking (Redis Set +
SADD) - Ensures only one job
runIdis scheduled — running and stale jobs also block the queue - Priority queue support (
queueFirstfor urgent jobs) - Automatic heartbeat monitoring to detect stale/abandoned jobs
- Progress tracking (0-1) and custom attributes per job
- Event system (start, finish, error, heartbeat, update) built on hookable
- Built-in maintenance job for stale detection and log cleanup
- Polling-based job execution with
start/ awaitablestop(graceful drain) - Failures visible by default — handler errors are logged (configurable / silenceable)
- Rolling-deploy resilient — a job whose handler isn't registered yet is re-queued for a sibling instance instead of dropped
- Target group isolation — multiple app groups can share the same Redis instance
- TypeScript with full generic type inference for job inputs and custom attributes
Installation
pnpm add @prostojs/redisjmQuick Start
import Redis from 'ioredis'
import { RedisJM } from '@prostojs/redisjm'
const redis = new Redis()
const manager = new RedisJM(redis, 'my-app')
// Create a job
const emailJob = manager.createJob(
{ jobName: 'send-email' },
async (inputs: { to: string; subject: string }, ctx) => {
await ctx.setProgress(0.5)
// ... send email logic
await ctx.setProgress(1)
}
)
// Queue a job run
const queued = await emailJob.queue('daily-digest-2024-01-15', {
to: 'user@example.com',
subject: 'Daily Digest',
})
console.log(queued) // true if queued, false if already locked
// Start processing the queue
manager.start(1000) // poll every 1 secondRedis Key Structure
Three Redis structures per target group:
| Key pattern | Redis type | Purpose |
|---|---|---|
redisjm:{tg}:queue |
List | Ordered queue of jobId strings (RPUSH/LPUSH + LPOP) |
redisjm:{tg}:locks |
Set | Locked jobIds (queued + running + stale). Atomic via SADD |
redisjm:{tg}:log |
Hash | jobId -> JSON record with full job state |
API Reference
RedisJM
The main job manager class. Uses Redis to manage job queues, locks, and the job log.
Constructor
new RedisJM(redis: Redis, targetGroup: string, options?: RedisJMOptions)redis-- An ioredis client instancetargetGroup-- A string prefix for all Redis keys; only clients sharing the same target group share queues and locksoptions-- Optional configuration:
| Option | Default | Description |
|---|---|---|
heartbeatInterval |
5000 |
Milliseconds between heartbeat updates during job execution |
roundsToStale |
2 |
Number of missed heartbeat intervals before a job is considered stale |
keepFinishedInterval |
0 |
Milliseconds to keep finished/error/stale records in the log (0 = remove immediately — see the caveat under Job Statuses) |
maintenanceInterval |
heartbeatInterval * roundsToStale |
Milliseconds between automatic maintenance enqueues while start() is polling (0 = disable auto-maintenance) |
unknownJobRequeueLimit |
5 |
Times a job whose name isn't registered on the popping instance is re-queued (lock held) for a sibling instance before being dropped as an error. 0 restores the legacy drop-on-first-pop behavior |
logger |
console.error |
Sink (message, error?) => void for operational errors (handler throws, unknown/dropped jobs, poll-loop failures). Pass false to silence default logging |
Methods
createJob<TInputs, TAttrs>(metadata, fn): Job<TInputs, TAttrs>
Creates a new Job instance and registers it with this manager. Job names must be unique per manager.
const job = manager.createJob(
{ jobName: 'process-order' },
async (inputs: { orderId: string }, ctx) => {
await ctx.setProgress(0.5)
await ctx.setAttrs({ step: 'processing' })
// ...
}
)queue<TInputs>(job, runId, inputs): Promise<boolean>
Adds a job run to the end of the queue. Returns true if successfully queued, false if the jobId is already locked (queued, running, or stale).
const success = await manager.queue(job, 'order-123', { orderId: '123' })Dedupe is on the lock, not the log record.
queue()dedupes onjobName#runIdin the locks set, which releases on finish/error (or when maintenance reclaims an orphan). Two consequences for integrators:
- Always check the boolean return.
falsemeans "already locked" — a caller that ignores it reports false success.- A reused
runIdcan be swallowed by a stale/orphaned lock after a hard kill until maintenance reclaims it (bounded bymaintenanceInterval, or indefinitely if auto-maintenance is off). For idempotent manual triggers where you want a fresh run regardless, prefer a uniquerunIdper trigger (e.g. append a timestamp). This does not apply to the maintenance job, which deliberately uses a stable emptyrunIdto self-dedupe.
queueFirst<TInputs>(job, runId, inputs): Promise<boolean>
Same as queue but adds to the front of the queue (priority insert).
await manager.queueFirst(job, 'urgent-order', { orderId: '456' })isQueued(jobId): Promise<boolean>
Checks if a jobId is currently locked (queued, running, or stale).
const locked = await manager.isQueued('process-order#order-123')list(): Promise<JobLogRecord[]>
Returns all job log records (all statuses). A single corrupt/foreign hash field is skipped (and logged) rather than throwing.
const records = await manager.list()get(jobId): Promise<JobLogRecord | undefined>
Fetches a single record by jobId ("jobName#runId"), or undefined if absent — an O(1) HGET instead of scanning list(). Handy for "did my trigger finish?" endpoints.
const record = await manager.get('process-order#order-123')With the default
keepFinishedInterval: 0, a successful run's record is deleted the instant it finishes, soget()returnsundefined—undefinedmeans "no record", not "never ran". SetkeepFinishedInterval > 0to observe terminal states, or wire thefinish/errorhooks.
unqueue(jobId): Promise<void>
Removes a job from the queue, locks, and log entirely.
await manager.unqueue('process-order#order-123')popAndExecute(): Promise<boolean>
Pops the next job from the queue, matches it to a registered Job instance by name, and executes it. Returns true if a job was popped (even if execution failed), false if the queue was empty.
If the job name is not registered on this instance, it is re-queued (lock held) up to unknownJobRequeueLimit times so a sibling instance that does register the handler — e.g. a freshly deployed pod — can claim it. Once the budget is exhausted the record is set to status "error" with error "Job name is unknown" and the lock is released. Handler failures, unknown-name drops, and missing-record drops are reported through the configured logger.
start(interval): void
Starts a polling loop that calls popAndExecute(). When a job is executed, the next poll fires immediately. When the queue is empty, waits interval ms before the next poll. Throws TypeError if interval is not a positive number.
Unless maintenanceInterval is 0, start() also auto-wires maintenance: it first proactively reclaims a maintenance lock orphaned by a hard-killed instance (which would otherwise deadlock — maintenance can't reclaim its own lock), then enqueues the built-in maintenance job once immediately and every maintenanceInterval ms thereafter. All instances enqueue concurrently — the lock ensures only one maintenance run executes at a time.
manager.start(1000) // poll every 1 second when idlestop(): Promise<void>
Stops the polling loop and the auto-maintenance timer, and resolves once the in-flight job (if any) has settled — so a shutdown handler can await a graceful drain before exiting. For locks to release cleanly, prefer graceful shutdown (stop() on SIGTERM); a hard SIGKILL leaves orphaned locks that maintenance reclaims after the stale threshold.
performMaintenance(): Promise<MaintenanceResult>
Scans the job log and:
- Marks running jobs as
"stale"ifnow - lastHeartbeat > heartbeatInterval * roundsToStale, removes their lock - Marks orphaned queued jobs as
"stale"and removes their lock — aqueuedrecord that is no longer in the queue list was popped by an instance that died before thestartevent fired. Detection is two-pass to avoid racing the normal pop→start window: the first scan stampssuspectedAton the record; a later scan reclaims it if it is still orphaned after the stale threshold. - Removes finished/error/stale log records older than
keepFinishedInterval
Returns { staleCount, cleanedCount }.
registerJob(job): void
Registers a Job instance by name (must be unique). Hooks on all events to update Redis and re-dispatch.
unregisterJob(job): void
Unregisters a Job and removes all event hooks.
getTargetGroup(): string
Returns the target group identifier.
getOptions(): ResolvedRedisJMOptions
Returns a copy of the resolved options with defaults applied.
Job<TInputs, TAttrs>
Represents a named job with a function and event hooks. Extends Hookable.
TInputs-- Type for job inputs (must be JSON-serializable)TAttrs-- Type for custom attributes, extendsRecord<string, string | number | boolean | null | undefined>
Constructor
new Job<TInputs, TAttrs>(metadata: JobMetadata, fn: JobFunction<TInputs, TAttrs>, manager?: RedisJM)Job Function Signature
type JobFunction<TInputs, TAttrs> = (
inputs: TInputs,
ctx: {
setProgress: (progress: number) => Promise<void> // 0-1
setAttrs: (attrs: TAttrs) => Promise<void>
}
) => void | Promise<void>Methods
execute(inputs, options?): Promise<void>
Runs the job function with heartbeat timer and context callbacks. Options:
interface JobExecuteOptions {
targetGroup?: string
heartbeatInterval?: number // enables automatic heartbeat events
runId?: string // explicit runId (otherwise derived from inputs)
}queue(runId, inputs, manager?): Promise<boolean>
Convenience method -- delegates to RedisJM.queue.
getJobId(runId): string
Returns "jobName#runId".
getMetadata(): JobMetadata / getName(): string
createMaintenanceJob(manager): Job<null, never>
Factory function that creates and registers a pre-defined maintenance job. When executed, it calls manager.performMaintenance() to detect stale jobs and clean up expired log records.
Note:
manager.start()wires maintenance automatically (see themaintenanceIntervaloption). Manual wiring as shown below is only needed when auto-maintenance is disabled (maintenanceInterval: 0) or when maintenance must run on a separate schedule.start()reuses a maintenance job you registered yourself.
Maintenance is idempotent -- each run scans the full log regardless of prior state. Use an empty runId ('') so that at most one maintenance job is queued or running at any time. The lock is released on completion, allowing the next queue call to succeed. There is no need for time-based runIds.
import { createMaintenanceJob } from '@prostojs/redisjm'
const maintenanceJob = createMaintenanceJob(manager)
// Periodically try to queue maintenance (all instances, only one succeeds)
setInterval(() => maintenanceJob.queue('', null), 30000)Events
Both Job and RedisJM emit events via hookable:
| Event | Payload | Description |
|---|---|---|
start |
{ job, targetGroup, runId, inputs } |
Before job function runs |
finish |
{ job, targetGroup, runId, inputs } |
After successful completion |
error |
{ job, targetGroup, runId, inputs, error } |
On error (also rethrows) |
heartbeat |
{ job, targetGroup, runId, inputs } |
Periodic heartbeat tick |
update |
{ job, targetGroup, runId, inputs, progress?, attrs? } |
On setProgress/setAttrs |
RedisJM re-dispatches events only when targetGroup matches and updates the Redis log accordingly.
setProgressandsetAttrseach emit a separateupdateevent carrying only its own field — a singleupdatepayload never contains bothprogressandattrs.
manager.hook('start', (payload) => {
console.log(`Job ${payload.job.getName()} started`)
})
// Optional: custom handling. Job errors are ALSO logged by the default `logger`
// (set `logger: false` to suppress that if you handle errors yourself here).
manager.hook('error', (payload) => {
console.error(`Job failed:`, payload.error)
})Observability & error handling
- Failures are visible by default. A thrown handler is recorded in the log (
status: 'error'), re-broadcast on theerrorevent, and written to thelogger(defaultconsole.error, including the stack). Unknown-name drops, missing-record drops, and poll-loop errors also go to thelogger. Passlogger: falseto silence, or a custom function to redirect. - Don't blanket-
DELthe Redis keys. The:queue,:locks, and:logkeys are shared by every job in the target group — deleting one wipes all runs in the group, not just yours. Useunqueue(jobId)to remove a single run.
Long-running handlers & heartbeats
Heartbeats are emitted from a timer on the single-threaded event loop. A long, tightly synchronous handler that never awaits starves that timer, so heartbeats stop landing and maintenance may mark the run stale and release its lock — which can let another instance pick up the same runId and run it concurrently (duplicate execution). Chunk long work and await between chunks (e.g. setProgress/setAttrs per chunk) so heartbeats can fire:
manager.createJob({ jobName: 'backfill' }, async (inputs: { ids: string[] }, ctx) => {
for (let i = 0; i < inputs.ids.length; i += 100) {
await processChunk(inputs.ids.slice(i, i + 100)) // yields to the loop → heartbeat lands
await ctx.setProgress(Math.min(1, (i + 100) / inputs.ids.length))
}
})Job Statuses
| Status | Description | Blocks queue? | In log? |
|---|---|---|---|
queued |
Waiting in queue | Yes | Yes |
running |
Currently executing with active heartbeat | Yes | Yes |
stale |
Heartbeat expired, detected by maintenance | Yes (until maintenance cleans it) | Yes |
finished |
Completed successfully | No | Kept for keepFinishedInterval |
error |
Failed with an error | No | Kept for keepFinishedInterval |
Default
keepFinishedInterval: 0makes terminal states write-only. With the default,finished/error/stalerecords are deleted the instant they're set, solist()/get()only ever returnqueued/running— terminal outcomes are observable only via thefinish/errorhooks. SetkeepFinishedInterval > 0(e.g.60000) to keep them queryable.
Full Example: Distributed Job Processing
import Redis from 'ioredis'
import { RedisJM, createMaintenanceJob } from '@prostojs/redisjm'
const redis = new Redis(process.env.REDIS_URL)
const manager = new RedisJM(redis, 'my-service', {
heartbeatInterval: 5000,
roundsToStale: 2,
keepFinishedInterval: 60000,
})
// Define jobs
const reportJob = manager.createJob(
{ jobName: 'daily-report' },
async (inputs: { date: string }, ctx) => {
await ctx.setAttrs({ step: 'fetching data' })
await ctx.setProgress(0.3)
// ... fetch data
await ctx.setAttrs({ step: 'generating report' })
await ctx.setProgress(0.7)
// ... generate report
await ctx.setProgress(1)
}
)
// Maintenance job (auto-registered with manager)
const maintenanceJob = createMaintenanceJob(manager)
// Start processing the queue (one job at a time per instance)
manager.start(1000)
// CRON handler (runs on every instance)
async function onCron() {
const today = new Date().toISOString().slice(0, 10)
await reportJob.queue(today, { date: today })
// Schedule maintenance -- empty runId ensures at most one in the queue
await maintenanceJob.queue('', null)
}
// Graceful shutdown — await the drain so the in-flight job finishes and its lock releases
process.on('SIGTERM', async () => {
await manager.stop()
await redis.quit()
})Using a Job with Multiple Managers
A single Job instance can be attached to different RedisJM instances:
const managerA = new RedisJM(redis, 'group-a')
const managerB = new RedisJM(redis, 'group-b')
const job = managerA.createJob(
{ jobName: 'sync-data' },
async (inputs: { source: string }, ctx) => { /* ... */ }
)
// Also register with second manager
managerB.registerJob(job)
// Queue on specific manager
await job.queue('run-1', { source: 'api' }, managerB)Types
type JobAttrs = Record<string, string | number | boolean | null | undefined>
type JobStatus = 'queued' | 'running' | 'finished' | 'error' | 'stale'
interface JobMetadata {
jobName: string
description?: string
}
type RedisJMLogger = (message: string, error?: Error) => void
interface RedisJMOptions {
heartbeatInterval?: number // default 5000
roundsToStale?: number // default 2
keepFinishedInterval?: number // default 0
maintenanceInterval?: number // default heartbeatInterval * roundsToStale; 0 disables
unknownJobRequeueLimit?: number // default 5; 0 = drop unknown names immediately
logger?: RedisJMLogger | false // default console.error; false to silence
}
interface JobLogRecord<TInputs = unknown, TAttrs extends JobAttrs = JobAttrs> {
jobId: string
jobName: string
runId: string
inputs: TInputs
targetGroup: string
status: JobStatus
startedAt?: number
finishedAt?: number
heartbeat?: number
progress: number
attrs?: TAttrs
error?: string
suspectedAt?: number // internal: orphaned-queued suspect timestamp (maintenance)
requeueCount?: number // internal: times re-queued for an unknown job name
}
interface JobContext<TAttrs extends JobAttrs = JobAttrs> {
setProgress: (progress: number) => Promise<void>
setAttrs: (attrs: TAttrs) => Promise<void>
}
type JobFunction<TInputs, TAttrs extends JobAttrs> = (
inputs: TInputs,
ctx: JobContext<TAttrs>,
) => void | Promise<void>
interface MaintenanceResult {
staleCount: number
cleanedCount: number
}License
MIT