Generated Code¶
The Event Queue generator produces runtime types that handle channel management, event dispatch, and lifecycle — all backed by plain .NET APIs.
Runtime Types¶
ChannelWorker<T>¶
A BackgroundService base class that continuously reads from a Channel<T> and dispatches events to handlers.
| Feature | Description |
|---|---|
| Concurrency control | Uses SemaphoreSlim when MaxConcurrency > 1 to limit parallel handler executions |
| Layered retry policy | Immediate retries (in-memory, exponential backoff) + delayed retries (transport-level abandon with backoff) before dead-lettering |
| Timeout enforcement | When TimeoutMilliseconds > 0, wraps each dispatch with CancellationTokenSource.CancelAfter |
| Error handling | Virtual OnError(T, Exception) method as escape hatch — called after immediate retries are exhausted |
| Health reporting | Exposes queue depth, processing rate, and error count for health checks |
| Graceful shutdown | Tracks in-flight concurrent tasks via ConcurrentDictionary and awaits them with Task.WhenAll before stopping |
EventQueueChannel<T>¶
Typed wrapper around Channel<T> providing three enqueue strategies:
| Method | Description |
|---|---|
Enqueue(T) |
Fire-and-forget — writes to the channel and returns immediately |
EnqueueWithAck(T) |
Returns an EventAcknowledgement that completes when the handler finishes |
EnqueueAndWait(T) |
Awaits handler completion before returning |
EventAcknowledgement¶
A TaskCompletionSource wrapper for acknowledgement patterns. Use when a producer needs confirmation that an event was processed.
// Producer side
from ack in OrderEvents.EnqueueWithAck<AppRuntime>(new OrderPlaced("ord-1", "cust-1"))
from _ in liftEff(async () => await ack.WaitAsync())
select unit;
Event Type Registry & Transport Injection
These features are only generated for [IntegrationEventQueue]. Plain [EventQueue] uses in-process channels exclusively.
Deferred Transport¶
When a transport is set via SetTransport() before the event queue channel is initialized, the transport is stored as pending and applied automatically when Initialize() runs. This makes SetTransport call-order independent:
// Generated pattern (simplified)
private static IEventTransport<object>? _pendingTransport;
public static void SetTransport(IEventTransport<object> transport)
{
if (_channel is not null)
_channel.Transport = transport; // channel exists — set directly
else
_pendingTransport = transport; // channel not ready — defer
}
internal static void Initialize(EventQueueChannel<object> channel)
{
_channel = channel;
if (_pendingTransport is not null)
{
_channel.Transport = _pendingTransport;
_pendingTransport = null;
}
}
This is particularly important for subscriber event handlers ([EventQueueHandler]), where the handler's channel may not be initialized when SubscribeFromServiceBus<T>() calls SetTransport in the bootstrapper's configure callback.
Distributed Tracing with WithActivity¶
The generated ChannelWorker wraps each handler dispatch in a .WithActivity(...) call, producing System.Diagnostics.Activity spans for distributed tracing:
// Generated dispatch (simplified)
await OrderEventHandlers.Handle(e)
.WithActivity("orders.Handle", ActivitySource, destination: "orders")
.RunAsync(runtime);
Each queue class declares its own ActivitySource:
The activity name follows the pattern {queueName}.{methodName} and the destination tag is set to the queue name. These spans integrate with OpenTelemetry and any ActivityListener for end-to-end event flow visibility.
Enqueue Strategies¶
Fire-and-Forget¶
// Producer doesn't wait for processing
OrderEvents.Enqueue<AppRuntime>(new OrderPlaced("ord-1", "cust-1"));
Acknowledged¶
// Producer gets a handle to track completion
from ack in OrderEvents.EnqueueWithAck<AppRuntime>(new OrderPlaced("ord-1", "cust-1"))
select ack; // caller can await ack.WaitAsync() later
Synchronous Wait¶
// Producer blocks until processing completes
OrderEvents.EnqueueAndWait<AppRuntime>(new OrderPlaced("ord-1", "cust-1"));
// guaranteed processed when this completes
Concurrency Control¶
When MaxConcurrency > 1, the generated ChannelWorker<T> uses a SemaphoreSlim to limit the number of events being processed in parallel. The semaphore is acquired before dispatching to a handler and released when the handler completes (or faults).
Retry Policy¶
The generated worker implements a layered retry strategy controlled by the MaxRetries attribute property:
- Immediate retries — failed dispatches are retried in-memory with exponential backoff (200ms, 400ms, 800ms...)
- Delayed retries — after immediate retries are exhausted, the message is abandoned (re-enqueued) for transport-level retry with its own backoff
- Dead-letter — after all retries are exhausted, the message is moved to the dead-letter queue
Attempt 1 (initial) → fail → immediate retry 1 (200ms)
→ fail → immediate retry 2 (400ms)
→ fail → immediate retry 3 (800ms)
→ fail → abandon (transport backoff ~100ms)
Attempt 5 (redelivery) → fail → immediate retries 1-3...
→ fail → abandon (transport backoff ~200ms)
Attempt 6 (redelivery) → fail → immediate retries 1-3...
→ fail → dead-letter
The OnError(T, Exception) virtual method is called after immediate retries are exhausted. If overridden to return MessageAction.Retry or MessageAction.Skip, that action takes precedence over the default delayed retry / dead-letter path.
Effects resilience vs queue retry
Queue retry (MaxRetries) protects message delivery. For protecting individual infrastructure calls within a handler (e.g., a flaky HTTP API), use Effects resilience attributes ([Retry], [Timeout], [CircuitBreaker]) on your capability interfaces. The two layers are complementary — see Resilience in Event Queue Handlers for how they compose.
Timeout Enforcement¶
When TimeoutMilliseconds > 0 is set on the attribute, each dispatch attempt is wrapped with a linked CancellationTokenSource.CancelAfter(). Timeouts produce a TimeoutException that enters the retry pipeline. Shutdown cancellation is distinguished from timeout cancellation — only handler-initiated timeouts trigger retries.
Health Reporting¶
The generated worker exposes queue depth, processing rate, and error count. These can be wired into ASP.NET Core health checks or any monitoring system.
Graceful Shutdown¶
On cancellation, the worker stops reading new events and awaits all in-flight concurrent tasks via Task.WhenAll before stopping. In-flight tasks are tracked in a ConcurrentDictionary and removed on completion. The host's ShutdownTimeout acts as the overall deadline.
Testing¶
Event queues expose test helpers for verifying event flow in tests:
// Read a single event from the channel (non-blocking)
var evt = channel.TryRead();
// Drain all pending events
var allEvents = channel.DrainAll();
These helpers are available on the EventQueueChannel<T> instance, typically resolved from the test DI container.
Request-Response Transport¶
The Deepstaging.RequestResponse namespace provides a bidirectional transport abstraction for request-response patterns — in contrast to IEventTransport<TEvent> which is unidirectional fire-and-forget.
IRequestResponseTransport<TRequest, TResponse>¶
| Method | Description |
|---|---|
RequestAsync(TRequest, CancellationToken) |
Sends a request and awaits the correlated response |
ReceiveRequestsAsync(CancellationToken) |
Returns an IAsyncEnumerable<PendingRequest<TRequest, TResponse>> of incoming requests |
PendingCount |
Number of pending requests, or -1 if counting is not supported |
Complete() |
Signals no more requests will be sent |
Correlation is handled internally by the transport — callers do not manage correlation IDs directly.
PendingRequest<TRequest, TResponse>¶
Represents a request awaiting a response. The receiver calls Reply(TResponse) to send the response or Fault(Exception) to signal failure.
| Member | Description |
|---|---|
Request |
The request payload |
Context |
Optional CorrelationContext propagated from the requester |
Reply(TResponse) |
Sends a response back to the requester |
Fault(Exception) |
Signals that the request could not be processed |
ChannelRequestResponseTransport<TRequest, TResponse>¶
The default in-process implementation backed by System.Threading.Channels.Channel<T>. Supports both bounded and unbounded modes:
// Unbounded
var transport = new ChannelRequestResponseTransport<MyRequest, MyResponse>();
// Bounded with capacity
var transport = new ChannelRequestResponseTransport<MyRequest, MyResponse>(capacity: 100);
Both constructors accept optional singleReader (default true) and singleWriter (default false) parameters for channel optimization.
External transports (Azure Service Bus sessions, AMQP reply queues, gRPC, etc.) implement IRequestResponseTransport<TRequest, TResponse> to provide cross-process or cross-service request-response semantics.
vs IEventTransport
Use IEventTransport<TEvent> for fire-and-forget event delivery (one-way). Use IRequestResponseTransport<TRequest, TResponse> when the caller needs a correlated response from the handler.
Direct Usage¶
The generated types can be injected and used directly without the effects layer: