Apr 22nd, 2020 - written by Kimserey with .
One of the concept which gained a lot of traction from the last five years is Event Sourcing. Event Sourcing changes the way data are handled, how data are stored, and how perception of the current state is constructed. In this post, we will explain what is Event Sourcing and will see how it can implement using Equinox
, a .NET implementation of an event store.
When we deal with data, the quickest way is to store data with one to one mapping between object model and database schema. For example, if we are creating a todo list,
we would hold the todos in an object containing a title
, content
, date
and status
. That object would have the exact representation in a database table.
If we need to update the todo item, we would go ahead and update the todo in the database.
With event sourcing, we look at events rather than data. Instead of looking at the concrete todo state in the database, we look at the events that occurred and produced the latest state.
In this example, we will have two events, Todo added
and Todo updated
.
While a todo item can only be added once, it can be updated multiple times so the state of a specific todo will be constructed from one todo added
event and potentially multiple todo updated
.
Each todo state is reconstructed from a stream of events which is also referred as aggregate in Domain Driven Design (DDD) terms.
An operation on a stream involves the following steps:
This is where Equinox
comes into play. Equinox is a set of libraries abstracting the event sourcing mechanism and specifically an abstraction over the streams.
In the following example, we will look closely at how we can use Equinox to build a event sourced todo application.
Equinox can be installed from Nuget. In this example, we will use the MemoryStore
to demonstrate how an implementation of Equinox would look like. We start by installing the following:
Building a stream implementation follows four steps:
The events being the centre of the application, we start by creating our two events, Added
and Updated
. In the following example, I use F# as a language as the discriminated union makes it easy to model the different events.
1
2
3
4
5
6
7
8
9
module Events =
type TodoData = { id: int; title: string; completed: bool }
type Event =
| Added of TodoData
| Updated of TodoData
interface TypeShape.UnionContract.IUnionContract
let codec = FsCodec.NewtonsoftJson.Codec.Create<Event>()
We define a Todo
type which will contain the data to be carried by the events. We also implement the TypeShape
interface IUnionContract
as we want to make our discriminated union compatible with the default serializer provided by Equinox, FsCodec
.
We define a variable codec
which will be required later.
Once we have defined our events, we can define our folding logic.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
module Fold =
type State = { items : Events.TodoData list; nextId : int }
let initial = { items = []; nextId = 0 }
let evolve s e =
match e with
| Events.Added item ->
{ s with
items = item :: s.items
nextId = s.nextId + 1 }
| Events.Updated value ->
let items =
s.items |> List.map (function { id = id } when id = value.id -> value | item -> item)
{ s with items = items }
let fold : State -> Events.Event seq -> State =
Seq.fold evolve
We start by defining a State
. The state will be a in-memory object which we construct by applying events from an initial
state (or from snapshotted events) one by one.
The application of event is done in the evolve
which has the following signature:
1
evolve: State -> Event -> State
It takes the previous state plus an event and return the next state.
We can see that the state is unrelated from the Events.Todo
type, it is not stored anywhere and is only reconstructed based on events.
Therefore we introduced a nextId
which serves as a counter to assign new ID to todos. The nextId
is not a business data therefore isn’t stored in the event but it is a necessary piece of logic.
For Events.Added
, we simply add the new event to the current list and bump the nextId
.
For Events.Updated
, we simply find the todo of that particular ID and replace it.
Lastly we create a fold
function which takes the state plus a sequence of events rather than a single event, this function follows the signature needed by Equinox.
Once we have defined the folding, we are able to repopulate the latest state, we can create a Command
which will represent our available actions in our system.
1
2
3
4
5
6
7
8
9
10
11
12
13
type Command =
| Add of Events.TodoData
| Update of Events.TodoData
let interpret c (state : Fold.State) =
match c with
| Add value ->
[Events.Added { value with id = state.nextId }]
| Update value ->
match state.items |> List.tryFind (function { id = id } -> id = value.id) with
| Some current when current <> value -> [Events.Updated value]
| _ -> []
Similarly to the Events
, we create a discriminated union for the Command
(follow the command pattern is not a requirement, but it often fits well with event sourcing).
Note however, that unlike events, commands are not stored and hence don’t require to be serialized or deserialized so we are free to use any type representation.
Here we define Add
and Update
. Our Events are in past tense while our Command are in imperative mood. Events already occurred and have already been saved as truth while Commands
represent a desire or intent that’s yet to be accepted into the aggregate’s timeline
In the interpret
, we take the current state with the command, and generate events out from that point. We don’t act on the state, we generate events which will be folded onto the state later on.
And that concludes our definition of a stream. The last part will be to use the Events
, Fold
, Command
and interpret
to build a Service
which will contain the functionality:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
type Service (resolve : string -> Equinox.Stream<Events.Event, Fold.State>) =
let handle clientId command =
let stream = resolve clientId
stream.Transact(fun state ->
let events = interpret command state
let newState = Fold.fold state events
newState.items, events)
member __.List(clientId) : Async<Events.Todo seq> =
let stream = resolve clientId
stream.Query (fun s -> s.items |> Seq.ofList)
member __.Create(clientId, template: Events.Todo) : Async<Events.Todo> = async {
let! newState = handle clientId (Command.Add template)
return List.head newState }
member __.Patch(clientId, item: Events.Todo) : Async<Events.Todo> = async {
let! newState = handle clientId (Command.Update item)
return List.find (fun x -> x.id = item.id) newState }
The Service
exposes List
, Create
and Patch
which allows respectively to list all todos, create a new todo and update an existing todo.
The argument resolve
is a function which takes a stream ID and returns the instance of the stream.
Given the stream, we are able to either use #.Query
allowing us to query the latest state of the stream, or #.Transact
allowing us to perform an action on the stream.
#.Transact
manages accepting changes into the aggregate. It obtains the current state (using initial
, fold
, events
, snapshots
and/or caches
etc), presents that to a function you supply, and takes care of saving the changes the function decided are appropriate given that state and the input Command
. There’s also an overload of #.Transact
that allows the function passed as argument to yield a response.
Lastly we can register the service as a singleton:
1
2
3
4
5
6
7
8
9
10
11
12
services.AddSingleton<TodoBackend.Service>(fun sc ->
// The Events for each Stream are held in here (NOTE: Volatile: not saves when Host is stopped)
let memoryStore = VolatileStore()
let resolver =
Equinox.MemoryStore.Resolver(memoryStore, Todo.Events.codec, Todo.Fold.fold, Todo.Fold.initial)
let streamName id =
FsCodec.StreamName.create "Todos" id
Todo.Service(fun id -> Equinox.Stream(Serilog.Log.Logger, resolver.Resolve (streamName id), maxAttempts = 3)))
A stream is created using a resolver
, and a resolver is given from a concrete implementation of an Equinox store
. In this example we use the MemoryStore
therefore we create a resolver with the VolatileStore
.
And we construct the stream name using FsCodec.StreamName.create
facility (more info about StreamName
here).
We can now use the service in a regular ASPNET Core API Controller:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
[<CLIMutable>]
type TodoDto = { id: int; title: string; completed: bool }
[<ApiController>]
[<Route("[controller]")>]
type TodoController (service: Service) =
inherit ControllerBase()
let clientId = Guid.Empty.ToString("N")
[<HttpGet>]
member __.Get() = async {
let! xs = service.List(clientId)
return xs
}
[<HttpPost>]
member __.Post([<FromBody>]value : TodoDto) : Async<TodoDto> = async {
let! _ = service.Create(clientId, { id = 0; title = value.title; completed = false })
return value
}
[<HttpPatch "{id}">]
member __.Patch(id, [<FromBody>]value : TodoDto) : Async<TodoDto> = async {
let! _ = service.Patch(clientId, { id = id; title = value.title; completed = value.completed })
return value
}
The events for a given aggregate are stored in a stream. Each #.Query
or #.Transact
references a StreamName
which in our examples takes a clientId
and maps it to the individual stream that will hold that client’s todo list, e.g. “Todos-Customer1234”. And that’s it, we now have an event sourced todo application with two events, Added
and Updated
.
Today we looked into Equinox
, a .NET Event Sourcing platform. We started by giving a brief introduction of event sourcing and then moved on to implementing a TODO application using Equinox and its memory store.
We took a step by step approach of defining a model via events, then defining the logic of folding events to rebuild the state, then looked into how to interpret commands to generate events and lastly how to bind everything together with the memory store.
I hope you liked this post and I see you on the next one!