navicore / naviblob

An Akka Streams Source for Azure Storage Blobs with Avro support for Eventhubs capture blobs

GitHub

Build Status Codacy Badge

Read Blob Storage into Akka Streams

This connector is for when I replay historical data-at-rest into an existing code base that had been designed for streaming. The initial use case is to replay Azure Eventhubs "capture" avro data back into Eventhubs and Kafka, allowing me to back-test new streaming code.

Current Storage Sources

  1. Azure Blobs with Avro
  2. Azure Blobs newline delimited text
  3. Azure Blobs with Gzip'd newline delimited text
  4. Other cloud storage implementations TBD

INSTALL

Binaries available via maven: - check for latest version

Update your build.sbt dependencies with:

// https://mvnrepository.com/artifact/tech.navicore/naviblob
libraryDependencies += "tech.navicore" %% "naviblob" % "1.1.1"

USAGE

This example reads avro data from Azure blobs. It uses avro4s to create the avro schema from a case class type parameter.

Create a config, a connector, and a source via the example below - note the EhRecord type parameter can be replaced with a case class that represents your avro schema.

    val consumer = ... // some Sink

    ...
    ...
    ...
    
    // credentials and location
    implicit val cfg: BlobConfig = BlobConfig(STORAGEACCOUNT, STORAGEKEY, CONTAINERNAME, STORAGEPATH)
    
    // type parameter for avro deserialize - in this example: `EhRecord`
    val connector: ActorRef = actorSystem.actorOf(AvroConnector.props[EhRecord])
    
    val src = NaviBlob[EhRecord](connector)

    ...
    ...
    ...

    src.runWith(consumer)

    ...
    ...
    ...

For line delimited text files, use the TextBlobConnector connector actor:

    ...
    ...
    ...
    
    val connector: ActorRef = actorSystem.actorOf(TextBlobConnector.props)
    
    val src = NaviBlob[String](connector)
    
    ...
    ...
    ...

For GZIP'd jsonl text, use the GzipTextBlobConnector connector actor:

    ...
    ...
    ...
    
    val connector: ActorRef = actorSystem.actorOf(GzipTextBlobConnector.props)
    
    val src = NaviBlob[String](connector)
    
    ...
    ...
    ...

OPS

publish local

sbt +publishLocalSigned

publish to nexus staging

export GPG_TTY=$(tty)
sbt +publishSigned
sbt sonatypeReleaseAll