Akka Streams Azure Eventhubs Source and Sink
update your build.sbt
dependencies with:
// https://mvnrepository.com/artifact/tech.navicore/akkaeventhubs
libraryDependencies += "tech.navicore" %% "akkaeventhubs" % "1.6.3"
add to application.conf
eventhubs {
dispatcher {
type = Dispatcher
executor = "thread-pool-executor"
thread-pool-executor {
core-pool-size-min = 4
core-pool-size-factor = 2.0
core-pool-size-max = 8
}
throughput = 10
mailbox-capacity = -1
mailbox-type = ""
}
}
eventhubs-in {
snapshotInterval = 100
persist = false
persistFreq = 1
offsetPersistenceId = "my_example_eventhubsOffset"
connection {
connStr = ${EVENTHUBS_1_CONNSTR}
partitions = ${EVENTHUBS_1_PARTITION_COUNT}
defaultOffset = "LATEST"
consumerGroup = "$Default"
receiverTimeout = 120s
receiverBatchSize = 1
readersPerPartition = 1
}
dispatcher {
type = Dispatcher
executor = "thread-pool-executor"
thread-pool-executor {
core-pool-size-min = 4
core-pool-size-factor = 2.0
core-pool-size-max = 8
}
throughput = 10
mailbox-capacity = -1
mailbox-type = ""
}
}
ack the the item once processed for a partition source:
val cfg: Config = ConfigFactory.load().getConfig("eventhubs-in")
val source1 = createPartitionSource(0, cfg)
source1.runForeach(m => {
println(s"SINGLE SOURCE: ${m._1.substring(0, 160)}")
m._2.ack()
})
ack the the item once processed after merging all the partition sources:
val consumer: Sink[(String, AckableOffset), Future[Done]] =
Sink.foreach(m => {
println(s"SUPER SOURCE: ${m._1.substring(0, 160)}")
m._2.ack()
})
val toConsumer = createToConsumer(consumer)
val cfg: Config = ConfigFactory.load().getConfig("eventhubs-in")
for (pid <- 0 until EventHubConf(cfg).partitions) {
val src: Source[(String, AckableOffset), NotUsed] =
createPartitionSource(pid, cfg)
src.runWith(toConsumer)
}
change applicagtion.conf
and configure Actor Persistence
eventhubs-in {
persist = true
...
...
...
The sink requires a stream shape using a case class
case class EventhubsSinkData(payload: Array[Byte],
keyOpt: Option[String] = None,
props: Option[Map[String, String]] = None,
ackable: Option[AckableOffset] = None,
genericAck: Option[() => Unit] = None)
payload
is what you think it is.keyOpt
is the partition key. If not set, the Sink will use a hash of the payload.props
is an optional string map that will add properties to the Eventhubs metadata for this item.ackable
is optional and will be committed when the payload is successfully sent.genericAck
is an optional function and will be called when the payload is successfully sent.
val outConfig: Config = ConfigFactory.load().getConfig("eventhubs-out")
...
...
...
val format = Flow[(String, AckableOffset)].map((x: (String, AckableOffset)) =>
EventhubsSinkData(x._1.getBytes("UTF8"), None, None, Some(x._2))
)
src.via(<SOME_PROCESSING_FLOW>).via(format).runWith(new EventhubsSink(EventHubConf(outConfig)))
sbt +publishLocalSigned
export GPG_TTY=$(tty)
sbt +publishSigned
sbt sonatypeReleaseAll