Akka Streams Source and Sink for Cloud Pub/Sub
This repository contains an Akka Streams
Sink implementation that can be used with Google's Cloud Pub/Sub messaging middleware. The Cloud Pub/Sub sinks and sources provided in this repository have been in production at Qubit for several months and is used to process a Cloud Pub/Sub stream that averages over a billion messages a day in real time.
Currently only Scala 2.11 artifacts are published.
"com.qubit" % "akka-cloudpubsub_2.11" % "1.0.0"
<dependency> <groupId>com.qubit</groupId> <artifactId>akka-cloudpubsub_2.11</artifactId> <version>1.0.0</version> </dependency>
Reading from Cloud Pub/Sub (Source)
PubSubSource is backpressure aware and only reads from Pub/Sub when there's demand downstream. By default, messages are acked every 10 seconds or when the ack buffer is full (default size 100). The ack interval, buffer size and several other properties are configurable via attributes.
Creating a source reading from the
foo-sub subscription in the
foo-project with the default settings:
Source.fromGraph(new PubSubSource(PubSubSubscription("foo-project", "foo-sub")))
Overriding the ack interval to 30 seconds:
Source.fromGraph(new PubSubSource(PubSubSubscription("foo-project", "foo-sub")), 30.seconds)
Customise the ack interval, buffer size, pull timeout, max retries and retry jitter:
val attributes = Attributes(List( PubSubPullTimeoutAttribute(60.seconds), PubSubStageBufferSizeAttribute(200), PubSubStageMaxRetriesAttribute(100), PubSubStageRetryJitterAttribute(1, 5))) Source.fromGraph(new PubSubSource(PubSubSubscription("foo-project", "foo-sub"), 30.seconds)).withAttributes(attributes)
Writing to Cloud Pub/Sub (Sink)
PubSubSink flushes messages either when the buffer is full (size is 100 by default) or after the messages have been in the buffer for
maxDelay (30 seconds by default).
Creating a sink writing to
foo-topic topic in the
foo-project with the default settings:
Sink.fromGraph(new PubSubSink(PubSubTopic("foo-project", "foo-topic")))
Overriding the max delay to 10 seconds:
Sink.fromGraph(new PubSubSink(PubSubTopic("foo-project", "foo-topic")), 10.seconds)
Customise the max delay, buffer size, publish timeout, max retries and retry jitter:
val attributes = Attributes(List( PubSubStageBufferSizeAttribute(200), PubSubStageMaxRetriesAttribute(100), PubSubStageRetryJitterAttribute(1, 5), PubSubPublishTimeoutAttribute(60.seconds))) Sink.fromGraph(new PubSubSink(PubSubTopic("foo-project", "foo-topic"))).withAttributes(attributes)
Using the Cloud Pub/Sub Client
The sink and the source both make use of a custom client library implemented using the gRPC API definitions published by Google. This client can be used as a standalone Pub/Sub client even if you are not interested in using Akka streams.
In almost all cases, you should be using the
RetryingPubSubClient which supports customizable retry policies using the Atmos library. Custom retry policies and retry execution contexts need to be provided as implicit parameters to the client when it is created. See
com.qubit.pubsub.client.retry.RetryPolicyDefaults for the default values used for these parameters.
Creating the default client:
val client = RetryingPubSubClient(PubSubGrpcClient())
Overriding the Pub/Sub endpoint and transport security (for tests using the Pub/Sub emulator):
val config = PubSubApiConfig(apiHost = "localhost", apiPort = "8897", tlsEnabled = false) val client = RetryingPubSubClient(PubSubGrpcClient(config))
The integration tests require the Pub/Sub emulator to be running.
On a new terminal window:
mkdir -p /tmp/pubsub && gcloud beta emulators pubsub start --data-dir /tmp/pubsub
On another terminal window:
Change directory to source root and
eval $(gcloud beta emulators pubsub env-init --data-dir /tmp/pubsub) sbt test it:test
Releasing to Sonatype OSSRH
Ensure that credentials and PGP settings are correctly configured as documented at http://www.scala-sbt.org/release/docs/Using-Sonatype.html
sbt release sbt releaseSonatype
- Please make sure that the code is formatted using scalafmt using the provided formatting configuration
- Add tests to exercise the new additions/modifications
- Create an issue on Github before submitting your pull request