backupify / datto-flow   3.2.0

MIT License GitHub

Augment akka-stream to make streams that carry context easy to construct and manipulate.

Scala versions: 2.13

datto.flow Build Status

Datto-flow augments Akka-stream to make streams that carry context easy to construct and manipulate.

When constructing a stream, one often wants to do two things:

  1. Maintain a unchanged context about each item in the stream, which may be accessed by each processing step of the stream.
  2. Allow a processing step to fail when processing on a particular item, such that subsequent processing steps will not be executed.

Akka-stream does not directly provide tools for these tasks. Instead, the solution is to create streams of more complicated types. For example, instead of a akka flow of type Flow[A, B, akka.NotUsed], one could create a flow of type Flow[(A, Context), (Try[B], Context), akka.NotUsed], where Context is some context type associated with each item in the stream.

Manipulating such streams is tiresome. Datto-flow provides tools to make this easy. The FlowResult[A, Context] type represents the items in a stream, which have a success/failure state and a context. FlowBuilder provides a tool for constructing flows of type Flow[FlowResult[A, Context], FlowResult[B, Context]] (this type is abbreviated to ContextFlow[A, B, Context]).


The simplified definition of FlowResult is as follows:

sealed trait FlowResult[+T, Ctx] {
  val value: Try[T]
  val context: Ctx
  val metadata: Metadata = Metadata() 

So, a flow result contains either a value or a failure, a context that is unchanged over the course of the stream, as well as metadata, which represents additional information that may have been generated during the course of the stream.

FlowResult is also a monad: it supports map[B](f: A => B), flatMap[B](f: A => Try[B]) and mapAsync[B](f: A => Future[B]) operations. It also supports operations that expose the context, such as mapWithContext[B](f: (A, Context, Metadata) => B). Many other useful operations are available. In all cases, these operations are only applied to successful flow results. Failures are propogated unchanged.


FlowBuilder makes building flows of FlowResults easy. A standard use of FlowBuilder would be as follows:

val myFlow: ContextFlow[Int, Int, MyContext] = FlowBuilder[Int, MyContext]()
  .map(i => i + 2)
  .mapWithContextAsync((i, context, metadata) => i + context.baseSize)

FlowBuilder supports building flows using most operations provided by FlowResult. It also supports other useful operations, such as flatMapGrouped, which will process multiple items at once.


MergeFlow provides a way to apply branching logic to a flow. It works as follows: with each flow, you assign a predicate that determines the conditions under which that flow should be applied. Generally, these predicates should be mutually exclusive and cover all possible cases, but this is not enforced. Together, these form a list of flow-predicate pairs. From these, a new flow is constructed, in which (in order)

  • Each item in the flow is broadcast to N child flows
  • Each child flow is filtered according to the associated predicate.
  • The items for which the predicate is true are passed down the associated flow.
  • Items that are errors (and hence to which none of the predicates apply) are propogated in another error flow.
  • The items in each flow are merged back into a single flow.
              |            |
              |  Broadcast |
              |            |
                   / | \
                  /  |  \
                 /   |   \
                /    |    \
               /     |     \
              /      |      \
             /       |       \
   +----------+ +-------+ +-------------+
   |Predicate | | ...   | |   Errors    |
   |1 Applied | |       | | Propgated   |
   |          | |       | |             |
   +-----+----+ +---+---+ +------+------+
         |          |            |
         |          |            |
   +-----+----+ +---+---+        /
   | Flow 1   | | Flows |       /
   |          | |       |      /
   +-----\----+ +---+---+     /
          \         |        /
           \        |       /
            \       |      /
             \      |     /
              \     |    /
               \    |   /
                \   |  /
                 \  | /
                  \ |/
          |                |
          |     Merge      |
          |                |

For example, given two flows, one for handling positive integers and another for handling negative ones, we can create a new flow that will handle all integers according to the combined flow operation:

   val flow: ContextFlow[Int, Int, Ctx] = MergeFlow(
     (positiveIntFlow, _ >= 0),
     (negativeIntFlow, _ < 0)


Often, a few asynchronous operations are needed to properly construct a Source. This can lead to working with objects of type Future[Source[_]] instead of type Source[_]. Generator encapsulates this, making such objects easier to manipulate.

Some examples:

Creating a generator from data retrieved by a future:

def getDataFuture(): Future[LazyList[Int]]

val generator = Generator.future[Int, Unit] {
  getDataFuture().map(dataCollection => Source(dataCollection))
generator.runWith(Sink.seq) //returns a Future[Seq[Int]]

Performing a task prior to executing the stream:

def preRunHook(): Future[Unit]

val generator = Generator.future {
  preRunHook().map(_ => mySource)

The materialization of the underlying source is always of type Future[T]. In the above examples, it is of type Future[Unit]. For other materialization types, use the Generator.Mat functions:

val generator = Generator.Mat.future {
  getDataFuture().map(dataCollection => Source(dataCollection)).mapMaterializedValue(_ => 1)

generator.runWithMat(Sink.ignore)(Keep.left) // returns a Future[Int]

Another way to express the materialization in the last line would be:

Publishing this library

First, make sure you're set up to publish. See for more info:

1 . Add the file ~/.sbt/1.0/sonatype.sbt with contents:

credentials += Credentials("Sonatype Nexus Repository Manager",

And the file ~/.sbt/1.0/plugins/plugins.sbt with contents:

addSbtPlugin("com.jsuereth" % "sbt-pgp" % "1.0.0")
addSbtPlugin("org.xerial.sbt" % "sbt-sonatype" % "2.4")

Follow the steps on to generate a gpg key for yourself. Make sure you publish the key. Run the following in sbt to do so:

pgp-cmd gen-key
Please enter the name associated with the key: Desmond Yeung
Please enter the email associated with the key: [email protected]
Please enter the passphrase for the key: *************
Please re-enter the passphrase for the key: *************

Send your key: pgp-cmd send-key [email protected]

Now, you are ready to publish a version:

  1. Update the version in build.sbt, git commit, and create a tag using git tag -a
  2. Run + core/publishSigned in sbt console, and enter the PGP key. Make sure to include the + sign! This is for crossbuilding both scala 2.11 and 2.12.
  3. Run sonatypeRelease
  4. + testkit/publishSigned
  5. sonatypeRelease

Manually releasing on sonatype:

  1. Visit and log in.
  2. Select com.datto from the list of repositories, and click close. (Make sure to do for both flow and flow-testkit)
  3. Wait a while and hit refresh.
  4. Select com.datto from the list of repositories, and click release (make sure automatically drop is selected). (Make sure to do for both flow and flow-testkit)