nachinius / stompconnectorforakkastreams

A STOMP PROTOCOL connector for akka streams

GitHub

Latest version Build Status License codecov

STOMP Protocol Connector

The Stomp Protocol connector provides Akka Streams sources and sinks to connect to STOMP servers.

Reported issues

Tagged issues at Github

Usage

Connecting to a STOMP server

All the STOMP connectors are configured using a (com.nachinius.akka.stream.stomp.ConnectionProvider).

There are some types of (com.nachinius.akka.stream.stomp.ConnectionProvider):

  • LocalConnectionProvider which connects to the default localhost. It creates a new connection for each stage.
  • DetailsConnectionProvider which supports more fine-grained configuration. It creates a new connection for each stage.

Sinking messages into a STOMP server

Create the ConnectorSettings

 val host = "localhost"
           val port = 61613
           val topic = "AnyTopic"
           val settings =
 ConnectorSettings(connectionProvider = DetailsConnectionProvider(host, port), destination = Some(topic)))

StompClientSink is a collection of factory methods that facilitates creation of sinks.

Create a sink, that accepts and forwards SendingFrame to the STOMP server.

  val sinkToStomp: Sink[SendingFrame, Future[Done]] = StompClientSink(settings)

Last step is to materialize and run the sink we created.

  val input = Vector("one", "two")
  val source = Source(input).map(SendingFrame.from)
  val sinkDone = source.runWith(sinkToStomp)

Receiving messages from STOMP server using a StompClientSource

Create the [ConnectorSettings] that specifies the STOMP server to connect, and the STOMP destination that you want receive messages from

  val host = "localhost"
  val port = 61667
  val destination = "/topic/topic2"
  val settings = ConnectorSettings(DetailsConnectionProvider(host, port, None), Some(destination))ector-settings }

Create a source, that generates [SendingFrame]

  val source: Source[SendingFrame, Future[Done]] = StompClientSource(settings)

Last step is to materialize and run the source we created.

  val sink = Sink.head[SendingFrame]
  val (futConnected: Future[Done], futHead: Future[SendingFrame]) = source.toMat(sink)(Keep.both).run()

Running an example code

The code in this guide is part of runnable tests of this project. You are welcome to edit the code and run it in sbt.

Test code does not require an STOMP server running in the background, since it creates one per test using Vertx Stomp library.

Scala : sbt > test