jczuchnowski / zio-pulsar   0.2

BSD 2-clause "Simplified" License GitHub

Apache Pulsar client for Scala with ZIO and ZIO Streams integration.

Scala versions: 3.x

ZIO Pulsar

CI Release Snapshot
CI Release Artifacts Snapshot Artifacts

Purely functional Scala wrapper over the official Pulsar client.

  • Type-safe (utilizes Scala type system to reduce runtime exceptions present in the official Java client)
  • Streaming-enabled (naturally integrates with ZIO Streams)
  • ZIO integrated (uses common ZIO primitives like ZIO effect and ZManaged to reduce the boilerplate and increase expressiveness)

Compatibility

ZIO Pulsar is a Scala 3 library, so it's compatible with Scala 3 applications as well as Scala 2.13.6+ (see forward compatibility for more information.

Getting started

Add the following dependency to your build.sbt file:

Scala 3

libraryDependencies += "com.github.jczuchnowski" %% "zio-pulsar" % zioPulsarVersion

Scala 2.13.6+ (sbt 1.5.x)

libraryDependencies += 
  ("com.github.jczuchnowski" %% "zio-pulsar" % zioPulsarVersion).cross(CrossVersion.for2_13Use3)

ZIO Pulsar also needs ZIO and ZIO Streams to be provided:

libraryDependencies ++= Seq(
  "dev.zio" %% "zio"         % zioVersion,
  "dev.zio" %% "zio-streams" % zioVersion
)

Simple example of consumer and producer:

import org.apache.pulsar.client.api.{ PulsarClientException, Schema }
import zio._
import zio.pulsar._

object Main extends App:

  val pulsarClient = PulsarClient.live("localhost", 6650)

  val topic = "my-topic"

  val app: ZManaged[PulsarClient, PulsarClientException, Unit] =
    for
      builder  <- ConsumerBuilder.make(Schema.STRING).toManaged_
      consumer <- builder
                    .topic(topic)
                    .subscription(
                      Subscription(
                        "my-subscription", 
                        SubscriptionType.Shared))
                    .build
      producer <- Producer.make(topic, Schema.STRING)
      _        <- producer.send("Hello!").toManaged_
      m        <- consumer.receive.toManaged_
    yield ()
    
  def run(args: List[String]): URIO[ZEnv, ExitCode] =
    app.provideCustomLayer(pulsarClient).useNow.exitCode

Running examples locally

To try the examples from the examples subproject you'll need a Pulsar instance running locally. You can set one up using docker:

docker run -it \
  -p 6650:6650 \
  -p 8080:8080 \
  --mount source=pulsardata,target=/pulsar/data \
  --mount source=pulsarconf,target=/pulsar/conf \
  --network pulsar \
  apachepulsar/pulsar:2.7.0 \
  bin/pulsar standalone