@autofleet/kafka
Internal wrapper for Apache Kafka using @platformatic/kafka, providing a production-ready interface for managing multiple Kafka producers and consumers with type-safe topic definitions. Both producers and consumers default to passwordless SASL OAUTHBEARER auth backed by Google Application Default Credentials (ADC), so no static username/password is required against Google Managed Kafka.
Table of Contents
- Installation
- Features
- Quick Start
- Typed Topics
- Initialization & Bootstrap
- Usage
- Configuration
- Advanced Features
- API Reference
- Best Practices
- Troubleshooting
- Testing
Installation
npm add @autofleet/kafkaFeatures
- Typed Topics with Zod - Define topics with Zod schemas for compile-time type safety and runtime validation
- Passwordless Auth - Producers and consumers default to SASL
OAUTHBEARER/ADC — no static credentials against Google Managed Kafka - Multi-Producer Management - Manage multiple named producers with different broker configurations
- Consumers with Handler Callbacks - Subscribe to topics with a typed handler, auto-started background loop, and a
.stop()handle (plus a rawstream()escape hatch) - Self-Healing Consumers - Background connection/rejoin errors are handled (never an
uncaughtException), classified (transient vs fatal), and recovered from automatically by reconnecting with backoff - Type-Safe APIs - Full TypeScript support with inferred types for topics and payloads
- Built-in Mock Mode - Disable Kafka entirely with automatic mock implementations
- Centralized Health Checks - Single readiness check for all producers
- Automatic Connection Management - Handles connection lifecycle automatically
- Batch Publishing - Efficient batch message sending with type safety
- Message Partitioning - Control message distribution across partitions
- Custom Headers - Attach metadata to messages
- Graceful Shutdown - Automatic cleanup on process termination
- ESM & CommonJS Compatible - Works in both ESM and CommonJS projects
- Production Ready - Built with reliability and performance in mind
Quick Start
import { KafkaManager } from '@autofleet/kafka';
import { z } from 'zod';
import logger from './logger';
// Define typed topics with Zod schemas
const kafka = KafkaManager.create({
enabled: process.env.ENABLE_KAFKA === 'true', // Built-in mock mode
logger,
producers: {
main: {
brokers: ['kafka-main.svc.cluster.local:9092'],
clientId: 'my-service-main',
},
analytics: {
brokers: ['kafka-analytics.svc.cluster.local:9092'],
clientId: 'my-service-analytics',
},
},
topics: {
'order.created': {
schema: z.object({
orderId: z.string(),
customerId: z.string(),
amount: z.number(),
items: z.array(z.object({
sku: z.string(),
quantity: z.number(),
})),
}),
producer: 'main',
},
'user.signup': {
schema: z.object({
userId: z.string(),
email: z.string().email(),
timestamp: z.number(),
}),
producer: 'analytics',
},
},
});
// Publish with full type safety and runtime validation
// TypeScript infers payload type from Zod schema!
await kafka.publish('order.created', {
orderId: 'ORD-123',
customerId: 'CUST-456',
amount: 99.99,
items: [
{ sku: 'WIDGET-1', quantity: 2 },
{ sku: 'GADGET-2', quantity: 1 },
],
});
// ✅ Type error! TypeScript knows the exact shape expected
// await kafka.publish('order.created', {
// orderId: 123, // ❌ Error: Expected string
// // ❌ Error: Missing required fields
// });
// Batch publish with type safety
await kafka.publishBatch('order.created', {
messages: [
{ value: { orderId: 'ORD-1', customerId: 'CUST-1', amount: 50, items: [] } },
{ value: { orderId: 'ORD-2', customerId: 'CUST-2', amount: 75, items: [] } },
],
});
// Use in health checks
app.get('/health/ready', async (req, res) => {
const ready = await kafka.isReady();
res.status(ready ? 200 : 503).json({ ready });
});Typed Topics
The typed topics feature provides compile-time type safety and runtime validation for Kafka messages using Zod schemas.
Benefits
- Type Safety: Payload types are automatically inferred from Zod schemas
- Runtime Validation: Messages are validated before publishing
- DX: Full autocomplete for topic names and payloads in your IDE
- Centralized: Topic definitions live alongside your Kafka configuration
- Producer Routing: Topics are automatically routed to the correct producer
Basic Usage
import { z } from 'zod';
const kafka = KafkaManager.create({
logger,
producers: {
main: { brokers: ['localhost:9092'] },
analytics: { brokers: ['localhost:9093'] },
},
topics: {
'order.created': {
schema: z.object({
orderId: z.string().uuid(),
customerId: z.string().uuid(),
amount: z.number().positive(),
currency: z.enum(['USD', 'EUR', 'GBP']),
}),
producer: 'main', // Routes to 'main' producer
},
'user.activity': {
schema: z.object({
userId: z.string(),
action: z.string(),
metadata: z.record(z.unknown()).optional(),
}),
producer: 'analytics', // Routes to 'analytics' producer
},
},
});
// ✅ Fully typed - TypeScript knows the exact shape
await kafka.publish('order.created', {
orderId: '550e8400-e29b-41d4-a716-446655440000',
customerId: '6ba7b810-9dad-11d1-80b4-00c04fd430c8',
amount: 99.99,
currency: 'USD',
});
// ❌ Type error: Wrong type for 'amount'
// await kafka.publish('order.created', {
// orderId: '550e8400-e29b-41d4-a716-446655440000',
// customerId: '6ba7b810-9dad-11d1-80b4-00c04fd430c8',
// amount: '99.99', // Error: Expected number, got string
// currency: 'USD',
// });
// ❌ Runtime validation error: Invalid enum value
// await kafka.publish('order.created', {
// orderId: '550e8400-e29b-41d4-a716-446655440000',
// customerId: '6ba7b810-9dad-11d1-80b4-00c04fd430c8',
// amount: 99.99,
// currency: 'JPY', // Error: Invalid enum value (not in ['USD', 'EUR', 'GBP'])
// });Complex Schemas
Support for nested objects, arrays, and advanced Zod features:
const kafka = KafkaManager.create({
logger,
producers: {
main: { brokers: ['localhost:9092'] },
},
topics: {
'order.created': {
schema: z.object({
orderId: z.string(),
customer: z.object({
id: z.string(),
email: z.string().email(),
name: z.string().min(1),
}),
items: z.array(z.object({
sku: z.string(),
quantity: z.number().positive(),
price: z.number().positive(),
})).min(1),
total: z.number().positive(),
metadata: z.record(z.unknown()).optional(),
}),
producer: 'main',
},
},
});
await kafka.publish('order.created', {
orderId: '123',
customer: {
id: '456',
email: 'test@example.com',
name: 'John Doe',
},
items: [
{ sku: 'ABC', quantity: 2, price: 10.00 },
{ sku: 'DEF', quantity: 1, price: 20.00 },
],
total: 40.00,
metadata: { source: 'web' },
});Skipping Validation
For advanced use cases, you can skip runtime validation:
await kafka.publish('order.created', payload, {
skipValidation: true, // Skip Zod validation
});
// Also works for batch publishing
await kafka.publishBatch('order.created', {
messages: [{ value: payload1 }, { value: payload2 }],
skipValidation: true,
});Initialization & Bootstrap
Lifecycle Overview
The @autofleet/kafka package follows a clear, predictable lifecycle:
┌─────────────┐ ┌──────────────┐ ┌──────────────┐ ┌────────────┐
│ create() │────▶│ bootstrap() │────▶│ publish/use │────▶│disconnect()│
│ (sync) │ │ (async) │ │ (async) │ │ (async) │
└─────────────┘ └──────────────┘ └──────────────┘ └────────────┘
Instant Validates Runtime Cleanup
construction connectivity operations
1. create() - Synchronous Construction
- Creates the KafkaManager instance
- Sets up configuration
- Does NOT connect to Kafka yet
- Returns immediately
2. bootstrap() - Async Initialization
- Explicitly connects all producers to Kafka
- Validates broker connectivity
- Fetches cluster metadata
- Fails fast with detailed diagnostics
- Recommended before serving traffic
3. Runtime Operations
publish(),publishBatch()work normally- Lazy initialization supported (producers connect on first use)
- Health checks available
4. disconnect() - Cleanup
- Gracefully closes all connections
- Automatic on SIGTERM/SIGINT
Production Bootstrap Pattern
Recommended production service initialization:
import { KafkaManager } from '@autofleet/kafka';
import logger from './logger';
// 1. Synchronous construction
export const kafka = KafkaManager.create({
enabled: process.env.ENABLE_KAFKA === 'true',
logger,
bootstrapTimeoutMs: 15000, // 15s bootstrap timeout
healthCheckCacheMs: 2000, // Cache health for 2s
strictBootstrap: true, // Fail if ANY producer fails
producers: {
main: {
brokers: process.env.KAFKA_BROKERS?.split(',') || [],
clientId: 'my-service-main',
},
analytics: {
brokers: process.env.ANALYTICS_BROKERS?.split(',') || [],
clientId: 'my-service-analytics',
},
},
});
// 2. Bootstrap before serving traffic
async function startServer() {
logger.info('Bootstrapping Kafka...');
try {
const result = await kafka.bootstrap();
logger.info('Kafka bootstrap successful', {
duration: result.duration,
producers: Object.keys(result.results),
});
} catch (error) {
logger.error('Fatal: Kafka bootstrap failed', { error });
process.exit(1); // Fail fast in production
}
// 3. Start HTTP server only after Kafka is ready
app.listen(3000, () => {
logger.info('Server ready - Kafka connected');
});
}
startServer();Development Bootstrap Pattern
For development/local environments with optional Kafka:
async function startServer() {
// Try to bootstrap, but don't fail if Kafka unavailable
try {
await kafka.bootstrap({
timeoutMs: 5000, // Shorter timeout for dev
strict: false, // Allow partial failures
});
logger.info('Kafka connected');
} catch (error) {
logger.warn('Kafka unavailable - using lazy initialization', { error });
// Server still starts, producers will retry on first publish
}
app.listen(3000, () => {
logger.info('Server ready');
});
}Bootstrap Options
Strict Mode (Default)
Bootstrap fails if any producer fails to connect:
await kafka.bootstrap(); // strict: true by default
// Throws if any producer fails
// Perfect for production where all clusters must be availableNon-Strict Mode
Bootstrap succeeds if at least one producer connects:
const result = await kafka.bootstrap({
strict: false,
});
if (!result.success) {
logger.warn('Some producers failed:', result.results);
}
// Check which ones failed
const failed = Object.entries(result.results)
.filter(([_, r]) => !r.success)
.map(([name]) => name);
logger.warn('Failed producers:', failed);
// Continue - failed producers use lazy initializationSelective Bootstrap
Bootstrap specific producers only:
// Only bootstrap critical producer
await kafka.bootstrap({
producers: ['main'],
timeoutMs: 10000,
});
// Analytics producer will use lazy initializationCustom Timeouts
await kafka.bootstrap({
timeoutMs: 20000, // 20 second timeout
strict: true,
});Usage
Setup with Multiple Producers
Create a kafka.ts file in your service:
import { KafkaManager } from '@autofleet/kafka';
import logger from './logger';
const ENABLE_KAFKA = process.env.ENABLE_KAFKA === 'true';
// No await needed - initialization is synchronous!
// Producers connect lazily on first publish
export const kafka = KafkaManager.create({
enabled: ENABLE_KAFKA,
logger,
producers: {
// Primary producer for main events
main: {
brokers: ['kafka-main.svc.cluster.local:9092'],
clientId: 'driver-ms-main',
autoCreateTopics: true,
},
// Secondary producer for analytics
analytics: {
brokers: ['kafka-analytics.svc.cluster.local:9092'],
clientId: 'driver-ms-analytics',
},
},
});
// Define your topics
export const TOPICS = {
DRIVER_CONSENT_V1: 'backend.driver.consent.v1',
DRIVER_ASSIGNED_V1: 'backend.driver.vehicle.assigned.v1',
DRIVER_UNASSIGNED_V1: 'backend.driver.vehicle.unassigned.v1',
} as const;Publishing Messages
Use the Kafka manager directly throughout your service with full type safety:
import { kafka, TOPICS } from './kafka';
// Publish directly
// TypeScript validates producer names: 'main' | 'analytics'
await kafka.publish('main', TOPICS.DRIVER_CONSENT_V1, {
state: 'accepted',
driverId: '123',
fleetId: '456',
});
// With options - autocomplete works!
await kafka.publish(
'main', // TypeScript autocompletes 'main' | 'analytics'
TOPICS.DRIVER_ASSIGNED_V1,
{
driverId: '123',
vehicleId: '789',
},
{
key: `driver-123`, // Ensures ordering per driver
headers: {
'correlation-id': requestId,
},
}
);
// TypeScript will error if you use a non-existent producer
// await kafka.publish('wrong', TOPICS.DRIVER_CONSENT_V1, {}); // ❌ Error!Batch Publishing
await kafka.publishBatch('main', {
topic: TOPICS.DRIVER_CONSENT_V1,
messages: [
{ value: { driverId: '123', state: 'accepted' } },
{ value: { driverId: '456', state: 'rejected' } },
{ value: { driverId: '789', state: 'pending' } },
],
});Consuming Messages
Configure named consumers alongside producers. Each consumer requires a groupId, and — just like producers — defaults to passwordless OAUTHBEARER/ADC auth (no sasl block needed against Google Managed Kafka).
import { KafkaManager } from '@autofleet/kafka';
import { z } from 'zod';
import logger from './logger';
export const kafka = KafkaManager.create({
enabled: process.env.ENABLE_KAFKA === 'true',
logger,
producers: {
main: { brokers: ['kafka:9092'] },
},
consumers: {
// No `sasl` block -> passwordless OAUTHBEARER/ADC auth
reader: {
brokers: ['kafka:9092'],
groupId: 'my-service-readers',
},
},
topics: {
'order.created': {
schema: z.object({ orderId: z.string(), amount: z.number() }),
producer: 'main',
},
},
});Handler callback + stop()
consume() auto-starts a background loop and returns a subscription handle. When a schema applies (the registered topic schema, or a per-call options.schema), each value is validated and the handler receives the parsed value (coercions/defaults/transforms applied), typed accordingly.
const subscription = await kafka.consume('reader', ['order.created'], async (msg) => {
// msg.value is parsed JSON and validated against the topic schema
console.log(msg.topic, msg.partition, msg.offset, msg.value);
});
// Later, stop consuming and close the stream
await subscription.stop();The handler receives a KafkaMessage<T>:
interface KafkaMessage<T> {
topic: string;
partition: number;
offset: string; // stringified to avoid BigInt issues
key: string | null;
value: T; // JSON-parsed (and schema-validated when applicable)
rawValue: string; // original unparsed value
headers: Record<string, string>;
timestamp: number | null; // producer-side timestamp from the x-timestamp header
}Manual commit on success
By default autocommit is true. In this default mode, offsets advance on the client's own timer regardless of whether your handler succeeded — so if the handler throws, the message is NOT redelivered and is effectively lost (at-most-once). Set autocommit: false for at-least-once delivery: the offset is committed only after your handler resolves successfully, so a thrown handler leaves the message uncommitted and it is redelivered.
const subscription = await kafka.consume('reader', ['order.created'], async (msg) => {
await processOrder(msg.value); // if this throws, the offset is NOT committed
}, { autocommit: false });Inbound-only topics (schema override)
For topics you only consume (not registered in topics), pass a per-call schema — the handler value type is inferred as z.infer<schema>:
await kafka.consume('reader', ['external.event'], async (msg) => {
console.log(msg.value.externalId); // typed + validated
}, { schema: z.object({ externalId: z.string() }) });Single-topic subscriptions get clean
z.infertyping from the registry. Multi-topic subscriptions resolve tounknown(provide your own type parameter and skip auto-validation).
Validation behavior (invalid messages are skipped, not retried)
When a schema applies, validation happens in the consume loop. An invalid message is logged, committed (to advance past it), and skipped — it is never thrown or passed to the handler. This is deliberate: bad data never becomes valid on a retry, so throwing would only cause a redelivery loop. The distinction:
- Invalid payload (data error) → logged as
Message failed schema validation - skipping, offset committed, handler not called. - Handler throws (processing error, e.g. a downstream outage) → with
autocommit: falsethe offset is not committed so the message is redelivered; with the defaultautocommit: truethe offset advances anyway and the message is lost (logged as... possible loss).
Pass skipValidation: true to bypass validation entirely (the handler then receives the raw JSON-parsed value). There is intentionally no "throw on invalid" mode — use skipValidation + your own in-handler checks if you need custom handling (e.g. routing to a dead-letter topic).
Self-healing (auto-reconnect)
Consumers recover automatically from background failures. The underlying
@platformatic/kafka client is
an EventEmitter; a background failure (e.g. a heartbeat-triggered rejoin that
exhausts its retries on a transient broker disconnect) is emitted as an 'error'
event. The wrapper:
- Never lets it become an
uncaughtException— it always handles the event. - Classifies it by walking the error's
causechain: a transient network blip (a socket closed mid-SASL-handshake — which platformatic mislabels as a fatalAuthenticationError: "Cannot find a suitable SASL mechanism."whose root cause isNetworkError: "Connection closed") is logged atwarn; a genuine fatal error (e.g. bad credentials) aterror. - Re-consumes: it recreates the client and rejoins the group with exponential backoff, so the consumer keeps running instead of dying silently.
Tune via the consumer's reconnect config (all optional):
consumers: {
reader: {
brokers: ['kafka:9092'],
groupId: 'my-group',
reconnect: {
maxAttempts: 0, // 0 = unlimited (default)
baseBackoffMs: 1000, // exponential base (default)
maxBackoffMs: 30000, // backoff cap (default)
},
},
},getHealth() exposes reconnectAttempts and lastReconnectAt for observability.
Producers attach the same 'error' listener (so a background producer error can't
crash the process either) but have no restart loop — they reconnect lazily on the
next publish()/ping().
Raw stream escape hatch
For full control, open the underlying @platformatic/kafka MessagesStream:
const stream = await kafka.stream('reader', ['order.created']);
for await (const message of stream) {
// raw platformatic message
}
await stream.close();Mock mode
When enabled: false, consumers become mocks: consume() returns a no-op subscription whose handler is never invoked, and all methods log Mock mode - skipping ....
Mock Mode (Disable Kafka)
When enabled: false, all producers become mocks automatically:
const kafka = KafkaManager.create({
enabled: false, // or process.env.ENABLE_KAFKA !== 'true'
logger,
producers: {
main: {
brokers: ['kafka:9092'],
clientId: 'my-service',
},
},
});
// This will log a debug message but not actually publish
await kafka.publish('main', 'topic', { data: 'test' });
// Output: "Kafka: [main] Mock mode - skipping publish to topic: topic"Health Checks & Readiness Probes
The package provides comprehensive health check APIs suitable for Kubernetes probes:
Readiness Probe
Checks if Kafka is ready to handle traffic (connects to brokers):
import { kafka } from './kafka';
app.get('/health/ready', async (req, res) => {
const ready = await kafka.isReady();
if (!ready) {
const health = kafka.getHealth();
return res.status(503).json({
ready: false,
kafka: health,
});
}
res.json({ ready: true });
});Readiness check features:
- Revalidates connectivity if cache is stale
- Configurable cache duration (default: 1 second)
- Force revalidation with
isReady({ force: true }) - Respects
healthCheckTimeoutMsconfiguration
Liveness Probe
Lightweight check for process health (does NOT connect to Kafka):
app.get('/health/live', (req, res) => {
const live = kafka.isLive();
res.status(live ? 200 : 503).json({ live });
});Liveness check:
- Returns
falseif manager is in a fatal state - Returns
falseif graceful shutdown has started - Does NOT perform Kafka operations (very fast)
- Perfect for Kubernetes liveness probes
Detailed Health Snapshot
Get comprehensive health information for all producers:
app.get('/status/kafka', (req, res) => {
const health = kafka.getHealth();
res.json({
producers: health,
enabled: kafka.isEnabled,
live: kafka.isLive(),
});
});
// Example response:
{
"producers": {
"main": {
"name": "main",
"enabled": true,
"isConnected": true,
"lastPingAt": 1701234567890,
"lastPingSucceededAt": 1701234567890,
"lastError": null,
"clusterId": "prod-kafka-main-01",
"brokerCount": 3,
"brokers": ["kafka1:9092", "kafka2:9092", "kafka3:9092"]
},
"analytics": {
"name": "analytics",
"enabled": true,
"isConnected": false,
"lastPingAt": 1701234560000,
"lastPingSucceededAt": 1701234500000,
"lastError": {
"message": "Connection refused",
"timestamp": 1701234560000
},
"clusterId": null,
"brokerCount": 0,
"brokers": ["analytics-kafka:9092"]
}
},
"enabled": true,
"live": true
}Connection Status
Get quick connection status summary:
const status = kafka.getConnectionStatus();
// Returns:
{
"main": {
"connected": true,
"lastSuccessAt": 1701234567890,
"lastError": null
},
"analytics": {
"connected": false,
"lastSuccessAt": 1701234500000,
"lastError": "Connection refused"
}
}Kubernetes Example
apiVersion: v1
kind: Pod
metadata:
name: my-service
spec:
containers:
- name: app
image: my-service:latest
livenessProbe:
httpGet:
path: /health/live
port: 3000
initialDelaySeconds: 10
periodSeconds: 10
readinessProbe:
httpGet:
path: /health/ready
port: 3000
initialDelaySeconds: 15
periodSeconds: 5
timeoutSeconds: 3Configuration
KafkaManagerOptions
interface KafkaManagerOptions {
// Enable/disable Kafka - when false, returns mock implementations
// @default true
enabled?: boolean;
// Custom logger instance
logger?: LoggerInstanceManager;
// Skip automatic graceful shutdown
// @default false
dontGracefulShutdown?: boolean;
// Named producers configuration
producers?: Record<string, ProducerConfig>;
// Named consumers configuration
consumers?: Record<string, ConsumerConfig>;
// Health & Bootstrap Options
// Global timeout for health checks in ms
// Used by isReady() and ping operations
// @default 5000 (5 seconds)
healthCheckTimeoutMs?: number;
// How long to cache health check results in ms
// Set to 0 to always revalidate
// @default 1000 (1 second)
healthCheckCacheMs?: number;
// Default timeout for bootstrap operations in ms
// Can be overridden per bootstrap() call
// @default 30000 (30 seconds)
bootstrapTimeoutMs?: number;
// Default strict mode for bootstrap
// If true, all producers must succeed
// If false, at least one producer must succeed
// @default true
strictBootstrap?: boolean;
}ProducerConfig
interface ProducerConfig {
// Array of Kafka broker addresses (required)
brokers: string[];
// Client ID for this producer (optional)
clientId?: string;
// SASL authentication options (optional)
sasl?: {
mechanism: 'plain' | 'scram-sha-256' | 'scram-sha-512';
username: string;
password: string;
};
// Automatically create topics if they don't exist (default: false)
autoCreateTopics?: boolean;
}If
saslis omitted, the producer defaults to passwordlessOAUTHBEARER/ADC auth.
ConsumerConfig
| Option | Type | Required | Default | Description |
|---|---|---|---|---|
brokers |
string[] |
— | Kafka broker addresses | |
groupId |
string |
— | Consumer group ID | |
clientId |
string |
autofleet-kafka-consumer-<name> |
Client ID for this consumer | |
sasl |
ConnectionOptions['sasl'] |
passwordless OAUTHBEARER/ADC |
SASL authentication options | |
tls |
ConnectionOptions['tls'] |
{} |
TLS configuration options | |
oauthBearerConfig |
OauthBearerProviderOptions |
{ logger } |
OAUTHBEARER token provider options (ADC) | |
sessionTimeout |
number |
platformatic default | Session timeout in ms | |
heartbeatInterval |
number |
platformatic default | Heartbeat interval in ms |
ConsumeOptions
| Option | Type | Default | Description |
|---|---|---|---|
autocommit |
boolean |
true |
When false, offsets are committed only after the handler resolves successfully |
sessionTimeout |
number |
consumer config | Override session timeout for this subscription |
heartbeatInterval |
number |
consumer config | Override heartbeat interval for this subscription |
skipValidation |
boolean |
false |
Skip schema validation entirely (handler receives the raw JSON-parsed value) |
schema |
z.ZodType |
registered topic schema | Schema to validate against (overrides the registered topic schema). On failure the message is logged, committed, and skipped; on success the handler receives the parsed value |
Consumers are intentionally excluded from
isReady()so a consumer-group rebalance never flips a pod to not-ready. UsegetConsumerHealth()/pingConsumer()for consumer-specific status.
Advanced Features
Message Keys for Partitioning
Use message keys to control partitioning and maintain order:
// All messages for the same user go to the same partition
await kafka.publish('main', 'user-events', event, {
key: `user-${userId}`,
});
// All messages for the same order go to the same partition
await kafka.publish('main', 'order-events', event, {
key: `order-${orderId}`,
});Why use keys?
- Ensures messages with the same key are ordered
- Enables partition-level parallelism
- Supports stateful stream processing
Custom Headers
Headers are useful for metadata and message routing:
await kafka.publish('main', 'events', data, {
headers: {
// Correlation ID for distributed tracing
'correlation-id': correlationId,
// Source service
'source': 'user-service',
// Message schema version
'schema-version': '2.0',
// Custom application headers
'tenant-id': 'tenant-123',
},
});Partition Control
Direct partition control for advanced use cases:
// Send to specific partition
await kafka.publish('main', 'events', data, {
partition: 0, // Always send to partition 0
});
// Balance across partitions using keys
const partitionKey = `${customerId % 10}`;
await kafka.publish('main', 'events', data, {
key: partitionKey,
});API Reference
Core Lifecycle Methods
KafkaManager.create(options): KafkaManager<ProducerNames>
Creates a new KafkaManager instance with multiple named producers. Producer names are type-safe - TypeScript will autocomplete and validate them based on your configuration.
Parameters:
options- Configuration options (see Configuration)
Returns: A KafkaManager<ProducerNames> instance where ProducerNames are the keys from your producers config
Example:
const kafka = KafkaManager.create({
enabled: true,
logger,
producers: {
main: {
brokers: ['kafka:9092'],
clientId: 'my-service',
},
analytics: {
brokers: ['kafka-analytics:9092'],
clientId: 'my-service-analytics',
},
},
});
// Type: KafkaManager<'main' | 'analytics'>
// TypeScript knows the valid producer names!
// Producers connect automatically on first publish
await kafka.publish('main', 'my-topic', { data: 'test' }); // ✅ Valid
await kafka.publish('analytics', 'metrics', { value: 1 }); // ✅ Valid
// await kafka.publish('wrong', 'topic', {}); // ❌ TypeScript error!bootstrap(options?): Promise<BootstrapResult>
Explicitly bootstrap all (or specific) producers. Connects to brokers, validates connectivity, and returns detailed results. Recommended before serving traffic in production.
Parameters:
options.timeoutMs(optional) - Maximum time to wait for all producers (default:bootstrapTimeoutMsconfig)options.strict(optional) - Fail if ANY producer fails (default:strictBootstrapconfig)options.producers(optional) - Array of specific producer names to bootstrap (default: all)
Returns: BootstrapResult with success status, duration, and per-producer results
Examples:
// Bootstrap all producers with defaults
await kafka.bootstrap();
// Non-strict mode - continue if some fail
const result = await kafka.bootstrap({
timeoutMs: 10000,
strict: false,
});
if (!result.success) {
console.error('Some producers failed:', result.results);
}
// Bootstrap specific producers only
await kafka.bootstrap({ producers: ['main'] });Health & Monitoring Methods
getHealth(): Record<ProducerNames, ProducerHealth>
Get detailed health snapshot for all producers. Returns comprehensive metadata including connection state, timestamps, errors, and cluster info.
Returns: Object mapping producer names to ProducerHealth objects
Example:
const health = kafka.getHealth();
console.log(health.main.clusterId); // 'prod-kafka-01'
console.log(health.main.lastPingSucceededAt);// 1701234567890
console.log(health.main.lastError); // null or error objectgetProducerHealth(name): ProducerHealth
Get detailed health for a specific producer.
Parameters:
name- Producer name (type-safe)
Returns: ProducerHealth object
Example:
const health = kafka.getProducerHealth('main');
console.log('Cluster:', health.clusterId);
console.log('Brokers:', health.brokerCount);isReady(options?): Promise<boolean>
Check if Kafka is ready to handle traffic. Revalidates connectivity if cache is stale. Use for Kubernetes readiness probes.
Parameters:
options.timeout(optional) - Override default health check timeoutoptions.force(optional) - Force revalidation, ignore cache
Returns: true if all producers are connected, false otherwise
Examples:
// Use cached result if fresh
const ready = await kafka.isReady();
// Force immediate revalidation
const ready = await kafka.isReady({ force: true });
// Custom timeout
const ready = await kafka.isReady({ timeout: 10000 });isLive(): boolean
Lightweight liveness check - does NOT perform Kafka operations. Only checks internal state for fatal errors. Perfect for Kubernetes liveness probes.
Returns: false if manager is in fatal state or graceful shutdown has started
Example:
app.get('/health/live', (req, res) => {
res.status(kafka.isLive() ? 200 : 503).json({ live: kafka.isLive() });
});getConnectionStatus(): Record<ProducerNames, ConnectionStatus>
Get connection status for all producers with timestamps and errors.
Returns: Object mapping producer names to connection status
Example:
const status = kafka.getConnectionStatus();
// {
// main: {
// connected: true,
// lastSuccessAt: 1701234567890,
// lastError: null
// }
// }Publishing Methods
publish<T>(producerName, topic, value, options?): Promise<RecordMetadata[]>
Publishes a single message to a topic using a named producer. Producer name is type-safe - must match one of the configured producers.
Parameters:
producerName- Name of the producer to use (type-safe: only accepts configured producer names)topic- Topic namevalue- Message value (will be JSON stringified)options(optional) - Publish options
Returns: Array of record metadata (partition, offset, etc.)
Example:
// TypeScript autocompletes and validates producer names
const metadata = await kafka.publish('main', 'events', { id: 1, data: 'test' }, {
key: 'key-1',
headers: { type: 'create' },
});
// Type error if using non-existent producer
// await kafka.publish('nonexistent', 'topic', {}); // ❌ Error!publishBatch(producerName, options): Promise<RecordMetadata[]>
Publishes multiple messages in a batch using a named producer. Producer name is type-safe.
Parameters:
producerName- Name of the producer to use (type-safe: only accepts configured producer names)options.topic- Topic nameoptions.messages- Array of messages with values, keys, headers, etc.
Returns: Array of record metadata for each message
Example:
// TypeScript validates 'main' is a configured producer
const metadata = await kafka.publishBatch('main', {
topic: 'events',
messages: [
{ value: { id: 1 }, key: 'key-1' },
{ value: { id: 2 }, key: 'key-2' },
],
});isReady(): Promise<boolean>
Check if all producers are connected and ready. Useful for health checks.
Example:
const ready = await kafka.isReady();
if (!ready) {
throw new Error('Kafka not ready');
}getConnectionStatus(): Record<ProducerNames, boolean>
Get connection status for all producers. Return type is type-safe based on your configuration.
Example:
const status = kafka.getConnectionStatus();
// Type: { main: boolean; analytics: boolean }
// { main: true, analytics: false }ping(): Promise<void>
Ping all producers to verify connectivity.
Example:
await kafka.ping(); // Throws if any producer fails to connectpingProducer(name): Promise<void>
Ping a specific producer to verify connectivity. Producer name is type-safe.
Example:
await kafka.pingProducer('main'); // ✅ Valid
// await kafka.pingProducer('invalid'); // ❌ Type error!disconnect(): Promise<void>
Disconnects all producers and cleans up resources.
Example:
await kafka.disconnect();disconnectProducer(name): Promise<void>
Disconnects a specific producer and cleans up resources. Producer name is type-safe.
Example:
await kafka.disconnectProducer('main'); // ✅ Valid
// await kafka.disconnectProducer('invalid'); // ❌ Type error!Best Practices
1. Use Centralized Kafka Configuration
Define your Kafka instance and topics in one place:
// src/kafka.ts
import { KafkaManager } from '@autofleet/kafka';
import logger from './logger';
export const kafka = KafkaManager.create({
enabled: process.env.ENABLE_KAFKA === 'true',
logger,
producers: {
main: {
brokers: process.env.KAFKA_BROKERS?.split(',') || ['localhost:9092'],
clientId: 'my-service',
autoCreateTopics: process.env.NODE_ENV === 'development',
},
},
});
export const TOPICS = {
USER_EVENTS: 'user-events',
ORDER_EVENTS: 'order-events',
} as const;2. Use Message Keys for Ordering
// Ensure all events for a user are processed in order
await kafka.publish('main', TOPICS.USER_EVENTS, event, {
key: `user-${userId}`,
});3. Add Metadata with Headers
await kafka.publish('main', 'events', data, {
headers: {
'correlation-id': requestId,
'source-service': 'user-service',
'event-type': eventType,
},
});4. Use Batch Publishing for High Throughput
// Instead of multiple publish calls
for (const event of events) {
await kafka.publish('main', 'events', event); // ❌ Slow
}
// Use batch publishing
await kafka.publishBatch('main', {
topic: 'events',
messages: events.map(e => ({ value: e })), // ✅ Fast
});5. Integrate with Health Checks
app.get('/health/ready', async (req, res) => {
const ready = await kafka.isReady();
res.status(ready ? 200 : 503).json({ ready });
});6. Handle Errors Gracefully
try {
await kafka.publish('main', 'events', data);
} catch (error) {
logger.error('Failed to publish to Kafka', { error, data });
// Consider: retry logic, fallback queue, monitoring alerts
}7. Use Multiple Producers for Different Clusters
const kafka = KafkaManager.create({
enabled: true,
logger,
producers: {
// Production events
main: {
brokers: ['kafka-prod.svc.cluster.local:9092'],
clientId: 'my-service-main',
},
// Analytics (separate cluster)
analytics: {
brokers: ['kafka-analytics.svc.cluster.local:9092'],
clientId: 'my-service-analytics',
},
},
});
// Critical events go to main cluster
await kafka.publish('main', 'orders', orderData);
// Analytics go to analytics cluster
await kafka.publish('analytics', 'metrics', metricsData);Troubleshooting
Common Issues and Solutions
Bootstrap Fails with Connection Timeout
Problem:
[main] Failed to connect to Kafka brokers: Connection timeout after 5000ms
Possible causes:
1. Brokers are unreachable: kafka:9092
2. Network connectivity issues
3. Firewall blocking ports
Solutions:
Verify brokers are running:
kubectl get pods -l app=kafkaTest connectivity:
nc -zv kafka-broker 9092Check network policies:
- Ensure service can reach Kafka namespace
- Verify firewall rules allow traffic on port 9092
Increase timeout:
await kafka.bootstrap({ timeoutMs: 30000 }); // 30 seconds
SASL Authentication Failed
Problem:
[main] Failed to connect to Kafka brokers: SASL authentication failed
Solutions:
Verify credentials:
producers: { main: { brokers: ['kafka:9092'], sasl: { mechanism: 'scram-sha-256', username: process.env.KAFKA_USERNAME, // Check this password: process.env.KAFKA_PASSWORD, // And this }, }, }Check mechanism:
- Verify broker supports the SASL mechanism
- Common mechanisms:
'plain','scram-sha-256','scram-sha-512'
Inspect secrets:
kubectl get secret kafka-credentials -o yaml
Producer Not Found Error
Problem:
Producer 'analytics' not found. Available producers: main
Solution: You're trying to use a producer that wasn't configured:
// Add the missing producer
const kafka = KafkaManager.create({
producers: {
main: { brokers: ['kafka:9092'] },
analytics: { brokers: ['kafka-analytics:9092'] }, // Add this
},
});Readiness Check Always Fails
Problem: Health checks keep failing even though Kafka is up.
Solutions:
Check timeout is sufficient:
const kafka = KafkaManager.create({ healthCheckTimeoutMs: 5000, // Increase if needed });Force revalidation:
const ready = await kafka.isReady({ force: true });Inspect detailed health:
const health = kafka.getHealth(); console.log(health); // Check lastError for details
Bootstrap Succeeds but Publish Fails
Problem: Bootstrap passes, but publish throws "topic auto-create disabled".
Solution: Enable topic auto-creation or create topics manually:
producers: {
main: {
brokers: ['kafka:9092'],
autoCreateTopics: true, // Enable for dev/test
},
}
// Or create topics manually:
// kafka-topics --create --topic my-topic --bootstrap-server kafka:9092Graceful Shutdown Not Working
Problem: Service doesn't close Kafka connections on shutdown.
Solutions:
Ensure graceful shutdown is enabled:
const kafka = KafkaManager.create({ dontGracefulShutdown: false, // Default producers: { /* ... */ }, });Manual disconnect if needed:
process.on('SIGTERM', async () => { await kafka.disconnect(); process.exit(0); });
Debugging Tips
Enable Debug Logging
import { KafkaManager } from '@autofleet/kafka';
import logger from './logger';
const kafka = KafkaManager.create({
logger, // Ensure logger has debug level enabled
producers: { /* ... */ },
});
// Check health frequently
setInterval(() => {
const health = kafka.getHealth();
logger.debug('Kafka health', { health });
}, 10000);Inspect Cluster Metadata
const health = kafka.getProducerHealth('main');
console.log('Cluster ID:', health.clusterId);
console.log('Brokers:', health.brokerCount);
console.log('Last success:', new Date(health.lastPingSucceededAt));
console.log('Last error:', health.lastError);Test Connectivity Manually
try {
await kafka.pingProducer('main');
console.log('✓ Connection successful');
} catch (error) {
console.error('✗ Connection failed:', error.message);
}Testing
Run the test suite:
pnpm testRun tests with coverage:
pnpm run coverageExample Test
import { describe, it, expect, beforeEach } from 'vitest';
import { KafkaManager } from '@autofleet/kafka';
describe('KafkaService', () => {
let kafka: KafkaManager;
beforeEach(() => {
kafka = KafkaManager.create({
enabled: false, // Use mock mode for tests
producers: {
main: {
brokers: ['localhost:9092'],
clientId: 'test-client',
},
},
});
});
it('should publish event successfully in mock mode', async () => {
const result = await kafka.publish('main', 'test-topic', {
userId: '123',
action: 'test',
});
expect(result).toBeDefined();
expect(result[0]).toHaveProperty('topic', 'test-topic');
});
it('should report ready status', async () => {
const ready = await kafka.isReady();
expect(ready).toBe(true);
});
});Environment Variables
Common environment variable patterns:
# Enable/disable Kafka
ENABLE_KAFKA=true
# Kafka brokers (comma-separated)
KAFKA_BROKERS=kafka1:9092,kafka2:9092,kafka3:9092
# Client configuration
KAFKA_CLIENT_ID=my-serviceUsage:
const kafka = KafkaManager.create({
enabled: process.env.ENABLE_KAFKA === 'true',
producers: {
main: {
brokers: process.env.KAFKA_BROKERS?.split(',') || ['localhost:9092'],
clientId: process.env.KAFKA_CLIENT_ID || 'default-client',
},
},
});License
Proprietary - Autofleet
Support
For issues or questions, please contact the platform team or create an issue in the repository.