Event Sourcing in Practice: What I Learned Building Financial Event Pipelines

| 7 min read |
event-sourcing architecture cqrs golang

Event sourcing is powerful but expensive to get wrong. Here's what actually works, with Go code, drawn from building event pipelines at the fintech startup.

At the fintech startup we had a problem that kept getting worse. Our financial content pipeline processed thousands of news items per hour, each one triggering enrichment, scoring, and routing decisions. The product team wanted to know why a specific story was scored a certain way three weeks ago. Customer support wanted to replay a user’s notification history. Compliance wanted an audit trail of every decision the system made.

We were storing final state in Postgres. Overwriting rows on every update. The answer to “why did this happen” was always “I don’t know, we only have what it looks like now.”

That’s when I built our first event-sourced aggregate. It wasn’t because event sourcing was trendy. It was because we literally couldn’t answer basic business questions with a CRUD model.

What Event Sourcing Actually Is

You stop overwriting rows. Instead, you append immutable events that describe what happened. Current state is derived by replaying those events in order.

type Event struct {
    ID          string    `json:"id"`
    AggregateID string    `json:"aggregate_id"`
    Type        string    `json:"type"`
    Version     int       `json:"version"`
    OccurredAt  time.Time `json:"occurred_at"`
    Data        json.RawMessage `json:"data"`
}

An event is a fact. StoryScored, NotificationSent, UserPreferenceUpdated. Past tense. Immutable. Each one carries enough data to explain the change without needing to look anything else up.

When It’s Worth the Pain

I’ve seen teams adopt event sourcing because it sounded cool and regret it within six months. The pattern earns its complexity only when:

  • You have regulatory or audit requirements that demand full history
  • Downstream systems need reliable, ordered change feeds
  • The business genuinely needs temporal queries (“what did we know on Tuesday?”)
  • Undo, replay, or simulation is a real product requirement

At the fintech startup, we hit three out of four. Financial content has compliance requirements. Multiple consumers needed the same change feed. And the product team needed point-in-time replay.

If you’re building a CRUD app where nobody will ever ask “what happened before this update” – don’t use event sourcing. You’ll hate yourself.

The Aggregate: Your Consistency Boundary

The aggregate is where commands turn into events. It validates business rules, emits events, and applies them to produce new state. Here is a simplified version of what our content aggregate looked like:

type ContentAggregate struct {
    ID        string
    Status    string
    Score     float64
    Version   int
    changes   []Event
}

func (a *ContentAggregate) Score(score float64, source string) error {
    if a.Status == "archived" {
        return fmt.Errorf("cannot score archived content %s", a.ID)
    }

    a.apply(Event{
        Type: "ContentScored",
        Data: mustMarshal(ContentScoredData{
            Score:  score,
            Source: source,
        }),
    })
    return nil
}

func (a *ContentAggregate) apply(e Event) {
    switch e.Type {
    case "ContentScored":
        var data ContentScoredData
        json.Unmarshal(e.Data, &data)
        a.Score = data.Score
    case "ContentArchived":
        a.Status = "archived"
    }
    a.Version++
    a.changes = append(a.changes, e)
}

Key thing: the aggregate doesn’t talk to databases. It doesn’t call other aggregates. It validates, emits, and applies. That’s it. Keep it pure.

The Event Store

The store is append-only. It enforces ordering per aggregate stream and supports optimistic concurrency so two writers can’t silently conflict.

type EventStore interface {
    Append(ctx context.Context, aggregateID string, expectedVersion int, events []Event) error
    Load(ctx context.Context, aggregateID string) ([]Event, error)
    LoadFrom(ctx context.Context, aggregateID string, fromVersion int) ([]Event, error)
}

We used Postgres for this. A single table with aggregate_id, version, event_type, data, and occurred_at. A unique constraint on (aggregate_id, version) gives you optimistic concurrency for free.

func (s *PostgresStore) Append(ctx context.Context, aggID string, expectedVersion int, events []Event) error {
    tx, err := s.db.BeginTx(ctx, nil)
    if err != nil {
        return err
    }
    defer tx.Rollback()

    for i, e := range events {
        version := expectedVersion + i + 1
        _, err := tx.ExecContext(ctx,
            `INSERT INTO events (aggregate_id, version, event_type, data, occurred_at)
             VALUES ($1, $2, $3, $4, $5)`,
            aggID, version, e.Type, e.Data, e.OccurredAt,
        )
        if err != nil {
            return fmt.Errorf("version conflict or write error: %w", err)
        }
    }
    return tx.Commit()
}

Nothing fancy. Postgres handles the concurrency. The unique constraint on (aggregate_id, version) rejects conflicting writes. Simple and reliable.

Projections: Where Reads Live

Your event store is optimized for writes. Reads need their own models. Projections consume the event stream and build denormalized views optimized for specific queries.

type ScoreProjection struct {
    db *sql.DB
}

func (p *ScoreProjection) Handle(ctx context.Context, e Event) error {
    if e.Type != "ContentScored" {
        return nil
    }

    var data ContentScoredData
    if err := json.Unmarshal(e.Data, &data); err != nil {
        return err
    }

    _, err := p.db.ExecContext(ctx,
        `INSERT INTO content_scores (content_id, score, source, scored_at)
         VALUES ($1, $2, $3, $4)
         ON CONFLICT (content_id) DO UPDATE
         SET score = $2, source = $3, scored_at = $4`,
        e.AggregateID, data.Score, data.Source, e.OccurredAt,
    )
    return err
}

That ON CONFLICT clause makes the handler idempotent. You can replay the entire event stream and get the same result. This matters more than you think – you will replay projections. A lot.

CQRS: The Necessary Split

Commands go through aggregates and produce events. Queries hit projections. Reads lag behind writes. This is the deal you make with event sourcing.

If your product can’t tolerate a read being 100-500ms behind the write, event sourcing will be a constant fight. At the fintech startup our users were fine with slight lag on score updates. The compliance team was fine with eventual consistency on audit views. Know your tolerance before you commit.

Snapshots: When Replay Gets Slow

Our content aggregates started small. Then some high-traffic items accumulated thousands of events. Loading 4,000 events to rebuild state for a single aggregate isn’t fast.

Snapshots fix this. Store a serialized state at a known version, then only replay events after that version.

func (s *PostgresStore) LoadWithSnapshot(ctx context.Context, aggID string) (*ContentAggregate, error) {
    snap, err := s.loadSnapshot(ctx, aggID)
    if err != nil {
        return nil, err
    }

    var agg ContentAggregate
    fromVersion := 0
    if snap != nil {
        json.Unmarshal(snap.Data, &agg)
        fromVersion = snap.Version
    }

    events, err := s.LoadFrom(ctx, aggID, fromVersion)
    if err != nil {
        return nil, err
    }

    for _, e := range events {
        agg.apply(e)
    }
    return &agg, nil
}

We snapshotted every 100 events. Brought load times from 200ms back down to under 10ms for hot aggregates.

Event Versioning: The Thing Nobody Plans For

Your event schemas will change. Count on it. We added fields, renamed things, changed structures. The key insight: never mutate old events. Write upcasters that transform old event shapes into the current shape at read time.

func upcast(e Event) Event {
    switch {
    case e.Type == "ContentScored" && e.Version < 3:
        // v1/v2 events had "relevance" instead of "score"
        var old map[string]interface{}
        json.Unmarshal(e.Data, &old)
        if rel, ok := old["relevance"]; ok {
            old["score"] = rel
            delete(old, "relevance")
            e.Data, _ = json.Marshal(old)
        }
    }
    return e
}

This keeps your entire history valid. No migrations. No rewriting the log. Old events just get translated on the fly.

What I Got Wrong

A few things I would do differently:

Too many event types early on. We created fine-grained events like ContentTitleUpdated, ContentBodyUpdated, ContentTagAdded. Should have started with ContentUpdated carrying a changeset. Granularity is easy to add later. Impossible to remove.

Projections coupled to business logic. Our first projections had “if score > 0.7 then mark as relevant” logic baked in. That belongs in the aggregate or a separate service. Projections should be dumb transformations.

Not testing replay from day one. We built the replay mechanism but didn’t exercise it regularly. When we finally needed a full rebuild six months in, it took 4 hours and surfaced three bugs in our upcasters. Test replay weekly. Automate it.

When to Walk Away

Event sourcing isn’t a default architecture. It’s a tool for specific problems. If your domain is simple CRUD, if nobody needs history, if your team is small and moving fast on data shape – use a regular database. Update rows. Be happy.

The overhead is real: separate read models, eventual consistency, event versioning, replay infrastructure, snapshot management. That’s a lot of machinery. It pays off when the audit trail and temporal capabilities are genuine business requirements. Otherwise it’s complexity for complexity’s sake.

At the fintech startup it was the right call. The compliance team could finally trace every decision. The product team could debug scoring issues by replaying exact event sequences. Customer support could see exactly what happened and when.

But I’ve also talked teams out of it. More often than I’ve recommended it, honestly.