Akka Streams Azure Eventhubs Source and Sink
update your build.sbt dependencies with:
// https://mvnrepository.com/artifact/tech.navicore/akkaeventhubs
libraryDependencies += "tech.navicore" %% "akkaeventhubs" % "1.6.3"add to application.conf
eventhubs {
dispatcher {
type = Dispatcher
executor = "thread-pool-executor"
thread-pool-executor {
core-pool-size-min = 4
core-pool-size-factor = 2.0
core-pool-size-max = 8
}
throughput = 10
mailbox-capacity = -1
mailbox-type = ""
}
}
eventhubs-in {
snapshotInterval = 100
persist = false
persistFreq = 1
offsetPersistenceId = "my_example_eventhubsOffset"
connection {
connStr = ${EVENTHUBS_1_CONNSTR}
partitions = ${EVENTHUBS_1_PARTITION_COUNT}
defaultOffset = "LATEST"
consumerGroup = "$Default"
receiverTimeout = 120s
receiverBatchSize = 1
readersPerPartition = 1
}
dispatcher {
type = Dispatcher
executor = "thread-pool-executor"
thread-pool-executor {
core-pool-size-min = 4
core-pool-size-factor = 2.0
core-pool-size-max = 8
}
throughput = 10
mailbox-capacity = -1
mailbox-type = ""
}
}
ack the the item once processed for a partition source:
val cfg: Config = ConfigFactory.load().getConfig("eventhubs-in")
val source1 = createPartitionSource(0, cfg)
source1.runForeach(m => {
println(s"SINGLE SOURCE: ${m._1.substring(0, 160)}")
m._2.ack()
})ack the the item once processed after merging all the partition sources:
val consumer: Sink[(String, AckableOffset), Future[Done]] =
Sink.foreach(m => {
println(s"SUPER SOURCE: ${m._1.substring(0, 160)}")
m._2.ack()
})
val toConsumer = createToConsumer(consumer)
val cfg: Config = ConfigFactory.load().getConfig("eventhubs-in")
for (pid <- 0 until EventHubConf(cfg).partitions) {
val src: Source[(String, AckableOffset), NotUsed] =
createPartitionSource(pid, cfg)
src.runWith(toConsumer)
}change applicagtion.conf and configure Actor Persistence
eventhubs-in {
persist = true
...
...
...
The sink requires a stream shape using a case class
case class EventhubsSinkData(payload: Array[Byte],
keyOpt: Option[String] = None,
props: Option[Map[String, String]] = None,
ackable: Option[AckableOffset] = None,
genericAck: Option[() => Unit] = None)
payloadis what you think it is.keyOptis the partition key. If not set, the Sink will use a hash of the payload.propsis an optional string map that will add properties to the Eventhubs metadata for this item.ackableis optional and will be committed when the payload is successfully sent.genericAckis an optional function and will be called when the payload is successfully sent.
val outConfig: Config = ConfigFactory.load().getConfig("eventhubs-out")
...
...
...
val format = Flow[(String, AckableOffset)].map((x: (String, AckableOffset)) =>
EventhubsSinkData(x._1.getBytes("UTF8"), None, None, Some(x._2))
)
src.via(<SOME_PROCESSING_FLOW>).via(format).runWith(new EventhubsSink(EventHubConf(outConfig)))
sbt +publishLocalSignedexport GPG_TTY=$(tty)
sbt +publishSigned
sbt sonatypeReleaseAll