systemfw / upperbound

A purely functional rate limiter

GitHub

upperbound

Build Status Maven Central

upperbound is a purely functional rate limiter. It allows you to submit jobs concurrently, which will then be started at a rate no higher than what you specify.

Installation

To get upperbound, add the following line to your build.sbt

libraryDependencies += "org.systemfw" %% "upperbound" % "version"

You can find the latest version in the releases tab.
upperbound depends on fs2, cats, cats-effect and cats-collections.

Note:

For the time being binary compatibility is not guaranteed. This is not a problem for usage in applications (which is where you would mostly use a rate limiter anyway), but risky if used in libraries. Binary compatibility will be guaranteed in the future.

Design principles

upperbound is an interval based limiter, which means jobs are started at a constant rate. This strategy prevents spikes in throughput, and makes it a very good fit for client side limiting, e.g. calling a rate limited API.
upperbound is completely pure, which allows for ease of reasoning and composability. On a practical level, this means that some familiarity with cats, cats-effect and fs2 is required.

Usage

Limiter

The main entity of the library is a Limiter, which is defined as:

trait Limiter[F[_]] {
  def submit[A](job: F[A], priority: Int = 0): F[Unit]

  def interval: SignallingRef[F, FiniteDuration]

  def initial: FiniteDuration

  def pending: F[Int]
}

The submit method takes an F[A], which can represent any program, and returns an F[Unit] that represents the action of submitting it to the limiter, with the given priority. The semantics of submit are fire-and-forget: the returned F[Unit] immediately returns, without waiting for the input F[A] to complete its execution.
Limiter.submit is designed to be called concurrently: every concurrent call submits a job to Limiter, and they are then started (in order of priority) at a rate which is no higher then the maximum rate you specify on construction. A higher number indicates higher priority, and FIFO order is used in case there are multiple jobs with the same priority being throttled.
interval is an fs2.concurrent.SignallingRef that allows you to sample, change or react to changes to the current interval between two tasks. Finally, initial and pending return the initial interval specified on creation, and the number of jobs that are queued up waiting to start, respectively.

The Limiter algebra is the basic building block of the library, additional functionality is expressed as combinators over it.

Program construction

To create a Limiter, use the start method:

case class Rate(n: Int, t: FiniteDuration)

object Limiter {
  def start[F[_]: Concurrent: Timer](maxRate: Rate, n: Int = Int.MaxValue): Resource[F, Limiter[F]]
}

start creates a new Limiter and starts processing the jobs submitted so it, which are started at a rate no higher than maxRate. import upperbound.syntax.rate._ exposes the every syntax for creating Rates:

import upperbound.syntax.rate._
import scala.concurrent.duration._

Limiter.start[F](100 every 1.minute)

Additionally, n enforces a bound on the maximum number of jobs allowed to queue up while waiting for execution. Once this number is reached, calling submit will fail the resulting F[Unit] with a LimitReachedException, so that you can in turn signal for backpressure downstream. Processing restarts as soon as the number of jobs waiting goes below n again.

The reason start returns a cats.effect.Resource is so that processing can be stopped gracefully when the Limiter's lifetime is over. To assemble your program, all the places that need limiting at the same rate should take a Limiter as an argument, which is then created at the end of a region of sharing (typically main) and injected via Limiter.start(...).use or Stream.resource(Limiter.start(...)).flatMap. If this sentence didn't make sense to you, it's recommended to watch this talk.

Note:

It's up to you whether you want to pass the Limiter algebra implicitly (as an F[_]: Limiter bound) or explicitly.
My position is that it's ok to pass algebras implicitly as long as the instance is made implicit at call site, as close as possible to where it's actually injected. This avoids any problems related to mixing things up, and is essentially equivalent to having an instance of your algebra for a newtype over Kleisli.

Reasonable people might disagree however, and I myself pass algebras around both ways, in different codebases.
upperbound is slightly skewed towards the F[_]: Limiter style: internal combinators are expressed that way, and Limiter has a summoner method to allow Limiter[F].submit

Await

As mentioned above, submit has fire-and-forget semantics. When this is not sufficient, you can use await:

object Limiter {
 def await[F[_]: Concurrent: Limiter, A](job: F[A], priority: Int = 0): F[A]
}

await looks very similar to submit, except its semantics are blocking: the returned F[A] only completes when job has finished its execution. Note however, that the blocking is only semantic, no actual threads are blocked by the implementation.

Backpressure

Limiter[F].interval offers flexible control over the rate, which can be used as a mechanism for applying backpressure based on the result of a specific job (e.g. a REST call that got rejected upstream).
Although this can be implemented entirely in user land, upperbound provides some backpressure helpers and combinators out of the box.

class BackPressure[F[_]: Limiter, A](job: F[A]) {
  def withBackoff(
    backOff: FiniteDuration => FiniteDuration,
    ack: BackPressure.Ack[A]
  ): F[A]
}
object BackPressure {
  case class Ack[-A](slowDown: Either[Throwable, A] => Boolean)
}

withBackoff enriches an F[A] with a Limiter constraint with the ability to apply backpressure to the Limiter:
Every time a job signals backpressure is needed through ack, the Limiter will adjust its current rate by applying backOff to it. This means the rate will be adjusted by calling backOff repeatedly whenever multiple consecutive jobs signal for backpressure, and reset to its original value when a job signals backpressure is no longer needed.

Note that since jobs submitted to the Limiter are processed asynchronously, rate changes will not propagate instantly when the rate is smaller than the job completion time. However, the rate will eventually converge to its most up-to-date value.

BackPressure.Ack[A] is a wrapper over Either[Throwable, A] => Boolean, and it's used to assert that backpressure is needed based on a specific result (or error) of the submitted job. You can write your own Acks, but the library provides some for you, including:

  • BackPressure.onAllErrors: signal backpressure every time a job fails with any error.
  • BackPressure.onError[E <: Throwable]: signal backpressure if a job fails with a specific type of exception. Meant to be used with explicit type application, e.g. BackPressure.onError[MyException].
  • BackPressure.onResult(cond: A => Boolean): signal backpressure when the result of a job satisfies the given condition.

Note that withBackoff only transforms the input job, you still need to actually submit or await yourself. This is done to allow further combinators to operate on a job as a chain of F[A] => F[A] functions before actually submitting to the Limiter.
It's also available as syntax:

import scala.concurrent.duration._
import upperbound._
import upperbound.syntax.backpressure._


def prog[F[_]: Limiter, A](fa: F[A]): F[Unit] =
  Limiter[F].submit {
    fa.withBackoff(_ + 1.second, Backpressure.onAllErrors)
  }

Finally, please be aware that the backpressure support doesn't interfere with your own error handling, nor does any error handling (e.g. retrying) for you.

Test limiter

If you need to satisfy a Limiter constraint in test code, where you don't actally care about rate limiting, you can use Limiter.noOp, which gives you a stub Limiter with no actual rate limiting and a synchronous submit method.