Version Matrix

Akka Persistence Postgres

License Actions Status

The Akka Persistence Postgres plugin allows for using PostgreSQL 11 and Amazon Aurora databases as backend for Akka Persistence and Akka Persistence Query.

It’s been originally created as a fork of Akka Persistence JDBC plugin, focused on PostgreSQL features such as partitions, arrays, BRIN indexes and others.

The main goal is to keep index size and memory consumption on a moderate level while being able to cope with an increasing data volume.

Use cases

This plugin supports different schema variants for different use-cases: from small and simple apps, through the ones with a small, finite number of persistent actors but each with huge and still growing journals, to the services with an increasing number of unique persistent actors.

You can read more about DAOs and schema variants in the official documentation.

Adding Akka Persistence Postgres to your project

To use akka-persistence-postgres in your SBT project, add the following to your build.sbt:

libraryDependencies += "com.swisborg" %% "akka-persistence-postgres" % "0.5.0-M1"

For a maven project add:

<dependency>
    <groupId>com.swisborg</groupId>
    <artifactId>akka-persistence-postgres_2.12</artifactId>
    <version>0.5.0-M1</version>
</dependency>

to your pom.xml.

Enabling Akka Persistence Postgres in your project

To use this plugin instead of the default one, add the following to application.conf:

akka.persistence {
  journal.plugin = "postgres-journal"
  snapshot-store.plugin = "postgres-snapshot-store"
}

and for persistence query:

PersistenceQuery(system).readJournalFor[PostgresReadJournal](PostgresReadJournal.Identifier)

Documentation

Key features when compared to the original Akka Persistence JDBC plugin

BRIN index on the ordering column

This plugin has been re-designed in terms of handling very large journals. The original plugin (akka-persistence-jdbc) uses B-Tree indexes on three columns: ordering, persistence_id and sequence_number. They are great in terms of the query performance and guarding column(s) data uniqueness, but they require relatively a lot of memory.

Wherever it makes sense, we decided to use more lightweight BRIN indexes.

Tags as an array of int

Akka-persistence-jdbc stores all tags in a single column as String separated by an arbitrary separator (by default it’s a comma character).

This solution is quite portable, but not perfect. Queries rely on the LIKE ‘%tag_name%’ condition and some additional work needs to be done in order to filter out tags that don't fully match the input tag_name (imagine a case when you have the following tags: healthy, unhealthy and neutral and want to find all events tagged with healthy. The query will return events tagged with both, healthy and unhealthy tags).

Postgres allows columns of a table to be defined as variable-length arrays. By mapping event tag names into unique numeric identifiers we could leverage intarray extension, which in some circumstances can improve query performance and reduce query costs up to 10x.

Support for partitioned tables

When you have big volumes of data and they keep growing, appending events to the journal becomes more expensive - indexes are growing together with tables.

Postgres allows you to split your data between smaller tables (logical partitions) and attach new partitions on demand. Partitioning also applies to indexes, so instead of a one huge B-Tree you can have a number of capped tables with smaller indexes.

You can read more on how Akka Persistence Postgres leverages partitioning in the Supported journal schema variants section below.

Minor PostgreSQL optimizations

Beside the aforementioned major changes we did some minor optimizations, like changing the column ordering for more efficient space utilization.

Supported journal schema variants

Currently, plugin supports two variants of the journal table schema: flat journal - a single table, similar to what the JDBC plugin provides. All events are appended to the table. Schema can be found here.

This is the default schema.

journal with nested partitions by persistenceId and sequenceNumber - this version allows you to shard your events by the persistenceId. Additionally each of the shards is split by sequenceNumber range to cap the indexes. You can find the schema here.

This variant is aimed for services that have a finite and/or small number of unique persistence aggregates, but each of them has a big journal.

Using partitioned journal

In order to start using partitioned journal, you have to create either a partitioned table (here is the schema) and set the Journal DAO FQCN:

postgres-journal.dao = "akka.persistence.postgres.journal.dao.NestedPartitionsJournalDao"

The size of the nested partitions (sequence_number’s range) can be changed by setting postgres-journal.tables.journal.partitions.size. By default partition size is set to 10000000 (10M).

Partitions are automatically created by the plugin in advance. NestedPartitionsJournalDao keeps track of created partitions and once sequence_number is out of the range for any known partitions, a new one is created.

Partitions follow the prefix_sanitizedPersistenceId_partitionNumber naming pattern. The prefix can be configured by changing the posgres-journal.tables.journal.partitions.prefix value. By default it’s set to j. sanitizedPersistenceId is PersistenceId with all non-word characters replaced by _. partitionNumber is the ordinal number of the partition for a given partition id.

Example partition names: j_myActor_0, j_myActor_1, j_worker_0 etc.

Keep in mind that the default maximum length for a table name in Postgres is 63 bytes, so you should avoid any non-ascii characters in your persistenceIds and keep the prefix reasonably short.

⚠️ Once any of the partitioning setting under postgres-journal.tables.journal.partitions branch is settled, you should never change it. Otherwise you might end up with PostgresExceptions caused by table name or range conflicts.

Migration

Migration from akka-persistence-jdbc 4.0.0

It is possible to migrate existing journals from Akka Persistence JDBC 4.0.0. Since we decided to extract metadata from the serialized payload and store it in a separate column it is not possible to migrate exiting journal and snapshot store using plain SQL scripts.

How migration works

Each journal event and snapshot has to be read, deserialized, metadata and tags must be extracted and then everything stored in the new table.

We provide you with an optional artifact, akka-persistence-postgres-migration that brings to your project the necessary classes to automate the above process.

Important: Our util classes neither drop nor update any old data. Original tables will be still there but renamed with an old_ prefix. It's up to you when to drop them.

How to use plugin provided migrations

Add akka-persistence-migration to your project

Add the following to your build.sbt

libraryDependencies += "com.swissborg" %% "akka-persistence-postgres-migration" % "0.5.0-M1"

For a maven project add:

<dependency>
    <groupId>com.swisborg</groupId>
    <artifactId>akka-persistence-postgres-migration_2.12</artifactId>
    <version>0.5.0-M1</version>
</dependency>

to your pom.xml.

Create and run migrations:
import akka.persistence.postgres.migration.journal.Jdbc4JournalMigration
import akka.persistence.postgres.migration.snapshot.Jdbc4SnapshotStoreMigration

for {
_ <- new Jdbc4JournalMigration(config).run()
_ <- new Jdbc4SnapshotStoreMigration(config).run()
} yield ()

Very important note: The migration has to be finished before your application starts any persistent actors!

It's your choice whether you want to trigger migration manually or (recommended) leverage a database version control system of your choice (e.g. Flyway).

Examples

An example Flyway-based migration can be found in the demo app: https://github.com/mkubala/demo-akka-persistence-postgres/blob/master/src/main/scala/com/github/mkubala/FlywayMigrationExample.scala

Migration from akka-persistence-postgres 0.4.0 to 0.5.0

New indices need to be created on each partition, to avoid locking production databases for too long, it should be done in 2 steps:

  1. manually create indices CONCURRENTLY,
  2. deploy new release with migration scripts.

Manually create indices CONCURRENTLY

Execute DDL statements produced by the sample migration script, adapt top level variables to match your journal configuration before executing.

Deploy new release with migration scripts

See sample flyway migration script and adapt top level variables to match your journal configuration.

Contributing

We are also always looking for contributions and new ideas, so if you’d like to join the project, check out the open issues, or post your own suggestions!

Sponsors

Development and maintenance of akka-persistence-postgres is sponsored by:

SoftwareMill

SoftwareMill is a software development and consulting company. We help clients scale their business through software. Our areas of expertise include backends, distributed systems, blockchain, machine learning and data analytics.

SwissBorg

SwissBorg makes managing your crypto investment easy and helps control your wealth.