A purely functional Scala 3 toolkit for building distributed systems.
Parapet lets you describe a distributed system as a composable program - a value - and then interpret that program into a running system with scheduling, messaging, and supervision.
Each process is an isolated unit with a mailbox. Its handler is an ordinary value in a typed DSL, executed by the runtime against any compatible effect type.
final case class Ping(n: Int) extends Event
final case class Pong(n: Int) extends Event
class Echo[F[_]](peer: ProcessRef) extends Process[F]:
import dsl.*
override def handle: Receive =
case Start => Ping(0) ~> peer // kick things off
case Ping(n) if n < 3 => reply(Pong(n + 1)) // reply to sender
case Pong(n) => eval(println(s"got $n")) ++
(Ping(n) ~> peer) // sequential DSL
case Failure(_, cause) => eval(println(cause)) // failures are eventsParapet is a library of primitives for building distributed systems. It gives you the substrate - processes, mailboxes, a typed DSL, a scheduler, pluggable transports, a wire-codec type-class - and trusts you to compose it into whatever topology your problem demands.
The toolkit is still filling out. Higher-level building blocks that production systems depend on - reliable channels, failure detectors, broadcast, gossip, CRDTs, sharding, observability - are being added as standalone modules.
Typical use cases:
- Consensus and coordination protocols - Raft, Paxos, leader election, membership, failure detectors.
- Replication, broadcast, and streaming systems built on top of your own wire protocol or transport.
- Deterministic testing of distributed algorithms - every handler is a value, so you can step, replay, and assert on full execution traces under a test interpreter before the code ever hits the network.
- Programs are values. A handler is an ordinary value (
DslF[F, Unit]) you can store, compose with++/par/race, inspect, and test deterministically without spinning up a real distributed system. Internally: aFreealgebra over a small set of operations. - Pure actor semantics without an actor system. Each process owns one mailbox, sees
messages sequentially, and never shares mutable state. No
Props, noActorRefcasting - just aProcessRefand the~>operator. - Effect-system agnostic. Handlers are generic over
F[_]. PickParIOfor drop-in use, or plug in your own effect type by providingEffect[F]andParallel[F]instances. - Composable handlers. Combine partial behaviours with
and/or, swap them at runtime withswitch, compose supervised children withregister. - Batteries for distribution. Pluggable transports (
net), protobufWireCodec(protocol), a Raft module, and an integration test-kit. - Predictable runtime. Bounded per-process mailboxes, a configurable scheduler, first-class failure events delivered to senders, overridable dead-letter and event-log hooks.
| Module | Purpose |
|---|---|
core |
Process model, DSL, scheduler, ParIO effect type, ParApp/ParIOApp |
protocol |
WireCodec type-class and protobuf message definitions |
net |
TCP / UDP transports and adapter processes |
raft |
Raft consensus (election, replication, commit) as a Process |
demo-coloring |
Distributed graph-coloring demo with a 3D web UI |
intg-tests |
Cross-module integration test suite |
- Getting started
- Defining a process
- Running an application
- DSL cheatsheet
- Channel - request/response
- Error handling
- Configuration
- Contributing
- License
parapet-core is the only dependency you need to define and run processes.
libraryDependencies += "io.parapet" %% "parapet-core" % versionOptional modules:
libraryDependencies += "io.parapet" %% "parapet-protocol" % version
libraryDependencies += "io.parapet" %% "parapet-net" % version
libraryDependencies += "io.parapet" %% "parapet-raft" % versionProcesses are defined generically over the effect type F[_], then specialized at the
application boundary. A process declares its API as events and reacts to them inside
handle.
import io.parapet.{Event, ProcessRef}
import io.parapet.core.Process
class Printer[F[_]] extends Process[F]:
import Printer.*
import dsl.*
override def handle: Receive =
case Print(data) => eval(println(data))
object Printer:
final case class Print(data: Any) extends EventA second process talks to the printer using the ~> (send) operator:
import io.parapet.ProcessRef
import io.parapet.core.Process
import io.parapet.core.Events.Start
class Greeter[F[_]](printer: ProcessRef) extends Process[F]:
import dsl.*
override def handle: Receive =
case Start => Printer.Print("hello world") ~> printerNotes:
StartandStopare lifecycle events delivered by the runtime -Startonce on registration,Stop(orKill) on shutdown.ProcessRefis the address of a process; prefer it over passingProcessinstances around so that wiring stays late-binding.
Specialize your application against ParIO, parapet's bundled effect type, by
extending ParIOApp:
import io.parapet.ParIOApp
import io.parapet.core.Process
import io.parapet.effect.ParIO
object HelloApp extends ParIOApp:
def processes(args: Array[String]): ParIO[Seq[Process[ParIO]]] =
ParIO.delay {
val printer = new Printer[ParIO]
val greeter = new Greeter[ParIO](printer.ref)
Seq(printer, greeter)
}Run it like any other JVM @main and you'll see hello world on stdout. Because
Printer and Greeter are generic in F, the same code can target a different effect
runtime by providing your own ParApp[F] subclass with Effect[F] / Parallel[F]
instances.
dsl._ brings the program combinators into scope inside a process. Most real programs
only need a handful:
| Combinator | Description |
|---|---|
event ~> ref |
Send an event to a process. |
reply(event) |
Send an event back to the current sender. |
eval { ... } |
Run a side-effecting computation in F. |
p1 ++ p2 |
Sequential composition. |
par(p1, p2) |
Run two programs concurrently. |
race(p1, p2) |
Run both, keep the first to finish. |
The full set of combinators:
| Combinator | Description |
|---|---|
unit |
The empty program - useful as a fold seed. |
event ~> ref |
Send event to a process. |
forward(event, ref) |
Send while preserving the original sender. |
reply(event) |
Send event back to the sender of the current message. |
withSender(s => program) |
Run a program parameterised by the current sender ref. |
flow { ... } |
Suspend program construction (use for recursion). |
eval { sideEffect } |
Suspend a side-effecting computation in F. |
evalWith(value)(f) |
Evaluate value and feed it into a program builder. |
suspend(fa) |
Lift a value of F[A] into the program. |
delay(d, program) |
Defer every step in program by duration d. |
par(program) |
Run independent steps of program in parallel. |
fork(program) |
Fire-and-forget concurrent execution. |
race(p1, p2) |
Run both, cancel the loser. |
register(parent, child) |
Register a child process; child receives Stop first. |
switch(receive) |
Replace the current process behaviour. |
Combine programs with ++ (sequential composition) - e.g. eval(println("a")) ++ eval(println("b")).
Channel turns the asynchronous mailbox model into a strict one-call-at-a-time
request/reply dialog. Send through it, get a Try[Event] back.
import io.parapet.{Event, ProcessRef}
import io.parapet.core.{Channel, Process}
import io.parapet.core.Events.Start
import io.parapet.effect.Effect
final case class Request(data: String) extends Event
final case class Response(data: String) extends Event
def server[F[_]]: Process[F] = Process[F](_ => {
case Request(data) => reply(Response(s"echo: $data"))
})
class Client[F[_]: Effect](backend: ProcessRef) extends Process[F]:
import dsl.*
private lazy val ch = Channel[F]()
override def handle: Receive =
case Start =>
register(ref, ch) ++
ch.send(Request("PING"), backend, {
case scala.util.Success(Response(d)) => eval(println(d))
case scala.util.Failure(err) => eval(println(s"failed: ${err.getMessage}"))
})Failures are first-class events. Every problem is reported via a Failure event
delivered to the original sender (or, if the sender has no handler for it, the
configured DeadLetterProcess).
import io.parapet.core.Envelope
import io.parapet.core.Events.{Failure, Start}
import io.parapet.core.exceptions.EventHandlingException
import io.parapet.core.Process
val faulty = Process.builder[F](_ => {
case Request(_) => eval(throw new RuntimeException("server is down"))
}).ref(ProcessRef("server")).build
val client = Process.builder[F](_ => {
case Start => Request("PING") ~> faulty
case Failure(Envelope(self, event, receiver), EventHandlingException(msg, cause)) =>
eval(println(s"$receiver failed: $msg (cause: ${cause.getMessage})"))
}).ref(ProcessRef("client")).buildCommon failure causes:
EventHandlingException- receiver's handler threw.EventDeliveryException- receiver's mailbox is full.EventMatchException- receiver's handler is not defined for the event.UnknownProcessException- receiver isn't registered.
Override ParApp#deadLetter to install a custom DeadLetterProcess (e.g. one that
persists or alerts on dropped events).
Override config on your ParApp to tune the scheduler:
import io.parapet.core.Parapet.ParConfig
object MyApp extends ParIOApp:
override val config: ParConfig = ParConfig.default
.withSchedulerConfig(_.copy(numberOfWorkers = 8, queueSize = 1 << 16))
...SchedulerConfig knobs:
numberOfWorkers- worker thread count (defaults to available processors).queueSize- bound on the shared task queue.processQueueSize- per-process mailbox bound;-1means unbounded.
When a mailbox overflows, events are routed to EventLog (a no-op stub by default -
override eventLog to persist).
-
vs actor frameworks (Akka, etc.) - similar process/mailbox model, but handlers are programs (values) in a typed DSL. They are composable, inspectable, and re-interpretable rather than directly executed behaviours.
-
vs effect systems (Cats Effect, ZIO) - builds on an effect runtime and adds a structured model of distributed processes: mailboxes, message passing, supervision.
-
Same program, multiple interpreters. A handler runs unchanged in production and under a deterministic test interpreter - no test-mode fork of your code, no separate simulation framework.
The project is in active development. Issues, ideas, and pull requests are welcome.
Copyright 2019 The Parapet Project Developers
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0