Event Sourcing with MartenDb

Event Sourcing with MartenDb
by Brad Jolicoeur
10/09/2021

I have been digging into MartenDb and found this sample of the Event Sourcing features included in MartenDb. This sample is a simplistic simulation of banking and is a good straight forward example of the MartenDb features. Since the example is 5 years old, I found it needed some updating and created my own version of the example.

In this article, I step through the Event Sourcing example and describe what is happening in each phase as I understand it. Explaining it will help me better understand what is happening and hopefully it will also help readers understand how Event Sourcing works in MartenDb.

Running the Example

If you would like to run my example and follow along, you can find it here.

Since MartenDb uses PostgreSQL to store event streams and documents, you will need access to PostgreSQL to run the example. The easiest way to accomplish that is to use Docker and the docker-compose.yml file at the root of the repo.

Getting Started

The first step is to configure MartenDb with the database connection string, defining event types and defining projections.

var store = DocumentStore.For(_ =>
{
    _.Connection("host=localhost;database=marten_test;password=not_magical_scary;username=banking_user");

    _.AutoCreateSchemaObjects = AutoCreate.All; // Creates the schema automatically in the database

    // Defines the event types
    _.Events.AddEventTypes(new[] {
                    typeof(AccountCreated),
                    typeof(AccountCredited),
                    typeof(AccountDebited)
                });

    // Defines the projections
    _.Events.InlineProjections.AggregateStreamsWith<Account>();
});

As you can see we have three different events defined for the event stream.

  • AccountCreated
  • AccountCredited
  • AccountDebited

The projection in this case is an Inline Projection. This means that as events are stored the projection will be updated and persisted as a document. This document can be queried at any time as a normal document in MartenDb. This is convenient to have the projection persisted, but in scenarios where there is lots of concurrency with events, this may cause locking issues. The alternative is to use Aggregates which calculate a result by running the event stream through them every time they are requested.

Creating Some Accounts

After we have MartenDb configured, we can start walking through the banking use case. The first thing we need to do in the use case is to create some accounts. We will use the names of the original authors of this example and create an account for each.

var khalid = new AccountCreated
{
    Owner = "Khalid Abuhakmeh",
    AccountId = Guid.NewGuid(),
    StartingBalance = 1000m
};

var bill = new AccountCreated
{
    Owner = "Bill Boga",
    AccountId = Guid.NewGuid()
};

The AccountCreated event is defined as a simple class as seen below.

public class AccountCreated {
    public AccountCreated() {
        CreatedAt = DateTime.UtcNow;
    }
    public string Owner { get;set; }
    public Guid AccountId { get;set; }        
    public DateTimeOffset CreatedAt { get;set; }
    public decimal StartingBalance { get;set; } = 0;

    public override string ToString() {
        return $"{CreatedAt} Created account for {Owner} with starting balance of {StartingBalance.ToString("C")}";
    }        
}

Once we have the account created events initialized we need to append them to the stream and save changes to persist them to the database.

// Create Accounts
using (var session = store.OpenSession())
{
    // create banking accounts
    session.Events.Append(khalid.AccountId, khalid);
    session.Events.Append(bill.AccountId, bill);

    session.SaveChanges();
}

This will also establish the Account Projection as a document in the background.

When the session.SaveChanges()' is called, the Apply(AccountCreated created) method on the Account Aggregate will be invoked and set the starting values on the Aggregate.

public class Account
{
    public Guid Id { get; set; }
    public string Owner { get; set; }
    public decimal Balance { get; set; }

    public DateTimeOffset CreatedAt { get; set; }

    public DateTimeOffset UpdatedAt { get; set; }

    public void Apply(AccountCreated created)
    {
        Id = created.AccountId;
        Owner = created.Owner;
        Balance = created.StartingBalance;
        CreatedAt = UpdatedAt = created.CreatedAt;

        Console.ForegroundColor = ConsoleColor.DarkMagenta;
        Console.WriteLine($"Account created for {Owner} with Balance of {Balance.ToString("C")}");
    }
    
    ...

First Transaction

Now that we have the accounts created we can start doing some transactions. In the first transaction Khalid pays Bill for work he has done.

// First Transaction
using (var session = store.OpenSession())
{
    // load khalid's account
    var account = session.Load<Account>(khalid.AccountId);
    // let's be generous
    var amount = 100m;
    var give = new AccountDebited
    {
        Amount = amount,
        To = bill.AccountId,
        From = khalid.AccountId,
        Description = "Bill helped me out with some code."
    };

    //check the Account Projection to verify sufficent funds available
    if (account.HasSufficientFunds(give))
    {
        session.Events.Append(give.From, give);
        session.Events.Append(give.To, give.ToCredit());
    }
    // commit these changes
    session.SaveChanges();
}

Notice that in the beginning of the session we pull Khalid's account Projection from the database ad use it to verify that there are sufficient funds before appending the debut and credit events to the streams.

At the end we call session.SaveChanges(); to persist the events and update the Account Projections. Below we see the methods on the Account Projection that are invoked when debit and credit events are processed.

It is important to note that account.Balance += Amount; will keep an accurate balance due to the fact that MartenDb is ACID compliant. If we tried to do this with MongoDb or other document store that was eventually consistent, this would not yield accurate results 100% of the time.

public class Account
{
    ...

    public void Apply(AccountDebited debit)
    {
        debit.Apply(this);
        Console.ForegroundColor = ConsoleColor.Red;            
        Console.WriteLine($"Debiting {Owner} ({debit.Amount.ToString("C")}): {debit.Description}");
    }

    public void Apply(AccountCredited credit)
    {
        credit.Apply(this);
        Console.ForegroundColor = ConsoleColor.Green;
        Console.WriteLine($"Crediting {Owner} {credit.Amount.ToString("C")}: {credit.Description}");
    }

    ...
}

Second Transaction

For the second transaction we have an overdraft situation. In this case there are not sufficient funds and an InvalidOperationAttempted event is appended instead of the debit and credit events.

// Second Transaction
using (var session = store.OpenSession())
{
    // load bill's account
    var account = session.Load<Account>(bill.AccountId);
    // let's try to over spend
    var amount = 1000m;
    var spend = new AccountDebited
    {
        Amount = amount,
        From = bill.AccountId,
        To = khalid.AccountId,
        Description = "Trying to buy that Ferrari"
    };

    if (account.HasSufficientFunds(spend))
    {
        // should not get here
        session.Events.Append(spend.From, spend);
        session.Events.Append(spend.To, spend.ToCredit());
    }
    else
    {
        session.Events.Append(account.Id, new InvalidOperationAttempted
        {
            Description = "Overdraft"
        });
    }
    // commit these changes
    session.SaveChanges();
}

I think this scenario is interesting given that we persist the non-happy path event. In a typical CRUD based model we would probably throw an exception and not persist the fact that an invalid operation took place. Having the InvalidOperationEvent persisted gives us a full picture of what happened. If the account holder wants to know why the transaction did not go through, it doesn't require an engineer to dig through the exception logs to figure it out.

Query Account Balances

Since we have been keeping track of the balances in the Account Projections and we have an ACID data store, reporting on the current balances is as simple as retrieving the Account Projections and writing the balances.

using (var session = store.LightweightSession())
{
    Console.WriteLine();
    Console.ForegroundColor = ConsoleColor.DarkYellow;
    Console.WriteLine("----- Final Balance ------");

    var accounts = session.LoadMany<Account>(khalid.AccountId, bill.AccountId);

    foreach (var account in accounts)
    {
        Console.WriteLine(account);
    }
}

Query Account Activity

Listing the account activity is a simple prospect as well. Since we have the events persisted in the stream we can simply fetch the streams from the database and list the events.

using (var session = store.LightweightSession())
{
    foreach (var account in new[] { khalid, bill })
    {
        Console.WriteLine();
        Console.WriteLine($"Transaction ledger for {account.Owner}");
        var stream = session.Events.FetchStream(account.AccountId);
        foreach (var item in stream)
        {
            Console.WriteLine(item.Data);
        }
        Console.WriteLine();
    }
}

Summary

In walking through this example, I am definitely seeing the power in Event Sourcing. This is especially true with how MartenDb makes it simple to create Projections that take advantage of ACID compliance.

While this example provides a clear vision as to how to implement Event Sourcing with MartenDb, it is arguably not a real world use case. Most use cases are messy when compared to double entry accounting and are usually not communicated well. I suspect the messy bits are the source of the tragic tales told by anyone who I've encountered that has attempted Event Sourcing.

I will be on the hunt for a good use case to apply MartenDb Event Sourcing. I want to understand if the tragic tales are due to some hidden flaw in Event Sourcing or it was implemented by folks who didn't truly understand it with tools that were not up to the task.