npm.io
0.12.26 • Published 2d ago

@lafken/queue

Licence
MIT
Version
0.12.26
Deps
3
Size
59 kB
Vulns
0
Weekly
0
Stars
3

@lafken/queue

Define and consume Amazon SQS queues using TypeScript decorators. @lafken/queue lets you declare Standard and FIFO queues, map message data to typed handler parameters, and send messages at runtime through a built-in service client.

Installation

npm install @lafken/queue

Getting Started

Define a queue class with @Queue, add consumer methods with @Standard or @Fifo, and register everything through QueueResolver:

import { createApp, createModule } from '@lafken/main';
import { QueueResolver } from '@lafken/queue/resolver';
import { Queue, Standard, Payload, Param, Event } from '@lafken/queue/main';

// 1. Define the message payload
@Payload()
export class OrderMessage {
  @Param({ source: 'attribute' })
  orderId: string;

  @Param({ source: 'body', parse: true })
  total: number;
}

// 2. Define the queue and its consumer
@Queue()
export class OrderQueue {
  @Standard({ batchSize: 5, visibilityTimeout: 60 })
  processOrder(@Event(OrderMessage) messages: OrderMessage[]) {
    for (const message of messages) {
      console.log(`Processing order ${message.orderId}`);
    }
  }
}

// 3. Register in a module
const orderModule = createModule({
  name: 'order',
  resources: [OrderQueue],
});

// 4. Add the resolver to the app
createApp({
  name: 'my-app',
  resolvers: [new QueueResolver()],
  modules: [orderModule],
});

Each @Standard or @Fifo method becomes an independent Lambda function with its own SQS queue and event source mapping. The handler always receives an array of mapped payload objects — one per SQS record in the batch.

Features

Queue Class

Use the @Queue decorator to group related queue consumers in a single class. A queue class can contain multiple @Standard and @Fifo handlers:

import { Queue, Standard, Fifo } from '@lafken/queue/main';

@Queue()
export class NotificationQueue {
  @Standard({ queueName: 'email-notifications' })
  sendEmail() { }

  @Fifo({ queueName: 'sms-notifications', contentBasedDeduplication: true })
  sendSms() { }
}
Standard Queues

Use the @Standard decorator to create a standard (non-FIFO) SQS queue consumer. Messages are delivered at least once with best-effort ordering:

@Standard({
  queueName: 'reports',
  batchSize: 10,
  visibilityTimeout: 120,
  maxConcurrency: 5,
})
generateReport(@Event(ReportMessage) messages: ReportMessage[]) {
  // Process up to 10 messages per invocation
  for (const message of messages) { ... }
}

If queueName is omitted, the method name is used as the queue name.

Standard Queue Options
Option Type Description
queueName string Queue name (defaults to the method name)
batchSize 1–10 Number of messages per Lambda invocation
visibilityTimeout number Seconds a message is hidden from other consumers while processing
deliveryDelay number Seconds to delay message delivery after it is sent
retentionPeriod number Seconds messages are retained before automatic deletion
maxMessageSizeBytes number Maximum message size in bytes
maxConcurrency number Maximum concurrent Lambda invocations for this queue
maxBatchingWindow number Seconds to wait gathering messages into a batch before invoking
lambda object Lambda-specific configuration (memory, timeout, etc.)
FIFO Queues

Use the @Fifo decorator for FIFO (First-In-First-Out) queues. FIFO queues guarantee exactly-once processing and strict message ordering. The queue name automatically gets a .fifo suffix:

@Fifo({
  queueName: 'payment-processing',
  contentBasedDeduplication: true,
  batchSize: 1,
})
processPayment(@Event(PaymentMessage) messages: PaymentMessage[]) {
  // Messages are processed in exact send order
  for (const message of messages) { ... }
}

FIFO queues support all standard queue options plus:

Option Type Description
contentBasedDeduplication boolean Use message content to generate deduplication IDs automatically

FIFO queues automatically enable ReportBatchItemFailures, allowing partial batch failure reporting.

Message Payload

Use @Payload on a class to define the structure of SQS messages. Decorate each property with @Param to specify where the value is extracted from in the SQS record:

import { Payload, Param } from '@lafken/queue/main';

@Payload()
export class TaskMessage {
  @Param({ source: 'attribute' })
  correlationId: string;

  @Param({ source: 'attribute', type: Number })
  priority: number;

  @Param({ source: 'body', parse: true })
  taskName: string;
}
@Param Options
Option Type Default Description
source 'attribute' | 'body' | 'record' 'attribute' Where to extract the value from the SQS record
parse boolean false JSON-parse the message body before extraction ('body' only)
type String | Number | ... inferred Data type of the extracted value
name string property name Override the source field name used for extraction
  • source: 'attribute' — reads from SQS message attributes (supports String and Number types).
  • source: 'body' — reads from the message body. Set parse: true to JSON-parse the body and extract a specific key by property name.
  • source: 'record' — reads a top-level SQS record field such as messageId, receiptHandle, awsRegion, etc. The name option accepts only valid SQSRecordField values and defaults to the property name.

Only one @Param with source: 'body' is allowed per payload class.

source: 'record' — SQS Record Fields

Use source: 'record' to bind SQS envelope metadata (e.g. the message identifier or region) directly to a payload property. The available fields are:

Field Description
messageId Unique identifier assigned by SQS to the message
receiptHandle Token used to delete or extend message visibility
md5OfBody MD5 digest of the message body
eventSource Always "aws:sqs"
eventSourceARN ARN of the source SQS queue
awsRegion AWS region where the queue resides
import { Payload, Param } from '@lafken/queue/main';

@Payload()
export class AuditMessage {
  // property name matches record field → no 'name' needed
  @Param({ source: 'record' })
  messageId: string;

  // alias: property 'region' extracts record.awsRegion
  @Param({ source: 'record', name: 'awsRegion' })
  region: string;

  @Param({ source: 'attribute' })
  userId: string;
}
@Field Decorator

Use @Field to include a property in the payload schema without specifying an extraction source:

import { Payload, Field } from '@lafken/queue/main';

@Payload()
export class SimpleMessage {
  @Field()
  action: string;

  @Field()
  timestamp: number;
}
Consuming Messages

Bind a typed payload to a handler method using the @Event parameter decorator. Pass the payload class so the framework can automatically extract and map fields from every SQS record in the batch. The handler always receives an array of mapped objects:

import { Queue, Standard, Event } from '@lafken/queue/main';

@Queue()
export class AlertQueue {
  @Standard({ queueName: 'alerts', batchSize: 5 })
  processAlert(@Event(AlertMessage) alerts: AlertMessage[]) {
    for (const alert of alerts) {
      console.log(`Alert from ${alert.source}: ${alert.message}`);
    }
  }
}
Sending Messages

Use QueueService at runtime to send messages to any SQS queue:

import { QueueService } from '@lafken/queue/service';

// Send a single message
await QueueService.sendMessage({
  url: 'https://sqs.us-east-1.amazonaws.com/123456789/orders',
  body: { orderId: 'ord-123', total: 49.99 },
  delay: 10,
});

// Send to a FIFO queue
await QueueService.sendMessage({
  url: 'https://sqs.us-east-1.amazonaws.com/123456789/payments.fifo',
  body: { paymentId: 'pay-456', amount: 100 },
  groupId: 'customer-789',
  deduplicationId: 'pay-456',
});
SendMessage Options
Option Type Description
url string Full SQS queue URL
body any Message body (automatically JSON-stringified)
attributes Record<string, string | number> SQS message attributes
delay number Delay in seconds before the message becomes visible
groupId string Message group ID (FIFO queues only)
deduplicationId string Deduplication ID (FIFO queues only)
Sending Batch Messages

Send multiple messages in a single API call:

import { QueueService } from '@lafken/queue/service';

await QueueService.sendBatchMessage({
  url: 'https://sqs.us-east-1.amazonaws.com/123456789/tasks',
  messages: [
    { body: { task: 'resize-image', fileId: 'f1' } },
    { body: { task: 'resize-image', fileId: 'f2' }, delay: 30 },
    { body: { task: 'generate-thumbnail', fileId: 'f3' } },
  ],
});

Keywords