gchudnov / metrics-akka-stream

Attach metrics to akka stream operations

GitHub

metrics-akka-stream

Attach metrics to akka stream operations.


Usage

Add the following dependency to your build.sbt:

libraryDependencies += "com.github.gchudnov" %% "metrics-akka-stream" % "0.11.0"

Optionally, add github credentials and package repository resolver to build.sbt:

credentials += Credentials(Path.userHome / ".sbt" / ".credentials")
resolvers += Resolver.url("GitHub Package Registry", url("https://maven.pkg.github.com/gchudnov/metrics-akka-stream"))

// cat ~/.sbt/.credentials
// realm=GitHub Package Registry
// host=maven.pkg.github.com
// user=USER
// password=TOKEN
  • create an implicit instance of MetricRegistry.
  • (optionally) create an implicit instance of DeviceRegistry. If not provided, a default global registry will be used.
  • call .withReg(name: String, isReuse: Boolean = false) on Source, Flow or Sink nodes.
  • on each materialization a new device will be created with a unique id. id generated using the following pattern: {source|sink|flow}-{name}-{counter}.

Here .withReg(name: String, isReuse: Boolean = false) contains the parameters:

  • name - name of the device. Used to derive an unique identifier, id of the device.
  • isReuse - specifies whether to use only single device for each materialization of the node or generate a new one (default: False). For example, if groupBy operator is used to split in N partitions, and metrics are attached to a subflow, N devices will be created: flow-{name}-{0..N-1}.
import akka.actor.ActorSystem
import akka.stream.scaladsl.{ Flow, Keep, Sink, Source }
import akka.Done
import com.github.gchudnov.metrics.RecordSyntax._
import com.github.gchudnov.metrics.DeviceRegistry
import com.codahale.metrics.MetricRegistry
import scala.concurrent.{ Await, Future }
import scala.concurrent.duration._

implicit val system: ActorSystem = ActorSystem("app-actor-system")

implicit val metricRegistry: MetricRegistry = new MetricRegistry()
implicit val deviceRegistry: DeviceRegistry = DeviceRegistry() // OPTIONAL. If not provided, a global DeviceRegistry is used: 'DeviceRegistry.instance()'

val doneF: Future[Done] = Source
  .repeat(()).withReg("repeat")
  .map(_ => 1).withReg("map")
  .take(10).withReg("take")
  .toMat(Sink.ignore.withReg("ignore"))(Keep.right)
  .run()

val done = Await.result(doneF, 10.second)

Metrics will be aggregated inside of the MetricRegistry and could be examined by the metric reporter, e.g. JmxReporter, ConsoleReporter or CsvTableReporter.

In addition, some of the metrics are exposed in the provided or global DeviceRegistry:

import com.github.gchudnov.metrics.DeviceRegistry

val deviceRegistry = DeviceRegistry.instance() // OR use the device registry constructed before
val devices = deviceRegistry.list()

devices.foreach { device =>
  println(device.name)
  println(device.C)
  println(device.B)
  println(device.T)
}

val device = deviceRegistry.get("some name")
println(device.C)
println(device.B)
println(device.T)

Metrics

The following metrics are captured:

  • T - observation period, [nanoseconds], [gauge]. Time passed between the first and last event.
  • B - busy time, [nanoseconds], [counter]. Time, that the given node (device) was budy. B <= T.
  • C - number of completions, [events], [counter]. Number of events, observed at the given device.
  • AR - arrival rate, [meter].
  • DR - departure rate, [meter].
  • S - service time, [timer]. metric for the time passed between the arrival of an event at the node and its departure.

Service Time S for a source, sink and flow is measured as:

  • S[source] - time between consecutive pull and push events: S[source] = Tpush - Tpull.
    -> [source] -> [device] ->
    
    t0: <- pull
    t1: -> push
    dt = t1 - t0
    
  • S[sink] - time between consecutive push and pull events: S[sink] = Tpull - Tpush. instead of an initial pull event, a time of subscription is used (init).
    -> [device] -> [sink]
    
    t0: -> push
    t1: <- pull|init
    dt = t1 - t0
    
  • S[flow] - time between push events on the left and on the right: S[flow] = Tright-push - Tleft-push.
    -> [lhs-device] -> [flow] -> [rhs-device]
    
    t0: -> left-push
    t1: -> right-push
    dt = t1 - t0
    

API

  • [Source|Flow|Sink].wihtReg(name: String) - adds metrics to capture message propagation through source, flow and sink.

Implementation Details

                                                      New Source
                                         +--------------------------------+
                                         |                                |
      +----------+                       |  +----------+      +---------+ |
      |  Source  +------>                |  |  Source  +----->+  RFlow  +------->
      +----------+                       |  +----------+      +---------+ |
                                         |                                |
                                         +--------------------------------+

                                                      New Sink
                                         +--------------------------------+
                                         |                                |
              +--------+                 |  +---------+       +--------+  |
      ------->+  Sink  |            ------->+  LFlow  +------>+  Sink  |  |
              +--------+                 |  +---------+       +--------+  |
                                         |                                |
                                         +--------------------------------+

                                                      New Flow
                                         +------------------------------------------------+
                                         |                                                |
        +--------+                       |  +---------+       +--------+      +---------+ |
------->+  Flow  +------>           ------->+  LFlow  +------>+  Flow  +----->+  RFlow  +------->
        +--------+                       |  +---------+       +--------+      +---------+ |
                                         |                                                |
                                         +------------------------------------------------+
  • Source - to capture metrics of the given Source, a new source is created by instantiating a Flow (RFlow) and appending it to the right of original Source.
  • Sink - to capture metrics of the given Sink, a new sink is created by instantiating a Flow (LFlow) and appending it to the left of the original Sink.
  • Flow - to capture metrics of the given Flow, a new flow is created by connecting two new Flows (LFlow, RFlow) with the original Flow between them.

Contact

Grigorii Chudnov

License

Distributed under the The MIT License (MIT).