permutive / fs2-google-pubsub

Google Cloud Pub/Sub stream-based client built on top of cats-effect, fs2 and http4s.

GitHub

fs2-google-pubsub

Build Status Maven Central

Google Cloud Pub/Sub stream-based client built on top of cats-effect, fs2 and http4s.

fs2-google-pubsub provides a mix of APIs, depending on the exact module. Consumers are provided as fs2 streams, while the producers are effect-based, utilising cats-effect.

Table of Contents

Module overview

Public modules

  • fs2-google-pubsub-grpc - an implementation that utilises Google's own Java library
  • fs2-google-pubsub-http - an implementation that uses http4s and communicates via the REST API

Internal modules

  • fs2-google-pubsub - shared classes for all implementations

Dependencies

Add one (or more) of the following to your build.sbt, see Releases for latest version:

libraryDependencies += "com.permutive" %% "fs2-google-pubsub-grpc" % Version

OR

libraryDependencies += "com.permutive" %% "fs2-google-pubsub-http" % Version

Also note you need to add an explicit HTTP client implementation. http4s provides different implementations for the clients, including blaze, async-http-client, jetty, okhttp and others.

If async-http-client is desired, add the following to build.sbt:

libraryDependencies += "org.http4s" %% "http4s-async-http-client" % "0.20.0"

Examples

Consumer (Google)

See PubsubGoogleConsumerConfig for more configuration options.

package com.permutive.pubsub.consumer.google

import cats.effect.{ExitCode, IO, IOApp}
import cats.syntax.all._
import com.permutive.pubsub.consumer.Model
import com.permutive.pubsub.consumer.decoder.MessageDecoder

object SimpleDriver extends IOApp {
  case class ValueHolder(value: String) extends AnyVal

  implicit val decoder: MessageDecoder[ValueHolder] = (bytes: Array[Byte]) => {
    Right(ValueHolder(new String(bytes)))
  }

  override def run(args: List[String]): IO[ExitCode] = {
    val stream = PubsubGoogleConsumer.subscribe[IO, ValueHolder](
      Model.ProjectId("test-project"),
      Model.Subscription("example-sub"),
      (msg, err, ack, _) => IO(println(s"Msg $msg got error $err")) >> ack,
      config = PubsubGoogleConsumerConfig(
        onFailedTerminate = _ => IO.unit
      )
    )

    stream
      .evalTap(t => t.ack >> IO(println(s"Got: ${t.value}")))
      .compile
      .drain
      .as(ExitCode.Success)
  }
}

Consumer (HTTP)

See PubsubHttpConsumerConfig for more configuration options.

package com.permutive.pubsub.consumer.http

import cats.effect._
import cats.syntax.all._
import com.permutive.pubsub.consumer.Model
import com.permutive.pubsub.consumer.decoder.MessageDecoder
import org.http4s.client.asynchttpclient.AsyncHttpClient
import fs2.Stream

import scala.util.Try

object Example extends IOApp {
  case class ValueHolder(value: String) extends AnyVal

  implicit val decoder: MessageDecoder[ValueHolder] = (bytes: Array[Byte]) => {
    Try(ValueHolder(new String(bytes))).toEither
  }

  override def run(args: List[String]): IO[ExitCode] = {
    val client = AsyncHttpClient.resource[IO]()

    val mkConsumer = PubsubHttpConsumer.subscribe[IO, ValueHolder](
      Model.ProjectId("test-project"),
      Model.Subscription("example-sub"),
      "/path/to/service/account",
      PubsubHttpConsumerConfig(
        host = "localhost",
        port = 8085,
        isEmulator = true,
      ),
      _,
      (msg, err, ack, _) => IO(println(s"Msg $msg got error $err")) >> ack,
    )

    Stream.resource(client)
      .flatMap(mkConsumer)
      .evalTap(t => t.ack >> IO(println(s"Got: ${t.value}")))
      .as(ExitCode.Success)
      .compile
      .lastOrError
  }
}

Producer (Google)

See PubsubProducerConfig for more configuration.

package com.permutive.pubsub.producer.google

import cats.effect.{ExitCode, IO, IOApp}
import cats.syntax.all._
import com.permutive.pubsub.producer.Model
import com.permutive.pubsub.producer.encoder.MessageEncoder

import scala.concurrent.duration._

object PubsubProducerExample extends IOApp {

  case class Value(v: Int) extends AnyVal

  implicit val encoder: MessageEncoder[Value] = new MessageEncoder[Value] {
    override def encode(a: Value): Either[Throwable, Array[Byte]] =
      Right(BigInt(a.v).toByteArray)
  }

  override def run(args: List[String]): IO[ExitCode] = {
    GooglePubsubProducer.of[IO, Value](
      Model.ProjectId("test-project"),
      Model.Topic("values"),
      config = PubsubProducerConfig[IO](
        batchSize = 100,
        delayThreshold = 100.millis,
        onFailedTerminate = e => IO(println(s"Got error $e")) >> IO.unit
      )
    ).use { producer =>
      producer.produce(
        Value(10),
      )
    }.map(_ => ExitCode.Success)
  }
}

Producer (HTTP)

See PubsubHttpProducerConfig for more configuration options.

package com.permutive.pubsub.producer.http

import cats.effect.{ExitCode, IO, IOApp}
import cats.syntax.all._
import com.github.plokhotnyuk.jsoniter_scala.core._
import com.github.plokhotnyuk.jsoniter_scala.macros._
import com.permutive.pubsub.producer.Model
import com.permutive.pubsub.producer.encoder.MessageEncoder
import org.http4s.client.asynchttpclient.AsyncHttpClient

import scala.concurrent.duration._
import scala.util.Try

object ExampleGoogle extends IOApp {

  final implicit val Codec: JsonValueCodec[ExampleObject] =
    JsonCodecMaker.make[ExampleObject](CodecMakerConfig())

  implicit val encoder: MessageEncoder[ExampleObject] = (a: ExampleObject) => {
    Try(writeToArray(a)).toEither
  }

  case class ExampleObject(
    projectId: String,
    url: String,
  )

  override def run(args: List[String]): IO[ExitCode] = {
    val mkProducer = HttpPubsubProducer.resource[IO, ExampleObject](
      projectId = Model.ProjectId("test-project"),
      topic = Model.Topic("example-topic"),
      googleServiceAccountPath = "/path/to/service/account",
      config = PubsubHttpProducerConfig(
        host = "pubsub.googleapis.com",
        port = 443,
        oauthTokenRefreshInterval = 30.minutes,
      ),
      _
    )

    val http = AsyncHttpClient.resource[IO]()
    http.flatMap(mkProducer).use { producer =>
      producer.produce(
        record = ExampleObject("70251cf8-5ffb-4c3f-8f2f-40b9bfe4147c", "example.com")
      )
    }.flatTap(output => IO(println(output))) >> IO.pure(ExitCode.Success)
  }
}

Producer (HTTP) automatic-batching

See PubsubHttpProducerConfig and BatchingHttpPublisherConfig for more configuration options.

package com.permutive.pubsub.producer.http

import cats.effect.{ExitCode, IO, IOApp}
import cats.syntax.all._
import com.github.plokhotnyuk.jsoniter_scala.core._
import com.github.plokhotnyuk.jsoniter_scala.macros._
import com.permutive.pubsub.producer.Model
import com.permutive.pubsub.producer.encoder.MessageEncoder
import io.chrisdavenport.log4cats.Logger
import io.chrisdavenport.log4cats.slf4j.Slf4jLogger
import org.http4s.client.asynchttpclient.AsyncHttpClient

import scala.concurrent.duration._
import scala.util.Try

object ExampleBatching extends IOApp {

  private[this] final implicit val unsafeLogger: Logger[IO] = Slf4jLogger.unsafeCreate[IO]

  final implicit val Codec: JsonValueCodec[ExampleObject] =
    JsonCodecMaker.make[ExampleObject](CodecMakerConfig())

  implicit val encoder: MessageEncoder[ExampleObject] = (a: ExampleObject) => {
    Try(writeToArray(a)).toEither
  }

  case class ExampleObject(
    projectId: String,
    url: String,
  )

  override def run(args: List[String]): IO[ExitCode] = {
    val mkProducer = BatchingHttpPubsubProducer.resource[IO, ExampleObject](
      projectId = Model.ProjectId("test-project"),
      topic = Model.Topic("example-topic"),
      googleServiceAccountPath = "/path/to/service/account",
      config = PubsubHttpProducerConfig(
        host = "localhost",
        port = 8085,
        oauthTokenRefreshInterval = 30.minutes,
        isEmulator = true,
      ),
      
      batchingConfig = BatchingHttpProducerConfig(
        batchSize = 10,
        maxLatency = 100.millis,

        retryTimes = 0,
        retryInitialDelay = 0.millis,
        retryNextDelay = _ + 250.millis,
      ),
      _
    )

    val messageCallback: Either[Throwable, Unit] => IO[Unit] = {
      case Right(_) => Logger[IO].info("Async message was sent successfully!")
      case Left(e) => Logger[IO].warn(e)("Async message was sent unsuccessfully!")
    }

    client
      .flatMap(mkProducer)
      .use { producer =>
        val produceOne = producer.produce(
          record = ExampleObject("1f9774be-9d7c-4dd9-8d97-855b681938a9", "example.com"),
        )

        val produceOneAsync = producer.produceAsync(
          record = ExampleObject("a84a3318-adbd-4eac-af78-eacf33be91ef", "example.com"),
          callback = messageCallback
        )

        for {
          result1 <- produceOne
          result2 <- produceOne
          result3 <- produceOne
          _       <- result1
          _       <- Logger[IO].info("First message was sent!")
          _       <- result2
          _       <- Logger[IO].info("Second message was sent!")
          _       <- result3
          _       <- Logger[IO].info("Third message was sent!")
          _       <- produceOneAsync
          _       <- IO.never
        } yield ()
      }
      .as(ExitCode.Success)
  }
}

HTTP vs Google

Google pros and cons

Pros of using the Google library

  • Underlying library well supported (theoretically)
  • Uses gRPC and HTTP/2 (should be faster)
  • Automatically handles authentication

Cons of using Google Library

  • Uses gRPC (if you uses multiple Google libraries with different gRPC versions, something will break)
  • Bloated
  • More dependencies
  • Less functional
  • Doesn't work with the official PubSub emulator (is in feature backlog)
  • Google API can change at any point (shouldn't be exposed to users of fs2-google-pubsub, but slows development/updating)

HTTP pros and cons

Pros of using HTTP variant

  • Less dependencies
  • Works with the PubSub emulator
  • Fully functional
  • Stable API
  • Theoretically less memory usage, especially for producer

Cons of using HTTP variant

  • Authentication is handled manually, hence potentially less secure/reliable
  • By default uses old HTTP 1.1 (potentially slower), but can be configured to use HTTP/2 if supported HTTP client backend is chosen

Licence

   Copyright 2018-2019 Permutive, Inc.

   Licensed under the Apache License, Version 2.0 (the "License");
   you may not use this file except in compliance with the License.
   You may obtain a copy of the License at

       http://www.apache.org/licenses/LICENSE-2.0

   Unless required by applicable law or agreed to in writing, software
   distributed under the License is distributed on an "AS IS" BASIS,
   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
   See the License for the specific language governing permissions and
   limitations under the License.