galax-io / gatling-amqp-plugin   1.0.4

Apache License 2.0 GitHub

Gatling plugin for AMQP/RabbitMQ — publish, request-reply, and consume patterns with connection pooling

Scala versions: 2.13

Gatling AMQP Plugin

CI Maven Central codecov License Scala Steward badge

AMQP protocol plugin for Gatling load testing framework. Supports RabbitMQ with publish, request-reply, and consume patterns with connection pooling.

Table of Contents

Compatibility

Plugin Version Gatling Scala Java
0.x.y-latest 3.13.x 2.13 17+
0.x.y 3.11.x 2.13 17+

Branch strategy: main targets Gatling 3.11.x, latest/gatling targets Gatling 3.13.x.

Installation

Scala (sbt)

libraryDependencies += "org.galaxio" %% "gatling-amqp-plugin" % "<version>" % Test

Java / Kotlin (Gradle Kotlin DSL)

gatling("org.galaxio:gatling-amqp-plugin_2.13:<version>")

Maven

<dependency>
  <groupId>org.galaxio</groupId>
  <artifactId>gatling-amqp-plugin_2.13</artifactId>
  <version>${version}</version>
  <scope>test</scope>
</dependency>

Quick Start

Prerequisites

You need a running RabbitMQ instance. The examples below use queueExchange("test-queue"), which publishes directly to the default exchange with routing key test-queue. The queue must exist before the test runs — otherwise messages are discarded by the broker (no error is reported to the publisher).

1. Start RabbitMQ

docker run -d --name gatling-rabbit \
  -p 5672:5672 -p 15672:15672 \
  rabbitmq:3-management

The management UI is available at http://localhost:15672 (guest/guest).

2. Create the required queue

Using the management CLI inside the container:

docker exec gatling-rabbit rabbitmqadmin declare queue name=test-queue durable=false

Alternatively, let the plugin declare the queue automatically — see Queue and Exchange Declarations below.

3. Run the simulation

Scala

import org.galaxio.gatling.amqp.Predef._
import io.gatling.core.Predef._

class AmqpSimulation extends Simulation {
  val amqpConf = amqp
    .connectionFactory(
      rabbitmq
        .host("localhost")
        .port(5672)
        .username("guest")
        .password("guest")
        .vhost("/")
    )
    .usePersistentDeliveryMode
    .matchByMessageId

  val scn = scenario("AMQP Publish")
    .exec(
      amqp("publish").publish
        .queueExchange("test-queue")
        .textMessage("""{"msg": "hello"}""")
        .contentType("application/json")
    )

  setUp(scn.inject(atOnceUsers(1))).protocols(amqpConf)
}

Java

import static org.galaxio.gatling.amqp.javaapi.AmqpDsl.*;
import static io.gatling.javaapi.core.CoreDsl.*;

public class AmqpSimulation extends Simulation {
  var amqpConf = amqp()
    .connectionFactory(
      rabbitmq().host("localhost").port(5672).username("guest").password("guest").build()
    )
    .usePersistentDeliveryMode()
    .matchByMessageId();

  var scn = scenario("AMQP Publish")
    .exec(
      amqp("publish").publish()
        .queueExchange("test-queue")
        .textMessage("{\"msg\": \"hello\"}")
        .contentType("application/json")
    );

  { setUp(scn.injectOpen(atOnceUsers(1)).protocols(amqpConf)); }
}

Kotlin

import org.galaxio.gatling.amqp.javaapi.AmqpDsl.*
import io.gatling.javaapi.core.CoreDsl.*

class AmqpSimulation : Simulation() {
  val amqpConf = amqp()
    .connectionFactory(
      rabbitmq().host("localhost").port(5672).username("guest").password("guest").build()
    )
    .usePersistentDeliveryMode()
    .matchByMessageId()

  val scn = scenario("AMQP Publish")
    .exec(
      amqp("publish").publish()
        .queueExchange("test-queue")
        .textMessage("""{"msg": "hello"}""")
        .contentType("application/json")
    )

  init { setUp(scn.injectOpen(atOnceUsers(1)).protocols(amqpConf)) }
}

4. Verify it worked

Purge stale messages, run the simulation, then check:

# Before the test — clear any leftover messages
docker exec gatling-rabbit rabbitmqadmin purge queue name=test-queue

# Run the simulation (step 3), then inspect:
docker exec gatling-rabbit rabbitmqadmin get queue=test-queue count=10

You should see the JSON payload {"msg": "hello"} in the output. If the queue is empty, the message was routed to a non-existent queue (check spelling) or the queue was not declared before the test.

You can also verify via the management UI at http://localhost:15672/#/queues/%2F/test-queue — click "Get messages" to inspect the content.

5. Request-Reply walkthrough

A request-reply test publishes a message and waits for a response on a reply queue.

# Create the request and reply queues
docker exec gatling-rabbit rabbitmqadmin declare queue name=request-queue durable=false
docker exec gatling-rabbit rabbitmqadmin declare queue name=reply-queue durable=false

Start a simple echo consumer on your host (requires Python 3 and pip install pika):

# echo_consumer.py — reads from request-queue, replies to the replyTo queue
python3 -c "
import pika
conn = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
ch = conn.channel()
def on_msg(channel, method, props, body):
    ch.basic_publish(exchange='', routing_key=props.reply_to,
                     properties=pika.BasicProperties(correlation_id=props.message_id),
                     body=body)
    ch.basic_ack(delivery_tag=method.delivery_tag)
    print(f'Replied to {props.reply_to}')
ch.basic_consume(queue='request-queue', on_message_callback=on_msg)
print('Waiting for messages on request-queue...')
ch.start_consuming()
" &

Run the consumer in a separate terminal (or background with & as shown). After the test you can stop it with kill %1.

Scala simulation:

import org.galaxio.gatling.amqp.Predef._
import io.gatling.core.Predef._

val amqpConf = amqp
  .connectionFactory(
    rabbitmq.host("localhost").port(5672).username("guest").password("guest").vhost("/")
  )
  .replyTimeout(60000)
  .consumerThreadsCount(8)
  .matchByMessageId

val scn = scenario("Request-Reply")
  .exec(
    amqp("rpc-call").requestReply
      .queueExchange("request-queue")
      .replyExchange("reply-queue")
      .textMessage("""{"action": "ping"}""")
      .contentType("application/json")
      .check(bodyString.exists)
  )

setUp(scn.inject(atOnceUsers(1))).protocols(amqpConf)

If no consumer is running, the request will time out (default 60 s per replyTimeout) and Gatling reports a KO — this confirms the round-trip is wired correctly.

Protocol Configuration

Connection Factory

val cf = rabbitmq
  .host("localhost")
  .port(5672)
  .username("guest")
  .password("guest")
  .vhost("/")

Advanced Connection Settings

val cf = rabbitmq
  .host("localhost")
  .port(5672)
  .username("guest")
  .password("guest")
  .automaticRecovery(true)
  .networkRecoveryInterval(5000)
  .topologyRecovery(true)
  .connectionTimeout(10000)
  .requestedHeartbeat(60)
  .requestedChannelMax(0)
  .useSslProtocol()

Protocol Options

val amqpConf = amqp
  .connectionFactory(publishCf)                  // single connection factory
  .connectionFactory(publishCf, replyCf)         // separate publish/reply connections

  .usePersistentDeliveryMode                     // delivery mode 2
  .useNonPersistentDeliveryMode                  // delivery mode 1 (default)

  .matchByMessageId                              // match replies by messageId (default)
  .matchByCorrelationId                          // match replies by correlationId
  .matchByMessage(msg => msg.messageId)          // custom match extraction

  .replyTimeout(30000)                           // reply timeout in ms
  .consumerThreadsCount(4)                       // consumer threads per pool
  .channelPoolSize(32)                           // max pooled channels (default: 16)
  .usePublisherConfirms                          // enable publisher confirms

  .responseTransform(msg => msg)                 // transform reply messages before checks

Queue and Exchange Declarations

val amqpConf = amqp
  .connectionFactory(cf)
  .declare(queue("test-queue", durable = true, exclusive = false, autoDelete = false))
  .declare(exchange("test-exchange", BuiltinExchangeType.TOPIC, durable = true))
  .bindQueue(
    queue("test-queue", durable = true),
    exchange("test-exchange", BuiltinExchangeType.TOPIC),
    "routing.key"
  )

Actions

Publish

// Queue exchange
amqp("publish").publish
  .queueExchange("my-queue")
  .textMessage("""{"data": "value"}""")

// Topic exchange
amqp("publish").publish
  .topicExchange("my-exchange", "routing.key")
  .textMessage("""{"data": "value"}""")

// Direct exchange
amqp("publish").publish
  .directExchange("my-exchange", "routing.key")
  .bytesMessage(myBytes)

// With EL expressions
amqp("publish-#{id}").publish
  .queueExchange("queue-#{name}")
  .textMessage("""{"id": "#{id}"}""")

Request-Reply

amqp("request-reply").requestReply
  .topicExchange("request-exchange", "request.key")
  .replyExchange("reply-queue")
  .textMessage("""{"query": "data"}""")
  .messageId("#{msgId}")
  .check(jsonPath("$.result").is("ok"))

The request destination supports queueExchange, directExchange, and topicExchange. The reply destination (.replyExchange(name)) specifies the queue where the plugin listens for replies. Reply routing is always queue-based.

Multi-broker setup (publish to one broker, consume reply from another):

val amqpConf = amqp
  .connectionFactory(publisherCf, replyConsumerCf)
  .matchByCorrelationId
  .replyTimeout(10000)

Consume

Polls the queue for a message up to the specified timeout (default 5000ms). If no message arrives within the timeout, the action reports failure.

amqp("consume").consume
  .queue("my-queue")
  .timeout(5000)
  .check(bodyString.is("expected"))

Message Properties

All message properties support Gatling Expression Language:

amqp("publish").publish
  .queueExchange("my-queue")
  .textMessage("hello")
  .messageId("#{msgId}")
  .correlationId("#{corrId}")
  .contentType("application/json")
  .contentEncoding("UTF-8")
  .priority(5)
  .replyTo("reply-queue")
  .expiration("60000")
  .amqpType("my.type")
  .userId("guest")
  .appId("my-app")
  .header("X-Custom", "value")
  .headers("X-One" -> "1", "X-Two" -> "2")

Checks

Checks are supported on request-reply and consume actions:

import org.galaxio.gatling.amqp.Predef._

// JSON checks
.check(jsonPath("$.status").is("ok"))
.check(jsonPath("$.items[*].id").findAll.saveAs("ids"))
.check(jmesPath("status").is("ok"))

// XML checks
.check(xpath("//status").is("ok"))

// Body checks
.check(bodyString.is("expected"))
.check(bodyBytes.exists)
.check(substring("success").exists)

// Response code
.check(responseCode.notIn("500", "503"))

// Simple custom check
.check(simpleCheck(msg => msg.payload.nonEmpty))

// Conditional check
.check(checkIf("#{useCheck}")(jsonPath("$.data").exists))

Silent Mode

Mark requests as silent to suppress statistics logging:

amqp("setup-publish").publish
  .queueExchange("setup-queue")
  .textMessage("init")
  .silent

amqp("setup-rpc").requestReply
  .queueExchange("setup-queue")
  .replyExchange("reply-queue")
  .textMessage("init")
  .silent

Publisher Confirms

Enable publisher confirms for guaranteed delivery:

val amqpConf = amqp
  .connectionFactory(cf)
  .usePublisherConfirms

Examples

Scala

Java

Kotlin

Note: Kotlin examples use the Java API facade and are provided as reference. They are not compiled by CI (no Kotlin compiler plugin is configured in the build).

Contributing

# Build
sbt compile

# Run unit tests
sbt test

# Run integration tests (requires RabbitMQ)
docker-compose up -d
sbt "Gatling / testOnly org.galaxio.gatling.amqp.examples.AmqpGatlingTest"

# Check formatting
sbt scalafmtCheckAll

# Format code
sbt scalafmtAll

License

Apache License 2.0. See LICENSE for details.