This library allows to use nats as an akka streams source by wrapping the official java nats client. It currently supports subject subscriptions and queue groups subscriptions.
It's an akka-streams source connector for NATS and not STAN nor NATS streaming
The library using an in-memory message buffer (queue), in the case of a slow consumer message will get buffered until the size of the buffer is reached. If the size of the buffer is reached an exception will be thrown.
libraryDependencies += "com.mycoachsport" %% "akka-streams-nats" % "0.0.1"
<dependency>
<groupId>com.mycoachsport</groupId>
<artifactId>akka-streams-nats_2.12</artifactId>
<version>0.0.4</version>
</dependency>
<dependency>
<groupId>com.mycoachsport</groupId>
<artifactId>akka-streams-nats_2.13</artifactId>
<version>0.0.4</version>
</dependency>
val natsConnection =
Nats.connect("nats://localhost:4222")
// Create source with a subject
val natsSettings =
NatsSettings(natsConnection, SubjectSubscription("my.subject"))
NatsSource(natsSettings, 10)
.map { message =>
// Do some work
println(message)
}
.run()
// Create a queue group based source
val natsSettingsQueue =
NatsSettings(natsConnection, QueueGroupSubscription("my.subject", "my.queue.group"))
NatsSource(natsSettingsQueue, 10)
.map { message =>
// Do some work
println(message)
}
.run()
val natsConnection =
Nats.connect("nats://localhost:4222")
val jsm = natsConnection.jetStreamManagement()
val sc = StreamConfiguration
.builder()
.name("EVENTS")
.subjects("events.>")
.retentionPolicy(RetentionPolicy.WorkQueue)
.build()
jsm.addStream(sc)
val js = natsConnection.jetStream()
val c1 = ConsumerConfiguration
.builder()
.durable("processor")
.ackPolicy(AckPolicy.Explicit)
.build()
jsm.addOrUpdateConsumer("EVENTS", c1)
val streamContext = natsConnection.getStreamContext("EVENTS")
val consumerContext = streamContext.getConsumerContext("processor")
JetStreamSource(consumerContext)
.map { m =>
// do some work
m.ack()
}
.run()