dnvriend / akka-stream-extensions

non-official extension library for akka-stream that contains operations like zipWithIndex, Source.fromTry and so on

GitHub

akka-stream-extensions

Build Status Download Codacy Badge License

Non-official extension library for akka-stream that contains operations like zipWithIndex, Source.fromTry and so on

Installation

Add the following to your build.sbt:

resolvers += Resolver.jcenterRepo

libraryDependencies += "com.github.dnvriend" %% "akka-stream-extensions" % "0.0.1"

Contribution policy

Contributions via GitHub pull requests are gladly accepted from their original author. Along with any pull requests, please state that the contribution is your original work and that you license the work to the project under the project's open source license. Whether or not you state this explicitly, by submitting any copyrighted material via pull request, email, or other means you agree to license the material under the project's open source license and warrant that you have the legal authority to do so.

License

This code is open source software licensed under the Apache 2.0 License.

akka.stream.scaladsl.extension.io.DigestCalculator

Given a stream of ByteString, it calculates a digest given a certain Algorithm.

akka.stream.scaladsl.extension.io.FileUtils

A stage that does file operations. Very handy for stream processing file operations.

akka.stream.scaladsl.extension.xml.Validation

Given a stream of ByteString, it validates an XML file given an XSD.

akka.stream.scaladsl.extension.xml.XMLEventSource

Given an inputstream or filename, it creates a Source[XMLEvent, NotUsed] that can be used to process an XML file. It can be used together with akka-stream's processing stages and the akka.persistence.query.extension.Journal to store the transformed messages in the journal to be consumed by other components. It can also be used with reactive-activemq's akka.stream.scaladsl.extension.activemq.ActiveMqProducer to send these messages to a VirtualTopic.

akka.stream.scaladsl.extension.xml.XMLParser

It should be easy to write XML parsers to process large XML files efficiently. Most often this means reading the XML sequentially, parsing a known XML fragment and converting it to DTOs using case classes. For such a use case the akka.stream.scaladsl.extension.xml.XMLParser should help you get you up and running fast!

For example, let's process the following XML:

<orders>
    <order id="1">
        <item name="Pizza" price="12.00">
            <pizza>
                <crust type="thin" size="14"/>
                <topping>cheese</topping>
                <topping>sausage</topping>
            </pizza>
        </item>
        <item name="Breadsticks" price="4.00"/>
        <tax type="federal">0.80</tax>
        <tax type="state">0.80</tax>
        <tax type="local">0.40</tax>
    </order>
</orders>

Imagine we are interested in only orders, and only the tax, lets write two parsers:

import scala.xml.pull._
import akka.stream.scaladsl._
import akka.stream.scaladsl.extension.xml.XMLParser
import akka.stream.scaladsl.extension.xml.XMLParser._
import akka.stream.scaladsl.extension.xml.XMLEventSource

case class Order(id: String)

val orderParser: Flow[XMLEvent, Order] = {
 var orderId: String = null
 XMLParser.flow {
  case EvElemStart(_, "order", meta, _) 
    orderId = getAttr(meta)("id"); emit()
  case EvElemEnd(_, "order") 
    emit(Order(orderId))
 }
}

case class Tax(taxType: String, value: String)

val tagParser: Flow[XMLEvent, Tax] = {
  var taxType: String = null
  var taxValue: String = null
  XMLParser.flow {
    case EvElemStart(_, "tax", meta, _) =>
      taxType = getAttr(meta)("type"); emit()
    case EvText(text) 
      taxValue = text; emit()
    case EvElemEnd(_, "tax") 
      emit(Tax(taxType, taxValue))
  }
}

XMLEventSource.fromFileName("orders.xml")
 .via(orderParser).runForeach(println)

XMLEventSource.fromFileName("orders.xml")
 .via(tagParser).runForeach(println)

For a more complex example, please take a look at akka.stream.scaladsl.extension.xml.PersonParser in the test package of this library.

  • v0.0.1 (2016-07-23)
    • Initial release

Have fun!