A lightweight, composable library for monitoring backpressure in FS2 streams.
FS2 Backpressure Sensor provides tools to monitor and measure backpressure in your functional streams. This library helps you understand where bottlenecks occur in your stream processing by tracking:
- Starvation: How long a stream waits for upstream data
- Backpressure: How long a stream waits for downstream processing
The sensor can be applied to any FS2 stream with minimal changes to your code, making it ideal for both debugging and monitoring production systems.
Add the dependency to your build.sbt:
libraryDependencies += "io.github.nivox" %% "fs2-backpressure-sensor" % "<version>"The library offers 2 types of sensors:
- plain sensor: measure backpressure at a specific point of the stream
- bracket sensor: measure backpressure contribution of a portion of the stream
A plain sensor measures backpressure at a single point in your stream:
import fs2.backpressuresensor._
import fs2.backpressuresensor.syntax._
import cats.effect._
import fs2._
import scala.concurrent.duration._
object PlainSensorExample extends IOApp.Simple:
  def run: IO[Unit] =
    // Create a reporter that logs backpressure stats every second
    val reporter = Reporter.interval[IO](1.second) { (starvation, backpressure) =>
      IO.println(s"Starvation: ${starvation.toMillis}ms, Backpressure: ${backpressure.toMillis}ms")
    }
    
    // Your source stream
    val source = Stream.iterate(0)(_ + 1)
      .metered(10.millis)
    
    // Apply the backpressure sensor to your stream
    source
      .backpressureSensor(reporter)  // <-- Add sensor here
      .evalMap(n => IO.sleep(50.millis) >> IO.println(s"Processing: $n"))
      .take(100)
      .compile
      .drainA bracketed sensor can measure backpressure around a specific pipe transformation, helping you understand which part of your stream is causing bottlenecks:
import fs2.backpressuresensor._
import fs2.backpressuresensor.syntax._
import cats.effect._
import fs2._
import scala.concurrent.duration._
object BracketedSensorExample extends IOApp.Simple:
  def run: IO[Unit] =
    // Create a reporter that logs backpressure stats every second
    val reporter = Reporter.interval[IO](1.second) { (starvation, backpressure) =>
      IO.println(s"Starvation: ${starvation.toMillis}ms, Backpressure: ${backpressure.toMillis}ms")
    }
    
    // Define your transformation pipe
    val processingPipe: Pipe[IO, Int, String] = 
      _.evalMap(n => IO.sleep(50.millis) >> IO.pure(s"Processed: $n"))
    
    // Your source stream
    val source = Stream.iterate(0)(_ + 1)
      .metered(10.millis)
    
    // Apply the bracketed sensor around your processing pipe
    source
      .backpressureBracketSensor(reporter)(processingPipe)  // <-- Wrap the pipe with a sensor
      .evalTap(s => IO.println(s))
      .take(100)
      .compile
      .drainYou can create custom reporters by implementing the Reporter[F] trait:
trait Reporter[F[_]]:
  def reportStarvedFor(duration: FiniteDuration): F[Unit]
  def reportBackpressuredFor(duration: FiniteDuration): F[Unit]This project was inspired by https://github.com/timbertson/backpressure-sensor.