longevityframework / unblocking

Maps Blocking Scala Iterators to Multiple Streaming Libraries

GitHub

Maps Blocking Scala Iterators to Multiple Streaming Libraries

This project is for library writers who want to provide a streaming API, but are do not want to limit themselves to a single Scala streaming library. With unblocking, you express your result one time, and get streaming enumerators for the following Scala streaming libraries:

These streaming libraries all have their own terminologies for output streams - sources, enumerators, and streams. We will use the general term "enumerator" here.

How It Works

To produce an enumerator in each of these streaming libraries, you need to provide unblocking with a scala.collection.Iterator, with two caveats:

  • The iterator must provide a close method, (i.e., implement java.io.Closeable), so that you can coordinate with the streaming library to release any underlying resources when the enumerator is closed.
  • Instead of providing a closeable iterator, you provide a no-arg function that produces closeable iterators, so that the enumerators produced by the streaming libraries are reusable.

The actual types work out like this:

type CloseableIterator[+A] = Iterator[A] with java.io.Closeable

type IteratorGen[+A] = () => CloseableIterator[A]

You can now call the following unblocking methods to acquire your streams:

def toAkkaSource[A](iteratorGen: IteratorGen[A]): akka.stream.scaladsl.Source[A, akka.NotUsed]

def toCatsEnumerator[F[_], E](iteratorGen: IteratorGen[E])(implicit F: cats.Monad[F]): io.iteratee.Enumerator[F, E]

def toFS2Stream[A](iteratorGen: IteratorGen[A]): fs2.Stream[fs2.Task, A]

def toPlayEnumerator[A](iteratorGen: IteratorGen[A])(implicit context: scala.concurrent.ExecutionContext): play.api.libs.iteratee.Enumerator[A]

def toFutureSeq[A](iteratorGen: IteratorGen[A])(implicit context: scala.concurrent.ExecutionContext): scala.concurrent.Future[Seq[A]]

None of these converters are particularly difficult. You could easily write them yourself, directly in your library codebase. But there are advantages to providing them as a bundle, here:

  • Placing the converters in a single location makes it easy for library writers to target multiple streaming libraries in a single pass.
  • The converters are all tested here, outside the context of the result you are actually trying to stream. The tests are good. And even if they were not, anyone (i.e. you) could contribute some better tests to the project.
  • If anyone wants to support another streaming library, they could easily add another converter here, for the benefit of many.
  • It's kind of neat to see all these formulas for building the different enumerators side by side.
  • We can alleviate some of the pain of keeping dependency versions up to date. I generally update all my library versions every longevity minor release, and those occur every month or so, so you can be reasonably sure that the library versions here will stay pretty well up to date.

I know, none of these reasons is particularly compelling.

Usage

Right now, we have artifacts for Scala 2.11 and 2.12. We don't provide artifacts for 2.10 yet, because there is not a full suite of 2.10 artifacts for the four streaming libraries.

libraryDependencies += "org.longevityframework" %% "unblocking" % "0.0.0"

Versioning Policy

We will bump the minor version any time any of the dependent streaming libraries bumps a minor version.

Future Directions

It would be cool to have methods that did the reverse translations, e.g., from fs2.Stream[fs2.Task,A] back to IteratorGen[A]. Then we could have conversion methods across all the supported streaming libraries.

I would like to expand the test suite to test individual versions of these libraries. This way, instead of just supporting the latest version of each streaming library, we could support the latest version, plus all previous versions that are compatible with the latest. Then we could provide a matrix of compatible library versions - including across Scala versions - so that libraries could provide version ranges for their dependencies.

You'll notice that a couple of the implementations could benefit from a chunking approach.

License

It's Apache 2. I don't really have any reasons to pick a different license than this seemingly de-facto open source license. If you have some good reasons why this project should be released under a different license, please let me know.

Please Contribute!

One of the main reasons I made this a separate library instead of part of longevity is so that we all could benefit from it. If you have any ideas - want to improve the test suite, want to add another streaming library, throw in some basic benchmarking support - please contribute!

I want to be clear here, I am far from an expert with any of these streaming libraries! So if you are familiar with them, and you see something that looks wonky, please fix it! If you can't be bothered to fix it, in the very least, tell me about it. I want to be better with these libraries than I am!

THANK YOU!!!