@beignet/provider-event-bus-memory
Beignet is experimental alpha software. The
0.0.xpackage line is for early evaluation, and APIs may change between releases while the framework settles.
In-memory EventBusPort adapter for Beignet applications.
Use it for tests, local development, and single-process apps. Distributed
systems should adapt a queue, stream, outbox, or message broker behind the same
EventBusPort interface.
This provider is not a durable delivery provider. It does not store attempts,
compute retry backoff, or dead-letter failed handlers. Use it when losing
in-flight events on process shutdown is acceptable, or put a durable outbox or
queue-backed event bus behind EventBusPort for production delivery.
Install
bun add @beignet/provider-event-bus-memory @beignet/coreDirect setup
import { defineEvent } from "@beignet/core/events";
import { createInMemoryEventBus } from "@beignet/provider-event-bus-memory";
import { z } from "zod";
// Define your domain events
const UserRegistered = defineEvent("user.registered", {
payload: z.object({
userId: z.string(),
email: z.string().email(),
}),
});
// Create the event bus
const eventBus = createInMemoryEventBus();
// Subscribe to events
const unsubscribe = eventBus.subscribe(UserRegistered, (payload) => {
console.log(`User registered: ${payload.email}`);
// Send welcome email, update analytics, etc.
});
// Publish events
await eventBus.publish(UserRegistered, {
userId: "123",
email: "user@example.com",
});
// Unsubscribe when done
unsubscribe();Framework setup
import { createNextServer } from "@beignet/next";
import { createInMemoryEventBusProvider } from "@beignet/provider-event-bus-memory";
import { appPorts } from "@/infra/app-ports";
import { routes } from "@/server/routes";
export const server = await createNextServer({
ports: appPorts,
providers: [createInMemoryEventBusProvider()],
context: ({ ports }) => ({
ports,
}),
routes,
});Use createInMemoryEventBus() directly when you want to manually assign an
event bus under ports.
The provider contributes ctx.ports.eventBus, the standard Beignet
EventBusPort. It has no provider-specific escape hatch.
This package is optional by provider metadata. beignet doctor --strict treats
an installed-but-unregistered memory event bus as an informational hint rather
than a required production provider.
Instrumentation
Pass a provider instrumentation target when creating the direct event bus to
record published events under the eventBus watcher:
const eventBus = createInMemoryEventBus({
instrumentation: ports,
});Provider instrumentation records published event names under the eventBus
watcher. Payloads are not recorded.
Using in use cases
import { defineEvent } from "@beignet/core/events";
import { z } from "zod";
const OrderPlaced = defineEvent("order.placed", {
payload: z.object({
orderId: z.string(),
total: z.number(),
}),
});
// Subscribe to events in your application setup
ctx.ports.eventBus.subscribe(OrderPlaced, async (payload) => {
// Send order confirmation email
await ctx.ports.mailer.send({
to: customer.email,
subject: "Order Confirmation",
text: `Your order ${payload.orderId} has been placed!`,
});
});
const placeOrder = useCase
.command("orders.place")
.input(PlaceOrderInput)
.output(OrderOutput)
.emits([OrderPlaced])
.run(async ({ ctx, input, events }) => {
return ctx.ports.uow.transaction(async (tx) => {
const order = await tx.orders.create(input);
await events.record(tx.events, OrderPlaced, {
orderId: order.id,
total: order.total,
});
return order;
});
});EventBus port API
publish<E>(event: E, payload: InferEventPayload<E>): Promise<void> | void
Publish a domain event with a typed payload.
await eventBus.publish(UserRegistered, {
userId: "123",
email: "user@example.com",
});By default, the in-memory bus awaits handlers so local development and tests are
deterministic. Handler errors are rethrown unless onHandlerError is provided.
Use delivery: "fire-and-forget" when you intentionally want detached
in-process delivery. Fire-and-forget delivery reports handler errors through
onHandlerError when provided, but it still does not retry or dead-letter
failed work.
subscribe<E>(event: E, handler: (payload) => void | Promise<void>): () => void
Subscribe to a domain event. Returns an unsubscribe function.
const unsubscribe = eventBus.subscribe(UserRegistered, (payload) => {
console.log(`New user: ${payload.email}`);
});
// Later, when you want to stop listening:
unsubscribe();TypeScript support
The event bus provides full type safety:
import type { EventBusPort } from "@beignet/core/ports";
import { definePorts } from "@beignet/core/ports";
// Type-safe ports definition
const appPorts = definePorts({
eventBus: createInMemoryEventBus() as EventBusPort,
// ... other ports
});
type AppPorts = typeof appPorts;Testing
The in-memory event bus is perfect for testing:
import { describe, expect, it, mock } from "bun:test";
describe("User Registration", () => {
it("should publish UserRegistered event", async () => {
const eventBus = createInMemoryEventBus();
const handler = mock(() => {});
eventBus.subscribe(UserRegistered, handler);
// Perform registration
await registerUser(ctx, { email: "test@example.com" });
expect(handler).toHaveBeenCalledWith({
userId: expect.any(String),
email: "test@example.com",
});
});
});Behavior
- Awaited by default:
publish(...)waits for subscribed handlers unlessdelivery: "fire-and-forget"is configured - In-process: Events are only delivered within the same process
- Memory-only: No persistence - events are lost if the process crashes
- Order: Handlers are called in the order they were subscribed
- Multiple handlers: Multiple handlers can subscribe to the same event
- Error handling: Handler errors reject
publish(...)unlessonHandlerErroris configured
Local and tests
Use this provider for local development, deterministic use-case tests, and single-process demos. Prefer it over a durable broker in tests unless the test is specifically proving retry, delivery, or worker behavior.
Deployment notes
Use the memory event bus only when in-process, best-effort delivery is
acceptable. For multi-process or production workflows that must survive
crashes, publish through Beignet outbox/listener workflows or implement
EventBusPort over a durable broker.
When to use
Good for:
- Single-process applications
- Development and testing
- Simple event-driven workflows
- Decoupling application components
Not suitable for:
- Distributed systems
- Event persistence requirements
- Guaranteed delivery needs
- Cross-service communication
For production distributed systems, implement EventBusPort with a proper message broker.
License
MIT