Ox

Ideas, suggestions, problems, questions CI Maven Central

Safe direct-style streaming, concurrency and resiliency for Scala on the JVM. Requires JDK 21+ & Scala 3. Ox covers the following areas:

  • streaming: push-based backpressured streaming designed for direct-style, with a rich set of stream transformations, flexible stream source & sink definitions and reactive streams integration
  • error management: retries, timeouts, a safe approach to error propagation, safe resource management
  • concurrency: high-level concurrency operators, developer-friendly structured concurrency, safe low-level primitives, communication between concurrently running computations
  • scheduling & timers
  • resiliency: circuit breakers, bulkheads, rate limiters, backpressure

Ox enables writing simple, expression-oriented code in functional style. The syntax overhead is kept to a minimum, preserving developer-friendly stack traces, and without compromising performance.

To use Ox, add the following dependency, using either sbt:

"com.softwaremill.ox" %% "core" % "1.0.1"

Or scala-cli:

//> using dep "com.softwaremill.ox::core:1.0.1"

Documentation is available at https://ox.softwaremill.com, ScalaDocs can be browsed at https://javadoc.io.

Tour of ox

Run two computations in parallel:

def computation1: Int = { sleep(2.seconds); 1 }
def computation2: String = { sleep(1.second); "2" }
val result1: (Int, String) = par(computation1, computation2)
// (1, "2")

Timeout a computation:

def computation3: Int = { sleep(2.seconds); 1 }
val result2: Either[TimeoutException, Int] = timeout(1.second)(computation3).catching[TimeoutException]
// `timeout` only completes once the loosing branch is interrupted & done

Race two computations:

def computation4: Int = { sleep(2.seconds); 1 }
def computation5: Int = { sleep(1.second); 2 }
val result3: Int = raceSuccess(computation4, computation5)
// as before, the loosing branch is interrupted & awaited before returning a result

Structured concurrency & supervision:

// equivalent of par
supervised {
  val f1 = fork { sleep(2.seconds); 1 }
  val f2 = fork { sleep(1.second); 2 }
  (f1.join(), f2.join())
}

Error handling within a structured concurrency scope:

supervised {
  forkUser:
    sleep(1.second)
    println("Hello!")

  forkUser:
    sleep(500.millis)
    throw new RuntimeException("boom!")
}
// on exception, all other forks are interrupted ("let it crash")
// the scope ends & re-throws only when all forks complete (no "leftovers")

Retry a computation:

def computationR: Int = ???
retry(Schedule.exponentialBackoff(100.millis).maxRetries(4)
  .jitter().maxInterval(5.minutes))(computationR)

Repeat a computation:

def computationR: Int = ???
repeat(Schedule.fixedInterval(100.millis))(computationR)

Rate limit computations:

supervised:
  val rateLimiter = RateLimiter.fixedWindowWithStartTime(2, 1.second)
  rateLimiter.runBlocking({ /* ... */ })

Allocate a resource in a scope:

supervised {
  val writer = useCloseableInScope(new java.io.PrintWriter("test.txt"))
  // ... use writer ...
} // writer is closed when the scope ends (successfully or with an error)

Create an app which shuts down cleanly when interrupted with SIGINT/SIGTERM:

object MyApp extends OxApp:
  def run(args: Vector[String])(using Ox): ExitCode =
    // ... your app's code ...
    // might use fork {} to create top-level background threads
    ExitCode.Success

Simple type-safe actors:

class Stateful { def increment(delta: Int): Int = ??? }

supervised:
  val ref = Actor.create(new Stateful)
  // ref can be shared across forks, but only within the concurrency scope
  ref.ask(_.increment(5))    

Create a simple flow & transform using a functional API:

Flow.iterate(0)(_ + 1) // natural numbers
  .filter(_ % 2 == 0)
  .map(_ + 1)
  .intersperse(5)
  // compute the running total
  .mapStateful(0) { (state, value) =>
    val newState = state + value
    (newState, newState)
  }
  .take(10)
  .runForeach(n => println(n.toString))

Create flows which perform I/O and manage concurrency:

def sendHttpRequest(entry: String): Unit = ???
Flow
  .fromInputStream(this.getClass().getResourceAsStream("/list.txt"))
  .linesUtf8
  .mapPar(4)(sendHttpRequest)
  .runDrain()

Merge two flows, properly handling the failure of either branches:

val f1 = Flow.tick(123.millis, "left")
val f2 = Flow.tick(312.millis, "right")
f1.merge(f2).take(100).runForeach(println)

Integrate flow with other components using an imperative API:

def readNextBatch(): List[String] = ???
Flow.usingEmit { emit =>
  forever:
    readNextBatch().foreach(emit.apply)
}

Use completable high-performance channels for inter-fork communication within concurrency scopes:

val c = Channel.buffered[String](8)
c.send("Hello,")
c.send("World")
c.done()

Select from Go-like channels:

val c = Channel.rendezvous[Int]
val d = Channel.rendezvous[Int]
select(c.sendClause(10), d.receiveClause)

Unwrap eithers and combine errors in a union type:

val v1: Either[Int, String] = ???
val v2: Either[Long, String] = ???

val result: Either[Int | Long, String] = either:
  v1.ok() ++ v2.ok()

Pipe & tap values to functions to use the dot-syntax:

def compute: Int = ???
def computeMore(v: Int): Long = ???
compute
  .pipe(2 * _)
  .tap(println)
  .pipe(computeMore)  

More in the docs!.

Other projects

The wider goal of direct-style Scala is enabling teams to deliver working software quickly and with confidence. Our other projects, including sttp client and Tapir, also include integrations directly tailored towards direct style.

Moreover, also check out the gears project, an experimental multi-platform library covering direct-style Scala.

Contributing

All suggestions welcome :)

To compile and test, run:

sbt compile
sbt test

See the list of issues and pick one! Or report your own.

If you are having doubts on the why or how something works, don't hesitate to ask a question on discourse or via github. This probably means that the documentation, ScalaDocs or code is unclear and can be improved for the benefit of all.

In order to develop the documentation, you can use the doc/watch.sh script, which runs Sphinx using Python. Use doc/requirements.txt to set up your Python environment with pip. Alternatively, if you're a Nix user, run nix develop in doc/ to start a shell with an environment allowing to run watch.sh. Moreover, you can use the compileDocumentation sbt task to verify, that all code snippets compile properly.

When you have a PR ready, take a look at our "How to prepare a good PR" guide. Thanks! :)

Project sponsor

We offer commercial development services. Contact us to learn more about us!

Copyright

Copyright (C) 2023-2025 SoftwareMill https://softwaremill.com.