permutive-engineering / fs2-pubsub   1.1.0

Apache License 2.0 GitHub

Google Cloud Pub/Sub stream-based client built on top of cats-effect, fs2 and http4s.

Scala versions: 3.x 2.13 2.12

Google Cloud Pub/Sub stream-based client built on top of cats-effect, fs2 and http4s.


Installation

Add the following line to your build.sbt file:

libraryDependencies += "com.permutive" %% "fs2-pubsub" % "1.1.0"

The library is published for Scala versions: 2.12, 2.13 and 3.

Usage

To start using the library, you'll need an http4s Client with permission to call Pub/Sub APIs in GCP. You can create one using gcp-auth:

import org.http4s.ember.client.EmberClientBuilder
import cats.effect.IO
import cats.syntax.all._
import com.permutive.gcp.auth.TokenProvider

val client = EmberClientBuilder
  .default[IO]
  .withHttp2
  .build
  .mproduct(client => TokenProvider.userAccount(client).toResource)
  .map { case (client, tokenProvider) => tokenProvider.clientMiddleware(client) }

Publishing messages to a Pub/Sub topic

To publish messages to Pub/Sub, you can use the PubsubPublisher class:

import fs2.pubsub._

val publisher: PubSubPublisher[IO, String] = PubSubPublisher
    .http[IO, String]
    .projectId(ProjectId("my-project"))
    .topic(Topic("my-topic"))
    .defaultUri
    .httpClient(client)
    .noRetry

Then you can use any of the PubSubPublisher methods to send messages to Pub/Sub.

// Producing a single message

publisher.publishOne("message")
// Producing multiple messages

val records = List(
   PubSubRecord.Publisher("message1"),
   PubSubRecord.Publisher("message2"),
   PubSubRecord.Publisher("message3")
)

publisher.publishMany(records)
// Producing a message with attributes

publisher.publishOne("message", "key" -> "value")
// Producing a message using the record type

val record = PubSubRecord.Publisher("message").withAttribute("key", "value")

publisher.publishOne(record)

Configuring the publisher

There are several configuration options available for the publisher:

  • projectId: The GCP project ID.
  • topic: The Pub/Sub topic name.
  • uri: The URI of the Pub/Sub API. By default, it uses the Google Cloud Pub/Sub API.
  • httpClient: The http4s Client to use for making requests to the Pub/Sub API.
  • retry: The retry policy to use when sending messages to Pub/Sub. By default, it retries up to 3 times with exponential backoff.

These configurations can either by provided by using a configuration object (PubSubPublisher.Config) or by using the builder pattern.

Using gRPC (only available on 2.13 or 3.x)

You can use PubSubPublisher.grpc to create a publisher that uses gRPC to connect to Pub/Sub.

This type of publisher is only available on Scala 2.13 or 3.x.

Publishing messages asynchronously (in batches)

In order to publish messages asynchronously, you can use the PubSubPublisher.Async. You can create an instance of this class from a regular PubSubPublisher by using the batching method:

import cats.effect.Resource
import scala.concurrent.duration._

val asyncPublisher: Resource[IO, PubSubPublisher.Async[IO, String]] = 
   publisher
    .batching
    .batchSize(10)
    .maxLatency(1.second)

Then you can use any of the PubSubPublisher.Async methods to send messages to Pub/Sub. These methods are the same ones you'll find in the regular PubSubPublisher, with the difference that they return a F[Unit] instead of a F[MessageId] and that they expect a PubSubRecord.Publisher.WithCallback instead of a regular PubSubRecord.Publisher.

In order to construct such class you can either use the PubSubRecord.Publisher.WithCallback constructor or use the withCallback method on a regular PubSubRecord.Publisher:

val recordWithCallback = PubSubRecord.Publisher("message").withCallback { _ =>
  IO(println("Message sent!"))
}

Subscribing to a Pub/Sub subscription

To subscribe to a Pub/Sub subscription, you can use the PubSubSubscriber class:

import fs2.Stream

val subscriber: Stream[IO, Option[String]] = PubSubSubscriber
    .http[IO]
    .projectId(ProjectId("my-project"))
    .subscription(Subscription("my-subscription"))
    .defaultUri
    .httpClient(client)
    .noRetry
    .noErrorHandling
    .withDefaults
    .decodeTo[String]
    .subscribeAndAck

Configuring the subscriber

There are several configuration options available for the subscriber:

  • projectId: The GCP project ID.
  • subscription: The Pub/Sub subscription name.
  • uri: The URI of the Pub/Sub API. By default, it uses the Google Cloud Pub/Sub API.
  • httpClient: The http4s Client to use for making requests to the Pub/Sub API.
  • retry: The retry policy to use when receiving messages from Pub/Sub. By default, it retries up to 3 times with exponential backoff.
  • errorHandling: The error handling policy to use when performing operations such as decoding messages or acknowledging them.
  • batchSize: The maximum number of messages to acknowledge at once.
  • maxLatency: The maximum time to wait for a batch of messages before acknowledging them.
  • maxMessages: The maximum number of messages to receive in a single batch.
  • readConcurrency: The number of concurrent reads from the subscription.

These configurations can either by provided by using a configuration object (PubSubSubscriber.Config) or by using the builder pattern.

Using gRPC (only available on 2.13 or 3.x)

You can use PubSubSubscriber.grpc to create a subscriber that uses gRPC to connect to Pub/Sub.

This type of subscriber is only available on Scala 2.13 or 3.x.

Creating a raw subscriber

There are two types of subscribers available in the library: raw and decoded.

The raw subscriber returns the raw message received from Pub/Sub, while the decoded subscriber decodes the message to a specific type.

The former is useful when you want to handle the message yourself, while the latter is useful when you want to work with a specific type. You can create a raw subscriber by using the raw method instead of decodeTo.

Pureconfig integration

The library provides a way to load the configuration from a ConfigSource using pureconfig.

You just need to add the following line to your build.sbt file:

libraryDependencies += "com.permutive" %% "fs2-pubsub-pureconfig" % "1.1.0"

And then add the following import when you want to use the pureconfig integration:

import pureconfig.ConfigSource

import fs2.pubsub.PubSubPublisher
import fs2.pubsub.pureconfig._

val config = ConfigSource.default.loadOrThrow[PubSubPublisher.Config]

PubSubPublisher
    .http[IO, String]
    .fromConfig(config)
    .httpClient(client)
    .noRetry

Migrating from fs2-google-pubsub

The most important thing you need to take into account while migrating is that the library no longer creates an authenticated Client for you. You need to provide one yourself using permutive-engineering/gcp-auth.

You can use the following table to find the equivalent classes and methods in fs2-pubsub:

fs2-google-pubsub fs2-pubsub
com.permutive.pubsub.consumer.ConsumerRecord fs2.pubsub.PubSubRecord.Publisher
com.permutive.pubsub.consumer.ConsumerRecord fs2.pubsub.PubSubRecord.Publisher
com.permutive.pubsub.consumer.decoder.MessageDecoder fs2.pubsub.MessageDecoder
com.permutive.pubsub.consumer.grpc.PubsubGoogleConsumer fs2.pubsub.PubSubSubscriber.grpc
com.permutive.pubsub.consumer.grpc.PubsubGoogleConsumerConfig fs2.pubsub.PubSubSubscriber.Config
com.permutive.pubsub.consumer.http.PubsubHttpConsumer fs2.pubsub.PubSubSubscriber.http
com.permutive.pubsub.consumer.http.PubsubHttpConsumerConfig fs2.pubsub.PubSubSubscriber.Config
com.permutive.pubsub.consumer.http.PubsubMessage fs2.pubsub.PubSubRecord.Subscriber
com.permutive.pubsub.consumer.Model.ProjectId fs2.pubsub.ProjectId
com.permutive.pubsub.consumer.Model.Subscription fs2.pubsub.Subscription
com.permutive.pubsub.http.crypto.* permutive-engineering/gcp-auth
com.permutive.pubsub.http.oauth.* permutive-engineering/gcp-auth
com.permutive.pubsub.http.util.RefreshableEffect permutive-engineering/refreshable
com.permutive.pubsub.producer.AsyncPubsubProducer fs2.pubsub.PubSubPublisher.Async
com.permutive.pubsub.producer.encoder.MessageEncoder fs2.pubsub.MessageEncoder
com.permutive.pubsub.producer.grpc.GooglePubsubProducer fs2.pubsub.PubSubPublisher.grpc
com.permutive.pubsub.producer.grpc.PubsubProducerConfig fs2.pubsub.PubSubPublisher.Config
com.permutive.pubsub.producer.http.BatchingHttpProducerConfig fs2.pubsub.PubSubPublisher.Async.Config
com.permutive.pubsub.producer.http.BatchingHttpPubsubProducer fs2.pubsub.PubSubPublisher.Async.http
com.permutive.pubsub.producer.http.HttpPubsubProducer fs2.pubsub.PubSubPublisher.http
com.permutive.pubsub.producer.http.PubsubHttpProducerConfig fs2.pubsub.PubSubPublisher.Config
com.permutive.pubsub.producer.Model.AsyncRecord fs2.pubsub.PubSubRecord.Subscriber.WithCallback
com.permutive.pubsub.producer.Model.MessageId fs2.pubsub.MessageId
com.permutive.pubsub.producer.Model.ProjectId fs2.pubsub.ProjectId
com.permutive.pubsub.producer.Model.SimpleRecord fs2.pubsub.PubSubRecord.Subscriber
com.permutive.pubsub.producer.Model.Topic fs2.pubsub.Topic
com.permutive.pubsub.producer.PubsubProducer fs2.pubsub.PubSubPublisher

Contributors to this project

CremboC bastewart TimWSpence travisbrown alejandrohdezma ChristianJohnston97 janstenpickle
CremboC bastewart TimWSpence travisbrown alejandrohdezma ChristianJohnston97 janstenpickle
chrisjl154 marcelocarlos desbo kythyra mcgizzle istreeter Joe8Bit
chrisjl154 marcelocarlos desbo kythyra mcgizzle istreeter Joe8Bit
arunas-cesonis
arunas-cesonis