npm.io
0.2.5 • Published 9h ago

sqs-partial-batch-processor

Licence
Apache-2.0
Version
0.2.5
Deps
1
Size
37 kB
Vulns
0
Weekly
190

SQS Partial Batch Processor

npm build

A small TypeScript helper for AWS Lambda SQS triggers using partial batch responses (SQSBatchResponse.batchItemFailures). You supply per-record async logic; the library handles looping, per-record error boundaries, and the response shape.

This library intentionally does not parse message bodies, validate schemas, create AWS SDK clients, or make retry/business decisions for you.

Features

  • Implements Lambda SQS partial batch response pattern (only failed messages are retried).
  • Per-record error boundary (failures are isolated to each record).
  • Aggregates failed messageIds into batchItemFailures.
  • Optional bounded concurrency (concurrency) and error hook (onRecordError).

Requirements

  • Node.js 20+ (runtime requirement)
  • Module system: published as CommonJS ("module": "CommonJS"). Usable from both CJS and ESM runtimes.
  • Lambda SQS event source mapping has Report batch item failures enabled. See: AWS Lambda SQS error handling docs

Installation

npm install sqs-partial-batch-processor
yarn add sqs-partial-batch-processor

Usage

import type { SQSEvent } from 'aws-lambda';
import { processPartialBatch } from 'sqs-partial-batch-processor';

export const handler = async (event: SQSEvent) =>
  processPartialBatch(event, async (record) => {
    // Your per-record logic here.
    // Throw to mark only this record's messageId as failed.
  });

Result-style callback (no throw for control flow):

import type { SQSEvent } from 'aws-lambda';
import { processPartialBatchWithResult } from 'sqs-partial-batch-processor';

export const handler = async (event: SQSEvent) =>
  processPartialBatchWithResult(event, async (record) => {
    if (record.body === '') {
      return { ok: false };
    }
    return { ok: true };
  });

Options

Both processPartialBatch and processPartialBatchWithResult accept an optional options object:

  • concurrency?: number (default: 1): maximum parallelism. Must be a finite integer >= 1 (1 = sequential, > 1 = bounded concurrency). If invalid, the function throws (RangeError for < 1, TypeError for non-integer / non-finite).
    • Tip: start with 1 and increase gradually while watching downstream limits (external API rate limits, DB connection pools, and Lambda reserved concurrency). SQS batches are typically small, so a large value rarely helps.
    • Note: when concurrency > 1, the order of batchItemFailures is not guaranteed. Tests should compare as a set, not by array order.
  • onRecordError?: (record, error) => void: called when a record is treated as failed (useful for structured logs / metrics).
    • Example (structured log / metrics hook):
import { processPartialBatch } from 'sqs-partial-batch-processor';
import type { SQSEvent } from 'aws-lambda';

export const handler = async (event: SQSEvent) =>
  processPartialBatch(
    event,
    async (record) => {
      // ...
    },
    {
      onRecordError: (record, error) => {
        console.log(JSON.stringify({
          level: 'error',
          msg: 'record failed',
          messageId: record.messageId,
          // Avoid logging sensitive contents from record.body.
          error: error instanceof Error ? { name: error.name, message: error.message } : { message: String(error) },
        }));
      },
    },
  );
  • mapMessageId?: (record) => string: customize the itemIdentifier (defaults to record.messageId).
    • Typical uses: align the identifier with an application-level id (e.g., an id stored in messageAttributes or the parsed payload), or normalize identifiers across FIFO/standard queues for easier correlation.

License

This project is licensed under the (Apache-2.0) License.

Keywords