Skip to content

Realtime

The Supabase Realtime module provides WebSocket-based subscriptions for Postgres Changes, Broadcast, and Presence using the Phoenix Channels protocol.

Overview

services.AddDeepstaging(options => options
    .AddSupabaseRealtime());

This registers SupabaseRealtimeClient as a background service that maintains a WebSocket connection with automatic reconnection and heartbeat.

Usage

Inject SupabaseRealtimeClient and create channel subscriptions:

public class OrderNotifier(SupabaseRealtimeClient realtime, ILogger<OrderNotifier> logger)
{
    public async Task StartListening()
    {
        var channel = realtime.Channel("public:orders")
            .OnPostgresChanges<Order>(async change =>
            {
                logger.LogInformation("Order {Type}: {Id}",
                    change.Type, change.NewRecord?.Id);

                if (change.Type == PostgresChangeType.Insert)
                    await NotifyNewOrder(change.NewRecord!);

            }, table: "orders", @event: "INSERT")

            .OnBroadcast("order-status", async payload =>
            {
                var status = payload.GetProperty("status").GetString();
                logger.LogInformation("Status update: {Status}", status);
            });

        await channel.SubscribeAsync();
    }
}

Postgres Changes

Subscribe to INSERT, UPDATE, and DELETE events on your database tables. Changes are delivered as typed PostgresChange<T> records:

channel.OnPostgresChanges<Article>(async change =>
{
    switch (change.Type)
    {
        case PostgresChangeType.Insert:
            await IndexArticle(change.NewRecord!);
            break;
        case PostgresChangeType.Update:
            await ReindexArticle(change.NewRecord!);
            break;
        case PostgresChangeType.Delete:
            await RemoveFromIndex(change.OldRecord!.Id);
            break;
    }
}, schema: "public", table: "articles");

PostgresChange<T>

Property Type Description
Type PostgresChangeType Insert, Update, or Delete
Table string Table name
Schema string Schema name (e.g., public)
NewRecord T? New row state (INSERT, UPDATE)
OldRecord T? Previous row state (UPDATE, DELETE — requires replica identity)

Replica identity for DELETE

By default, Postgres only sends the primary key for deleted rows. To receive the full old record, set ALTER TABLE articles REPLICA IDENTITY FULL;.

Filtering

Filter by schema, table, and event type:

// All changes on public.orders
channel.OnPostgresChanges<Order>(handler, schema: "public", table: "orders");

// Only inserts
channel.OnPostgresChanges<Order>(handler, table: "orders", @event: "INSERT");

// All tables in a schema
channel.OnPostgresChanges<JsonElement>(handler, schema: "audit");

Broadcast

Send and receive ephemeral messages between connected clients. Broadcast messages are not persisted — they're delivered to all subscribers currently connected to the channel.

// Subscribe
channel.OnBroadcast("cursor-move", async payload =>
{
    var x = payload.GetProperty("x").GetInt32();
    var y = payload.GetProperty("y").GetInt32();
    await UpdateCursor(x, y);
});

// Send
await channel.BroadcastAsync("cursor-move", new { x = 100, y = 200 });

Use cases: live cursors, typing indicators, presence notifications, collaborative editing signals.

Configuration

appsettings.json
{
  "Deepstaging": {
    "Supabase": {
      "Realtime": {
        "HeartbeatIntervalSeconds": 30,
        "ReconnectDelaySeconds": 1,
        "MaxReconnectDelaySeconds": 30
      }
    }
  }
}
Property Default Description
HeartbeatIntervalSeconds 30 Interval between Phoenix heartbeat messages
ReconnectDelaySeconds 1 Initial delay before reconnecting after disconnection
MaxReconnectDelaySeconds 30 Maximum reconnect delay (exponential backoff)

Connection Lifecycle

The client runs as a BackgroundService:

  1. Connects to wss://{project}.supabase.co/realtime/v1/websocket
  2. Sends heartbeats every 30 seconds to keep the connection alive
  3. Dispatches messages to the appropriate channel handlers
  4. Reconnects with exponential backoff on disconnection
  5. Re-joins channels that were subscribed before the disconnect

Channels survive reconnection — your handlers are re-registered automatically.

Channel Management

// Create or retrieve a channel (idempotent)
var channel = realtime.Channel("my-room");

// Subscribe (joins the channel on the server)
await channel.SubscribeAsync();

// Check connection
var connected = realtime.IsConnected;

// Unsubscribe (leaves the channel)
await channel.UnsubscribeAsync();

Channel names are automatically prefixed with realtime: if not already.

Declarative Subscriptions

The Deepstaging.Supabase.Generators package provides a source generator that turns decorated classes into fully wired BackgroundService subscriptions — no manual channel setup required.

dotnet add package Deepstaging.Supabase.Generators

[SupabaseRealtime]

Mark a static partial class with [SupabaseRealtime<TRuntime>("channel")] and decorate handler methods with [OnPostgresChange] or [OnBroadcast]:

using Deepstaging.Supabase.Realtime;

[SupabaseRealtime<AppRuntime>("public:orders")]
public static partial class OrderRealtime
{
    [OnPostgresChange(Table = "orders", Event = PostgresChangeFilter.Insert)]
    public static Eff<AppRuntime, Unit> OnNewOrder(PostgresChange<Order> change) =>
        AppEffects.Notify.SendAsync<AppRuntime>(
            new OrderCreated(change.NewRecord!.Id));

    [OnPostgresChange(Table = "orders", Event = PostgresChangeFilter.Delete)]
    public static Eff<AppRuntime, Unit> OnOrderDeleted(PostgresChange<Order> change) =>
        AppEffects.Search.RemoveAsync<AppRuntime>(change.OldRecord!.Id);

    [OnBroadcast(Event = "order-status")]
    public static Eff<AppRuntime, Unit> OnStatusUpdate(JsonElement payload) =>
        AppEffects.Audit.WriteAsync<AppRuntime>(
            new AuditEntry("order-status", payload.ToString()));
}

The generator produces:

  • OrderRealtimeService — a nested BackgroundService that subscribes to the channel and dispatches events to your handler methods
  • AddOrderRealtime() — an IServiceCollection extension method to register the service
builder.Services.AddOrderRealtime();

Each handler runs in its own DI scope, resolves the runtime, and executes the effect. Errors are caught and logged without crashing the subscription.

[OnPostgresChange]

Property Type Default Description
Table string (required) Table to watch
Schema string "public" Schema to watch
Event PostgresChangeFilter All All, Insert, Update, or Delete

The handler method must accept PostgresChange<T> and return Eff<TRuntime, Unit>.

[OnBroadcast]

Property Type Default Description
Event string (required) Broadcast event name to subscribe to

The handler method must accept JsonElement and return Eff<TRuntime, Unit>.

ISupabaseRealtime Effects

For outbound broadcast operations as effects, use SupabaseRealtimeModule:

[EffectsModule(typeof(SupabaseRealtimeModule))]
public partial class AppModule;

This is registered automatically when you call AddSupabaseRealtime(). Use it to broadcast messages from within effect pipelines:

public static Eff<AppRuntime, Unit> NotifyPriceChange(string productId, decimal newPrice) =>
    from _ in AppEffects.SupabaseRealtime.BroadcastAsync<AppRuntime>(
        "public:products", "price-update", new { productId, newPrice })
    select unit;