Akka streams nats
This library allows to use nats as an akka streams source by wrapping the official java nats client. It currently supports subject subscriptions and queue groups subscriptions.
It's an akka-streams source connector for NATS and not STAN nor NATS streaming
The library using an in-memory message buffer (queue), in the case of a slow consumer message will get buffered until the size of the buffer is reached. If the size of the buffer is reached an exception will be thrown.
SBT
libraryDependencies += "com.mycoachsport" %% "akka-streams-nats" % "0.0.1"
maven
Scala 2.12.x
<dependency>
<groupId>com.mycoachsport</groupId>
<artifactId>akka-streams-nats_2.12</artifactId>
<version>0.0.1</version>
</dependency>
Scala 2.13.x
<dependency>
<groupId>com.mycoachsport</groupId>
<artifactId>akka-streams-nats_2.13</artifactId>
<version>0.0.1</version>
</dependency>
Samples
val natsConnection =
Nats.connect("nats://localhost:4222")
// Create source with a subject
val natsSettings =
NatsSettings(natsConnection, SubjectSubscription("my.subject"))
NatsSource(natsSettings, 10)
.map { message =>
// Do some work
println(message)
}
.run()
// Create a queue group based source
val natsSettingsQueue =
NatsSettings(natsConnection, QueueGroupSubscription("my.subject", "my.queue.group"))
NatsSource(natsSettingsQueue, 10)
.map { message =>
// Do some work
println(message)
}
.run()