monix / monix-circe

Streaming JSON parsing for circe with Monix Observable

Streaming JSON parsing for circe with Monix Observable

Quick Start

To use monix-circe in an existing SBT project with Scala 2.12 or a later version, add the following dependencies to your build.sbt depending on your needs:

libraryDependencies ++= Seq(
  "io.monix" %% "monix-circe" % "0.0.3"


Monix-Circe provides different pipes to parse your streams of JSONs depending on whether your Observable is:

  • a \n-separated stream of JSON values or value stream:
{ "repo": "monix-circe", "stars": 14 }
{ "repo": "monix-config", "stars": 5 }
  • or a JSON array:
  { "repo": "monix-circe", "stars": 14 },
  { "repo": "monix-config", "stars": 5 }

The appropriate Operator for the job also depends on your input stream value type (i.e. String or Byte).

The following table sums up every Operator available as a function of the input stream value type as well as the JSON structure:

String Byte
Value stream stringStreamParser byteStreamParser
Array stringArrayParser byteArrayParser

As an example, let's say we have a stream of strings representing a JSON array, we'll pick the stringArrayParser pipe which converts a stream of String to a stream of Json, Circe's representation of JSONs:

import io.circe.Json
import monix.circe._
import monix.reactive.Observable

val stringStream: Observable[String] = ???
val parsedStream: Observable[Json] = stringStream.liftByOperator(stringArrayParser)


Monix-Circe also comes with a decoder function which, given a Decoder[A], produces a Observable[Json] => Observable[A].

For example, using Circe's fully automatic derivation:

case class Foo(a: Int, b: String)
val parsedStream: Observable[Json] = ???
val decodedStream: Observable[Foo] = parsedStream.liftByOperator(decoder[Foo])


Heavily inspired/based on circe-fs2 and circe-iteratee.


All code in this repository is licensed under the Apache License, Version 2.0. See LICENSE.txt.