This connector is for when I replay historical data-at-rest into an existing code base that had been designed for streaming. The initial use case is to replay Azure Eventhubs "capture" avro data back into Eventhubs and Kafka, allowing me to back-test new streaming code.
- Azure Blobs with Avro
- Azure Blobs newline delimited text
- Azure Blobs with Gzip'd newline delimited text
- Other cloud storage implementations TBD
Binaries available via maven: - check for latest version
Update your build.sbt
dependencies with:
// https://mvnrepository.com/artifact/tech.navicore/naviblob
libraryDependencies += "tech.navicore" %% "naviblob" % "1.3.7"
This example reads avro data from Azure blobs. It uses avro4s to create the avro schema from a case class type parameter.
Create a config, a connector, and a source via the example below - note the
EhRecord
type parameter can be replaced with a case class that represents your
avro schema.
val consumer = ... // some Sink
...
...
...
// credentials and location
implicit val cfg: BlobConfig = BlobConfig(STORAGEACCOUNT, STORAGEKEY, CONTAINERNAME, STORAGEPATH)
// type parameter for avro deserialize - in this example: `EhRecord`
val connector: ActorRef = actorSystem.actorOf(AvroConnector.props[EhRecord])
val src = NaviBlob[EhRecord](connector)
...
...
...
src.runWith(consumer)
...
...
...
For line delimited text files, use the TextBlobConnector
connector actor:
...
...
...
val connector: ActorRef = actorSystem.actorOf(TextBlobConnector.props)
val src = NaviBlob[String](connector)
...
...
...
For GZIP'd jsonl text, use the GzipTextBlobConnector
connector actor:
...
...
...
val connector: ActorRef = actorSystem.actorOf(GzipTextBlobConnector.props)
val src = NaviBlob[String](connector)
...
...
...
sbt +publishLocalSigned
export GPG_TTY=$(tty)
sbt +publishSigned
sbt sonatypeReleaseAll