A simple functional reactive library for ScalaJS

Build Status

Colibri

A simple functional reactive library for ScalaJS. Colibri is an implementation of the Observable, Observer and Subject reactive concepts.

If you're new to these concepts, here is a nice introduction from rx.js: https://rxjs.dev/guide/overview. Another good resource are these visualizations for common reactive operators: https://rxmarbles.com/.

This library includes:

  • A (minimal and performant) reactive library based on JavaScript native operations like setTimeout, setInterval, setImmediate, queueMicrotask
  • Typeclasses to integrate with other streaming libraries

Usage

Reactive core library with typeclasses:

libraryDependencies += "com.github.cornerman" %%% "colibri" % "0.5.0"
import colibri._

Reactive variables with hot distinct observables (a bit like scala-rx):

libraryDependencies += "com.github.cornerman" %%% "colibri-reactive" % "0.5.0"
import colibri.reactive._

For jsdom-based operations in the browser (EventObservable, Storage):

libraryDependencies += "com.github.cornerman" %%% "colibri-jsdom" % "0.5.0"
import colibri.jsdom._

For scala.rx support (only Scala 2.x):

libraryDependencies += "com.github.cornerman" %%% "colibri-rx" % "0.5.0"
import colibri.ext.rx._

For airstream support:

libraryDependencies += "com.github.cornerman" %%% "colibri-airstream" % "0.5.0"
import colibri.ext.airstream._

For zio support:

libraryDependencies += "com.github.cornerman" %%% "colibri-zio" % "0.5.0"
import colibri.ext.zio._

For fs2 support (Source only):

libraryDependencies += "com.github.cornerman" %%% "colibri-fs2" % "0.5.0"
import colibri.ext.fs2._

Subject, Observable and Observer

The implementation follows the reactive design:

  • An observable is a stream to which you can subscribe with an Observer.
  • An observer is basically a callback which can receive a value or an error from an Observable.
  • A Subject is both an observable and an observer, receiving values and errors from the outside and distributing them to all subscribing observers.

Observables in colibri are lazy, that means nothing starts until you call unsafeSubscribe on an Observable (or any unsafe* method).

We integrate with effect types by means of typeclasses (see below). It provides support for cats.effect.IO, cats.effect.SyncIO, cats.Eval, cats.effect.Resource (out of the box) as well as zio.Task (with outwatch-zio).

Example Observables:

import colibri._
import scala.concurrent.duration._
import cats.effect.IO

val observable = Observable
  .interval(1.second)
  .mapEffect[IO](i => myCount(i))
  .distinctOnEquals
  .tapEffect[IO](c => myLog(c))
  .mapResource(x => myResource(x))
  .switchMap(x => myObservable(x))
  .debounceMillis(1000)

val observer = Observer.foreach[Int](println(_))

val subscription: Cancelable = observable.unsafeSubscribe(observer)

val subscriptionIO: IO[Cancelable] = observable.subscribeF[IO](observer)

Example Subjects:

import colibri._

val subject = Subject.publish[Int]() // or Subject.behavior(seed) or Subject.replayLast or Subject.replayAll

val subscription: Cancelable = subject.unsafeForeach(println(_))

subject.unsafeOnNext(1)

val myEffect: IO[Unit] = subject.onNextF[IO](2)

Memory management

Every subscription that is created inside of colibri methods is returned to the user. For example unsafeSubscribe or subscribeF returns a Cancelable. That means, the caller is responsible to cleanup the subscription by calling Cancelable#unsafeCancel() or Cancelable#cancelF.

If you are working with Outwatch, you can just use Observable without ever subscribing yourself. Then all memory management is handled for you automatically. No memory leaks.

Typeclasses

We have prepared typeclasses for integrating with other streaming libaries. The most important ones are Sink and Source. Source is a typeclass for Observables, Sink is a typeclass for Observers:

  • Sink[G[_]] can send values and errors into G has an onNext and onError method.
  • Source[H[_]] can unsafely subscribe to H with a Sink (returns a cancelable subscription)
  • CanCancel[T] can cancel T to stop a subscription
  • LiftSink[G[_]] can lift a Sink into type G
  • LiftSource[H[_]] can lift a Source into type H
  • SubscriptionOwner[T] can let type T own a subscription

In order to work with effects inside our Observable, we have defined the following two typeclasses similar to Effect in cats-effect 2:

  • RunEffect[F[_]] can unsafely run an effect F[_] asynchronously, potentially starting synchronously until reaching an async boundary.
  • RunSyncEffect[F[_]] can unsafely run an effect F[_] synchronously.

You can convert any Source into an Observable with Observable.lift(source). The same for Sink and Observer with Observer.lift(sink).

Reactive variables

The module colibri-reactive exposes reactive variables. This is hot, distinct observables that always have a value. These reactive variables are meant for managing state - opposed to managing events which is a perfect fit for lazy Observable in the core colibri library.

This module behaves very similar to scala-rx - just built on top of colibri Observables for seamless integration and powerful operators. It is not entirely glitch-free because invalid state can appear in operators like map or foreach, but you always have a consistent state in now() and it reduces the number of intermediate triggers or glitches. You can become completely glitch-free by converting back to observable and using dropSyncGlitches which will introduce an async boundary (micro-task).

Example:

import colibri.reactive._

import colibri.owner.unsafeImplicits._ // dangerous. This never cancels subscriptions. See below!

val variable = Var(1)
val variable2 = Var("Test")

val rx = Rx {
  s"${variable()} - ${variable2()}"
}

rx.foreach(println(_))

println(variable.now()) // 1
println(variable2.now()) // "Test"
println(rx.now()) // "1 - Test"

variable.set(2)

println(variable.now()) // 2
println(variable2.now()) // "Test"
println(rx.now()) // "2 - Test"

variable2.set("Foo")

println(variable.now()) // 2
println(variable2.now()) // "Foo"
println(rx.now()) // "2 - Foo"

If you want to work with reactive variables (hot observable), then someone need to cleanup the subscriptions. We call this concept an Owner. We use an unsafe owner in the above example. It actually never cleans up. It should only ever be used in your main method or for global state.

You can even work without ever using the unsafe owner or having to pass it implictly. You can use Owned blocks instead. Inside an Owned block, you will have to return a type that has a SubscriptionOwner instance. Example:

import colibri._
import colibri.reactive._
import cats.effect.SyncIO

sealed trait Modifier
object Modifier {
  case class ReactiveModifier(rx: Rx[String]) extends Modifier
  case class SubscriptionModifier(subscription: () => Cancelable) extends Modifier
  case class CombineModifier(modifierA: Modifier, modifierB: Modifier) extends Modifier

  implicit object subcriptionOwner extends SubscriptionOwner[Modifier] {
    def own(owner: Modifier)(subscription: () => Cancelable): Modifier = CombineModifier(owner, SubscriptionModifier(subscription))
  }
}

val component: SyncIO[Modifier] = Owned {
  val variable = Var(1)
  val mapped = rx.map(_ + 1)

  val rx = Rx {
    "Hallo: ${mapped()}"
  }

  ReactiveModifier(rx)
}

For example, Outwatch supports Owned:

import outwatch._
import outwatch.dsl._
import colibri.reactive._
import cats.effect.SyncIO

val component: SyncIO[VModifier] = Owned {
  val variable = Var(1)
  val mapped = rx.map(_ + 1)

  val rx = Rx {
    "Hallo: ${mapped()}"
  }

  div(rx)
}

Memory management

Every subscription that is created inside of colibri-reactive methods is owned by an implicit Owner. For example map or foreach take an implicit Owner. As long as the Owner is cancelled when it is not needed anymore, all subscriptions will be cleaned up. The exception is the Owner.unsafeGlobal that never cleans up and is meant for global state.

If you are working with Outwatch, you can just use Owned-blocks returning VModifier and everything is handled automatically for you. No memory leaks.

Information

Throughout the library, the type parameters for the Sink and Source typeclasses are named consistenly to avoid naming ambiguity when working with F[_] in the same context:

  • F[_] : RunEffect
  • G[_] : Sink
  • H[_] : Source

Source Code: Source.scala, Sink.scala, RunEffect.scala

In general, we take a middle ground with pure functional programming. We focus on performance and ease of use. Internally, the code is mutable for performance reasons. Externally, we try to expose a typesafe, immutable, and mostly pure interface to the user. There are some impure methods for example for subscribing observables - thereby potentially executing side effects. These impure methods are named unsafe*. And there are normally pure alias methods returning an effect type for public use. The unsafe methods exist so they can be used internally - we try to keep extra allocations to a minimum there.

Types like Observable are conceptionally very similar to IO - they can just return more than zero or one value. They are also lazy, and operations like map/flatMap/filter/... do not actually do anything. It is only after you unsafely run or subscribe an Observable that it actually starts evaluating.

Implementation for rx

Implementation for airstream

Implementation for zio

Implementation for fs2