npm.io
0.8.0 • Published 5d ago

@autofleet/kafka

Licence
Proprietary
Version
0.8.0
Deps
2
Size
305 kB
Vulns
0
Weekly
525

@autofleet/kafka

Internal wrapper for Apache Kafka using @platformatic/kafka, providing a production-ready interface for managing multiple Kafka producers and consumers with type-safe topic definitions. Both producers and consumers default to passwordless SASL OAUTHBEARER auth backed by Google Application Default Credentials (ADC), so no static username/password is required against Google Managed Kafka.

Table of Contents

Installation

npm add @autofleet/kafka

Features

  • Typed Topics with Zod - Define topics with Zod schemas for compile-time type safety and runtime validation
  • Passwordless Auth - Producers and consumers default to SASL OAUTHBEARER/ADC — no static credentials against Google Managed Kafka
  • Multi-Producer Management - Manage multiple named producers with different broker configurations
  • Consumers with Handler Callbacks - Subscribe to topics with a typed handler, auto-started background loop, and a .stop() handle (plus a raw stream() escape hatch)
  • Self-Healing Consumers - Background connection/rejoin errors are handled (never an uncaughtException), classified (transient vs fatal), and recovered from automatically by reconnecting with backoff
  • Type-Safe APIs - Full TypeScript support with inferred types for topics and payloads
  • Built-in Mock Mode - Disable Kafka entirely with automatic mock implementations
  • Centralized Health Checks - Single readiness check for all producers
  • Automatic Connection Management - Handles connection lifecycle automatically
  • Batch Publishing - Efficient batch message sending with type safety
  • Message Partitioning - Control message distribution across partitions
  • Custom Headers - Attach metadata to messages
  • Graceful Shutdown - Automatic cleanup on process termination
  • ESM & CommonJS Compatible - Works in both ESM and CommonJS projects
  • Production Ready - Built with reliability and performance in mind

Quick Start

import { KafkaManager } from '@autofleet/kafka';
import { z } from 'zod';
import logger from './logger';

// Define typed topics with Zod schemas
const kafka = KafkaManager.create({
  enabled: process.env.ENABLE_KAFKA === 'true', // Built-in mock mode
  logger,
  producers: {
    main: {
      brokers: ['kafka-main.svc.cluster.local:9092'],
      clientId: 'my-service-main',
    },
    analytics: {
      brokers: ['kafka-analytics.svc.cluster.local:9092'],
      clientId: 'my-service-analytics',
    },
  },
  topics: {
    'order.created': {
      schema: z.object({
        orderId: z.string(),
        customerId: z.string(),
        amount: z.number(),
        items: z.array(z.object({
          sku: z.string(),
          quantity: z.number(),
        })),
      }),
      producer: 'main',
    },
    'user.signup': {
      schema: z.object({
        userId: z.string(),
        email: z.string().email(),
        timestamp: z.number(),
      }),
      producer: 'analytics',
    },
  },
});

// Publish with full type safety and runtime validation
// TypeScript infers payload type from Zod schema!
await kafka.publish('order.created', {
  orderId: 'ORD-123',
  customerId: 'CUST-456',
  amount: 99.99,
  items: [
    { sku: 'WIDGET-1', quantity: 2 },
    { sku: 'GADGET-2', quantity: 1 },
  ],
});

// ✅ Type error! TypeScript knows the exact shape expected
// await kafka.publish('order.created', {
//   orderId: 123, // ❌ Error: Expected string
//   // ❌ Error: Missing required fields
// });

// Batch publish with type safety
await kafka.publishBatch('order.created', {
  messages: [
    { value: { orderId: 'ORD-1', customerId: 'CUST-1', amount: 50, items: [] } },
    { value: { orderId: 'ORD-2', customerId: 'CUST-2', amount: 75, items: [] } },
  ],
});

// Use in health checks
app.get('/health/ready', async (req, res) => {
  const ready = await kafka.isReady();
  res.status(ready ? 200 : 503).json({ ready });
});

Typed Topics

The typed topics feature provides compile-time type safety and runtime validation for Kafka messages using Zod schemas.

Benefits
  • Type Safety: Payload types are automatically inferred from Zod schemas
  • Runtime Validation: Messages are validated before publishing
  • DX: Full autocomplete for topic names and payloads in your IDE
  • Centralized: Topic definitions live alongside your Kafka configuration
  • Producer Routing: Topics are automatically routed to the correct producer
Basic Usage
import { z } from 'zod';

const kafka = KafkaManager.create({
  logger,
  producers: {
    main: { brokers: ['localhost:9092'] },
    analytics: { brokers: ['localhost:9093'] },
  },
  topics: {
    'order.created': {
      schema: z.object({
        orderId: z.string().uuid(),
        customerId: z.string().uuid(),
        amount: z.number().positive(),
        currency: z.enum(['USD', 'EUR', 'GBP']),
      }),
      producer: 'main', // Routes to 'main' producer
    },
    'user.activity': {
      schema: z.object({
        userId: z.string(),
        action: z.string(),
        metadata: z.record(z.unknown()).optional(),
      }),
      producer: 'analytics', // Routes to 'analytics' producer
    },
  },
});

// ✅ Fully typed - TypeScript knows the exact shape
await kafka.publish('order.created', {
  orderId: '550e8400-e29b-41d4-a716-446655440000',
  customerId: '6ba7b810-9dad-11d1-80b4-00c04fd430c8',
  amount: 99.99,
  currency: 'USD',
});

// ❌ Type error: Wrong type for 'amount'
// await kafka.publish('order.created', {
//   orderId: '550e8400-e29b-41d4-a716-446655440000',
//   customerId: '6ba7b810-9dad-11d1-80b4-00c04fd430c8',
//   amount: '99.99', // Error: Expected number, got string
//   currency: 'USD',
// });

// ❌ Runtime validation error: Invalid enum value
// await kafka.publish('order.created', {
//   orderId: '550e8400-e29b-41d4-a716-446655440000',
//   customerId: '6ba7b810-9dad-11d1-80b4-00c04fd430c8',
//   amount: 99.99,
//   currency: 'JPY', // Error: Invalid enum value (not in ['USD', 'EUR', 'GBP'])
// });
Complex Schemas

Support for nested objects, arrays, and advanced Zod features:

const kafka = KafkaManager.create({
  logger,
  producers: {
    main: { brokers: ['localhost:9092'] },
  },
  topics: {
    'order.created': {
      schema: z.object({
        orderId: z.string(),
        customer: z.object({
          id: z.string(),
          email: z.string().email(),
          name: z.string().min(1),
        }),
        items: z.array(z.object({
          sku: z.string(),
          quantity: z.number().positive(),
          price: z.number().positive(),
        })).min(1),
        total: z.number().positive(),
        metadata: z.record(z.unknown()).optional(),
      }),
      producer: 'main',
    },
  },
});

await kafka.publish('order.created', {
  orderId: '123',
  customer: {
    id: '456',
    email: 'test@example.com',
    name: 'John Doe',
  },
  items: [
    { sku: 'ABC', quantity: 2, price: 10.00 },
    { sku: 'DEF', quantity: 1, price: 20.00 },
  ],
  total: 40.00,
  metadata: { source: 'web' },
});
Skipping Validation

For advanced use cases, you can skip runtime validation:

await kafka.publish('order.created', payload, {
  skipValidation: true, // Skip Zod validation
});

// Also works for batch publishing
await kafka.publishBatch('order.created', {
  messages: [{ value: payload1 }, { value: payload2 }],
  skipValidation: true,
});

Initialization & Bootstrap

Lifecycle Overview

The @autofleet/kafka package follows a clear, predictable lifecycle:

┌─────────────┐     ┌──────────────┐     ┌──────────────┐     ┌────────────┐
│   create()  │────▶│ bootstrap()  │────▶│ publish/use  │────▶│disconnect()│
│ (sync)      │     │ (async)      │     │ (async)      │     │ (async)    │
└─────────────┘     └──────────────┘     └──────────────┘     └────────────┘
    Instant              Validates            Runtime           Cleanup
  construction         connectivity           operations

1. create() - Synchronous Construction

  • Creates the KafkaManager instance
  • Sets up configuration
  • Does NOT connect to Kafka yet
  • Returns immediately

2. bootstrap() - Async Initialization

  • Explicitly connects all producers to Kafka
  • Validates broker connectivity
  • Fetches cluster metadata
  • Fails fast with detailed diagnostics
  • Recommended before serving traffic

3. Runtime Operations

  • publish(), publishBatch() work normally
  • Lazy initialization supported (producers connect on first use)
  • Health checks available

4. disconnect() - Cleanup

  • Gracefully closes all connections
  • Automatic on SIGTERM/SIGINT
Production Bootstrap Pattern

Recommended production service initialization:

import { KafkaManager } from '@autofleet/kafka';
import logger from './logger';

// 1. Synchronous construction
export const kafka = KafkaManager.create({
  enabled: process.env.ENABLE_KAFKA === 'true',
  logger,
  bootstrapTimeoutMs: 15000,  // 15s bootstrap timeout
  healthCheckCacheMs: 2000,   // Cache health for 2s
  strictBootstrap: true,      // Fail if ANY producer fails
  producers: {
    main: {
      brokers: process.env.KAFKA_BROKERS?.split(',') || [],
      clientId: 'my-service-main',
    },
    analytics: {
      brokers: process.env.ANALYTICS_BROKERS?.split(',') || [],
      clientId: 'my-service-analytics',
    },
  },
});

// 2. Bootstrap before serving traffic
async function startServer() {
  logger.info('Bootstrapping Kafka...');

  try {
    const result = await kafka.bootstrap();

    logger.info('Kafka bootstrap successful', {
      duration: result.duration,
      producers: Object.keys(result.results),
    });
  } catch (error) {
    logger.error('Fatal: Kafka bootstrap failed', { error });
    process.exit(1); // Fail fast in production
  }

  // 3. Start HTTP server only after Kafka is ready
  app.listen(3000, () => {
    logger.info('Server ready - Kafka connected');
  });
}

startServer();
Development Bootstrap Pattern

For development/local environments with optional Kafka:

async function startServer() {
  // Try to bootstrap, but don't fail if Kafka unavailable
  try {
    await kafka.bootstrap({
      timeoutMs: 5000,  // Shorter timeout for dev
      strict: false,    // Allow partial failures
    });
    logger.info('Kafka connected');
  } catch (error) {
    logger.warn('Kafka unavailable - using lazy initialization', { error });
    // Server still starts, producers will retry on first publish
  }

  app.listen(3000, () => {
    logger.info('Server ready');
  });
}
Bootstrap Options
Strict Mode (Default)

Bootstrap fails if any producer fails to connect:

await kafka.bootstrap(); // strict: true by default

// Throws if any producer fails
// Perfect for production where all clusters must be available
Non-Strict Mode

Bootstrap succeeds if at least one producer connects:

const result = await kafka.bootstrap({
  strict: false,
});

if (!result.success) {
  logger.warn('Some producers failed:', result.results);
}

// Check which ones failed
const failed = Object.entries(result.results)
  .filter(([_, r]) => !r.success)
  .map(([name]) => name);

logger.warn('Failed producers:', failed);
// Continue - failed producers use lazy initialization
Selective Bootstrap

Bootstrap specific producers only:

// Only bootstrap critical producer
await kafka.bootstrap({
  producers: ['main'],
  timeoutMs: 10000,
});

// Analytics producer will use lazy initialization
Custom Timeouts
await kafka.bootstrap({
  timeoutMs: 20000, // 20 second timeout
  strict: true,
});

Usage

Setup with Multiple Producers

Create a kafka.ts file in your service:

import { KafkaManager } from '@autofleet/kafka';
import logger from './logger';

const ENABLE_KAFKA = process.env.ENABLE_KAFKA === 'true';

// No await needed - initialization is synchronous!
// Producers connect lazily on first publish
export const kafka = KafkaManager.create({
  enabled: ENABLE_KAFKA,
  logger,
  producers: {
    // Primary producer for main events
    main: {
      brokers: ['kafka-main.svc.cluster.local:9092'],
      clientId: 'driver-ms-main',
      autoCreateTopics: true,
    },
    // Secondary producer for analytics
    analytics: {
      brokers: ['kafka-analytics.svc.cluster.local:9092'],
      clientId: 'driver-ms-analytics',
    },
  },
});

// Define your topics
export const TOPICS = {
  DRIVER_CONSENT_V1: 'backend.driver.consent.v1',
  DRIVER_ASSIGNED_V1: 'backend.driver.vehicle.assigned.v1',
  DRIVER_UNASSIGNED_V1: 'backend.driver.vehicle.unassigned.v1',
} as const;
Publishing Messages

Use the Kafka manager directly throughout your service with full type safety:

import { kafka, TOPICS } from './kafka';

// Publish directly
// TypeScript validates producer names: 'main' | 'analytics'
await kafka.publish('main', TOPICS.DRIVER_CONSENT_V1, {
  state: 'accepted',
  driverId: '123',
  fleetId: '456',
});

// With options - autocomplete works!
await kafka.publish(
  'main', // TypeScript autocompletes 'main' | 'analytics'
  TOPICS.DRIVER_ASSIGNED_V1,
  {
    driverId: '123',
    vehicleId: '789',
  },
  {
    key: `driver-123`, // Ensures ordering per driver
    headers: {
      'correlation-id': requestId,
    },
  }
);

// TypeScript will error if you use a non-existent producer
// await kafka.publish('wrong', TOPICS.DRIVER_CONSENT_V1, {}); // ❌ Error!
Batch Publishing
await kafka.publishBatch('main', {
  topic: TOPICS.DRIVER_CONSENT_V1,
  messages: [
    { value: { driverId: '123', state: 'accepted' } },
    { value: { driverId: '456', state: 'rejected' } },
    { value: { driverId: '789', state: 'pending' } },
  ],
});
Consuming Messages

Configure named consumers alongside producers. Each consumer requires a groupId, and — just like producers — defaults to passwordless OAUTHBEARER/ADC auth (no sasl block needed against Google Managed Kafka).

import { KafkaManager } from '@autofleet/kafka';
import { z } from 'zod';
import logger from './logger';

export const kafka = KafkaManager.create({
  enabled: process.env.ENABLE_KAFKA === 'true',
  logger,
  producers: {
    main: { brokers: ['kafka:9092'] },
  },
  consumers: {
    // No `sasl` block -> passwordless OAUTHBEARER/ADC auth
    reader: {
      brokers: ['kafka:9092'],
      groupId: 'my-service-readers',
    },
  },
  topics: {
    'order.created': {
      schema: z.object({ orderId: z.string(), amount: z.number() }),
      producer: 'main',
    },
  },
});
Handler callback + stop()

consume() auto-starts a background loop and returns a subscription handle. When a schema applies (the registered topic schema, or a per-call options.schema), each value is validated and the handler receives the parsed value (coercions/defaults/transforms applied), typed accordingly.

const subscription = await kafka.consume('reader', ['order.created'], async (msg) => {
  // msg.value is parsed JSON and validated against the topic schema
  console.log(msg.topic, msg.partition, msg.offset, msg.value);
});

// Later, stop consuming and close the stream
await subscription.stop();

The handler receives a KafkaMessage<T>:

interface KafkaMessage<T> {
  topic: string;
  partition: number;
  offset: string;       // stringified to avoid BigInt issues
  key: string | null;
  value: T;             // JSON-parsed (and schema-validated when applicable)
  rawValue: string;     // original unparsed value
  headers: Record<string, string>;
  timestamp: number | null; // producer-side timestamp from the x-timestamp header
}
Manual commit on success

By default autocommit is true. In this default mode, offsets advance on the client's own timer regardless of whether your handler succeeded — so if the handler throws, the message is NOT redelivered and is effectively lost (at-most-once). Set autocommit: false for at-least-once delivery: the offset is committed only after your handler resolves successfully, so a thrown handler leaves the message uncommitted and it is redelivered.

const subscription = await kafka.consume('reader', ['order.created'], async (msg) => {
  await processOrder(msg.value); // if this throws, the offset is NOT committed
}, { autocommit: false });
Inbound-only topics (schema override)

For topics you only consume (not registered in topics), pass a per-call schema — the handler value type is inferred as z.infer<schema>:

await kafka.consume('reader', ['external.event'], async (msg) => {
  console.log(msg.value.externalId); // typed + validated
}, { schema: z.object({ externalId: z.string() }) });

Single-topic subscriptions get clean z.infer typing from the registry. Multi-topic subscriptions resolve to unknown (provide your own type parameter and skip auto-validation).

Validation behavior (invalid messages are skipped, not retried)

When a schema applies, validation happens in the consume loop. An invalid message is logged, committed (to advance past it), and skipped — it is never thrown or passed to the handler. This is deliberate: bad data never becomes valid on a retry, so throwing would only cause a redelivery loop. The distinction:

  • Invalid payload (data error) → logged as Message failed schema validation - skipping, offset committed, handler not called.
  • Handler throws (processing error, e.g. a downstream outage) → with autocommit: false the offset is not committed so the message is redelivered; with the default autocommit: true the offset advances anyway and the message is lost (logged as ... possible loss).

Pass skipValidation: true to bypass validation entirely (the handler then receives the raw JSON-parsed value). There is intentionally no "throw on invalid" mode — use skipValidation + your own in-handler checks if you need custom handling (e.g. routing to a dead-letter topic).

Self-healing (auto-reconnect)

Consumers recover automatically from background failures. The underlying @platformatic/kafka client is an EventEmitter; a background failure (e.g. a heartbeat-triggered rejoin that exhausts its retries on a transient broker disconnect) is emitted as an 'error' event. The wrapper:

  • Never lets it become an uncaughtException — it always handles the event.
  • Classifies it by walking the error's cause chain: a transient network blip (a socket closed mid-SASL-handshake — which platformatic mislabels as a fatal AuthenticationError: "Cannot find a suitable SASL mechanism." whose root cause is NetworkError: "Connection closed") is logged at warn; a genuine fatal error (e.g. bad credentials) at error.
  • Re-consumes: it recreates the client and rejoins the group with exponential backoff, so the consumer keeps running instead of dying silently.

Tune via the consumer's reconnect config (all optional):

consumers: {
  reader: {
    brokers: ['kafka:9092'],
    groupId: 'my-group',
    reconnect: {
      maxAttempts: 0,      // 0 = unlimited (default)
      baseBackoffMs: 1000, // exponential base (default)
      maxBackoffMs: 30000, // backoff cap (default)
    },
  },
},

getHealth() exposes reconnectAttempts and lastReconnectAt for observability. Producers attach the same 'error' listener (so a background producer error can't crash the process either) but have no restart loop — they reconnect lazily on the next publish()/ping().

Raw stream escape hatch

For full control, open the underlying @platformatic/kafka MessagesStream:

const stream = await kafka.stream('reader', ['order.created']);
for await (const message of stream) {
  // raw platformatic message
}
await stream.close();
Mock mode

When enabled: false, consumers become mocks: consume() returns a no-op subscription whose handler is never invoked, and all methods log Mock mode - skipping ....

Mock Mode (Disable Kafka)

When enabled: false, all producers become mocks automatically:

const kafka = KafkaManager.create({
  enabled: false, // or process.env.ENABLE_KAFKA !== 'true'
  logger,
  producers: {
    main: {
      brokers: ['kafka:9092'],
      clientId: 'my-service',
    },
  },
});

// This will log a debug message but not actually publish
await kafka.publish('main', 'topic', { data: 'test' });
// Output: "Kafka: [main] Mock mode - skipping publish to topic: topic"
Health Checks & Readiness Probes

The package provides comprehensive health check APIs suitable for Kubernetes probes:

Readiness Probe

Checks if Kafka is ready to handle traffic (connects to brokers):

import { kafka } from './kafka';

app.get('/health/ready', async (req, res) => {
  const ready = await kafka.isReady();

  if (!ready) {
    const health = kafka.getHealth();
    return res.status(503).json({
      ready: false,
      kafka: health,
    });
  }

  res.json({ ready: true });
});

Readiness check features:

  • Revalidates connectivity if cache is stale
  • Configurable cache duration (default: 1 second)
  • Force revalidation with isReady({ force: true })
  • Respects healthCheckTimeoutMs configuration
Liveness Probe

Lightweight check for process health (does NOT connect to Kafka):

app.get('/health/live', (req, res) => {
  const live = kafka.isLive();
  res.status(live ? 200 : 503).json({ live });
});

Liveness check:

  • Returns false if manager is in a fatal state
  • Returns false if graceful shutdown has started
  • Does NOT perform Kafka operations (very fast)
  • Perfect for Kubernetes liveness probes
Detailed Health Snapshot

Get comprehensive health information for all producers:

app.get('/status/kafka', (req, res) => {
  const health = kafka.getHealth();

  res.json({
    producers: health,
    enabled: kafka.isEnabled,
    live: kafka.isLive(),
  });
});

// Example response:
{
  "producers": {
    "main": {
      "name": "main",
      "enabled": true,
      "isConnected": true,
      "lastPingAt": 1701234567890,
      "lastPingSucceededAt": 1701234567890,
      "lastError": null,
      "clusterId": "prod-kafka-main-01",
      "brokerCount": 3,
      "brokers": ["kafka1:9092", "kafka2:9092", "kafka3:9092"]
    },
    "analytics": {
      "name": "analytics",
      "enabled": true,
      "isConnected": false,
      "lastPingAt": 1701234560000,
      "lastPingSucceededAt": 1701234500000,
      "lastError": {
        "message": "Connection refused",
        "timestamp": 1701234560000
      },
      "clusterId": null,
      "brokerCount": 0,
      "brokers": ["analytics-kafka:9092"]
    }
  },
  "enabled": true,
  "live": true
}
Connection Status

Get quick connection status summary:

const status = kafka.getConnectionStatus();

// Returns:
{
  "main": {
    "connected": true,
    "lastSuccessAt": 1701234567890,
    "lastError": null
  },
  "analytics": {
    "connected": false,
    "lastSuccessAt": 1701234500000,
    "lastError": "Connection refused"
  }
}
Kubernetes Example
apiVersion: v1
kind: Pod
metadata:
  name: my-service
spec:
  containers:
  - name: app
    image: my-service:latest
    livenessProbe:
      httpGet:
        path: /health/live
        port: 3000
      initialDelaySeconds: 10
      periodSeconds: 10
    readinessProbe:
      httpGet:
        path: /health/ready
        port: 3000
      initialDelaySeconds: 15
      periodSeconds: 5
      timeoutSeconds: 3

Configuration

KafkaManagerOptions
interface KafkaManagerOptions {
  // Enable/disable Kafka - when false, returns mock implementations
  // @default true
  enabled?: boolean;

  // Custom logger instance
  logger?: LoggerInstanceManager;

  // Skip automatic graceful shutdown
  // @default false
  dontGracefulShutdown?: boolean;

  // Named producers configuration
  producers?: Record<string, ProducerConfig>;

  // Named consumers configuration
  consumers?: Record<string, ConsumerConfig>;

  // Health & Bootstrap Options

  // Global timeout for health checks in ms
  // Used by isReady() and ping operations
  // @default 5000 (5 seconds)
  healthCheckTimeoutMs?: number;

  // How long to cache health check results in ms
  // Set to 0 to always revalidate
  // @default 1000 (1 second)
  healthCheckCacheMs?: number;

  // Default timeout for bootstrap operations in ms
  // Can be overridden per bootstrap() call
  // @default 30000 (30 seconds)
  bootstrapTimeoutMs?: number;

  // Default strict mode for bootstrap
  // If true, all producers must succeed
  // If false, at least one producer must succeed
  // @default true
  strictBootstrap?: boolean;
}
ProducerConfig
interface ProducerConfig {
  // Array of Kafka broker addresses (required)
  brokers: string[];

  // Client ID for this producer (optional)
  clientId?: string;

  // SASL authentication options (optional)
  sasl?: {
    mechanism: 'plain' | 'scram-sha-256' | 'scram-sha-512';
    username: string;
    password: string;
  };

  // Automatically create topics if they don't exist (default: false)
  autoCreateTopics?: boolean;
}

If sasl is omitted, the producer defaults to passwordless OAUTHBEARER/ADC auth.

ConsumerConfig
Option Type Required Default Description
brokers string[] Kafka broker addresses
groupId string Consumer group ID
clientId string autofleet-kafka-consumer-<name> Client ID for this consumer
sasl ConnectionOptions['sasl'] passwordless OAUTHBEARER/ADC SASL authentication options
tls ConnectionOptions['tls'] {} TLS configuration options
oauthBearerConfig OauthBearerProviderOptions { logger } OAUTHBEARER token provider options (ADC)
sessionTimeout number platformatic default Session timeout in ms
heartbeatInterval number platformatic default Heartbeat interval in ms
ConsumeOptions
Option Type Default Description
autocommit boolean true When false, offsets are committed only after the handler resolves successfully
sessionTimeout number consumer config Override session timeout for this subscription
heartbeatInterval number consumer config Override heartbeat interval for this subscription
skipValidation boolean false Skip schema validation entirely (handler receives the raw JSON-parsed value)
schema z.ZodType registered topic schema Schema to validate against (overrides the registered topic schema). On failure the message is logged, committed, and skipped; on success the handler receives the parsed value

Consumers are intentionally excluded from isReady() so a consumer-group rebalance never flips a pod to not-ready. Use getConsumerHealth() / pingConsumer() for consumer-specific status.

Advanced Features

Message Keys for Partitioning

Use message keys to control partitioning and maintain order:

// All messages for the same user go to the same partition
await kafka.publish('main', 'user-events', event, {
  key: `user-${userId}`,
});

// All messages for the same order go to the same partition
await kafka.publish('main', 'order-events', event, {
  key: `order-${orderId}`,
});

Why use keys?

  • Ensures messages with the same key are ordered
  • Enables partition-level parallelism
  • Supports stateful stream processing
Custom Headers

Headers are useful for metadata and message routing:

await kafka.publish('main', 'events', data, {
  headers: {
    // Correlation ID for distributed tracing
    'correlation-id': correlationId,

    // Source service
    'source': 'user-service',

    // Message schema version
    'schema-version': '2.0',

    // Custom application headers
    'tenant-id': 'tenant-123',
  },
});
Partition Control

Direct partition control for advanced use cases:

// Send to specific partition
await kafka.publish('main', 'events', data, {
  partition: 0, // Always send to partition 0
});

// Balance across partitions using keys
const partitionKey = `${customerId % 10}`;
await kafka.publish('main', 'events', data, {
  key: partitionKey,
});

API Reference

Core Lifecycle Methods
KafkaManager.create(options): KafkaManager<ProducerNames>

Creates a new KafkaManager instance with multiple named producers. Producer names are type-safe - TypeScript will autocomplete and validate them based on your configuration.

Parameters:

Returns: A KafkaManager<ProducerNames> instance where ProducerNames are the keys from your producers config

Example:

const kafka = KafkaManager.create({
  enabled: true,
  logger,
  producers: {
    main: {
      brokers: ['kafka:9092'],
      clientId: 'my-service',
    },
    analytics: {
      brokers: ['kafka-analytics:9092'],
      clientId: 'my-service-analytics',
    },
  },
});

// Type: KafkaManager<'main' | 'analytics'>
// TypeScript knows the valid producer names!

// Producers connect automatically on first publish
await kafka.publish('main', 'my-topic', { data: 'test' }); // ✅ Valid
await kafka.publish('analytics', 'metrics', { value: 1 }); // ✅ Valid
// await kafka.publish('wrong', 'topic', {}); // ❌ TypeScript error!
bootstrap(options?): Promise<BootstrapResult>

Explicitly bootstrap all (or specific) producers. Connects to brokers, validates connectivity, and returns detailed results. Recommended before serving traffic in production.

Parameters:

  • options.timeoutMs (optional) - Maximum time to wait for all producers (default: bootstrapTimeoutMs config)
  • options.strict (optional) - Fail if ANY producer fails (default: strictBootstrap config)
  • options.producers (optional) - Array of specific producer names to bootstrap (default: all)

Returns: BootstrapResult with success status, duration, and per-producer results

Examples:

// Bootstrap all producers with defaults
await kafka.bootstrap();

// Non-strict mode - continue if some fail
const result = await kafka.bootstrap({
  timeoutMs: 10000,
  strict: false,
});
if (!result.success) {
  console.error('Some producers failed:', result.results);
}

// Bootstrap specific producers only
await kafka.bootstrap({ producers: ['main'] });
Health & Monitoring Methods
getHealth(): Record<ProducerNames, ProducerHealth>

Get detailed health snapshot for all producers. Returns comprehensive metadata including connection state, timestamps, errors, and cluster info.

Returns: Object mapping producer names to ProducerHealth objects

Example:

const health = kafka.getHealth();
console.log(health.main.clusterId);          // 'prod-kafka-01'
console.log(health.main.lastPingSucceededAt);// 1701234567890
console.log(health.main.lastError);          // null or error object
getProducerHealth(name): ProducerHealth

Get detailed health for a specific producer.

Parameters:

  • name - Producer name (type-safe)

Returns: ProducerHealth object

Example:

const health = kafka.getProducerHealth('main');
console.log('Cluster:', health.clusterId);
console.log('Brokers:', health.brokerCount);
isReady(options?): Promise<boolean>

Check if Kafka is ready to handle traffic. Revalidates connectivity if cache is stale. Use for Kubernetes readiness probes.

Parameters:

  • options.timeout (optional) - Override default health check timeout
  • options.force (optional) - Force revalidation, ignore cache

Returns: true if all producers are connected, false otherwise

Examples:

// Use cached result if fresh
const ready = await kafka.isReady();

// Force immediate revalidation
const ready = await kafka.isReady({ force: true });

// Custom timeout
const ready = await kafka.isReady({ timeout: 10000 });
isLive(): boolean

Lightweight liveness check - does NOT perform Kafka operations. Only checks internal state for fatal errors. Perfect for Kubernetes liveness probes.

Returns: false if manager is in fatal state or graceful shutdown has started

Example:

app.get('/health/live', (req, res) => {
  res.status(kafka.isLive() ? 200 : 503).json({ live: kafka.isLive() });
});
getConnectionStatus(): Record<ProducerNames, ConnectionStatus>

Get connection status for all producers with timestamps and errors.

Returns: Object mapping producer names to connection status

Example:

const status = kafka.getConnectionStatus();
// {
//   main: {
//     connected: true,
//     lastSuccessAt: 1701234567890,
//     lastError: null
//   }
// }
Publishing Methods
publish<T>(producerName, topic, value, options?): Promise<RecordMetadata[]>

Publishes a single message to a topic using a named producer. Producer name is type-safe - must match one of the configured producers.

Parameters:

  • producerName - Name of the producer to use (type-safe: only accepts configured producer names)
  • topic - Topic name
  • value - Message value (will be JSON stringified)
  • options (optional) - Publish options

Returns: Array of record metadata (partition, offset, etc.)

Example:

// TypeScript autocompletes and validates producer names
const metadata = await kafka.publish('main', 'events', { id: 1, data: 'test' }, {
  key: 'key-1',
  headers: { type: 'create' },
});

// Type error if using non-existent producer
// await kafka.publish('nonexistent', 'topic', {}); // ❌ Error!
publishBatch(producerName, options): Promise<RecordMetadata[]>

Publishes multiple messages in a batch using a named producer. Producer name is type-safe.

Parameters:

  • producerName - Name of the producer to use (type-safe: only accepts configured producer names)
  • options.topic - Topic name
  • options.messages - Array of messages with values, keys, headers, etc.

Returns: Array of record metadata for each message

Example:

// TypeScript validates 'main' is a configured producer
const metadata = await kafka.publishBatch('main', {
  topic: 'events',
  messages: [
    { value: { id: 1 }, key: 'key-1' },
    { value: { id: 2 }, key: 'key-2' },
  ],
});
isReady(): Promise<boolean>

Check if all producers are connected and ready. Useful for health checks.

Example:

const ready = await kafka.isReady();
if (!ready) {
  throw new Error('Kafka not ready');
}
getConnectionStatus(): Record<ProducerNames, boolean>

Get connection status for all producers. Return type is type-safe based on your configuration.

Example:

const status = kafka.getConnectionStatus();
// Type: { main: boolean; analytics: boolean }
// { main: true, analytics: false }
ping(): Promise<void>

Ping all producers to verify connectivity.

Example:

await kafka.ping(); // Throws if any producer fails to connect
pingProducer(name): Promise<void>

Ping a specific producer to verify connectivity. Producer name is type-safe.

Example:

await kafka.pingProducer('main'); // ✅ Valid
// await kafka.pingProducer('invalid'); // ❌ Type error!
disconnect(): Promise<void>

Disconnects all producers and cleans up resources.

Example:

await kafka.disconnect();
disconnectProducer(name): Promise<void>

Disconnects a specific producer and cleans up resources. Producer name is type-safe.

Example:

await kafka.disconnectProducer('main'); // ✅ Valid
// await kafka.disconnectProducer('invalid'); // ❌ Type error!

Best Practices

1. Use Centralized Kafka Configuration

Define your Kafka instance and topics in one place:

// src/kafka.ts
import { KafkaManager } from '@autofleet/kafka';
import logger from './logger';

export const kafka = KafkaManager.create({
  enabled: process.env.ENABLE_KAFKA === 'true',
  logger,
  producers: {
    main: {
      brokers: process.env.KAFKA_BROKERS?.split(',') || ['localhost:9092'],
      clientId: 'my-service',
      autoCreateTopics: process.env.NODE_ENV === 'development',
    },
  },
});

export const TOPICS = {
  USER_EVENTS: 'user-events',
  ORDER_EVENTS: 'order-events',
} as const;
2. Use Message Keys for Ordering
// Ensure all events for a user are processed in order
await kafka.publish('main', TOPICS.USER_EVENTS, event, {
  key: `user-${userId}`,
});
3. Add Metadata with Headers
await kafka.publish('main', 'events', data, {
  headers: {
    'correlation-id': requestId,
    'source-service': 'user-service',
    'event-type': eventType,
  },
});
4. Use Batch Publishing for High Throughput
// Instead of multiple publish calls
for (const event of events) {
  await kafka.publish('main', 'events', event); // ❌ Slow
}

// Use batch publishing
await kafka.publishBatch('main', {
  topic: 'events',
  messages: events.map(e => ({ value: e })), // ✅ Fast
});
5. Integrate with Health Checks
app.get('/health/ready', async (req, res) => {
  const ready = await kafka.isReady();
  res.status(ready ? 200 : 503).json({ ready });
});
6. Handle Errors Gracefully
try {
  await kafka.publish('main', 'events', data);
} catch (error) {
  logger.error('Failed to publish to Kafka', { error, data });
  // Consider: retry logic, fallback queue, monitoring alerts
}
7. Use Multiple Producers for Different Clusters
const kafka = KafkaManager.create({
  enabled: true,
  logger,
  producers: {
    // Production events
    main: {
      brokers: ['kafka-prod.svc.cluster.local:9092'],
      clientId: 'my-service-main',
    },
    // Analytics (separate cluster)
    analytics: {
      brokers: ['kafka-analytics.svc.cluster.local:9092'],
      clientId: 'my-service-analytics',
    },
  },
});

// Critical events go to main cluster
await kafka.publish('main', 'orders', orderData);

// Analytics go to analytics cluster
await kafka.publish('analytics', 'metrics', metricsData);

Troubleshooting

Common Issues and Solutions
Bootstrap Fails with Connection Timeout

Problem:

[main] Failed to connect to Kafka brokers: Connection timeout after 5000ms

Possible causes:
  1. Brokers are unreachable: kafka:9092
  2. Network connectivity issues
  3. Firewall blocking ports

Solutions:

  1. Verify brokers are running:

    kubectl get pods -l app=kafka
  2. Test connectivity:

    nc -zv kafka-broker 9092
  3. Check network policies:

    • Ensure service can reach Kafka namespace
    • Verify firewall rules allow traffic on port 9092
  4. Increase timeout:

    await kafka.bootstrap({ timeoutMs: 30000 }); // 30 seconds
SASL Authentication Failed

Problem:

[main] Failed to connect to Kafka brokers: SASL authentication failed

Solutions:

  1. Verify credentials:

    producers: {
      main: {
        brokers: ['kafka:9092'],
        sasl: {
          mechanism: 'scram-sha-256',
          username: process.env.KAFKA_USERNAME,  // Check this
          password: process.env.KAFKA_PASSWORD,  // And this
        },
      },
    }
  2. Check mechanism:

    • Verify broker supports the SASL mechanism
    • Common mechanisms: 'plain', 'scram-sha-256', 'scram-sha-512'
  3. Inspect secrets:

    kubectl get secret kafka-credentials -o yaml
Producer Not Found Error

Problem:

Producer 'analytics' not found. Available producers: main

Solution: You're trying to use a producer that wasn't configured:

// Add the missing producer
const kafka = KafkaManager.create({
  producers: {
    main: { brokers: ['kafka:9092'] },
    analytics: { brokers: ['kafka-analytics:9092'] }, // Add this
  },
});
Readiness Check Always Fails

Problem: Health checks keep failing even though Kafka is up.

Solutions:

  1. Check timeout is sufficient:

    const kafka = KafkaManager.create({
      healthCheckTimeoutMs: 5000, // Increase if needed
    });
  2. Force revalidation:

    const ready = await kafka.isReady({ force: true });
  3. Inspect detailed health:

    const health = kafka.getHealth();
    console.log(health); // Check lastError for details
Bootstrap Succeeds but Publish Fails

Problem: Bootstrap passes, but publish throws "topic auto-create disabled".

Solution: Enable topic auto-creation or create topics manually:

producers: {
  main: {
    brokers: ['kafka:9092'],
    autoCreateTopics: true, // Enable for dev/test
  },
}

// Or create topics manually:
// kafka-topics --create --topic my-topic --bootstrap-server kafka:9092
Graceful Shutdown Not Working

Problem: Service doesn't close Kafka connections on shutdown.

Solutions:

  1. Ensure graceful shutdown is enabled:

    const kafka = KafkaManager.create({
      dontGracefulShutdown: false, // Default
      producers: { /* ... */ },
    });
  2. Manual disconnect if needed:

    process.on('SIGTERM', async () => {
      await kafka.disconnect();
      process.exit(0);
    });
Debugging Tips
Enable Debug Logging
import { KafkaManager } from '@autofleet/kafka';
import logger from './logger';

const kafka = KafkaManager.create({
  logger, // Ensure logger has debug level enabled
  producers: { /* ... */ },
});

// Check health frequently
setInterval(() => {
  const health = kafka.getHealth();
  logger.debug('Kafka health', { health });
}, 10000);
Inspect Cluster Metadata
const health = kafka.getProducerHealth('main');
console.log('Cluster ID:', health.clusterId);
console.log('Brokers:', health.brokerCount);
console.log('Last success:', new Date(health.lastPingSucceededAt));
console.log('Last error:', health.lastError);
Test Connectivity Manually
try {
  await kafka.pingProducer('main');
  console.log('✓ Connection successful');
} catch (error) {
  console.error('✗ Connection failed:', error.message);
}

Testing

Run the test suite:

pnpm test

Run tests with coverage:

pnpm run coverage
Example Test
import { describe, it, expect, beforeEach } from 'vitest';
import { KafkaManager } from '@autofleet/kafka';

describe('KafkaService', () => {
  let kafka: KafkaManager;

  beforeEach(() => {
    kafka = KafkaManager.create({
      enabled: false, // Use mock mode for tests
      producers: {
        main: {
          brokers: ['localhost:9092'],
          clientId: 'test-client',
        },
      },
    });
  });

  it('should publish event successfully in mock mode', async () => {
    const result = await kafka.publish('main', 'test-topic', {
      userId: '123',
      action: 'test',
    });

    expect(result).toBeDefined();
    expect(result[0]).toHaveProperty('topic', 'test-topic');
  });

  it('should report ready status', async () => {
    const ready = await kafka.isReady();
    expect(ready).toBe(true);
  });
});

Environment Variables

Common environment variable patterns:

# Enable/disable Kafka
ENABLE_KAFKA=true

# Kafka brokers (comma-separated)
KAFKA_BROKERS=kafka1:9092,kafka2:9092,kafka3:9092

# Client configuration
KAFKA_CLIENT_ID=my-service

Usage:

const kafka = KafkaManager.create({
  enabled: process.env.ENABLE_KAFKA === 'true',
  producers: {
    main: {
      brokers: process.env.KAFKA_BROKERS?.split(',') || ['localhost:9092'],
      clientId: process.env.KAFKA_CLIENT_ID || 'default-client',
    },
  },
});

License

Proprietary - Autofleet

Support

For issues or questions, please contact the platform team or create an issue in the repository.

Keywords