gchudnov / metrics-akka-stream

Attach metrics to akka stream operations

GitHub

metrics-akka-stream

Attach metrics to akka stream operations.


Usage

Add the following dependency to your build.sbt:

libraryDependencies += "com.github.gchudnov" %% "metrics-akka-stream" % "0.10.0"

Optionally, add github credentials and package repository resolver to build.sbt:

credentials += Credentials(Path.userHome / ".sbt" / ".credentials")
resolvers += Resolver.url("GitHub Package Registry", url("https://maven.pkg.github.com/gchudnov/metrics-akka-stream"))

// cat ~/.sbt/.credentials
// realm=GitHub Package Registry
// host=maven.pkg.github.com
// user=USER
// password=TOKEN
  • create an implicit instance of MetricRegistry.
  • (optionally) create an implicit instance of DeviceRegistry. If not provided, a default global registry will be used.
  • call .withReg(name: String) on Source, Flow or Sink nodes.
import akka.actor.ActorSystem
import akka.stream.scaladsl.{ Flow, Keep, Sink, Source }
import akka.Done
import com.github.gchudnov.metrics.RecordSyntax._
import com.github.gchudnov.metrics.DeviceRegistry
import com.codahale.metrics.MetricRegistry
import scala.concurrent.{ Await, Future }
import scala.concurrent.duration._

implicit val system: ActorSystem = ActorSystem("app-actor-system")

implicit val metricRegistry: MetricRegistry = new MetricRegistry()
implicit val deviceRegistry: DeviceRegistry = DeviceRegistry() // OPTIONAL. If not provided, a global DeviceRegistry is used: 'DeviceRegistry.instance()'

val doneF: Future[Done] = Source
  .repeat(()).withReg("repeat")
  .map(_ => 1).withReg("map")
  .take(10).withReg("take")
  .toMat(Sink.ignore.withReg("ignore"))(Keep.right)
  .run()

val done = Await.result(doneF, 10.second)

Metrics will be aggregated inside of the MetricRegistry and could be examined by the metric reporter, e.g. JmxReporter, ConsoleReporter or CsvTableReporter.

In addition, some of the metrics are exposed in the provided or global DeviceRegistry:

import com.github.gchudnov.metrics.DeviceRegistry

val deviceRegistry = DeviceRegistry.instance() // OR use the device registry constructed before
val devices = deviceRegistry.list()

devices.foreach { device =>
  println(device.name)
  println(device.C)
  println(device.B)
  println(device.T)
}

val device = deviceRegistry.get("some name")
println(device.C)
println(device.B)
println(device.T)

Metrics

The following metrics are captured:

  • T - observation period, [nanoseconds], [gauge]. Time passed between the first and last event.
  • B - busy time, [nanoseconds], [counter]. Time, that the given node (device) was budy. B <= T.
  • C - number of completions, [events], [counter]. Number of events, observed at the given device.
  • AR - arrival rate, [meter].
  • DR - departure rate, [meter].
  • S - service time, [timer]. metric for the time passed between the arrival of an event at the node and its departure.

API

  • [Source|Flow|Sink].wihtReg(name: String) - adds metrics to capture message propagation through source, flow and sink.

Implementation Details

                                                      New Source
                                         +--------------------------------+
                                         |                                |
      +----------+                       |  +----------+      +---------+ |
      |  Source  +------>                |  |  Source  +----->+  RFlow  +------->
      +----------+                       |  +----------+      +---------+ |
                                         |                                |
                                         +--------------------------------+

                                                      New Sink
                                         +--------------------------------+
                                         |                                |
              +--------+                 |  +---------+       +--------+  |
      ------->+  Sink  |            ------->+  LFlow  +------>+  Sink  |  |
              +--------+                 |  +---------+       +--------+  |
                                         |                                |
                                         +--------------------------------+

                                                      New Flow
                                         +------------------------------------------------+
                                         |                                                |
        +--------+                       |  +---------+       +--------+      +---------+ |
------->+  Flow  +------>           ------->+  LFlow  +------>+  Flow  +----->+  RFlow  +------->
        +--------+                       |  +---------+       +--------+      +---------+ |
                                         |                                                |
                                         +------------------------------------------------+
  • Source - to capture metrics of the given Source, a new source is created by instantiating a Flow (RFlow) and appending it to the right of original Source.
  • Sink - to capture metrics of the given Sink, a new sink is created by instantiating a Flow (LFlow) and appending it to the left of the original Sink.
  • Flow - to capture metrics of the given Flow, a new flow is created by connecting two new Flows (LFlow, RFlow) with the original Flow between them.

Contact

Grigorii Chudnov

License

Distributed under the The MIT License (MIT).