supersonicads / consul-akka-stream Edit

An akka-stream of changes to a Consul key from its key/value store.

Version Matrix

Build Status Maven Central

consul-akka-stream

consul-akka-stream is a small utility that can listen to a Consul key in its key/value store and provide an Akka stream that emits a value every time the key is updated.

Getting consul-akka-stream

Add these lines you to your SBT project:

libraryDependencies += "com.supersonic" %% "consul-akka-stream" % "1.1.2"

How To

To create a stream we first need a Consul client, we use the Consul Client for Java library:

val consul =
  Consul.builder()
    .withReadTimeoutMillis(0L)
    .withHostAndPort(HostAndPort.fromParts("localhost", 8500))
    .build()

With this in hand, we can initialize an Akka-streams source based on the client using the ConsulStream.consulKeySource function:

val source: Source[Map[String, Option[String]], CancellationToken] = 
  ConsulStream.consulKeySource(key = "foo/baz/bar", consul, blockingTime = 1.seconds)

This creates an Akka source that listens to the foo/baz/bar key in Consul's key/value store. On every change to the key (including recursive changes) a new map is produced. The keys in the map are the Consul keys and the values are the (optional) strings under them.

An example output can be something like:

Map(
  "foo/baz/bar/qux" -> Some("a"), 
  "foo/baz/bar/goo" -> Some("b"), 
  "foo/baz/bar/bla" -> None)

The Source materializes into a CancellationToken token that can be triggered when the user wants to stop polling Consul.

See the tests for further examples.

Integration Tests

The library provides a trait that facilitates testing the Consul stream (with ScalaTest) using the Embedded Consul library.

To obtain the trait, add the following to your SBT project:

libraryDependencies += "com.supersonic" %% "consul-akka-stream-integration-tests" % "1.1.2" % "it"

And mixin the ConsulIntegrationSpec trait into your test.