npm.io
0.0.26 • Published 8h ago

@beignet/provider-event-bus-memory

Licence
MIT
Version
0.0.26
Deps
0
Size
31 kB
Vulns
0
Weekly
1.3K

@beignet/provider-event-bus-memory

Beignet is experimental alpha software. The 0.0.x package line is for early evaluation, and APIs may change between releases while the framework settles.

In-memory EventBusPort adapter for Beignet applications.

Use it for tests, local development, and single-process apps. Distributed systems should adapt a queue, stream, outbox, or message broker behind the same EventBusPort interface.

This provider is not a durable delivery provider. It does not store attempts, compute retry backoff, or dead-letter failed handlers. Use it when losing in-flight events on process shutdown is acceptable, or put a durable outbox or queue-backed event bus behind EventBusPort for production delivery.

Install

bun add @beignet/provider-event-bus-memory @beignet/core

Direct setup

import { defineEvent } from "@beignet/core/events";
import { createInMemoryEventBus } from "@beignet/provider-event-bus-memory";
import { z } from "zod";

// Define your domain events
const UserRegistered = defineEvent("user.registered", {
  payload: z.object({
    userId: z.string(),
    email: z.string().email(),
  }),
});

// Create the event bus
const eventBus = createInMemoryEventBus();

// Subscribe to events
const unsubscribe = eventBus.subscribe(UserRegistered, (payload) => {
  console.log(`User registered: ${payload.email}`);
  // Send welcome email, update analytics, etc.
});

// Publish events
await eventBus.publish(UserRegistered, {
  userId: "123",
  email: "user@example.com",
});

// Unsubscribe when done
unsubscribe();

Framework setup

import { createNextServer } from "@beignet/next";
import { createInMemoryEventBusProvider } from "@beignet/provider-event-bus-memory";
import { appPorts } from "@/infra/app-ports";
import { routes } from "@/server/routes";

export const server = await createNextServer({
  ports: appPorts,
  providers: [createInMemoryEventBusProvider()],
  context: ({ ports }) => ({
    ports,
  }),
  routes,
});

Use createInMemoryEventBus() directly when you want to manually assign an event bus under ports.

The provider contributes ctx.ports.eventBus, the standard Beignet EventBusPort. It has no provider-specific escape hatch.

This package is optional by provider metadata. beignet doctor --strict treats an installed-but-unregistered memory event bus as an informational hint rather than a required production provider.

Instrumentation

Pass a provider instrumentation target when creating the direct event bus to record published events under the eventBus watcher:

const eventBus = createInMemoryEventBus({
  instrumentation: ports,
});

Provider instrumentation records published event names under the eventBus watcher. Payloads are not recorded.

Using in use cases

import { defineEvent } from "@beignet/core/events";
import { z } from "zod";

const OrderPlaced = defineEvent("order.placed", {
  payload: z.object({
    orderId: z.string(),
    total: z.number(),
  }),
});

// Subscribe to events in your application setup
ctx.ports.eventBus.subscribe(OrderPlaced, async (payload) => {
  // Send order confirmation email
  await ctx.ports.mailer.send({
    to: customer.email,
    subject: "Order Confirmation",
    text: `Your order ${payload.orderId} has been placed!`,
  });
});

const placeOrder = useCase
  .command("orders.place")
  .input(PlaceOrderInput)
  .output(OrderOutput)
  .emits([OrderPlaced])
  .run(async ({ ctx, input, events }) => {
    return ctx.ports.uow.transaction(async (tx) => {
      const order = await tx.orders.create(input);

      await events.record(tx.events, OrderPlaced, {
        orderId: order.id,
        total: order.total,
      });

      return order;
    });
  });

EventBus port API

publish<E>(event: E, payload: InferEventPayload<E>): Promise<void> | void

Publish a domain event with a typed payload.

await eventBus.publish(UserRegistered, {
  userId: "123",
  email: "user@example.com",
});

By default, the in-memory bus awaits handlers so local development and tests are deterministic. Handler errors are rethrown unless onHandlerError is provided. Use delivery: "fire-and-forget" when you intentionally want detached in-process delivery. Fire-and-forget delivery reports handler errors through onHandlerError when provided, but it still does not retry or dead-letter failed work.

subscribe<E>(event: E, handler: (payload) => void | Promise<void>): () => void

Subscribe to a domain event. Returns an unsubscribe function.

const unsubscribe = eventBus.subscribe(UserRegistered, (payload) => {
  console.log(`New user: ${payload.email}`);
});

// Later, when you want to stop listening:
unsubscribe();

TypeScript support

The event bus provides full type safety:

import type { EventBusPort } from "@beignet/core/ports";
import { definePorts } from "@beignet/core/ports";

// Type-safe ports definition
const appPorts = definePorts({
  eventBus: createInMemoryEventBus() as EventBusPort,
  // ... other ports
});

type AppPorts = typeof appPorts;

Testing

The in-memory event bus is perfect for testing:

import { describe, expect, it, mock } from "bun:test";

describe("User Registration", () => {
  it("should publish UserRegistered event", async () => {
    const eventBus = createInMemoryEventBus();
    const handler = mock(() => {});
    
    eventBus.subscribe(UserRegistered, handler);
    
    // Perform registration
    await registerUser(ctx, { email: "test@example.com" });
    
    expect(handler).toHaveBeenCalledWith({
      userId: expect.any(String),
      email: "test@example.com",
    });
  });
});

Behavior

  • Awaited by default: publish(...) waits for subscribed handlers unless delivery: "fire-and-forget" is configured
  • In-process: Events are only delivered within the same process
  • Memory-only: No persistence - events are lost if the process crashes
  • Order: Handlers are called in the order they were subscribed
  • Multiple handlers: Multiple handlers can subscribe to the same event
  • Error handling: Handler errors reject publish(...) unless onHandlerError is configured

Local and tests

Use this provider for local development, deterministic use-case tests, and single-process demos. Prefer it over a durable broker in tests unless the test is specifically proving retry, delivery, or worker behavior.

Deployment notes

Use the memory event bus only when in-process, best-effort delivery is acceptable. For multi-process or production workflows that must survive crashes, publish through Beignet outbox/listener workflows or implement EventBusPort over a durable broker.

When to use

Good for:

  • Single-process applications
  • Development and testing
  • Simple event-driven workflows
  • Decoupling application components

Not suitable for:

  • Distributed systems
  • Event persistence requirements
  • Guaranteed delivery needs
  • Cross-service communication

For production distributed systems, implement EventBusPort with a proper message broker.

License

MIT

Keywords