Google Cloud Pub/Sub stream-based client built on top of cats-effect, fs2 and http4s.
fs2-google-pubsub provides a mix of APIs, depending on the exact module. Consumers are provided as fs2 streams,
while the producers are effect-based, utilising cats-effect.
fs2-google-pubsub-grpc- an implementation that utilises Google's own Java libraryfs2-google-pubsub-http- an implementation that useshttp4sand communicates via the REST API
fs2-google-pubsub- shared classes for all implementations
Add one (or more) of the following to your build.sbt, see Releases for latest version:
libraryDependencies += "com.permutive" %% "fs2-google-pubsub-grpc" % Version
OR
libraryDependencies += "com.permutive" %% "fs2-google-pubsub-http" % Version
Also note you need to add an explicit HTTP client implementation. http4s provides different implementations
for the clients, including blaze, async-http-client, jetty, okhttp and others.
If async-http-client is desired, add the following to build.sbt:
libraryDependencies += "org.http4s" %% "http4s-async-http-client" % "0.20.0"
See PubsubGoogleConsumerConfig for more configuration options.
package com.permutive.pubsub.consumer.google
import cats.effect.{ExitCode, IO, IOApp}
import cats.syntax.all._
import com.permutive.pubsub.consumer.Model
import com.permutive.pubsub.consumer.decoder.MessageDecoder
object SimpleDriver extends IOApp {
  case class ValueHolder(value: String) extends AnyVal
  implicit val decoder: MessageDecoder[ValueHolder] = (bytes: Array[Byte]) => {
    Right(ValueHolder(new String(bytes)))
  }
  override def run(args: List[String]): IO[ExitCode] = {
    val stream = PubsubGoogleConsumer.subscribe[IO, ValueHolder](
      Model.ProjectId("test-project"),
      Model.Subscription("example-sub"),
      (msg, err, ack, _) => IO(println(s"Msg $msg got error $err")) >> ack,
      config = PubsubGoogleConsumerConfig(
        onFailedTerminate = _ => IO.unit
      )
    )
    stream
      .evalTap(t => t.ack >> IO(println(s"Got: ${t.value}")))
      .compile
      .drain
      .as(ExitCode.Success)
  }
}See PubsubHttpConsumerConfig for more configuration options.
package com.permutive.pubsub.consumer.http
import cats.effect._
import cats.syntax.all._
import com.permutive.pubsub.consumer.Model
import com.permutive.pubsub.consumer.decoder.MessageDecoder
import org.http4s.client.asynchttpclient.AsyncHttpClient
import fs2.Stream
import scala.util.Try
object Example extends IOApp {
  case class ValueHolder(value: String) extends AnyVal
  implicit val decoder: MessageDecoder[ValueHolder] = (bytes: Array[Byte]) => {
    Try(ValueHolder(new String(bytes))).toEither
  }
  override def run(args: List[String]): IO[ExitCode] = {
    val client = AsyncHttpClient.resource[IO]()
    val mkConsumer = PubsubHttpConsumer.subscribe[IO, ValueHolder](
      Model.ProjectId("test-project"),
      Model.Subscription("example-sub"),
      "/path/to/service/account",
      PubsubHttpConsumerConfig(
        host = "localhost",
        port = 8085,
        isEmulator = true,
      ),
      _,
      (msg, err, ack, _) => IO(println(s"Msg $msg got error $err")) >> ack,
    )
    Stream.resource(client)
      .flatMap(mkConsumer)
      .evalTap(t => t.ack >> IO(println(s"Got: ${t.value}")))
      .as(ExitCode.Success)
      .compile
      .lastOrError
  }
}
See PubsubProducerConfig for more configuration.
package com.permutive.pubsub.producer.google
import cats.effect.{ExitCode, IO, IOApp}
import cats.syntax.all._
import com.permutive.pubsub.producer.Model
import com.permutive.pubsub.producer.encoder.MessageEncoder
import scala.concurrent.duration._
object PubsubProducerExample extends IOApp {
  case class Value(v: Int) extends AnyVal
  implicit val encoder: MessageEncoder[Value] = new MessageEncoder[Value] {
    override def encode(a: Value): Either[Throwable, Array[Byte]] =
      Right(BigInt(a.v).toByteArray)
  }
  override def run(args: List[String]): IO[ExitCode] = {
    GooglePubsubProducer.of[IO, Value](
      Model.ProjectId("test-project"),
      Model.Topic("values"),
      config = PubsubProducerConfig[IO](
        batchSize = 100,
        delayThreshold = 100.millis,
        onFailedTerminate = e => IO(println(s"Got error $e")) >> IO.unit
      )
    ).use { producer =>
      producer.produce(
        Value(10),
      )
    }.map(_ => ExitCode.Success)
  }
}
See PubsubHttpProducerConfig for more configuration options.
package com.permutive.pubsub.producer.http
import cats.effect.{ExitCode, IO, IOApp}
import cats.syntax.all._
import com.github.plokhotnyuk.jsoniter_scala.core._
import com.github.plokhotnyuk.jsoniter_scala.macros._
import com.permutive.pubsub.producer.Model
import com.permutive.pubsub.producer.encoder.MessageEncoder
import org.http4s.client.asynchttpclient.AsyncHttpClient
import scala.concurrent.duration._
import scala.util.Try
object ExampleGoogle extends IOApp {
  final implicit val Codec: JsonValueCodec[ExampleObject] =
    JsonCodecMaker.make[ExampleObject](CodecMakerConfig)
  implicit val encoder: MessageEncoder[ExampleObject] = (a: ExampleObject) => {
    Try(writeToArray(a)).toEither
  }
  case class ExampleObject(
    projectId: String,
    url: String,
  )
  override def run(args: List[String]): IO[ExitCode] = {
    val mkProducer = HttpPubsubProducer.resource[IO, ExampleObject](
      projectId = Model.ProjectId("test-project"),
      topic = Model.Topic("example-topic"),
      googleServiceAccountPath = "/path/to/service/account",
      config = PubsubHttpProducerConfig(
        host = "pubsub.googleapis.com",
        port = 443,
        oauthTokenRefreshInterval = 30.minutes,
      ),
      _
    )
    val http = AsyncHttpClient.resource[IO]()
    http.flatMap(mkProducer).use { producer =>
      producer.produce(
        record = ExampleObject("70251cf8-5ffb-4c3f-8f2f-40b9bfe4147c", "example.com")
      )
    }.flatTap(output => IO(println(output))) >> IO.pure(ExitCode.Success)
  }
}See PubsubHttpProducerConfig and BatchingHttpPublisherConfig for more configuration options.
package com.permutive.pubsub.producer.http
import cats.effect.{ExitCode, IO, IOApp}
import cats.syntax.all._
import com.github.plokhotnyuk.jsoniter_scala.core._
import com.github.plokhotnyuk.jsoniter_scala.macros._
import com.permutive.pubsub.producer.Model
import com.permutive.pubsub.producer.encoder.MessageEncoder
import org.typelevel.log4cats.Logger
import org.typelevel.log4cats.slf4j.Slf4jLogger
import org.http4s.client.asynchttpclient.AsyncHttpClient
import scala.concurrent.duration._
import scala.util.Try
object ExampleBatching extends IOApp {
  private[this] final implicit val unsafeLogger: Logger[IO] = Slf4jLogger.unsafeCreate[IO]
  final implicit val Codec: JsonValueCodec[ExampleObject] =
    JsonCodecMaker.make[ExampleObject](CodecMakerConfig)
  implicit val encoder: MessageEncoder[ExampleObject] = (a: ExampleObject) => {
    Try(writeToArray(a)).toEither
  }
  case class ExampleObject(
    projectId: String,
    url: String,
  )
  override def run(args: List[String]): IO[ExitCode] = {
    val mkProducer = BatchingHttpPubsubProducer.resource[IO, ExampleObject](
      projectId = Model.ProjectId("test-project"),
      topic = Model.Topic("example-topic"),
      googleServiceAccountPath = "/path/to/service/account",
      config = PubsubHttpProducerConfig(
        host = "localhost",
        port = 8085,
        oauthTokenRefreshInterval = 30.minutes,
        isEmulator = true,
      ),
      batchingConfig = BatchingHttpProducerConfig(
        batchSize = 10,
        maxLatency = 100.millis,
        retryTimes = 0,
        retryInitialDelay = 0.millis,
        retryNextDelay = _ + 250.millis,
      ),
      _
    )
    val messageCallback: Either[Throwable, Unit] => IO[Unit] = {
      case Right(_) => Logger[IO].info("Async message was sent successfully!")
      case Left(e) => Logger[IO].warn(e)("Async message was sent unsuccessfully!")
    }
    client
      .flatMap(mkProducer)
      .use { producer =>
        val produceOne = producer.produce(
          record = ExampleObject("1f9774be-9d7c-4dd9-8d97-855b681938a9", "example.com"),
        )
        val produceOneAsync = producer.produceAsync(
          record = ExampleObject("a84a3318-adbd-4eac-af78-eacf33be91ef", "example.com"),
          callback = messageCallback
        )
        for {
          result1 <- produceOne
          result2 <- produceOne
          result3 <- produceOne
          _       <- result1
          _       <- Logger[IO].info("First message was sent!")
          _       <- result2
          _       <- Logger[IO].info("Second message was sent!")
          _       <- result3
          _       <- Logger[IO].info("Third message was sent!")
          _       <- produceOneAsync
          _       <- IO.never
        } yield ()
      }
      .as(ExitCode.Success)
  }
}Pros of using the Google library
- Underlying library well supported (theoretically)
 - Uses gRPC and HTTP/2 (should be faster)
 - Automatically handles authentication
 
Cons of using Google Library
- Uses gRPC (if you uses multiple Google libraries with different gRPC versions, something will break)
 - Bloated
 - More dependencies
 - Less functional
 - Doesn't work with the official PubSub emulator (is in feature backlog)
 - Google API can change at any point (shouldn't be exposed to users of 
fs2-google-pubsub, but slows development/updating) 
Pros of using HTTP variant
- Less dependencies
 - Works with the PubSub emulator
 - Fully functional
 - Stable API
 - Theoretically less memory usage, especially for producer
 
Cons of using HTTP variant
- Authentication is handled manually, hence potentially less secure/reliable
 - By default uses old HTTP 1.1 (potentially slower), but can be configured to use HTTP/2 if supported HTTP client backend is chosen
 
   Copyright 2018-2019 Permutive, Inc.
   Licensed under the Apache License, Version 2.0 (the "License");
   you may not use this file except in compliance with the License.
   You may obtain a copy of the License at
       http://www.apache.org/licenses/LICENSE-2.0
   Unless required by applicable law or agreed to in writing, software
   distributed under the License is distributed on an "AS IS" BASIS,
   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
   See the License for the specific language governing permissions and
   limitations under the License.