Marten Backend¶
The Deepstaging.EventStore.Marten package generates production-ready Marten-backed implementations of EventStore interfaces. Marten provides native event sourcing on PostgreSQL with document storage, projections, and ACID guarantees.
Installation¶
No code changes needed — the Marten generator discovers the same [EventStore], [EventSourcedAggregate], and [Projection] attributes and generates Marten-specific implementations alongside the core in-memory ones.
What Gets Generated¶
For each [EventSourcedAggregate]:
Marten{Aggregate}EventStore—IDocumentSession-backed store implementationMartenEventStream<T>— adapter from Marten'sIEventStream<T>to Deepstaging's interface
For each [Projection]:
Marten{Projection}Projection—IQuerySession-backed projection queries
For the [EventStore] container:
{Store}MartenRegistration— DI extension method with Marten configuration
DI Registration¶
The generated registration method:
- Calls
AddMarten()with the connection string - Sets
DatabaseSchemaNameto isolate tables in a dedicated PostgreSQL schema - Enables lightweight sessions (
.UseLightweightSessions()) - Configures
StreamIdentity.AsStringwhen any aggregate uses a string-backed[StreamId] - Configures snapshot projections for each
[Projection]type - Registers
Marten{Aggregate}EventStoreas scoped (overrides in-memory singletons) - Registers
Marten{Projection}Projectionas scoped - Accepts an optional
Action<StoreOptions>for custom Marten configuration
services.AddAppEventStoreMarten(connectionString, opts =>
{
// Custom Marten configuration
opts.AutoCreateSchemaObjects = AutoCreate.All;
});
Generated: registration method
public static class AppEventStoreMartenRegistration
{
public static IServiceCollection AddAppEventStoreMarten(
this IServiceCollection services,
string connectionString,
Action<StoreOptions>? configure = null)
{
services.AddMarten(opts =>
{
opts.Connection(connectionString);
opts.DatabaseSchemaName = "app_event_store";
opts.Projections.Snapshot<OrderSummary>(SnapshotLifecycle.Inline);
configure?.Invoke(opts);
}).UseLightweightSessions();
services.AddScoped<IOrderEventStore, MartenOrderEventStore>();
services.AddScoped<IOrderSummaryProjection, MartenOrderSummaryProjection>();
return services;
}
}
Combining with Core Registration¶
The Marten registration uses AddScoped which takes priority over the core's TryAddSingleton:
services.AddAppEventStore(); // Core: registers InMemory via TryAddSingleton
services.AddAppEventStoreMarten(connectionString); // Marten: registers scoped (overrides)
Order doesn't matter — scoped registrations from Marten are resolved in request scopes, while the singleton in-memory stores serve as fallback in contexts without Marten.
PostgreSQL Schema Isolation¶
The generator automatically sets DatabaseSchemaName so each event store gets its own PostgreSQL schema, derived from the store class name in snake_case:
| Store Class | Schema |
|---|---|
AppEventStore |
app_event_store |
OrderEventStore |
order_event_store |
Override with the Schema property:
Generated Store Implementation¶
The Marten store delegates to IDocumentSession:
internal sealed class MartenOrderEventStore : IOrderEventStore
{
private readonly IDocumentSession _session;
public async Task<IEventStream<Order>> FetchForWritingAsync(OrderId id, CancellationToken ct)
{
var stream = await _session.Events.FetchForWriting<Order>(id.Value, ct);
return new MartenEventStream<Order>(stream);
}
public async Task<Order?> AggregateAsync(OrderId id, CancellationToken ct) =>
await _session.Events.AggregateStreamAsync<Order>(id.Value, token: ct);
public Task StartStreamAsync(OrderId id, IReadOnlyList<object> events, CancellationToken ct)
{
_session.Events.StartStream<Order>(id.Value, events.ToArray());
return Task.CompletedTask;
}
public Task AppendAsync(OrderId id, IReadOnlyList<object> events, CancellationToken ct)
{
_session.Events.Append(id.Value, events.ToArray());
return Task.CompletedTask;
}
public Task CommitAsync(CancellationToken ct) =>
_session.SaveChangesAsync(ct);
// ... plus AggregateAsync(version), StartStreamAsync(auto-id),
// AppendOptimisticAsync, FetchStreamAsync
}
Key implementation details:
- TypedId → Guid: The store converts
OrderIdtoid.Value(the rawGuid) for Marten APIs - FetchForWriting: Uses Marten's
FetchForWriting<T>which provides optimistic concurrency via version tracking - CommitAsync: Delegates to
_session.SaveChangesAsync()— events and projections are persisted in a single transaction - AggregateStreamAsync: Marten handles the Create/Apply fold natively
Event Stream Adapter¶
The MartenEventStream<T> adapter bridges Marten's stream interface to Deepstaging's:
internal sealed class MartenEventStream<T> : IEventStream<T> where T : class
{
private readonly Marten.Events.IEventStream<T> _inner;
public T? Aggregate => _inner.Aggregate;
public long CurrentVersion => _inner.CurrentVersion ?? 0;
public void AppendOne(object @event) => _inner.AppendOne(@event);
public void AppendMany(IEnumerable<object> events) => _inner.AppendMany(events.ToArray());
}
Projection Implementation¶
For each [Projection], Deepstaging generates a query-only store backed by IQuerySession:
internal sealed class MartenOrderSummaryProjection : IOrderSummaryProjection
{
private readonly IQuerySession _session;
public async Task<OrderSummary?> GetAsync(OrderId id, CancellationToken ct) =>
await _session.LoadAsync<OrderSummary>(id.Value, ct);
public async Task<QueryResult<OrderSummary>> QueryAsync(
int page, int pageSize, CancellationToken ct)
{
var total = await _session.Query<OrderSummary>().CountAsync(ct);
var items = await _session.Query<OrderSummary>()
.Skip((page - 1) * pageSize)
.Take(pageSize)
.ToListAsync(ct);
return new QueryResult<OrderSummary>(items, total, page, pageSize);
}
}
IQuerySession vs IDocumentSession
Projection stores use IQuerySession (read-only), not IDocumentSession. Marten handles projection updates internally via its snapshot projection mechanism — the generated store only queries.
Snapshot Projections¶
Marten's snapshot projections handle the Create/Apply logic natively. The [Projection] class is both the read model and the projection definition:
[Projection]
public partial class OrderSummary
{
public Guid Id { get; set; } // Marten identity (set by Marten)
public OrderId OrderId { get; set; } // Deepstaging stream identity
public string CustomerName { get; set; } = "";
public int ItemCount { get; set; }
public static OrderSummary Create(OrderCreated e) =>
new() { OrderId = e.OrderId, CustomerName = e.CustomerName };
public void Apply(OrderItemAdded e) =>
ItemCount += e.Quantity;
}
Marten discovers Create and Apply methods automatically — no additional configuration needed.
Lifecycle Mapping¶
The Lifecycle property on [Projection] maps to Marten's SnapshotLifecycle:
| Deepstaging | Marten | Behavior |
|---|---|---|
Inline |
SnapshotLifecycle.Inline |
Projection updated within the same transaction as SaveChangesAsync |
Async |
SnapshotLifecycle.Async |
Projection updated by Marten's async daemon (background worker) |
Complete Example¶
using Deepstaging.EventStore;
using Deepstaging.Ids;
// Types
[TypedId] [StreamId] public readonly partial struct OrderId;
public sealed record OrderCreated(OrderId OrderId, string CustomerName) : IAggregateEvent;
public sealed record OrderItemAdded(string Sku, int Quantity) : IAggregateEvent;
[EventSourcedAggregate]
public partial record Order(OrderId Id, string CustomerName, int ItemCount)
{
public static Order Create(OrderCreated e) => new(e.OrderId, e.CustomerName, 0);
public Order Apply(OrderItemAdded e) => this with { ItemCount = ItemCount + e.Quantity };
}
[Projection]
public partial class OrderSummary
{
public OrderId Id { get; init; }
public string CustomerName { get; set; } = "";
public int ItemCount { get; set; }
public static OrderSummary Create(OrderCreated e) =>
new() { Id = e.OrderId, CustomerName = e.CustomerName };
public void Apply(OrderItemAdded e) => ItemCount += e.Quantity;
}
[EventStore]
public static partial class AppEventStore;
var builder = WebApplication.CreateBuilder(args);
// Register core (in-memory fallback)
builder.Services.AddAppEventStore();
// Register Marten (PostgreSQL production store)
builder.Services.AddAppEventStoreMarten(
builder.Configuration.GetConnectionString("Postgres")!);
var app = builder.Build();
app.MapGet("/orders/{id}", async (OrderId id, IOrderSummaryProjection projections) =>
await projections.GetAsync(id) is { } summary
? Results.Ok(summary)
: Results.NotFound());
app.Run();