Reactive-streams REST client for API implemented with Akka-Http


This is a Reactive-streams REST client for API implemented with Akka-Http

If you are looking for an alternative for Scala - see Dispatch-based client.

User Guide


To include it in your project add

"com.featurefm" %% "kastomer" % "0.0.2"

Only Scala 2.11 is currently supported.

Code API provide 3 end-points: identify, track and delete. This library gives you 3 corresponding flows.

 trait Flows {
   def health:   Source[HealthInfo, Unit]
   def identify: Flow[User,  (Try[Int], User),  Any]
   def track:    Flow[Event, (Try[Int], Event), Any]
   def delete:   Flow[String, Try[Int],         Any]

The main implementation is Kastomer class, you can create an instance with Kastomer(), providing that you have an implicit ActorSystem in scope. To create one:

 implicit val system = ActorSystem("My-App")

Checking connection

To run the client you need to provide CUSTOMERIO_SITEID and CUSTOMERIO_APIKEY environment variables.

Use health.runWith(Sink.head) to test connection, it will produce Future[HealthInfo] and if you provided good credentials you will get HealthInfo(HealthState.GOOD, "nice credentials")

Before you start - general notes about the API

Try[Int] represents the HTTP response status after calling API, and it should be Success(200) if all went well. If Failure is returned it probably signals either connection problem or bug in the code. Non-200 return code will indicate problem on side or invalid credentials.

The flow can be used to send a single request to, or it can be used to stream requests from some source to, for example results of a database query, a file read, or just an in-memory list of elements. When streaming multiple elements, it may be useful to correlate the responses (especially failures) with the element which processing produced them, for example to report or to retry the request. For that purpose the library provides a tuple in response to track and identify, first element of the tuple is the response, and the second is the element which processing produced the result.

If you are new to akka-streams: to build any stream you need an implicit Materializer value in scope, it s usually created like this:

 implicit val materializer = ActorMaterializer()

that in turn assumes an implicit ActorSystem.

Identifying user

We provide a simple User class:

case class User(id: String, email: String, data: Any)

The data field should be of a type serializable to json, e.g. a case class or a Map (of serializable values). See JSON section below for more details.

In a general case to identify request you can use code like:

val source: Source[User, _] = ??? //some source, e.g. Source.single(user)
val sink = Sink.head[(Try[Int], User)] //you can of course use any other sink of your choosing
val f: Future[(Try[Int], User)] = source.via(identify).runWith()

But for a single request we recommend using a convenience identifySingle method:

val f: Future[Int] = Source.single(user).via(identifySingle).runWith(Sink.head)

Tracking events

We provide a simple Event class:

case class Event(id: String, name: String, data: Any)

The data field should be of a type serializable to json, e.g. a case class or a Map (of serializable values). See JSON section below for more details.

In a general case to track a request you can use code like:

val source: Source[Event, _] = ??? //some source, e.g. Source.single(user)
val sink: Sink[(Try[Int], Event), _] = ??? //some sink of your choosing
val x = source.via(track).runWith(sink)

As with identifying users, in case of a single request, you can use convenience trackSingle method:

val f: Future[Int] = Source.single(event).via(trackSingle).runWith(Sink.head)

Delete user

The delete flow takes user id as parameter. It can be used in the same way as identify and track above, but it does not return additional information with failure, as there's not much that can be done in such case. It has deleteSingle shortcut method, like the other flows.

Configuring HTTP

Under the hood, this library uses Akka-Http and Akka-Streams. Specifically it is based on Host-Level Client Api. There are several parameters that can be configured for the connection pool, see reference.conf file section for their list.


The library uses json4s with jackson via the akka-http-json to serialize the requests. In addition to default serializers, Joda-Time and UUID are supported.

Metrics and Health

The library registers Health and Metrics extensions that enable Akka integration with Dropwizard (formerly known as CodaHale) Metrics.

The library automatically registers a timer with each type of flows. If you wish to access the timers in your code, you can get them from Kastomer instance Timer property that returns

import nl.grons.metrics.scala.{Timer => ScalaTimer}

trait Timers {
  def health:   ScalaTimer
  def identify: ScalaTimer
  def track:    ScalaTimer
  def delete:   ScalaTimer

For example: K.Timer.track.count or K.Timer.track.mean

To register itself as a HealthCheck, your application needs to call Health().addCheck passing the Kastomer instance.

Implementation class provides most of the heavy lifting here. This class is part of our ( micro-service infrastructure that provides both server and client implementation based on Akka-Http. We use the client to connect to many services, both internal and external, not just


Since Akka-Streams provide integration with Reactive-streams, this library also provides Processors that correspond to identify, track and delete flows.

Advanced Example

Akka-Http Site client will retry idempotent requests (according to configuration). Since identify request is a PUT, it is considered idempotent. But track is a POST, therefore if we want to retry it, we need to do it ourselves.

import scala.util._
import scala.concurrent.Future
import java.util.concurrent.atomic.AtomicLong

val K = Kastomer()
val track: Flow[Event, (Try[Int], Event), _] = K.track
val failed: Flow[(Try[Int],Event), Event, _] =
  Flow[(Try[Int],Event)].filter(_._1.filter(_ == 200).isFailure).map(_._2)

val successes = new AtomicLong()
val countSuccess =
    Sink.foreach[(Try[Int], Event)] {
      case (Success(200), _)  => successes.incrementAndGet()
      case _ => //do nothing
val report: Sink[(Try[Int], Event), Future[List[Event]]] =
    Sink.fold[List[Event], (Try[Int], Event)](List[Event]()) {
      case (res, (Success(200), event))  =>
      case (res, (Success(code), event)) =>
        res :+ event
      case (res, (Failure(e),    event)) =>
        res :+ event
val source: Source[Event, _ ] = ???

(source via track alsoTo countSuccess
      via failed via track //retry failed