Colibri
A simple functional reactive library for ScalaJS. Colibri is an implementation of the Observable
, Observer
and Subject
reactive concepts.
If you're new to these concepts, here is a nice introduction from rx.js: https://rxjs.dev/guide/overview. Another good resource are these visualizations for common reactive operators: https://rxmarbles.com/.
This library includes:
- A (minimal and performant) reactive library based on JavaScript native operations like
setTimeout
,setInterval
,setImmediate
,queueMicrotask
- Typeclasses to integrate with other streaming libraries
Usage
Reactive core library with typeclasses:
libraryDependencies += "com.github.cornerman" %%% "colibri" % "0.5.0"
import colibri._
Reactive variables with hot distinct observables (a bit like scala-rx):
libraryDependencies += "com.github.cornerman" %%% "colibri-reactive" % "0.5.0"
import colibri.reactive._
For jsdom-based operations in the browser (EventObservable
, Storage
):
libraryDependencies += "com.github.cornerman" %%% "colibri-jsdom" % "0.5.0"
import colibri.jsdom._
For scala.rx support (only Scala 2.x):
libraryDependencies += "com.github.cornerman" %%% "colibri-rx" % "0.5.0"
import colibri.ext.rx._
For airstream support:
libraryDependencies += "com.github.cornerman" %%% "colibri-airstream" % "0.5.0"
import colibri.ext.airstream._
For zio support:
libraryDependencies += "com.github.cornerman" %%% "colibri-zio" % "0.5.0"
import colibri.ext.zio._
For fs2 support (Source
only):
libraryDependencies += "com.github.cornerman" %%% "colibri-fs2" % "0.5.0"
import colibri.ext.fs2._
Subject, Observable and Observer
The implementation follows the reactive design:
- An observable is a stream to which you can subscribe with an Observer.
- An observer is basically a callback which can receive a value or an error from an Observable.
- A Subject is both an observable and an observer, receiving values and errors from the outside and distributing them to all subscribing observers.
Observables in colibri are lazy, that means nothing starts until you call unsafeSubscribe
on an Observable
(or any unsafe*
method).
We integrate with effect types by means of typeclasses (see below). It provides support for cats.effect.IO
, cats.effect.SyncIO
, cats.Eval
, cats.effect.Resource
(out of the box) as well as zio.Task
(with outwatch-zio
).
Example Observables:
import colibri._
import scala.concurrent.duration._
import cats.effect.IO
val observable = Observable
.interval(1.second)
.mapEffect[IO](i => myCount(i))
.distinctOnEquals
.tapEffect[IO](c => myLog(c))
.mapResource(x => myResource(x))
.switchMap(x => myObservable(x))
.debounceMillis(1000)
val observer = Observer.foreach[Int](println(_))
val subscription: Cancelable = observable.unsafeSubscribe(observer)
val subscriptionIO: IO[Cancelable] = observable.subscribeF[IO](observer)
Example Subjects:
import colibri._
val subject = Subject.publish[Int]() // or Subject.behavior(seed) or Subject.replayLast or Subject.replayAll
val subscription: Cancelable = subject.unsafeForeach(println(_))
subject.unsafeOnNext(1)
val myEffect: IO[Unit] = subject.onNextF[IO](2)
Memory management
Every subscription that is created inside of colibri methods is returned to the user. For example unsafeSubscribe
or subscribeF
returns a Cancelable
. That means, the caller is responsible to cleanup the subscription by calling Cancelable#unsafeCancel()
or Cancelable#cancelF
.
If you are working with Outwatch
, you can just use Observable
without ever subscribing yourself. Then all memory management is handled for you automatically. No memory leaks.
Typeclasses
We have prepared typeclasses for integrating with other streaming libaries. The most important ones are Sink
and Source
. Source
is a typeclass for Observables, Sink
is a typeclass for Observers:
Sink[G[_]]
can send values and errors intoG
has anonNext
andonError
method.Source[H[_]]
can unsafely subscribe toH
with aSink
(returns a cancelable subscription)CanCancel[T]
can cancelT
to stop a subscriptionLiftSink[G[_]]
can lift aSink
into typeG
LiftSource[H[_]]
can lift aSource
into typeH
SubscriptionOwner[T]
can let typeT
own a subscription
In order to work with effects inside our Observable, we have defined the following two typeclasses similar to Effect
in cats-effect 2:
RunEffect[F[_]]
can unsafely run an effectF[_]
asynchronously, potentially starting synchronously until reaching an async boundary.RunSyncEffect[F[_]]
can unsafely run an effectF[_]
synchronously.
You can convert any Source
into an Observable
with Observable.lift(source)
. The same for Sink
and Observer
with Observer.lift(sink)
.
Reactive variables
The module colibri-reactive
exposes reactive variables. This is hot, distinct observables that always have a value. These reactive variables are meant for managing state - opposed to managing events which is a perfect fit for lazy Observable
in the core colibri
library.
This module behaves very similar to scala-rx - just built on top of colibri Observables for seamless integration and powerful operators. It is not entirely glitch-free because invalid state can appear in operators like map or foreach, but you always have a consistent state in now()
and it reduces the number of intermediate triggers or glitches. You can become completely glitch-free by converting back to observable and using dropSyncGlitches
which will introduce an async boundary (micro-task).
Example:
import colibri.reactive._
import colibri.owner.unsafeImplicits._ // dangerous. This never cancels subscriptions. See below!
val variable = Var(1)
val variable2 = Var("Test")
val rx = Rx {
s"${variable()} - ${variable2()}"
}
rx.foreach(println(_))
println(variable.now()) // 1
println(variable2.now()) // "Test"
println(rx.now()) // "1 - Test"
variable.set(2)
println(variable.now()) // 2
println(variable2.now()) // "Test"
println(rx.now()) // "2 - Test"
variable2.set("Foo")
println(variable.now()) // 2
println(variable2.now()) // "Foo"
println(rx.now()) // "2 - Foo"
If you want to work with reactive variables (hot observable), then someone need to cleanup the subscriptions. We call this concept an Owner
. We use an unsafe owner in the above example. It actually never cleans up. It should only ever be used in your main method or for global state.
You can even work without ever using the unsafe owner or having to pass it implictly. You can use Owned
blocks instead. Inside an Owned
block, you will have to return a type that has a SubscriptionOwner
instance. Example:
import colibri._
import colibri.reactive._
import cats.effect.SyncIO
sealed trait Modifier
object Modifier {
case class ReactiveModifier(rx: Rx[String]) extends Modifier
case class SubscriptionModifier(subscription: () => Cancelable) extends Modifier
case class CombineModifier(modifierA: Modifier, modifierB: Modifier) extends Modifier
implicit object subcriptionOwner extends SubscriptionOwner[Modifier] {
def own(owner: Modifier)(subscription: () => Cancelable): Modifier = CombineModifier(owner, SubscriptionModifier(subscription))
}
}
val component: SyncIO[Modifier] = Owned {
val variable = Var(1)
val mapped = rx.map(_ + 1)
val rx = Rx {
"Hallo: ${mapped()}"
}
ReactiveModifier(rx)
}
For example, Outwatch supports Owned
:
import outwatch._
import outwatch.dsl._
import colibri.reactive._
import cats.effect.SyncIO
val component: SyncIO[VModifier] = Owned {
val variable = Var(1)
val mapped = rx.map(_ + 1)
val rx = Rx {
"Hallo: ${mapped()}"
}
div(rx)
}
Memory management
Every subscription that is created inside of colibri-reactive methods is owned by an implicit Owner
. For example map
or foreach
take an implicit Owner
. As long as the Owner
is cancelled when it is not needed anymore, all subscriptions will be cleaned up. The exception is the Owner.unsafeGlobal
that never cleans up and is meant for global state.
If you are working with Outwatch
, you can just use Owned
-blocks returning VModifier
and everything is handled automatically for you. No memory leaks.
Information
Throughout the library, the type parameters for the Sink
and Source
typeclasses are named consistenly to avoid naming ambiguity when working with F[_]
in the same context:
F[_] : RunEffect
G[_] : Sink
H[_] : Source
Source Code: Source.scala, Sink.scala, RunEffect.scala
In general, we take a middle ground with pure functional programming. We focus on performance and ease of use. Internally, the code is mutable for performance reasons. Externally, we try to expose a typesafe, immutable, and mostly pure interface to the user. There are some impure methods for example for subscribing observables - thereby potentially executing side effects. These impure methods are named unsafe*
. And there are normally pure alias methods returning an effect type for public use. The unsafe methods exist so they can be used internally - we try to keep extra allocations to a minimum there.
Types like Observable
are conceptionally very similar to IO
- they can just return more than zero or one value. They are also lazy, and operations like map/flatMap/filter/... do not actually do anything. It is only after you unsafely run or subscribe an Observable that it actually starts evaluating.