Add these lines you to your SBT project:
libraryDependencies += "com.supersonic" %% "consul-akka-stream" % "1.1.2"
To create a stream we first need a Consul client, we use the Consul Client for Java library:
val consul = Consul.builder() .withReadTimeoutMillis(0L) .withHostAndPort(HostAndPort.fromParts("localhost", 8500)) .build()
With this in hand, we can initialize an Akka-streams source based on the client using the
val source: Source[Map[String, Option[String]], CancellationToken] = ConsulStream.consulKeySource(key = "foo/baz/bar", consul, blockingTime = 1.seconds)
This creates an Akka source that listens to the
foo/baz/bar key in Consul's key/value store. On every change to the key (including recursive changes) a new map is produced. The keys in the map are the Consul keys and the values are the (optional) strings under them.
An example output can be something like:
Map( "foo/baz/bar/qux" -> Some("a"), "foo/baz/bar/goo" -> Some("b"), "foo/baz/bar/bla" -> None)
The Source materializes into a
CancellationToken token that can be triggered when the user wants to stop polling Consul.
See the tests for further examples.
The library provides a trait that facilitates testing the Consul stream (with ScalaTest) using the Embedded Consul library.
To obtain the trait, add the following to your SBT project:
libraryDependencies += "com.supersonic" %% "consul-akka-stream-integration-tests" % "1.1.2" % "it"
And mixin the
ConsulIntegrationSpec trait into your test.