Skip to content

Marten Backend

The Deepstaging.EventStore.Marten package generates production-ready Marten-backed implementations of EventStore interfaces. Marten provides native event sourcing on PostgreSQL with document storage, projections, and ACID guarantees.

Installation

dotnet add package Deepstaging.EventStore.Marten --prerelease

No code changes needed — the Marten generator discovers the same [EventStore], [EventSourcedAggregate], and [Projection] attributes and generates Marten-specific implementations alongside the core in-memory ones.

What Gets Generated

For each [EventSourcedAggregate]:

  • Marten{Aggregate}EventStoreIDocumentSession-backed store implementation
  • MartenEventStream<T> — adapter from Marten's IEventStream<T> to Deepstaging's interface

For each [Projection]:

  • Marten{Projection}ProjectionIQuerySession-backed projection queries

For the [EventStore] container:

  • {Store}MartenRegistration — DI extension method with Marten configuration

DI Registration

services.AddAppEventStoreMarten(connectionString);

The generated registration method:

  1. Calls AddMarten() with the connection string
  2. Sets DatabaseSchemaName to isolate tables in a dedicated PostgreSQL schema
  3. Enables lightweight sessions (.UseLightweightSessions())
  4. Configures StreamIdentity.AsString when any aggregate uses a string-backed [StreamId]
  5. Configures snapshot projections for each [Projection] type
  6. Registers Marten{Aggregate}EventStore as scoped (overrides in-memory singletons)
  7. Registers Marten{Projection}Projection as scoped
  8. Accepts an optional Action<StoreOptions> for custom Marten configuration
services.AddAppEventStoreMarten(connectionString, opts =>
{
    // Custom Marten configuration
    opts.AutoCreateSchemaObjects = AutoCreate.All;
});
Generated: registration method
public static class AppEventStoreMartenRegistration
{
    public static IServiceCollection AddAppEventStoreMarten(
        this IServiceCollection services,
        string connectionString,
        Action<StoreOptions>? configure = null)
    {
        services.AddMarten(opts =>
        {
            opts.Connection(connectionString);
            opts.DatabaseSchemaName = "app_event_store";
            opts.Projections.Snapshot<OrderSummary>(SnapshotLifecycle.Inline);
            configure?.Invoke(opts);
        }).UseLightweightSessions();

        services.AddScoped<IOrderEventStore, MartenOrderEventStore>();
        services.AddScoped<IOrderSummaryProjection, MartenOrderSummaryProjection>();
        return services;
    }
}

Combining with Core Registration

The Marten registration uses AddScoped which takes priority over the core's TryAddSingleton:

services.AddAppEventStore();                       // Core: registers InMemory via TryAddSingleton
services.AddAppEventStoreMarten(connectionString);  // Marten: registers scoped (overrides)

Order doesn't matter — scoped registrations from Marten are resolved in request scopes, while the singleton in-memory stores serve as fallback in contexts without Marten.

PostgreSQL Schema Isolation

The generator automatically sets DatabaseSchemaName so each event store gets its own PostgreSQL schema, derived from the store class name in snake_case:

Store Class Schema
AppEventStore app_event_store
OrderEventStore order_event_store

Override with the Schema property:

[EventStore(Schema = "orders")]
public static partial class AppEventStore;

Generated Store Implementation

The Marten store delegates to IDocumentSession:

internal sealed class MartenOrderEventStore : IOrderEventStore
{
    private readonly IDocumentSession _session;

    public async Task<IEventStream<Order>> FetchForWritingAsync(OrderId id, CancellationToken ct)
    {
        var stream = await _session.Events.FetchForWriting<Order>(id.Value, ct);
        return new MartenEventStream<Order>(stream);
    }

    public async Task<Order?> AggregateAsync(OrderId id, CancellationToken ct) =>
        await _session.Events.AggregateStreamAsync<Order>(id.Value, token: ct);

    public Task StartStreamAsync(OrderId id, IReadOnlyList<object> events, CancellationToken ct)
    {
        _session.Events.StartStream<Order>(id.Value, events.ToArray());
        return Task.CompletedTask;
    }

    public Task AppendAsync(OrderId id, IReadOnlyList<object> events, CancellationToken ct)
    {
        _session.Events.Append(id.Value, events.ToArray());
        return Task.CompletedTask;
    }

    public Task CommitAsync(CancellationToken ct) =>
        _session.SaveChangesAsync(ct);

    // ... plus AggregateAsync(version), StartStreamAsync(auto-id),
    //     AppendOptimisticAsync, FetchStreamAsync
}

Key implementation details:

  • TypedId → Guid: The store converts OrderId to id.Value (the raw Guid) for Marten APIs
  • FetchForWriting: Uses Marten's FetchForWriting<T> which provides optimistic concurrency via version tracking
  • CommitAsync: Delegates to _session.SaveChangesAsync() — events and projections are persisted in a single transaction
  • AggregateStreamAsync: Marten handles the Create/Apply fold natively

Event Stream Adapter

The MartenEventStream<T> adapter bridges Marten's stream interface to Deepstaging's:

internal sealed class MartenEventStream<T> : IEventStream<T> where T : class
{
    private readonly Marten.Events.IEventStream<T> _inner;

    public T? Aggregate => _inner.Aggregate;
    public long CurrentVersion => _inner.CurrentVersion ?? 0;

    public void AppendOne(object @event) => _inner.AppendOne(@event);
    public void AppendMany(IEnumerable<object> events) => _inner.AppendMany(events.ToArray());
}

Projection Implementation

For each [Projection], Deepstaging generates a query-only store backed by IQuerySession:

internal sealed class MartenOrderSummaryProjection : IOrderSummaryProjection
{
    private readonly IQuerySession _session;

    public async Task<OrderSummary?> GetAsync(OrderId id, CancellationToken ct) =>
        await _session.LoadAsync<OrderSummary>(id.Value, ct);

    public async Task<QueryResult<OrderSummary>> QueryAsync(
        int page, int pageSize, CancellationToken ct)
    {
        var total = await _session.Query<OrderSummary>().CountAsync(ct);
        var items = await _session.Query<OrderSummary>()
            .Skip((page - 1) * pageSize)
            .Take(pageSize)
            .ToListAsync(ct);
        return new QueryResult<OrderSummary>(items, total, page, pageSize);
    }
}

IQuerySession vs IDocumentSession

Projection stores use IQuerySession (read-only), not IDocumentSession. Marten handles projection updates internally via its snapshot projection mechanism — the generated store only queries.

Snapshot Projections

Marten's snapshot projections handle the Create/Apply logic natively. The [Projection] class is both the read model and the projection definition:

[Projection]
public partial class OrderSummary
{
    public Guid Id { get; set; }                // Marten identity (set by Marten)
    public OrderId OrderId { get; set; }        // Deepstaging stream identity
    public string CustomerName { get; set; } = "";
    public int ItemCount { get; set; }

    public static OrderSummary Create(OrderCreated e) =>
        new() { OrderId = e.OrderId, CustomerName = e.CustomerName };

    public void Apply(OrderItemAdded e) =>
        ItemCount += e.Quantity;
}

Marten discovers Create and Apply methods automatically — no additional configuration needed.

Lifecycle Mapping

The Lifecycle property on [Projection] maps to Marten's SnapshotLifecycle:

Deepstaging Marten Behavior
Inline SnapshotLifecycle.Inline Projection updated within the same transaction as SaveChangesAsync
Async SnapshotLifecycle.Async Projection updated by Marten's async daemon (background worker)

Complete Example

using Deepstaging.EventStore;
using Deepstaging.Ids;

// Types
[TypedId] [StreamId] public readonly partial struct OrderId;

public sealed record OrderCreated(OrderId OrderId, string CustomerName) : IAggregateEvent;
public sealed record OrderItemAdded(string Sku, int Quantity) : IAggregateEvent;

[EventSourcedAggregate]
public partial record Order(OrderId Id, string CustomerName, int ItemCount)
{
    public static Order Create(OrderCreated e) => new(e.OrderId, e.CustomerName, 0);
    public Order Apply(OrderItemAdded e) => this with { ItemCount = ItemCount + e.Quantity };
}

[Projection]
public partial class OrderSummary
{
    public OrderId Id { get; init; }
    public string CustomerName { get; set; } = "";
    public int ItemCount { get; set; }

    public static OrderSummary Create(OrderCreated e) =>
        new() { Id = e.OrderId, CustomerName = e.CustomerName };
    public void Apply(OrderItemAdded e) => ItemCount += e.Quantity;
}

[EventStore]
public static partial class AppEventStore;
Program.cs
var builder = WebApplication.CreateBuilder(args);

// Register core (in-memory fallback)
builder.Services.AddAppEventStore();

// Register Marten (PostgreSQL production store)
builder.Services.AddAppEventStoreMarten(
    builder.Configuration.GetConnectionString("Postgres")!);

var app = builder.Build();

app.MapGet("/orders/{id}", async (OrderId id, IOrderSummaryProjection projections) =>
    await projections.GetAsync(id) is { } summary
        ? Results.Ok(summary)
        : Results.NotFound());

app.Run();