avast / rabbitmq-scala-client

Scala wrapper over standard RabbitMQ Java client library

Version Matrix

RabbitMQ client CI Version

This client is Scala wrapper over the standard RabbitMQ Java client. Goal of this library is to simplify basic use cases - to provide FP-oriented API for programmers and to shadow the programmer from an underlying client.

The library is configurable both by case classes (core module) and by HOCON/Lightbend Config (pureconfig module).

The library uses concept of connection and derived producers and consumers. Note that the connection shadows you from the underlying concept of AMQP connection and derived channels - it handles channels automatically according to best practises. Each producer and consumer can be closed separately while closing connection causes closing all derived channels and all producers and consumers.

Dependency

SBT: "com.avast.clients.rabbitmq" %% "rabbitmq-client-core" % "x.x.x"

Gradle: compile 'com.avast.clients.rabbitmq:rabbitmq-client-core_$scalaVersion:x.x.x'

Modules

  1. api - Contains only basic traits for consumer etc.
  2. core - Main module. The client, configurable by case classes.
  3. pureconfig - Module for configuration from Config.
  4. extras - Module with some extra feature.
  5. extras-circe Allows to publish and consume JSON events, using the circe library.
  6. extras-cactus Allows to publish and consume Protobuf events, dusing the cactus library that provides mapping between Java generated classes and Scala classes.
  7. extras-protobuf Allows to publish and consume events defined as Google Protocol Buffers messages (as both JSON and Protobuf), represented as standard Java classes.
  8. extras-scalapb Allows to publish and consume events defined as Google Protocol Buffers messages (as both JSON and Protobuf), generated to Scala using ScalaPB.

Migration

There is a migration guide between versions 5 and 6.0.x.
There is a migration guide between versions 6.0.x and 6.1.x.
There is a migration guide between versions 6.1.x and 7.0.x.
There is a migration guide between versions 6.1.x and 8.0.x.

Please note that configuration from Typesafe/Lightbend config has been moved to pureconfig module since 8.x.

Usage

The API is finally tagless (read more e.g. here) with cats.effect.Resource which is convenient way how to manage resources in your app. In addition, there is a support for streaming with fs2.Stream.

The API uses conversions for both consumer and producer, that means you don't have to work directly with Bytes (however you still can if you want to) and you touch only your business model class which is then (de)serialized using provided converter.

The library uses two types of executors - one is for blocking (IO) operations and the second for callbacks. You have to provide both of them:

  1. Blocking executor as ExecutorService
  2. Callback executor as scala.concurrent.ExecutionContext

The default way is to configure the client with manually provided case classes; see pureconfig module for a configuration from HOCON (Lightbend Config).

This is somewhat minimal setup, using Monix Task:

import java.util.concurrent.ExecutorService

import cats.effect.Resource
import com.avast.bytes.Bytes
import com.avast.clients.rabbitmq._
import com.avast.clients.rabbitmq.api._
import com.avast.metrics.scalaapi.Monitor
import javax.net.ssl.SSLContext
import monix.eval._
import monix.execution.Scheduler

implicit val sch: Scheduler = ???
val monitor: Monitor = ???

val blockingExecutor: ExecutorService = ???

val sslContext = SSLContext.getDefault

val connectionConfig = RabbitMQConnectionConfig(
    hosts = List("localhost:5432"),
    name = "MyProductionConnection",
    virtualHost = "/",
    credentials = CredentialsConfig(username = "vogon", password = "jeltz")
  )

val consumerConfig = ConsumerConfig(
    name = "MyConsumer",
    queueName = "QueueWithMyEvents",
    bindings = List(
      AutoBindQueueConfig(exchange = AutoBindExchangeConfig(name = "OtherAppExchange"), routingKeys = List("TheEvent"))
    )
  )

val producerConfig = ProducerConfig(
    name = "MyProducer",
    exchange = "MyGreatApp"
  )

// see https://typelevel.org/cats-effect/tutorial/tutorial.html#acquiring-and-releasing-resources

val rabbitMQProducer: Resource[Task, RabbitMQProducer[Task, Bytes]] = {
    for {
      connection <- RabbitMQConnection.make[Task](connectionConfig, blockingExecutor, Some(sslContext))
      /*
      Here you have created the connection; it's shared for all producers/consumers amongst one RabbitMQ server - they will share a single
      TCP connection but have separated channels.
      If you expect very high load, you can use separate connections for each producer/consumer, however it's usually not needed.
      */

      consumer <- connection.newConsumer[Bytes](consumerConfig, monitor) {
        case delivery: Delivery.Ok[Bytes] =>
          Task.now(DeliveryResult.Ack)

        case _: Delivery.MalformedContent =>
          Task.now(DeliveryResult.Reject)
      }

      producer <- connection.newProducer[Bytes](producerConfig, monitor)
    } yield {
      producer
    }
  }

Streaming support

It seems quite natural to process RabbitMQ queue with a streaming app. StreamingRabbitMQConsumer provides you an fs2.Stream through which you can easily process incoming messages in a streaming way.

Notice: Using this functionality requires you to know some basics of FS2 library. Please see it's official guide if you're not familiar with it first.

// skipping imports and common things, they are the same as in general example above

val consumerConfig = StreamingConsumerConfig( // notice: StreamingConsumerConfig vs. ConsumerConfig
    name = "MyConsumer",
    queueName = "QueueWithMyEvents",
    bindings = List(
      AutoBindQueueConfig(exchange = AutoBindExchangeConfig(name = "OtherAppExchange"), routingKeys = List("TheEvent"))
    )
  )

val processMyStream: fs2.Pipe[Task, StreamedDelivery[Task, Bytes], StreamedResult] = { in =>
    in.evalMap(delivery => delivery.handle(DeliveryResult.Ack)) // TODO you probably want to do some real stuff here
  }

val deliveryStream: Resource[Task, fs2.Stream[Task, StreamedResult]] = for {
    connection <- RabbitMQConnection.make[Task](connectionConfig, blockingExecutor, Some(sslContext))
    streamingConsumer <- connection.newStreamingConsumer[Bytes](consumerConfig, monitor)
  } yield {
    val stream: fs2.Stream[Task, StreamedResult] = streamingConsumer.deliveryStream.through(processMyStream)
    
    // create resilient (self-restarting) stream; see more information below
    lazy val resilientStream: fs2.Stream[Task, StreamedResult] = stream.handleErrorWith { e =>
      // TODO log the error - something is going wrong!
      resilientStream
    }

    resilientStream
  }
Resilient stream

While you should never ever let the stream fail (handle all your possible errors; see Error handling section in official docs how the stream can be failed), it's important you're able to recover the stream when it accidentally happens. You can do that by simply requesting a new stream from the client:

val stream = streamingConsumer.deliveryStream // get stream from client
  .through(processMyStream) // "run" the stream through your processing logic

lazy val resilientStream: fs2.Stream[Task, StreamedResult] = stream.handleErrorWith { err =>
  // handle the error in stream: recover by calling itself
  // TODO don't forget to add some logging/metrics here!
  fs2.Stream.eval(failureCounter.modify(a => (a - 1, a - 1))).flatMap { attemptsRest =>
    if (attemptsRest < 0) fs2.Stream.raiseError[Task](err) else resilientStream
  }
}

resilientStream

or use a prepared extension method:

import com.avast.clients.rabbitmq._

streamingConsumer.deliveryStream // get stream from client
  .through(processMyStream) // "run" the stream through your processing logic
  .makeResilient(maxErrors = 3) { err =>
    Task.delay {
      // TODO don't forget to add some logging/metrics here!
      ()
    }
  }

Please refer to the official guide for understanding more deeply how the recovery of fs2.Stream works.

Providing converters for producer/consumer

Both the producer and consumer require type argument when creating from connection:

  1. connection.newConsumer[MyClass] which requires implicit DeliveryConverter[MyClass]
  2. connection.newProducer[MyClass] which requires implicit ProductConverter[MyClass]

There are multiple options where to get the converter (it's the same case for DeliveryConverter as for ProductConverter):

  1. Implement your own implicit converter for the type
  2. Modules extras-circe and extras-cactus provide support for JSON and GPB conversion.
  3. Use identity converter by specifying Bytes type argument. No further action needed in that case.

Caveats

  1. null instead of converter instance
    It may happen you run in this problem:
    scala> import io.circe.generic.auto._
    import io.circe.generic.auto._
    
    scala> import com.avast.clients.rabbitmq.extras.format.JsonDeliveryConverter
    import com.avast.clients.rabbitmq.extras.format.JsonDeliveryConverter
    
    scala> import com.avast.clients.rabbitmq.DeliveryConverter
    import com.avast.clients.rabbitmq.DeliveryConverter
    
    scala> case class Event(name: String)
    defined class Event
    
    scala> implicit val deliveryConverter: JsonDeliveryConverter[Event] = JsonDeliveryConverter.derive[Event]()
    deliveryConverter: com.avast.clients.rabbitmq.extras.format.JsonDeliveryConverter[Event] = null
    
    scala> implicit val deliveryConverter: DeliveryConverter[Event] = JsonDeliveryConverter.derive[Event]()
    deliveryConverter: com.avast.clients.rabbitmq.DeliveryConverter[Event] = com.avast.clients.rabbitmq.extras.format.JsonDeliveryConverter$$anon$1@5b977aaa
    
    scala> implicit val deliveryConverter = JsonDeliveryConverter.derive[Event]()
    deliveryConverter: com.avast.clients.rabbitmq.extras.format.JsonDeliveryConverter[Event] = com.avast.clients.rabbitmq.extras.format.JsonDeliveryConverter$$anon$1@4b024fb2
    Notice the results of last three calls differ even though they are supposed to be the same (non-null respectively)! A very similar issue is discussed on the StackOverflow and so is similar the solution:
    1. Remove explicit type completely (not recommended)
    2. Make the explicit type more general (DeliveryConverter instead of JsonDeliveryConverter in this case)

Notes

Extras

There is a module with some optional functionality called extras.

Network recovery

The library offers configurable network recovery, with the functionality itself backed by RabbitMQ client's one (ready in 5+).
You can either disable the recovery or select (and configure one of following types):

  1. Linear
    The client will wait initialDelay for first recovery attempt and if it fails, will try it again each period until it succeeds.
  2. Exponential
    The client will wait initialDelay for first recovery attempt and if it fails, will try it again until it succeeds and prolong the delay between each two attempts exponentially (based on period, factor, attempt number), up to maxLength.
    Example:
    For initialDelay = 3s, period = 2s, factor = 2.0, maxLength = 1 minute, produced delays will be 3, 2, 4, 8, 16, 32, 60 seconds (and it will never go higher).

Do not set too short custom recovery delay intervals (less than 2 seconds) as it is not recommended by the official RabbitMQ API Guide.

DeliveryResult

The consumers readAction returns Future of DeliveryResult. The DeliveryResult has 4 possible values (descriptions of usual use-cases):

  1. Ack - the message was processed; it will be removed from the queue
  2. Reject - the message is corrupted or for some other reason we don't want to see it again; it will be removed from the queue
  3. Retry - the message couldn't be processed at this moment (unreachable 3rd party services?); it will be requeued (inserted on the top of the queue)
  4. Republish - the message may be corrupted but we're not sure; it will be re-published to the bottom of the queue (as a new message and the original one will be removed). It's usually wise to prevent an infinite republishing of the message - see Poisoned message handler.

Difference between Retry and Republish

When using Retry the message can effectively cause starvation of other messages in the queue until the message itself can be processed; on the other hand Republish inserts the message to the original queue as a new message and it lets the consumer handle other messages (if they can be processed).

Republishing

Republishing is solved at application level with publishing a new message (with original content, headers, messageId, etc.) to the original queue and acknowledging the old one. This can be done via:

  1. Default exchange Every virtual host in RabbitMQ has default exchange which has implicit bindings to all queues and can be easily used for publishing to basically any queue. This is very handy for functionality such as the republishing however it's also very dangerous and you don't have permissions to use it. In case you do have them, use this option instead of the custom exchange.
    This the default option (in other words, the client will use the default exchange in case you don't tell it not to do so).
  2. Custom exchange In case you're unable to use the default exchange, you have to create your own exchange to replace the functionality. The RabbitMQ client will create it for you together with all necessary bindings and all you have to do is to just configure a name of the exchange, e.g.
       rabbitConnection {
         hosts = ["localhost:5672"]
         virtualHost = "/"
       
         ...
       
         republishStrategy {
           type = CustomExchange
       
           exchangeName = "ExchangeForRepublishing"
    
           exchangeDeclare = true // default
           exchangeAutoBind = true // default
         }
         
         ...
       }
    
    The exchange is created as direct, durable and without auto-delete flag.

Bind/declare arguments

There is an option to specify bind/declare arguments for queues/exchanges as you may read about at RabbitMQ docs.
Example of configuration with HOCON:

  producer {
    name = "Testing" // this is used for logging etc.

    exchange = "myclient"

    // should the producer declare exchange he wants to send to?
    declare {
      enabled = true // disabled by default

      type = "direct" // fanout, topic
      
      arguments = { "x-max-length" : 10000 }
    }
  }

Additional declarations and bindings

Sometimes it's necessary to declare an additional queue or exchange which is not directly related to the consumers or producers you have in your application (e.g. dead-letter queue).
The library makes possible to do such thing. Here is example of such configuration with HOCON:

val rabbitConnection: ConfigRabbitMQConnection[F] = ???

rabbitConnection.bindExchange("backupExchangeBinding") // : F[Unit]

where the "backupExchangeBinding" is link to the configuration (use relative path to the declarations block in configuration):

  declarations {  
    backupExchangeBinding {
      sourceExchangeName = "mainExchange"
      destExchangeName = "backupExchange"
      routingKeys = ["myMessage"]
      arguments {}
    }
  }

Equivalent code with using case classes configuration:

val rabbitConnection: RabbitMQConnection[F] = ???

rabbitConnection.bindExchange(
  BindExchangeConfig(
    sourceExchangeName = "mainExchange",
    destExchangeName = "backupExchange",
    routingKeys = List("myMessage")
  )
) // : F[Unit]

Pull consumer

Sometimes your use-case just doesn't fit the normal consumer scenario. Here you can use the pull consumer which gives you much more control over the received messages. You pull new message from the queue and acknowledge (reject, ...) it somewhere in the future.

The pull consumer uses PullResult as return type:

  • Ok - contains DeliveryWithHandle instance
  • EmptyQueue - there was no message in the queue available

Additionally you can call .toOption method on the PullResult.

A simplified example, using configuration from HOCON:

import cats.effect.Resource
import com.avast.bytes.Bytes
import com.avast.clients.rabbitmq._
import com.avast.clients.rabbitmq.pureconfig._
import com.avast.clients.rabbitmq.api._
import monix.eval.Task
import monix.execution.Scheduler

implicit val sch: Scheduler = ???

val consumer: Resource[Task, RabbitMQPullConsumer[Task, Bytes]] = {
  for {
    connection <- RabbitMQConnection.fromConfig[Task](???, ???)
    consumer <- connection.newPullConsumer[Bytes](??? : String, ???)
  } yield {
    consumer
  }
}

val program: Task[Unit] = consumer.use { consumer =>
  Task
    .sequence { (1 to 100).map(_ => consumer.pull()) } // receive "up to" 100 deliveries
    .flatMap { ds =>
      // do your stuff!

      Task.unit
    }
}

MultiFormatConsumer

Quite often you receive a single type of message but you want to support multiple formats of encoding (Protobuf, Json, ...). This is where MultiFormatConsumer could be used.

Modules extras-circe and extras-cactus provide support for JSON and GPB conversion. They are both used in the example below.

The MultiFormatConsumer is Scala only.

Usage example:

Proto file

import com.avast.bytes.Bytes
import com.avast.cactus.bytes._ // Cactus support for Bytes, see https://github.com/avast/cactus#bytes
import com.avast.clients.rabbitmq.test.ExampleEvents.{NewFileSourceAdded => NewFileSourceAddedGpb}
import com.avast.clients.rabbitmq._
import com.avast.clients.rabbitmq.extras.format._
import io.circe.Decoder
import io.circe.generic.auto._ // to auto derive `io.circe.Decoder[A]` with https://circe.github.io/circe/codec.html#fully-automatic-derivation
import scala.concurrent.Future
import scala.jdk.CollectionConverters._ // <-- for Scala 2.12 use scala.collection.JavaConverters

private implicit val d: Decoder[Bytes] = Decoder.decodeString.map(???)

case class FileSource(fileId: Bytes, source: String)

case class NewFileSourceAdded(fileSources: Seq[FileSource])

val consumer = MultiFormatConsumer.forType[Future, NewFileSourceAdded](
  JsonDeliveryConverter.derive(), // requires implicit `io.circe.Decoder[NewFileSourceAdded]`
  GpbDeliveryConverter[NewFileSourceAddedGpb].derive() // requires implicit `com.avast.cactus.Converter[NewFileSourceAddedGpb, NewFileSourceAdded]`
)(_ => ???)

(see unit test for full example)

Implementing own DeliveryConverter

The CheckedDeliveryConverter is usually reacting to Content-Type (like in the example below) but it's not required - it could e.g. analyze the payload (or first bytes) too.

import com.avast.bytes.Bytes
import com.avast.clients.rabbitmq.CheckedDeliveryConverter
import com.avast.clients.rabbitmq.api.{ConversionException, Delivery}

val StringDeliveryConverter: CheckedDeliveryConverter[String] = new CheckedDeliveryConverter[String] {
  override def canConvert(d: Delivery[Bytes]): Boolean = d.properties.contentType.contains("text/plain")
  override def convert(b: Bytes): Either[ConversionException, String] = Right(b.toStringUtf8)
}