zainab-ali / fs2-reactive-streams

A reactive streams implementation for fs2



A reactive streams implementation for fs2

Join the chat at

Build Status codecov Nexus

To use

Add the following to your build.sbt:

libraryDependencies += "com.github.zainab-ali" %% "fs2-reactive-streams" % "0.5.1"

This is dependent on version 0.10.0 of fs2.


import cats._, cats.effect._, fs2._
// import cats._
// import cats.effect._
// import fs2._

import fs2.interop.reactivestreams._
// import fs2.interop.reactivestreams._

// import

val upstream = Stream(1, 2, 3).covary[IO]
// upstream: fs2.Stream[cats.effect.IO,Int] = Stream(..)

val publisher = upstream.toUnicastPublisher
// publisher: fs2.interop.reactivestreams.StreamUnicastPublisher[cats.effect.IO,Int] = fs2.interop.reactivestreams.StreamUnicastPublisher@1655d153

val downstream = publisher.toStream[IO]
// downstream: fs2.Stream[cats.effect.IO,Int] = Stream(..)

// res1: Vector[Int] = Vector(1, 2, 3)


The reactive streams initiative is complicated, mutable and unsafe - it is not something that is desired for use over fs2. But there are times when we need use fs2 in conjunction with a different streaming library, and this is where reactive streams shines.

Any reactive streams system can interop with any other reactive streams system by exposing an org.reactivestreams.Publisher or an org.reactivestreams.Subscriber.

This library provides instances of reactivestreams compliant publishers and subscribers to ease interop with other streaming libraries.


To convert a Stream into a downstream unicast org.reactivestreams.Publisher:

val stream = Stream(1, 2, 3).covary[IO]

To convert an upstream org.reactivestreams.Publisher into a Stream:

val publisher: org.reactivestreams.Publisher[Int] = Stream(1, 2, 3).covary[IO].toUnicastPublisher

A unicast publisher must have a single subscriber only.

Example: Akka streams

Import the Akka streams dsl:

import akka._

implicit val system = ActorSystem("akka-streams-example")
implicit val materializer = ActorMaterializer()

To convert from an Source to a Stream:

val source = Source(1 to 5)
// source:[Int,akka.NotUsed] = Source(SourceShape(StatefulMapConcat.out(593477360)))

val publisher = source.runWith(Sink.asPublisher[Int](fanout = false))
// publisher: org.reactivestreams.Publisher[Int] = VirtualProcessor(state = Publisher[StatefulMapConcat.out(593477360)])

val stream = publisher.toStream[IO]
// stream: fs2.Stream[cats.effect.IO,Int] = Stream(..)

// res5: Vector[Int] = Vector(1, 2, 3, 4, 5)

To convert from a Stream to a Source:

val stream = Stream.emits((1 to 5).toSeq).covary[IO]
// stream: fs2.Stream[cats.effect.IO,Int] = Stream(..)

val source = Source.fromPublisher(stream.toUnicastPublisher)
// source:[Int,akka.NotUsed] = Source(SourceShape(PublisherSource.out(672726900)))

// res6: scala.collection.immutable.Seq[Int] = Vector(1, 2, 3, 4, 5)

Version Compatability

Patch releases (e.g 0.2.7 to 0.2.8) are binary compatible. If you're concerned about a broken release, please check the CHANGELOG for more details.

fs2 fs2-reactive-streams status
0.10.1 0.5.1 current
0.10.0 0.5.0 current
0.10.0 0.4.0 broken
0.10.0-RC2 0.3.0 broken
0.10.0-M11 0.2.8 broken
0.10.0-M10 0.2.7 broken
0.10.0-M9 0.2.6 broken
0.10.0-M8 0.2.5 broken
0.10.0-M7 0.2.4 broken
0.10.0-M6 0.2.3 broken
0.10.0-M5 0.2.2 broken
0.10.0-M5 0.2.1 broken
0.10.0-M2 0.2.0 broken
0.9.4 0.1.1 current
0.9.4 0.1.0 broken


The following people have taken their time and effort to improve fs2-reactive-streams.

Thank you for your help!


fs2-reactive-streams is licensed under the Apache License 2.0.


Many thanks go to Ross Baker who took the first step in making a reactive streams implementation in http4s. Without this, fs2-reactive-streams would have been much harder to write.