Akka Eventhubs
Akka Streams Azure Eventhubs Source and Sink
USAGE
update your build.sbt
dependencies with:
// https://mvnrepository.com/artifact/tech.navicore/akkaeventhubs
libraryDependencies += "tech.navicore" %% "akkaeventhubs" % "1.6.3"
SOURCE
add to application.conf
eventhubs {
dispatcher {
type = Dispatcher
executor = "thread-pool-executor"
thread-pool-executor {
core-pool-size-min = 4
core-pool-size-factor = 2.0
core-pool-size-max = 8
}
throughput = 10
mailbox-capacity = -1
mailbox-type = ""
}
}
eventhubs-in {
snapshotInterval = 100
persist = false
persistFreq = 1
offsetPersistenceId = "my_example_eventhubsOffset"
connection {
connStr = ${EVENTHUBS_1_CONNSTR}
partitions = ${EVENTHUBS_1_PARTITION_COUNT}
defaultOffset = "LATEST"
consumerGroup = "$Default"
receiverTimeout = 120s
receiverBatchSize = 1
readersPerPartition = 1
}
dispatcher {
type = Dispatcher
executor = "thread-pool-executor"
thread-pool-executor {
core-pool-size-min = 4
core-pool-size-factor = 2.0
core-pool-size-max = 8
}
throughput = 10
mailbox-capacity = -1
mailbox-type = ""
}
}
ack the the item once processed for a partition source:
val cfg: Config = ConfigFactory.load().getConfig("eventhubs-in")
val source1 = createPartitionSource(0, cfg)
source1.runForeach(m => {
println(s"SINGLE SOURCE: ${m._1.substring(0, 160)}")
m._2.ack()
})
ack the the item once processed after merging all the partition sources:
val consumer: Sink[(String, AckableOffset), Future[Done]] =
Sink.foreach(m => {
println(s"SUPER SOURCE: ${m._1.substring(0, 160)}")
m._2.ack()
})
val toConsumer = createToConsumer(consumer)
val cfg: Config = ConfigFactory.load().getConfig("eventhubs-in")
for (pid <- 0 until EventHubConf(cfg).partitions) {
val src: Source[(String, AckableOffset), NotUsed] =
createPartitionSource(pid, cfg)
src.runWith(toConsumer)
}
With Persistence of Offsets
change applicagtion.conf
and configure Actor Persistence
eventhubs-in {
persist = true
...
...
...
SINK
The sink requires a stream shape using a case class
case class EventhubsSinkData(payload: Array[Byte],
keyOpt: Option[String] = None,
props: Option[Map[String, String]] = None,
ackable: Option[AckableOffset] = None,
genericAck: Option[() => Unit] = None)
payload
is what you think it is.keyOpt
is the partition key. If not set, the Sink will use a hash of the payload.props
is an optional string map that will add properties to the Eventhubs metadata for this item.ackable
is optional and will be committed when the payload is successfully sent.genericAck
is an optional function and will be called when the payload is successfully sent.
val outConfig: Config = ConfigFactory.load().getConfig("eventhubs-out")
...
...
...
val format = Flow[(String, AckableOffset)].map((x: (String, AckableOffset)) =>
EventhubsSinkData(x._1.getBytes("UTF8"), None, None, Some(x._2))
)
src.via(<SOME_PROCESSING_FLOW>).via(format).runWith(new EventhubsSink(EventHubConf(outConfig)))
OPS
publish local
sbt +publishLocalSigned
publish to nexus staging
export GPG_TTY=$(tty)
sbt +publishSigned
sbt sonatypeReleaseAll