AMQP protocol plugin for Gatling load testing framework. Supports RabbitMQ with publish, request-reply, and consume patterns with connection pooling.
- Compatibility
- Installation
- Quick Start
- Protocol Configuration
- Actions
- Message Properties
- Checks
- Silent Mode
- Publisher Confirms
- Examples
- Contributing
- License
| Plugin Version | Gatling | Scala | Java |
|---|---|---|---|
| 0.x.y-latest | 3.13.x | 2.13 | 17+ |
| 0.x.y | 3.11.x | 2.13 | 17+ |
Branch strategy:
maintargets Gatling 3.11.x,latest/gatlingtargets Gatling 3.13.x.
libraryDependencies += "org.galaxio" %% "gatling-amqp-plugin" % "<version>" % Testgatling("org.galaxio:gatling-amqp-plugin_2.13:<version>")<dependency>
<groupId>org.galaxio</groupId>
<artifactId>gatling-amqp-plugin_2.13</artifactId>
<version>${version}</version>
<scope>test</scope>
</dependency>You need a running RabbitMQ instance. The examples below use queueExchange("test-queue"), which publishes directly to the default exchange with routing key test-queue. The queue must exist before the test runs — otherwise messages are discarded by the broker (no error is reported to the publisher).
docker run -d --name gatling-rabbit \
-p 5672:5672 -p 15672:15672 \
rabbitmq:3-managementThe management UI is available at http://localhost:15672 (guest/guest).
Using the management CLI inside the container:
docker exec gatling-rabbit rabbitmqadmin declare queue name=test-queue durable=falseAlternatively, let the plugin declare the queue automatically — see Queue and Exchange Declarations below.
import org.galaxio.gatling.amqp.Predef._
import io.gatling.core.Predef._
class AmqpSimulation extends Simulation {
val amqpConf = amqp
.connectionFactory(
rabbitmq
.host("localhost")
.port(5672)
.username("guest")
.password("guest")
.vhost("/")
)
.usePersistentDeliveryMode
.matchByMessageId
val scn = scenario("AMQP Publish")
.exec(
amqp("publish").publish
.queueExchange("test-queue")
.textMessage("""{"msg": "hello"}""")
.contentType("application/json")
)
setUp(scn.inject(atOnceUsers(1))).protocols(amqpConf)
}import static org.galaxio.gatling.amqp.javaapi.AmqpDsl.*;
import static io.gatling.javaapi.core.CoreDsl.*;
public class AmqpSimulation extends Simulation {
var amqpConf = amqp()
.connectionFactory(
rabbitmq().host("localhost").port(5672).username("guest").password("guest").build()
)
.usePersistentDeliveryMode()
.matchByMessageId();
var scn = scenario("AMQP Publish")
.exec(
amqp("publish").publish()
.queueExchange("test-queue")
.textMessage("{\"msg\": \"hello\"}")
.contentType("application/json")
);
{ setUp(scn.injectOpen(atOnceUsers(1)).protocols(amqpConf)); }
}import org.galaxio.gatling.amqp.javaapi.AmqpDsl.*
import io.gatling.javaapi.core.CoreDsl.*
class AmqpSimulation : Simulation() {
val amqpConf = amqp()
.connectionFactory(
rabbitmq().host("localhost").port(5672).username("guest").password("guest").build()
)
.usePersistentDeliveryMode()
.matchByMessageId()
val scn = scenario("AMQP Publish")
.exec(
amqp("publish").publish()
.queueExchange("test-queue")
.textMessage("""{"msg": "hello"}""")
.contentType("application/json")
)
init { setUp(scn.injectOpen(atOnceUsers(1)).protocols(amqpConf)) }
}Purge stale messages, run the simulation, then check:
# Before the test — clear any leftover messages
docker exec gatling-rabbit rabbitmqadmin purge queue name=test-queue
# Run the simulation (step 3), then inspect:
docker exec gatling-rabbit rabbitmqadmin get queue=test-queue count=10You should see the JSON payload {"msg": "hello"} in the output. If the queue is empty, the message was routed to a non-existent queue (check spelling) or the queue was not declared before the test.
You can also verify via the management UI at http://localhost:15672/#/queues/%2F/test-queue — click "Get messages" to inspect the content.
A request-reply test publishes a message and waits for a response on a reply queue.
# Create the request and reply queues
docker exec gatling-rabbit rabbitmqadmin declare queue name=request-queue durable=false
docker exec gatling-rabbit rabbitmqadmin declare queue name=reply-queue durable=falseStart a simple echo consumer on your host (requires Python 3 and pip install pika):
# echo_consumer.py — reads from request-queue, replies to the replyTo queue
python3 -c "
import pika
conn = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
ch = conn.channel()
def on_msg(channel, method, props, body):
ch.basic_publish(exchange='', routing_key=props.reply_to,
properties=pika.BasicProperties(correlation_id=props.message_id),
body=body)
ch.basic_ack(delivery_tag=method.delivery_tag)
print(f'Replied to {props.reply_to}')
ch.basic_consume(queue='request-queue', on_message_callback=on_msg)
print('Waiting for messages on request-queue...')
ch.start_consuming()
" &Run the consumer in a separate terminal (or background with
&as shown). After the test you can stop it withkill %1.
Scala simulation:
import org.galaxio.gatling.amqp.Predef._
import io.gatling.core.Predef._
val amqpConf = amqp
.connectionFactory(
rabbitmq.host("localhost").port(5672).username("guest").password("guest").vhost("/")
)
.replyTimeout(60000)
.consumerThreadsCount(8)
.matchByMessageId
val scn = scenario("Request-Reply")
.exec(
amqp("rpc-call").requestReply
.queueExchange("request-queue")
.replyExchange("reply-queue")
.textMessage("""{"action": "ping"}""")
.contentType("application/json")
.check(bodyString.exists)
)
setUp(scn.inject(atOnceUsers(1))).protocols(amqpConf)If no consumer is running, the request will time out (default 60 s per replyTimeout) and Gatling reports a KO — this confirms the round-trip is wired correctly.
val cf = rabbitmq
.host("localhost")
.port(5672)
.username("guest")
.password("guest")
.vhost("/")val cf = rabbitmq
.host("localhost")
.port(5672)
.username("guest")
.password("guest")
.automaticRecovery(true)
.networkRecoveryInterval(5000)
.topologyRecovery(true)
.connectionTimeout(10000)
.requestedHeartbeat(60)
.requestedChannelMax(0)
.useSslProtocol()val amqpConf = amqp
.connectionFactory(publishCf) // single connection factory
.connectionFactory(publishCf, replyCf) // separate publish/reply connections
.usePersistentDeliveryMode // delivery mode 2
.useNonPersistentDeliveryMode // delivery mode 1 (default)
.matchByMessageId // match replies by messageId (default)
.matchByCorrelationId // match replies by correlationId
.matchByMessage(msg => msg.messageId) // custom match extraction
.replyTimeout(30000) // reply timeout in ms
.consumerThreadsCount(4) // consumer threads per pool
.channelPoolSize(32) // max pooled channels (default: 16)
.usePublisherConfirms // enable publisher confirms
.responseTransform(msg => msg) // transform reply messages before checksval amqpConf = amqp
.connectionFactory(cf)
.declare(queue("test-queue", durable = true, exclusive = false, autoDelete = false))
.declare(exchange("test-exchange", BuiltinExchangeType.TOPIC, durable = true))
.bindQueue(
queue("test-queue", durable = true),
exchange("test-exchange", BuiltinExchangeType.TOPIC),
"routing.key"
)// Queue exchange
amqp("publish").publish
.queueExchange("my-queue")
.textMessage("""{"data": "value"}""")
// Topic exchange
amqp("publish").publish
.topicExchange("my-exchange", "routing.key")
.textMessage("""{"data": "value"}""")
// Direct exchange
amqp("publish").publish
.directExchange("my-exchange", "routing.key")
.bytesMessage(myBytes)
// With EL expressions
amqp("publish-#{id}").publish
.queueExchange("queue-#{name}")
.textMessage("""{"id": "#{id}"}""")amqp("request-reply").requestReply
.topicExchange("request-exchange", "request.key")
.replyExchange("reply-queue")
.textMessage("""{"query": "data"}""")
.messageId("#{msgId}")
.check(jsonPath("$.result").is("ok"))The request destination supports queueExchange, directExchange, and topicExchange. The reply destination (.replyExchange(name)) specifies the queue where the plugin listens for replies. Reply routing is always queue-based.
Multi-broker setup (publish to one broker, consume reply from another):
val amqpConf = amqp
.connectionFactory(publisherCf, replyConsumerCf)
.matchByCorrelationId
.replyTimeout(10000)Polls the queue for a message up to the specified timeout (default 5000ms). If no message arrives within the timeout, the action reports failure.
amqp("consume").consume
.queue("my-queue")
.timeout(5000)
.check(bodyString.is("expected"))All message properties support Gatling Expression Language:
amqp("publish").publish
.queueExchange("my-queue")
.textMessage("hello")
.messageId("#{msgId}")
.correlationId("#{corrId}")
.contentType("application/json")
.contentEncoding("UTF-8")
.priority(5)
.replyTo("reply-queue")
.expiration("60000")
.amqpType("my.type")
.userId("guest")
.appId("my-app")
.header("X-Custom", "value")
.headers("X-One" -> "1", "X-Two" -> "2")Checks are supported on request-reply and consume actions:
import org.galaxio.gatling.amqp.Predef._
// JSON checks
.check(jsonPath("$.status").is("ok"))
.check(jsonPath("$.items[*].id").findAll.saveAs("ids"))
.check(jmesPath("status").is("ok"))
// XML checks
.check(xpath("//status").is("ok"))
// Body checks
.check(bodyString.is("expected"))
.check(bodyBytes.exists)
.check(substring("success").exists)
// Response code
.check(responseCode.notIn("500", "503"))
// Simple custom check
.check(simpleCheck(msg => msg.payload.nonEmpty))
// Conditional check
.check(checkIf("#{useCheck}")(jsonPath("$.data").exists))Mark requests as silent to suppress statistics logging:
amqp("setup-publish").publish
.queueExchange("setup-queue")
.textMessage("init")
.silent
amqp("setup-rpc").requestReply
.queueExchange("setup-queue")
.replyExchange("reply-queue")
.textMessage("init")
.silentEnable publisher confirms for guaranteed delivery:
val amqpConf = amqp
.connectionFactory(cf)
.usePublisherConfirmsNote: Kotlin examples use the Java API facade and are provided as reference. They are not compiled by CI (no Kotlin compiler plugin is configured in the build).
# Build
sbt compile
# Run unit tests
sbt test
# Run integration tests (requires RabbitMQ)
docker-compose up -d
sbt "Gatling / testOnly org.galaxio.gatling.amqp.examples.AmqpGatlingTest"
# Check formatting
sbt scalafmtCheckAll
# Format code
sbt scalafmtAllApache License 2.0. See LICENSE for details.