A functional, streaming NATS client for Scala 3, built on FS2 and Cats Effect 3.
- Pure functional - Built entirely on Cats Effect 3 and FS2
- Streaming first - Native FS2 streams for message handling
- Headers support - Full NATS 2.2+ headers support (HPUB/HMSG)
- Backpressure - Configurable slow consumer policies
- Reconnection - Exponential backoff with full jitter
- TLS support - Secure connections with configurable TLS
- Type-safe - Leverages Scala 3 features for safety
Add to your build.sbt:
libraryDependencies += "io.github.thatscalaguy" %% "fs2-nats" % "0.1.0"Start a NATS server:
docker run -p 4222:4222 nats:latestimport cats.effect.{IO, IOApp, ExitCode}
import com.comcast.ip4s.{Host, Port}
import fs2.Chunk
import fs2.nats.client.{ClientConfig, NatsClient}
object Main extends IOApp:
override def run(args: List[String]): IO[ExitCode] =
val config = ClientConfig(
host = Host.fromString("localhost").get,
port = Port.fromInt(4222).get
)
NatsClient.connect[IO](config).use { client =>
for
// Subscribe to a subject
_ <- client.subscribe("hello.world").use { messages =>
for
// Publish a message
_ <- client.publish(
"hello.world",
Chunk.array("Hello, NATS!".getBytes)
)
// Receive the message
msg <- messages.take(1).compile.lastOrError
_ <- IO.println(s"Received: ${msg.payloadAsString}")
yield ()
}
yield ExitCode.Success
}import fs2.nats.protocol.Headers
val headers = Headers(
"X-Request-Id" -> "abc123",
"X-Timestamp" -> System.currentTimeMillis().toString
)
client.publish(
"events.created",
Chunk.array("""{"id": 1}""".getBytes),
headers
)// Subscribe to all events under events.*
client.subscribe("events.*").use { messages =>
messages.evalMap { msg =>
IO.println(s"${msg.subject}: ${msg.payloadAsString}")
}.compile.drain
}
// Subscribe to all events under events.>
client.subscribe("events.>").use { messages =>
// Handles events.a, events.a.b, events.a.b.c, etc.
messages.compile.drain
}// Multiple subscribers in same queue group share messages
client.subscribe("work.queue", queueGroup = Some("workers")).use { messages =>
messages.evalMap { msg =>
processWork(msg)
}.compile.drain
}import fs2.nats.client.ClientEvent
client.events.evalMap {
case ClientEvent.Connected(info) =>
IO.println(s"Connected to ${info.serverId}")
case ClientEvent.Disconnected(reason, willReconnect) =>
IO.println(s"Disconnected: $reason, reconnecting: $willReconnect")
case ClientEvent.Reconnected(info, attempt) =>
IO.println(s"Reconnected after $attempt attempts")
case ClientEvent.SlowConsumer(sid, subject, dropped) =>
IO.println(s"Slow consumer on $subject, dropped $dropped messages")
case other =>
IO.println(s"Event: $other")
}.compile.drainimport scala.concurrent.duration._
import fs2.nats.client._
val config = ClientConfig(
host = Host.fromString("nats.example.com").get,
port = Port.fromInt(4222).get,
useTls = false,
tlsParams = None,
name = Some("my-app"),
credentials = Some(NatsCredentials.UserPassword("user", "pass")),
backoff = BackoffConfig(
baseDelay = 100.millis,
maxDelay = 30.seconds,
factor = 2.0,
maxRetries = None // unlimited
),
queueCapacity = 10000,
slowConsumerPolicy = SlowConsumerPolicy.Block,
verbose = false,
pedantic = false,
echo = true
)When a subscription queue fills up:
SlowConsumerPolicy.Block- Backpressure (default)SlowConsumerPolicy.DropNew- Drop incoming messagesSlowConsumerPolicy.DropOldest- Drop oldest queued messagesSlowConsumerPolicy.ErrorAndDrop- Emit event and drop
import fs2.nats.client.Backoff
// Exponential backoff with jitter (recommended)
val policy = Backoff.exponentialWithJitter(
base = 100.millis,
max = 30.seconds,
factor = 2.0,
maxRetries = Some(10)
)
// Fixed delay
val fixed = Backoff.fixed(5.seconds, maxRetries = Some(5))
// No delay (for testing)
val immediate = Backoff.immediate(maxRetries = 3)fs2.nats
├── client/
│ ├── NatsClient # Main public API
│ ├── ClientConfig # Configuration
│ ├── ConnectionManager # Connection lifecycle & reconnection
│ └── Backoff # Retry policies
├── protocol/
│ ├── ProtocolParser # Incremental NATS protocol parser
│ ├── NatsModel # Protocol data types (Info, Connect, etc.)
│ ├── Headers # NATS/1.0 headers support
│ └── NatsFrame # Parsed frame ADT
├── transport/
│ ├── Transport # Transport abstraction
│ ├── NatsSocket # TCP transport
│ └── TlsTransport # TLS transport wrapper
├── subscriptions/
│ ├── SubscriptionManager # Message routing & slow consumer handling
│ ├── SidAllocator # Subscription ID allocation
│ └── NatsMessage # User-facing message type
├── publish/
│ ├── Publisher # Publish with max_payload validation
│ └── SerializationUtils # Protocol serialization
└── errors/
└── NatsError # Error ADT
Run unit tests:
sbt testRun integration tests (requires NATS server):
docker-compose up -d
sbt integration/test
docker-compose downSee the examples/ directory for complete examples:
Basic.scala- Simple publish/subscribeRequestReplyExample- Request/reply patternQueueGroupExample- Load-balanced workers
Run examples:
sbt "runMain fs2.nats.examples.Basic"Apache License 2.0