owainlewis / akka-mqlight

A Scala/Akka interface for IBM MQLight. MQLight is a hosted implementation of AMQP1.0

Version Matrix

Akka MQLight

Download

A Scala library that makes it easy to integrate MQLight into your Scala/Akka projects.

Some examples can be found below for common patterns

Check build status on Jenkins

Starting MQLight locally

docker-compose up

Publisher

An actor that publishes messages to MQLight

import akka.actor.{ActorSystem, Props}
import io.forward.mqlight.SimpleMQLightPublisher
import io.forward.mqlight.model.{Credentials, Publish}

object Publisher {

  val system = ActorSystem("publisher")

  val authentication = Credentials("amqp://localhost", "admin", "password")

  val publisher = system.actorOf(Props(new SimpleMQLightPublisher(authentication)))

  // Publish a message to the topic /foo

  publisher ! Publish("/foo", "HELLO", Map("timestamp" -> new java.util.Date().getTime().toString))
}

Subscriber

class SimpleMQLightSubscriber(credentials: Credentials, topic: String) extends MQLightActor {

  val client = createClient(credentials.host, credentials.username, credentials.password, ActorClientListener, self)

  override def receive = {
    case Started =>
      client.subscribe(topic, ActorDestinationListener, ActorCompletionListener, self)
      context.become(started)
  }

  def started: Receive = {
    case OnMessage(delivery) => onMessage(delivery)
  }

  private def onMessage(delivery: Delivery) = {
    delivery.getType match {
      case Delivery.Type.STRING =>
        val payload = delivery.asInstanceOf[StringDelivery].getData
        // DO WORK
      case _ =>
    }
  }
}

Workers

A worker is something that listens to a topic and performs work. For example we may want to subscribe to a topic /sms and dispatch SMS messages when we get an AMQP message

First we need to create an actor which will do work when it gets a message from MQLight

import akka.actor.Actor
import com.ibm.mqlight.api.{Delivery, StringDelivery}
import io.forward.messaging.internal.{ActorClientListener, ActorCompletionListener, ActorDestinationListener, MQConnection}
import io.forward.messaging.model._

class Worker(credentials: Credentials) extends Actor with MQConnection {

  val client = createClient(credentials.host, credentials.username, credentials.password, ActorClientListener, self)

  def process(message: String) = {
    message.parseJson.asOpt[SMSMessage] match {
      case Some(smsMessage) => dispatchSMS(smsMessage)
      case None => ()
    }
  }

  def receive = {
    case Subscribe(topic) =>
      client.subscribe(topic, ActorDestinationListener, ActorCompletionListener, self)

    case OnMessage(delivery) =>
      delivery.getType match {
        case Delivery.Type.STRING =>
          val payload = delivery.asInstanceOf[StringDelivery].getData
          process(payload)
        case _ =>
      }
  }
}

Now we can connect our worker to MQLight and subscribe it to a topic

import akka.actor.{ActorSystem, Props}
import io.forward.messaging.model._

object Subscriber {

  val system = ActorSystem("demo")

  val authentication = Credentials("amqp://localhost", "admin", "password")

  val subscriber = system.actorOf(Props(new Worker(authentication)))

  subscriber ! Subscribe("/foo")
}