Gatling AMQP support
handy cli (use AmqpProtocol as console utility) [0.6 feature]
scala> import io.gatling.amqp.Predef._
scala> amqp.declare(queue("q3", durable = true)).run
implicit val amqpProtocol: AmqpProtocol = amqp
.host("localhost")
.port(5672)
// .vhost("/")
.auth("guest", "guest")
.poolSize(10)
val req = PublishRequest("q1", payload = "{foo:1}")
val scn = scenario("AMQP Publish").repeat(1000) {
exec(amqp("Publish").publish(req))
}
setUp(scn.inject(rampUsers(10) over (1 seconds))).protocols(amqpProtocol)
publish (with persistent)
- PublishRequest.persistent make request DeliveryMode(2)
- known issue: persistent reset request's properties
val req = PublishRequest("q1", payload = "{foo:1}").persistent
publish (with confirmation)
- set "confirmMode()" in protocol that invokes "channel.confirmSelect()"
implicit val amqpProtocol: AmqpProtocol = amqp
.host("localhost")
.port(5672)
.auth("guest", "guest")
.poolSize(10)
.confirmMode()
val req = PublishRequest("q1", payload = "{foo:1}")
val scn = scenario("AMQP Publish(ack)").repeat(1000) {
exec(amqp("Publish").publish(req))
}
setUp(scn.inject(rampUsers(10) over (1 seconds))).protocols(amqpProtocol)
implicit val amqpProtocol: AmqpProtocol = amqp
.host("localhost")
.port(5672)
.auth("guest", "guest")
.declare(queue("q1", durable = true, autoDelete = false))
declare exchange and binding
val x = exchange("color", "direct", autoDelete = false)
val q = queue("orange")
implicit val amqpProtocol: AmqpProtocol = amqp
.host("localhost")
.port(5672)
.auth("guest", "guest")
.declare(x)
.declare(q)
.bind(x, q, routingKey = "orange")
- full code: src/test/scala/io/gatling/amqp/PublishingSimulation.scala
implicit val amqpProtocol: AmqpProtocol = amqp
.host("amqp")
.port(5672)
.auth("guest", "guest")
val scn = scenario("AMQP Publish(ack)").exec {
amqp("Consume").consume("q1", autoAck = true)
}
setUp(scn.inject(atOnceUsers(1))).protocols(amqpProtocol)
- full code: src/test/scala/io/gatling/amqp/ConsumingSimulation.scala
% sbt
> testOnly io.gatling.amqp.PublishingSimulation
% sbt
> testOnly io.gatling.amqp.ConsumingSimulation
- try
sbt -J-Xmx8192m -J-XX:MaxPermSize=256m
for publishing massive messages
shell script to store gatling stdout logs and simulation sources
- cpu: Xeon X5687(3.60GHz)
- mem: 24GB, limit(10GiB), watermark(5GiB)
- version: 3.5.2
- total bytes =
servers * payalod * messages * users
- users = concurrency of AMQP connections
publish (persistent queue)
servers |
payload |
ack |
repeat |
users |
total |
sec |
qps |
spd |
1 |
1 KB |
o |
100,000 |
10 |
1 GB |
69 |
14326 |
14.3 MB/s |
1 |
10 KB |
o |
10,000 |
10 |
1 GB |
14 |
6778 |
67.8 MB/s |
1 |
100 KB |
o |
1,000 |
10 |
1 GB |
11 |
881 |
88.1 MB/s |
1 |
1 MB |
o |
100 |
10 |
1 GB |
10 |
97 |
97.8 MB/s |
1 |
10 MB |
o |
100 |
1 |
1 GB |
- |
- |
log error |
1 |
10 KB |
o |
1,000 |
100 |
1 GB |
17 |
5791 |
57.9 MB/s |
4 |
1 KB |
o |
100,000 |
10 |
4 GB |
298 |
13490 |
13.5 MB/s |
4 |
10 KB |
o |
10,000 |
10 |
4 GB |
56 |
7208 |
72.1 MB/s |
- log error: statsEngine stopped before working actors finished
publish (persistent queue, paging)
servers |
payload |
ack |
repeat |
users |
total |
sec |
qps |
spd |
1 |
10 KB |
o |
100,000 |
10 |
10 GB |
143 |
6983 |
69.8 MB/s |
1 |
10 KB |
o |
200,000 |
10 |
20 GB |
301 |
6632 |
66.3 MB/s |
consume (persistent queue)
payload |
message |
users |
total |
sec |
qps |
spd |
10 KB |
100 k |
1 |
1 GB |
12 |
8436 |
84.4 MB/s |
10 KB |
2 m |
1 |
20 GB |
179 |
11161 |
111.6 MB/s |
publish and consume (persistent queue)
payload |
p-ack |
repeat |
publisher |
total |
qps |
spd |
consumer |
ack |
qps |
10 KB |
o |
10,000 |
10 |
1 GB |
6779 |
67.8 MB/s |
1 |
auto |
6233 |
10 KB |
o |
200,000 |
10 |
20 GB |
7639 |
76.4 MB/s |
1 |
auto |
7622 |
- amqp-client-3.5.3
- gatling-sbt-2.1.6 (to implement easily)
- gatling-2.2.0-M3 (live with edge)
- declare exchanges, queues and bindings in action builder context (to test declaration costs)
- make AmqpProtocol immutable
- make Builder mutable
- mandatory
- consume action (manual ack)
released under the MIT License.