Skip to content

Event Sourcing in Kotlin

Event Sourcing has been around for quite a while and lots of systems have implemented it. A lot has been written about it by multiple experts and hence instead of focusing on explaining the concept and design, this article is going to focus on the basics of the design using Kotlin and how we introduced it in our application architecture while implementing the transaction microservices at Vayana Network. We request that you first spend time understanding Event Sourcing from the pool of resources available in the blogosphere. We will attempt to leave a few pointers as part the article.

Let us start by looking at its definition.

What is Event Sourcing?

From Martin Fowler's blog

We can query an application's state to find out the current state of the world, and this answers many questions. However there are times when we don't just want to see where we are, we also want know how we got there.

Event Sourcing ensures that all changes to application state are stored as a sequence of events. Not just can we query these events, we can also use the event log to reconstruct past states, and as a foundation to automatically adjust the state to cope with retroactive changes.

In a microservice deployment architecture, event sourcing comes to your rescue to solve the problem of how to reliably/atomically update the database and send messages/events? However we will leave that as a self exercise for yourself. From https://microservices.io

A service command typically needs to update the database and send messages/events. To avoid inconsistencies and bugs, it must atomically update the database and send messages.

However, it is not viable to use traditional distributed transaction (2PC) that spans the database and message broker to atomically update the database and publish messages/events.

The message broker might not support 2PC without which, sending a message in the middle of a transaction is not reliable. There is no guarantee, that the transaction will commit.

Similarly, if a service sends a message after committing the transaction there is no guarantee that it won't crash before seding the message.

Amongst many articles written about this topic we would like to recommend reading CQRS & Event Sourcing in Java that does a very good job at introducing Event Sourcing and CQRS design patterns. CQRS naturally complements Event Sourcing but it is not mandatory that you implement both in your application.

If you are not already well versed with Event Sourcing as a concept, we recommend first reading the above article to form a good understanding before reading further.

Borrowing from the above article, here is a pictorial representation of Event Sourcing design pattern.

 

Setting up the context - Sample Application

To be able to demonstrate the design capabilities we will try to focus on a simplified version of Customer Onboarding Application processing system. Customers submit an application with their respective details which are then reviewed and actioned by the backoffice team and the application is expected to be approved or rejected. When approved, the system can hand over the customer details to the downstream transaction system for further setup and enabling the customers to perform their business transactions.

For the purposes of this demonstration, the use cases being considered are

  • Customer creates an application. (User action results in an Event)
  • System validates the application data for sanity and applies certain business rules and records the application. These details are left out from the sample code presented as we make progress.
  • Backoffice team approves an application. (User action results in an Event)
  • Backoffice team rejects an application. (User action results in an Event)
Simplified Data Model for the customer application
//Application Data
data class CustomerApplication(
  val customerName: String,
  val customerAddress: String,
  val customerPhone: String,
  val customerEmail: String,
  val appliedOn: LocalDate
)
enum class ApplicationStatus {
  New,
  BeingReviewed,
  SentBack,
  Approved,
  Rejected
}

A few more attributes like application status, approving/rejecting users, approval/rejection dates are left out on purpose to showcase the power of Event Sourcing.

Introducing the basic constructs

There are two obvious abstractions in Event Sourcing - event and state.

An event represents what has happened in the system.

inline class Guid(val uuidString: String = UUID.randomUUID().toString())
// -- Events
interface Event {
  val metadata: Metadata
}

data class Metadata(
  val correlationId: Guid,
  val sequence: Int = 0,
  val timestampMillis: Long = Instant.now().toEpochMilli()
) {
  val actionDate: LocalDate = LocalDate.ofInstant(
    Instant.ofEpochMilli(timestampMillis),
    ZoneId.of("Asia/Kolkata")
  )
}

A state represents that outcome of processing a set of events.

// -- State Models
interface State {
  val correlationId: Guid
}

To handle events and states we will need - command, command executor and state manager.

A command acts as an interface to implement externally or internally triggered system behavior. e.g submitting an application.

// -- Commands
interface Command {
  fun correlationId(): Guid
}

interface CommandContext

A command executor is responsible for looking at the current state of the system and the command data in question and compute a set of events.

interface DomainError
data class SomeError(val msg: String) : DomainError

// -- Command Handlers
typealias CmdResult<TEvent> = Either<DomainError, List<TEvent>> //cmd result is either an error or list of events

typealias CmdHandler<TCommand, TEvent> = (TCommand) -> CmdResult<TEvent>  //cmd handler takes a cmd and gives back cmd result

// -- Given a state and command, process and get a list of events
typealias ExecuteCommandFn<TState, TCommand, TCommandCtx, TEvent> =
    (TState, TCommand, TCommandCtx) -> CmdResult<TEvent>

fun <TState: State, TCommand: Command,
    TCommandCtx: CommandContext, TEvent: Event> executeCommand(
  state: TState,
  cmd: TCommand,
  cmdCtx: TCommandCtx,
  block: ExecuteCommandFn<TState, TCommand, TCommandCtx, TEvent>
): CmdResult<TEvent> = block(state, cmd, cmdCtx)

A state manager is responsible for computing the current state of the system by playing a set of events starting at a particular state and giving back the end state after all the events are processed in the right order.

// -- Apply an event to a state and get the new state
typealias ApplyEventFn<TState, TEvent> = (TState, TEvent) -> TState

fun <TState : State, TEvent : Event> applyEvent(
  state: TState, event: TEvent,
  block: ApplyEventFn<TState, TEvent>
): TState = block(state, event)

fun <TState : State, TEvent : Event> applyEvents(
  state: TState, events: List<TEvent>,
  block: ApplyEventFn<TState, TEvent>
): TState = events.fold(state, block)
Note

The executeCommand and applyEvent functions above enforce further type safety ensuring that only specific types of events will be processed as you use them further.

Either is a functional construct that allows capturing a result of an operation as a Left or a Right. We use this internally to streamline error handling behavior and make the code free of try...catch clutter.

Finally we need an aggregate that will deal with the commands, events and states in a uniform fashion. One can visualize that an instance of an aggregate will deal with a business transaction in your system. In our case this will be a customer application. It will become clear as we introduce an implementation of aggregate further down.

// -- Aggregates
interface Aggregate<TState : State, TCommand : Command, TCommandCtx : CommandContext, TEvent : Event> {
  val initialState: TState
  fun execute(st: TState, cmd: TCommand, cmdCtx: TCommandCtx): CmdResult<TEvent>
  fun apply(st: TState, ev: TEvent): TState
  fun apply(st: TState, evs: List<TEvent>): TState
}

Simulating in-memory Storage as a Replacement to the Database

To keep things simple we will use an in-memory data structure to deal with applications and application events in the system. In real life you will use a proper database and this is only for demonstration purposes.

Simulated In-Memory Applications storage

//Simulate the database

interface EventStream : Iterable<Event> {
  fun <TEvent: Event> append(newEvents: List<TEvent>): EventStream
}

class CustomerApplicationEvents(
  val events: List<Event> = emptyList()
): EventStream {
  override fun <TEvent: Event> append(newEvents: List<TEvent>): CustomerApplicationEvents =
    CustomerApplicationEvents(events+newEvents)

  override fun iterator(): Iterator<Event> = events.iterator()
}

interface EventStore {
  fun loadEventStream(correlationId: Guid): EventStream
  fun <TEvent: Event> store(correlationId: Guid, events: List<TEvent>): DomainError?
}

class CustomerApplications: EventStore {
  private val applications = ConcurrentHashMap<Guid, CustomerApplicationEvents>()
  override fun loadEventStream(correlationId: Guid): CustomerApplicationEvents =
    applications.getOrPut(correlationId) { CustomerApplicationEvents() }

  override fun <TEvent: Event> store(correlationId: Guid, newEvents: List<TEvent>): DomainError? =
    loadEventStream(correlationId).let { evs ->
      applications[correlationId] = evs.append(newEvents)
      null
    }
}

Implementing Your Event Sourcing Domain Models

With the design elements introduced, let us now implement the events, states and commands for our Customer Onboarding Application system. Revisit our use cases listed above to make sense of the models.

//Commands
sealed class CustomerApplicationCmd: Command {
  abstract val applicationId: Guid
  override fun correlationId() = applicationId
}
data class AcceptCustomerApplication(
  override val applicationId: Guid,
  val customerName: String,
  val customerAddress: String,
  val customerPhone: String,
  val customerEmail: String
): CustomerApplicationCmd()
data class UpdateCustomerApplication(
  override val applicationId: Guid,
  val customerName: String,
  val customerAddress: String,
  val customerPhone: String,
  val customerEmail: String
): CustomerApplicationCmd()
data class ApproveCustomerApplication(
  override val applicationId: Guid,
  val approvingUser: String
): CustomerApplicationCmd()
data class RejectCustomerApplication(
  override val applicationId: Guid,
  val rejectingUser: String
): CustomerApplicationCmd()

Notice how commands capture the data received in the request from user interface or API invocation.

//Events
sealed class CustomerApplicationEvent: Event {
  abstract override val metadata: Metadata
}
data class CustomerApplicationSubmitted(
  override val metadata: Metadata,
  val customerName: String,
  val customerAddress: String,
  val customerPhone: String,
  val customerEmail: String,
): CustomerApplicationEvent()
data class CustomerApplicationUpdated(
  override val metadata: Metadata,
  val customerName: String,
  val customerAddress: String,
  val customerPhone: String,
  val customerEmail: String
): CustomerApplicationEvent()
data class CustomerApplicationApproved(
  override val metadata: Metadata,
  val approvedBy: String
): CustomerApplicationEvent()
data class CustomerApplicationRejected(
  override val metadata: Metadata,
  val rejectedBy: String
): CustomerApplicationEvent()

All events would have a standard set of attributes (identifier, type, event timestamp, internal sequence number) and the data received in the command should be stored as extended event data along with additional data such as derviced attributes. One can be sure that events in the system represent validated requests. This will be one of the biggest mindset shifts as one tries to implement the event store in an RDBMS. Using NoSQL database generally tends to simplify designing your event store.

Note

A command execution can result into multiple events stored in the system database.

//States
sealed class CustomerApplicationState: State {
  abstract override val correlationId: Guid
  abstract val applicationStatus: ApplicationStatus
}
data class NewCustomerApplication(
  override val correlationId: Guid,
  override val applicationStatus: ApplicationStatus = ApplicationStatus.New
): CustomerApplicationState()
data class UnderProcessingCustomerApplication(
  override val correlationId: Guid,
  val applicationData: CustomerApplication,
  override val applicationStatus: ApplicationStatus = ApplicationStatus.BeingReviewed
): CustomerApplicationState()
data class CustomerApplicationBeingUpdated(
  override val correlationId: Guid,
  val applicationData: CustomerApplication,
  val sentBackOn: LocalDate,
  override val applicationStatus: ApplicationStatus = ApplicationStatus.SentBack
): CustomerApplicationState()
data class ApprovedCustomerApplication(
  override val correlationId: Guid,
  val applicationData: CustomerApplication,
  val approvedBy: String,
  val approvedOn: LocalDate,
  override val applicationStatus: ApplicationStatus = ApplicationStatus.Approved
): CustomerApplicationState()
data class RejectedCustomerApplication(
  override val correlationId: Guid,
  val applicationData: CustomerApplication,
  val rejectedBy: String,
  val rejectedOn: LocalDate,
  override val applicationStatus: ApplicationStatus = ApplicationStatus.Rejected
): CustomerApplicationState()

A command context is a construct that allows you to encapsulate and pass around stuff like database connection pool, application config, etc. Here we need a dummy implementation to satisfy the type requirements.

//Command Context
data class NoopCmdContext(
  val nothing: String = "dummy"
): CommandContext

Our Command Executor

One of the important aspects of commands is that a command is valid only when your system is in a particular state. Which means that one needs to design the system state transition very carefully to ensure that your system never gets into an inconsistent state post the execution of a command.

And hence a command executor implemention would look like a state machine.

Here is the implementation for our Customer Onboarding Application system use cases.

fun handleCustomerApplicationCmd(
  st: CustomerApplicationState,
  cmd: CustomerApplicationCmd,
  ctx: CommandContext
): CmdResult<CustomerApplicationEvent> =
  when(cmd) {
    is AcceptCustomerApplication ->
      either.eager {
        listOf(
          CustomerApplicationSubmitted(
            Metadata(Guid(cmd.applicationId.toString())),
            cmd.customerName,
            cmd.customerAddress,
            cmd.customerPhone,
            cmd.customerEmail
          )
        )
      }
    is UpdateCustomerApplication ->
      either.eager {
        listOf(
          CustomerApplicationUpdated(
            Metadata(Guid(cmd.applicationId.toString())),
            cmd.customerName,
            cmd.customerAddress,
            cmd.customerPhone,
            cmd.customerEmail
          )
        )
      }
    is ApproveCustomerApplication ->
      either.eager {
        listOf(
          CustomerApplicationApproved(
            Metadata(Guid(cmd.applicationId.toString())),
            cmd.approvingUser
          )
        )
      }
    is RejectCustomerApplication ->
      either.eager {
        listOf(
          CustomerApplicationRejected(
            Metadata(Guid(cmd.applicationId.toString())),
            cmd.rejectingUser
          )
        )
      }
  }


fun executeCustomerApplicationCmd(
  st: CustomerApplicationState,
  cmd: CustomerApplicationCmd,
  ctx: CommandContext
): CmdResult<CustomerApplicationEvent> =
  when {
    st is NewCustomerApplication && cmd is AcceptCustomerApplication ->
      executeCommand(st, cmd, NoopCmdContext(), ::handleCustomerApplicationCmd)
    st is UnderProcessingCustomerApplication && cmd is ApproveCustomerApplication ->
      executeCommand(st, cmd, NoopCmdContext(), ::handleCustomerApplicationCmd)
    st is UnderProcessingCustomerApplication && cmd is RejectCustomerApplication ->
      executeCommand(st, cmd, NoopCmdContext(), ::handleCustomerApplicationCmd)
    else ->
      SomeError("action not allowed").left()
  }

Computing The System State

As mentioned earlier that following Event Sourcing approach involves capturing a set of events in an ordered fashion. And the state of the system at any point in time involves playing a set of events starting with a particular state. For our Customer Onboarding Application this implementation would like this.

fun applyCustomerApplicationEvent(
  st: CustomerApplicationState,
  ev: CustomerApplicationEvent
): CustomerApplicationState =
  when {
    st is NewCustomerApplication && ev is CustomerApplicationSubmitted ->
      UnderProcessingCustomerApplication(
        ev.metadata.correlationId,
        CustomerApplication(
          ev.customerName,
          ev.customerAddress,
          ev.customerPhone,
          ev.customerEmail,
          ev.metadata.actionDate
        )
      )
    st is UnderProcessingCustomerApplication && ev is CustomerApplicationApproved ->
      ApprovedCustomerApplication(
        ev.metadata.correlationId,
        st.applicationData,
        ev.approvedBy,
        ev.metadata.actionDate
      )
    st is UnderProcessingCustomerApplication && ev is CustomerApplicationRejected ->
      RejectedCustomerApplication(
        ev.metadata.correlationId,
        st.applicationData,
        ev.rejectedBy,
        ev.metadata.actionDate
      )
    else -> st
  }

Our Customer Application Aggregate

Finally, to deal with command execution and event processing for a customer application instance in question, we will need to implement an aggregate.

class CustomerApplicationAggregate(
  private val es: CustomerApplications
): Aggregate<CustomerApplicationState, CustomerApplicationCmd, CommandContext, CustomerApplicationEvent> {
  override val initialState: CustomerApplicationState = NewCustomerApplication(Guid(""))
  override fun apply(st: CustomerApplicationState, ev: CustomerApplicationEvent): CustomerApplicationState =
    applyEvent(st, ev, ::applyCustomerApplicationEvent)

  override fun apply(st: CustomerApplicationState, evs: List<CustomerApplicationEvent>): CustomerApplicationState =
    applyEvents(st, evs, ::applyCustomerApplicationEvent)

  override fun execute(
    st: CustomerApplicationState,
    cmd: CustomerApplicationCmd,
    cmdCtx: CommandContext
  ): CmdResult<CustomerApplicationEvent> {
    val eventStream = es.loadEventStream(cmd.correlationId())
    val events = eventStream.events as List<CustomerApplicationEvent>
    val currentState = apply(initialState, events)
    return executeCommand(
      currentState, cmd, NoopCmdContext(), ::executeCustomerApplicationCmd
    ).flatMap {
      es.store(cmd.correlationId(), it)
      it.right()
    }
  }
}

See It In Action

Here is the simulated execution of our Customer Onboarding Application system.

fun main() {
  val db = CustomerApplications()
  val cmdCtx = NoopCmdContext()
  val correlationId = Guid(UUID.randomUUID().toString())
  val agg = CustomerApplicationAggregate(db)

  //Simulate the system usage
  val cmd1 = AcceptCustomerApplication(correlationId, "John Doe", "Vadodara", "john@doe.com", "+91-9988776655")
  agg.execute(NewCustomerApplication(Guid("")), cmd1, cmdCtx)

  print("System state after submitting the application -> ")
  //Finding the current state of an application
  val currentState1 = agg.apply(
    NewCustomerApplication(Guid("")),
    db.loadEventStream(correlationId).events as List<CustomerApplicationEvent>
  )
  println(currentState1)
  println()

  val cmd2 = ApproveCustomerApplication(correlationId, "Jane Doe")
  agg.execute(NewCustomerApplication(Guid("")), cmd2, cmdCtx)

  println("System state after approving the application -> ")
  //Finding the current state of an application
  val currentState = agg.apply(
    NewCustomerApplication(Guid("")),
    db.loadEventStream(correlationId).events as List<CustomerApplicationEvent>
  )
  println(currentState)
}
System state after submitting the application -> UnderProcessingCustomerApplication(correlationId=Guid(uuidString=Guid(uuidString=744ee026-4841-4e15-8509-3aadce497429)), applicationData=CustomerApplication(customerName=John Doe, customerAddress=Vadodara, customerPhone=john@doe.com, customerEmail=+91-9988776655, appliedOn=2022-01-08), applicationStatus=BeingReviewed)

System state after approving the application ->
ApprovedCustomerApplication(correlationId=Guid(uuidString=Guid(uuidString=744ee026-4841-4e15-8509-3aadce497429)), applicationData=CustomerApplication(customerName=John Doe, customerAddress=Vadodara, customerPhone=john@doe.com, customerEmail=+91-9988776655, appliedOn=2022-01-08), approvedBy=Jane Doe, approvedOn=2022-01-08, applicationStatus=Approved)

Why Did We Implement Event Sourcing?

When used appropriately, Event Sourcing brings some very strong benefits to your application architecture.

  • Since event store becomes an append only data structure and storage, it makes write operations much faster.
  • It simplifies the storage and domain model design at the cost of needing to compute the state of the system every time.
  • Your system automatically gets an audit log.
  • It becomes easier to build transaction timeline.
  • It lends itself for designing loosely coupled components which is a boon in microservices architecture.
  • Purging of system data becomes lot more easier and risk free. You can blindly put a purging policy based on transaction age and transactions having reached their end state.
  • When coupled with CQRS where the read-side storage is always computed for optimized read operations. This copy can be blindly purged since it can always be recomputed.
  • When designed well, it can reduce the complexity and size of the overall data backup implementation.

Where Did We Struggle When Adopting Event Sourcing?

While there are great benefits, it comes at cost that should be understood and paid upfront for long term benefits.

  • Higher learning curve because of shift in mindset.
  • Computation of transaction state will become a resource intensive activity. Luckily this can be solved by introducing a computed transaction state storage which can be periodically updated. This becomes very obvious when coupled with read-side storage in CQRS pattern.
  • Any attempt to deal with transaction state computation requirement will force you to introduce eventual consistency behavior in your system. This is yet another shift in mindset.
  • Should you transmit the events into your datalake? Do you want to recompute the state in the datalake as new events arrive?

Closing Notes

The actual implementation in our microservices is a little more heavy weight because we have implemented CQRS and eventual consistency on the read side, publishing of events on the message queues for inter-service communication, etc. However the core concepts and the initial definitions of Event, State, Command, Aggregate, the executeCommand and applyEvent/applyEvents functions remain the same with minor changes because our implementation being based on coroutines.

The code used in this article has been made available here.

Back to top