Add to build.sbt
libraryDependencies += "net.s_mach" %% "concurrent" % "2.0.0"
Notes_mach.concurrent is cross compiled for Scala 2.11/JDK6 and 2.12/JDK8
You love Scala Futures and functional concurrent coding! But you…
Wish there were methods for controlling how multiple Futures execute (such as retrying failures, throttling or controlling parallelism)
Hate all the boilerplate needed to create multiple futures that run in parallel
Didn’t realize Future.sequence only returns the first exception (and throws away the rest! you didn’t want those anyways right?)
Didn’t realize Future.sequence doesn’t fail immediately (a quick failure on an hour long Future.sequence might not return the failure for an hour!)
Wish Future had common idioms such as fold, flatten, etc
Are annoyed that you have to use Java’s ScheduledExecutorService for delayed or periodically repeating tasks (or even worse - an ActorSystem)
Express parallel Future operations in far less lines of code:
val read: String => Future[Int] = ???
// Turn this parallel future boilerplate:
val f1 = read("1")
val f2 = read("2")
val f3 = read("3")
val future : Future[(Int,Int,Int)] =
for {
i1 <- f1
i2 <- f2
i3 <- f3
} yield (i1,i2,i3)
// Into this:
val future : Future[(Int,Int,Int)] =
async.par.run(read("1"), read("2"), read("3"))
Create async tasks and configure them to:
Retry errors
Control the number of simultaneous operations
Limit the number of operations per second
val read : String => Future[Int] = ???
val read2 : Int => Future[Double] = ???
val read3 : Double => Future[String] = ???
// Tuple-based async task (heterogeneous)
val future : Future[(Int,Double,String)] =
async // start a new async task
.par(2) // run 2 ops at a time
.progress(1.second)(progress => println(progress)) // print progress once a second
.retry { // retry some errors with a random delay
case _:TimeoutException =>
case _ => false.future
) // sequence results and fail-immediately on exception (unlike Future.sequence)
// If the task fails, get the full list of exceptions (unlike Future.sequence)
// Collection-based async task (homogeneous)
val future : Future[List[Int]] =
.async // start a new async task
.throttle(3.second) // perform operations no faster than 1 every 3 seconds
.retry { // retry some errors with a random delay
case _:TimeoutException =>
case _ => false.future
.map(read) // sequence results and fail-immediately on exception (unlike Future.sequence)
// If the task fails, get the full list of exceptions (unlike Future.sequence)
Save your async task config for later reuse:
val myAsyncConfig =
.par(2) // run 2 ops at a time
.progress(1.second)(progress => println(progress)) // print progress once a second
.retry { // retry some errors with a random delay
case _:TimeoutException =>
case _ => false.future
// Reuse config on tuple-based async task
// Reuse config on collection-based async task
Convenience and utility methods:
A.future - stop typing Future.successful(A)
Future.get - Await.result without the flagellation
Future.flatten - flatten Future[Future[A]] into Future[A]
Future.toTry - expand Future[A] to Future[Try[A]] that always succeeds
Future.fold - fold Future[A] (which is either Success[A] or Failure[A]) to Future[B] that always succeeds
Collection[Future[A]].sequence - idiomatic version of Future.sequence(A) and less typing
Future.onTimeout - do something if a Future doesn’t complete within a given amount of time
Future.happensBefore - start a Future after another Future completes
Future.sideEffect - compose a Future and a side effect (that starts after the Future completes) into a new Future. The new Future completes successfully only after both the original Future and the side effect successfully complete. If either the original Future or the side effect fail, the composed Future fails.
Future.delayed or ScheduledExecutionContext.schedule - start a Future after a delay
ScheduledExecutionContext.scheduleCancellable - start a Future after a delay and optionally cancel the future if it hasn’t started yet. If cancelled, instead of throwing an exception the future completes successfully with a value you specify!
ScheduledExecutionContext.scheduleAtFixedRate - create a repeating task that can be paused or cancelled
s_mach.concurrent is an open-source Scala library that provides asynchronous serial and parallel execution flow control primitives for working with asynchronous tasks. An asynchronous task consists of two or more calls to function(s) that return a future result A => Future[B] instead of the result A => B. s_mach.concurrent also provides utility & convenience code for working with scala.concurrent.Future.
Adds concurrent flow control primitives async and async.par for performing fixed size heterogeneous (tuple) and variable size homogeneous (collection) asynchronous tasks. These primitives:
Allow enabling optional progress reporting, failure retry and/or throttle control for asynchronous tasks
Ensure proper sequencing of returned futures, e.g. given f: Int => Future[String]:
List(1,2,3).async.map(f) returns Future[List[String]]
async.par.run(f(1),f(2),f(3)) returns Future[(String,String,String)]
Ensure fail-immediate sequencing of future results (see the 'Under the hood: Merge' section for details)
Ensure all exceptions generated during asynchronous task processing can be retrieved (Future.sequence returns only the first)
collection.async and collection.async.par support collection operations such as map, flatMap and foreach on asynchronous functions, i.e. A => Future[B]
async.par.run(future1, future2, ...) supports running fixed size heterogeneous asynchronous task (of up to 22 futures) in parallel
Adds ScheduledExecutionContext, a Scala interface wrapper for java.util.concurrent.ScheduledExecutorService that provides for scheduling delayed and periodic tasks
Adds non-blocking concurrent control primitives such as Barrier, Latch, Lock, Semaphore and AtomicFSM
Adds future utility methods such as Future.onTimeout, Future.sideEffect and Future.happensBefore
Provides convenience methods for writing more readable, concise and DRY code such as Future.get, Future.toTry and Future.fold
s_mach.concurrent uses semantic versioning (http://semver.org/). s_mach.concurrent does not use the package private modifier. Instead, all code files outside of the s_mach.concurrent.impl package form the public interface and are governed by the rules of semantic versioning. Code files inside the s_mach.concurrent.impl package may be used by downstream applications and libraries. However, no guarantees are made as to the stability or interface of code in the s_mach.concurrent.impl package between versions.
All code examples assume the following imports:
import scala.util._
import scala.concurrent._
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.duration._
import s_mach.concurrent._
import s_mach.concurrent.util._
implicit val scheduledExecutionContext = ScheduledExecutionContext(2)
case class Item(id: String, value: Int, relatedItemId: String)
def read(id: String) : Future[Item] = Future { Thread.sleep(1000); println(id); Item(id,id.toInt,(id.toInt+1).toString) }
def readFail(id: String) : Future[Item] = Future { Thread.sleep(1000); println(id); throw new RuntimeException(id.toString) }
def longRead(id: String) : Future[Item] = Future { Thread.sleep(2000); println(id); Item(id,id.toInt,(id.toInt+1).toString) }
def write(id: String, item: Item) : Future[Boolean] = Future { Thread.sleep(1000); println(id); true }
def writeFail(id: String, item: Item) : Future[Boolean] = Future { Thread.sleep(1000); println(id); throw new RuntimeException(id.toString) }
A common task when working with futures is transforming or traversing a collection in serial or parallel that will call a function that returns a future. With only a few levels of nesting, the standard idioms for accomplishing this lead to difficult to read code. In the following example, a collection of ten identifiers is grouped to batch identifier reads. The flow of execution for each batch is serial while the flow of the identifiers within each batch is parallel.
val oomItemIdBatch = (1 to 10).toList.map(_.toString).grouped(2).toList
val future = { // necessary for pasting into repl
for {
oomItem <- {
// Serially perform read of each batch
.foldLeft(Future.successful(List[Item]())) { (facc, idBatch) =>
for {
acc <- facc
// Parallel read batch
oomItem <- Future.sequence(idBatch.map(read))
} yield acc ::: oomItem
_ = println("Computing...")
oomNewItemBatch = oomItem.map(item => item.copy(value = item.value + 1)).grouped(2).toList
oomResult <- {
// Serially perform write of each batch
.foldLeft(Future.successful(List[Boolean]())) { (facc, itemBatch) =>
for {
acc <- facc
// Parallel write batch
oomResult <- Future.sequence(itemBatch.map(item => write(item.id, item)))
} yield acc ::: oomResult
} yield oomResult.forall(_ == true)
The same code, rewritten using async and async.par:
val oomItemIdBatch = (1 to 10).toList.map(_.toString).grouped(2).toList
val future = { // necessary for pasting into repl
for {
oomItem <- {
_ = println("Computing...")
oomNewItemBatch = oomItem.map(item => item.copy(value = item.value + 1)).grouped(10).toVector
oomResult <- {
oomNewItemBatch.async.flatMap(_.async.par.map(item => write(item.id, item)))
} yield oomResult.forall(_ == true)
async.par allows specifying the maximum number of simultaneous workers used during an asynchronous task. In the following example, batches are processed in parallel with at most two workers, while each identifier within a batch is processed with at most four workers.
val oomItemIdBatch = (1 to 10).toList.map(_.toString).grouped(2).toList
val future = { // necessary for pasting into repl
for {
oomItem <- {
_ = println("Computing...")
oomNewItemBatch = oomItem.map(item => item.copy(value = item.value + 1)).grouped(10).toVector
oomResult <- {
oomNewItemBatch.async.par(2).flatMap(_.async.par(4).map(item => write(item.id, item)))
} yield oomResult.forall(_ == true)
async and async.par can be optionally modified to report progress, retry failures and/or limit iteration speed to a specific time period for asynchronous tasks. In the following example, completion of each batch reports progress and batches may not complete faster than one every three seconds. For each identifier that is read and fails, the first three TimeoutExceptions or SocketTimeoutExceptions are retried. All other exceptions cause the entire task to fail.
val oomItemIdBatch = (1 to 10).toList.map(_.toString).grouped(2).toList
val future = { // necessary for pasting into repl
for {
oomItem <- {
.progress(1.second)(progress => println(progress))
.flatMap { batch =>
// Retry at most first 3 timeout and socket exceptions after delaying 100 milliseconds
.retry {
case (_: TimeoutException) :: tail if tail.size < 3 =>
case (_: SocketTimeoutException) :: tail if tail.size < 3 =>
case _ => false.future
_ = println("Computing...")
oomNewItemBatch = oomItem.map(item => item.copy(value = item.value + 1)).grouped(10).toVector
oomResult <- {
oomNewItemBatch.workers(2).flatMap(_.workers(4).map(item => write(item.id, item)))
} yield oomResult.forall(_ == true)
When first using Future with a for-comprehension, it is natural to assume the following will produce parallel operation:
for {
i1 <- read("1")
i2 <- read("2")
i3 <- read("3")
} yield (i1,i2,i3)
Sadly, this code will compile and run just fine, but it will not execute in parallel. To correctly implement parallel operation, the following standard pattern is used:
val f1 = read("1")
val f2 = read("2")
val f3 = read("3")
val future = { // necessary for pasting into repl
for {
i1 <- f1
i2 <- f2
i3 <- f3
} yield (i1,i2,i3)
For parallel operation, all of the futures must be started before the for-comprehension. The for-comprehension is a monadic workflow which captures commands that must take place in a specific sequential order. The pattern in Example 6 is necessary because Scala lacks an applicative workflow which captures commands that may be run in any order. s_mach.concurrent adds the async.par.run workflow which is an applicative workflow specifically for fixed size heterogeneous asynchronous tasks. This workflow can more concisely express the pattern above.
In the example below, all futures are started at the same time by async.par.run which returns a Future[(Int,Int,Int)] that completes once all supplied futures complete. After this returned future completes, the tuple value results can be extracted using normal Scala idioms.
for {
(i1,i2,i3) <- async.par.run(read("1"), read("2"), read("3"))
} yield (i1,i2,i3)
Additionally, all of the configuration options available for collection.async.par are valid for async.par.run. In the example below, the number of workers is limited to two, progress is reported once a second and certain failures are retried.
for {
(i1,i2,i3) <-
.progress(1.second)(progress => println(progress))
.retry {
case (_: TimeoutException) :: tail if tail.size < 3 =>
case (_: SocketTimeoutException) :: tail if tail.size < 3 =>
case _ => false.future
} yield (i1,i2,i3)
The async and async.par primitives utilize the merge and flatMerge sequencing functions to ensure that execution ends immediately once a failure occurs. This is in contrast to Future.sequence which may not always fail immediately when a failure occurs.
The merge function performs the same function as Future.sequence (it calls Future.sequence internally) but it ensures that the returned future completes immediately after an exception occurs in any of the futures. Because Future.sequence waits on all futures in left to right order before completing, an exception thrown at the beginning of the computation by a future at the far right will not be detected until after all other futures have completed. For long running computations, this can mean a significant amount of wasted time waiting on futures to complete whose results will be discarded.
Additionally, while the scala parallel collections correctly handle multiple parallel exceptions, Future.sequence only returns the first exception encountered. In Future.sequence, all further exceptions past the first are discarded. The merge and flatMerge methods fix these problems by throwing AsyncParThrowable. AsyncParThrowable has a member method to access both the first exception thrown and a future of all exceptions thrown during the computation.
scala> val t = Future.sequence(Vector(longRead("1"),readFail("2"),readFail("3"),read("4"))).getTry
t: scala.util.Try[scala.collection.immutable.Vector[Item]] = Failure(java.lang.RuntimeException: 2)
scala> val t = Vector(longRead("1"),readFail("2"),readFail("3"),read("4")).merge.getTry
t: scala.util.Try[scala.collection.immutable.Vector[Item]] = Failure(AsyncParThrowable(java.lang.RuntimeException: 2))
scala> 4
scala> val allFailures = t.failed.get.asInstanceOf[AsyncParThrowable].allFailure.get
allFailures: Vector[Throwable] = Vector(java.lang.RuntimeException: 2, java.lang.RuntimeException: 3)