nigozi / fs2-nakadi   0.1.0-M2

MIT License GitHub

Nakadi client for Scala based on FS2

Scala versions: 2.12


fs2-nakadi is Nakadi client for Scala based on FS2.

Under the hood

  • http4s as the underlying http client
  • circe for JSON encoding/decoding
  • fs2 for streaming


Work is still in progress but the basic DSLs are defined.


libraryDependencies += "io.nigo" %% "fs2-nakadi" % "0.1.0-M2"


Event Types

There are three main categories of event type defined by Nakadi:

  • Business Event: An event that is part of, or drives a business process, such as a state transition in a customer order.

  • Data Change Event: An event that represents a change to a record or other item, or a new item. Change events are associated with a create, update, delete, or snapshot operation.

  • Undefined Event: A free form category suitable for events that are entirely custom to the producer.

fs2-nakadi provides a simple DSL for dealing with event types:

import cats.effect.IO
import fs2.nakadi.client._
import fs2.nakadi.model._
import fs2.nakadi.dsl._

// Define Nakadi setting
implicit val config: NakadiConfig[IO] = NakadiConfig[IO](new URI("<nakadi-uri>"))

// Define EventType
val business = EventType(
    name = EventTypeName("business-data"), 
    owningApplication = "fs2-nakadi", 
    category = Category.Business

// Create the EventType

// Find the EventType

Publish Events

You can define your own ADT of the events and simply publish them to the desired EventType using Events DSL:

import cats.effect.IO
import java.util.UUID
import java.time.ZonedDateTime
import fs2.nakadi.model._
import fs2.nakadi.model.Event.Business
import fs2.nakadi.dsl._
import fs2.Stream

// Define Event ADT
case class User(id: UUID, firstName: String, lastName: String, createdAt: ZonedDateTime)

object User {
  import io.circe.{Encoder, Decoder}
  import io.circe.derivation._ 
  implicit val userEncoder: Encoder[User] = deriveEncoder(renaming.snakeCase)
  implicit val userDecoder: Decoder[User] = deriveDecoder(renaming.snakeCase)

// Define Event
val user: User = User(UUID.randomUUID(), "john", "snow", 
val event: Event[User] = Business(
  data = user,
  metadata = Metadata()

val eventClient = EventClient[IO]

// Publish a list of Event
eventClient.publish[User](EventTypeName("user-data"), List(event))

// Publish a Stream

Consume Events

fs2-nakadi supports high-level event consumption using subscriptions

import cats.effect.IO
import fs2.nakadi.model._
import fs2.nakadi.dsl._
import fs2.Stream

val subClient = SubscriptionClient[IO]

// Create a subscription if doesn't exist
val sub = 
          owningApplication = "fs2-nakadi", 
          eventTypes = Some(List(EventTypeName("user-data")))

// Create event stream
    .flatMap(s => subClient.eventStream[User](, StreamConfig()))

You can also use managedEventStream which receives a callback and applies it to every event:

val callback: EventCallback[User] = match {
      case Some(ev) =>
        ev.foreach(e => println(s"Received Event: ${}"))
      case _ => true

    .flatMap { s =>
      subClient.managedEventStream[User](1)(, callback, StreamConfig())