Version Matrix

ZIO Connector for AWS SQS

CircleCI

This library is a ZIO-powered client for AWS SQS. It is built on top of the AWS SDK for Java 2.0 via the automatically generated wrappers from zio-aws.

Add the dependency

To use zio-sqs, add the following line in your build.sbt file:

libraryDependencies += "dev.zio" %% "zio-sqs" % "0.4.1"

How to use

In order to use the connector, you need to provide your program with a configured SQS client as an Sqs ZLayer. You can use io.github.vigoo.zioaws.sqs.live to use default AWS SDK settings or use .customized (refer to the AWS SDK Documentation if you need help customizing it). See also the ZIO documentation on how to use layers.

Publish messages

Use Producer.make to instantiate an instance of Producer trait that can be used to publish objects of type T to the queue.

def make[R, T](
    queueUrl: String,
    serializer: Serializer[T],
    settings: ProducerSettings = ProducerSettings()
  ): ZManaged[R with Sqs with Clock, Throwable, Producer[T]]

where:

  • queueUrl: String - an SQS queue URL
  • serializer: Serializer[T] - a instance of zio.sqs.serialization.Serializer that can be used to convert an object of type T to a string.
      trait Serializer[T] {
        def apply(t: T): String
      }
    If a published message is already a string, Serializer.serializeString can be used.
  • settings: ProducerSettings - a set of settings used to configure the producer.
    • batchSize: Int - The size of the batch to use, [1-10] (default: 10).
    • duration: Duration - Time to wait for the batch to be full (have the specified batchSize) (default: 500 milliseconds).
    • parallelism: Int - The number of concurrent requests to make to SQS (default: 16).
    • retryDelay: Duration - Time to wait before retrying event republishing if it failed with a recoverable error (default: 250 milliseconds). The errors returned from SQS could either recoverable or not. An example of recoverable error -- when the server returned the code: ServiceUnavailable
    • retryMaxCount: Int - The number of retries to make for a posted event (default: 10).

Producer

Producer contains two set of methods:

  • methods that fail the resulting Task or Stream if SQS server returns an error for a published event.

    • def produce(e: ProducerEvent[T]): Task[ProducerEvent[T]] - Publishes a single event and fails the task. Fails the Task if the server returns an error.
    • def produceBatch(es: Iterable[ProducerEvent[T]]): Task[List[ProducerEvent[T]]] - Publishes a batch of events. Fails the Task if the server returns an error for any of the provided events.
    • def sendStream: Stream[Throwable, ProducerEvent[T]] => ZStream[Any, Throwable, ProducerEvent[T]] - Stream that takes the events and produces a stream with published events. Fails if the server returns an error for any of the published events.
    • def sendSink: ZSink[Any, Throwable, Nothing, Iterable[ProducerEvent[T]], Unit] - Sink that can be used to publish events. Fails if the server returns an error for any of the published events.
  • methods that do not fail the operation but return ErrorOrEvent[T] (defied as Either[ProducerError[T], ProducerEvent[T]]).

    • def sendStreamE: Stream[Throwable, ProducerEvent[T]] => ZStream[Any, Throwable, ErrorOrEvent[T]] - Stream that takes the events and produces a stream with the results. Doesn't fail if the server returns an error for any of the published events.
    • def produceBatchE(es: Iterable[ProducerEvent[T]]): Task[List[ErrorOrEvent[T]]] - Publishes a batch of events. Completes when all input events were processed (published to the server or failed with an error). Doesn't fail the Task if the server returns an error for any of the provided events.

Producer tries to accumulate messages in batches and send them to the server. If messages should be sent one by one and batching is not expected, set ProducerSettings.batchSize to 1.

ProducerEvent

ProducerEvent[T] is an event that is published to SQS and contains the following parameters that could be configured:

  • data: T - Object to publish to SQS. A serializer for this type should be provided when a Producer is instantiated.
  • attributes: Map[String, MessageAttributeValue] - A map of attributes to set.
  • groupId: Option[String] - Assigns a specific message group to the message.
  • deduplicationId: Option[String] - Token used for deduplication of sent messages.

If a plain string should be published without any additional attributes a ProducerEvent can be created directly:

val str: String = "message to publish"
val event: ProducerEvent = ProducerEvent(str)

ProducerError

ProducerError[T] represents an error details that were returned from the server.

  • senderFault: Boolean - Specifies whether the error happened due to the caller of the batch API action.
  • code: String - An error code representing why the action failed on this entry.
  • message: Option[String] - A message explaining why the action failed on this entry.
  • event: ProducerEvent[T] - An event that triggered this error on the server.

Publish Example

import io.github.vigoo.zioaws
import io.github.vigoo.zioaws.sqs.Sqs
import zio.clock.Clock
import zio.sqs._
import zio.sqs.producer._
import zio.sqs.serialization._
import zio.stream._
import zio.{ ExitCode, RIO, URIO, ZLayer }

object PublishExample extends zio.App {

  val client: ZLayer[Any, Throwable, Sqs] = zioaws.netty.default >>>
    zioaws.core.config.default >>>
    zioaws.sqs.live

  val events                                                = List("message1", "message2").map(ProducerEvent(_))
  val queueName                                             = "TestQueue"
  val program: RIO[Clock with Sqs, Either[Throwable, Unit]] = for {
    queueUrl    <- Utils.getQueueUrl(queueName)
    producer     = Producer.make(queueUrl, Serializer.serializeString)
    errOrResult <- producer.use(p => p.sendStream(ZStream(events: _*)).runDrain.either)
  } yield errOrResult

  override def run(args: List[String]): URIO[zio.ZEnv, ExitCode] =
    program.provideCustomLayer(client).exitCode
}

Consume messages

Use SqsStream.apply to get a stream of messages from a queue. It returns a ZIO Stream that you can consume with all the operators available.

def apply(
  queueUrl: String,
  settings: SqsStreamSettings = SqsStreamSettings()
): ZStream[Sqs, Throwable, Message]

SqsStreamSettings allows your to configure a number of things:

  • autoDelete: if true, messages will be automatically deleted from the queue when they're consumed by the stream, if false you have to delete them explicitly by calling SqsStream.deleteMessage (default true)
  • stopWhenQueueEmpty: if true the stream will close when there the queue is empty, if false the stream will go on forever (default false)
  • attributeNames: see the related page on AWS docs
  • maxNumberOfMessages: number of messages to query at once from SQS (default 1)
  • messageAttributeNames: see the related page on AWS docs
  • visibilityTimeout: see the related page on AWS docs (default Some(30). If set to None, the queue's value will be used.)
  • waitTimeSeconds: see the related page on AWS docs (default Some(20). If set to None, the queue's value will be used.)

Example:

import zio.sqs.{SqsStream, SqsStreamSettings}

SqsStream(
  queueUrl,
  SqsStreamSettings(stopWhenQueueEmpty = true, waitTimeSeconds = Some(3))
).foreach(msg => UIO(println(msg.body)))

Full example

import io.github.vigoo.zioaws
import io.github.vigoo.zioaws.core.config.CommonAwsConfig
import io.github.vigoo.zioaws.sqs.Sqs
import software.amazon.awssdk.auth.credentials.{ AwsBasicCredentials, StaticCredentialsProvider }
import software.amazon.awssdk.regions.Region
import zio.clock.Clock
import zio.sqs.producer.{ Producer, ProducerEvent }
import zio.sqs.serialization.Serializer
import zio.sqs.{ SqsStream, SqsStreamSettings, Utils }
import zio._

object TestApp extends zio.App {
  val queueName = "TestQueue"

  val client: ZLayer[Any, Throwable, Sqs] = zioaws.netty.default ++
    ZLayer.succeed(
      CommonAwsConfig(
        region = Some(Region.of("ap-northeast-2")),
        credentialsProvider = StaticCredentialsProvider.create(AwsBasicCredentials.create("key", "key")),
        endpointOverride = None,
        commonClientConfig = None
      )
    ) >>>
    zioaws.core.config.configured() >>>
    zioaws.sqs.live

  val program: RIO[Sqs with Clock, Unit] = for {
    _        <- Utils.createQueue(queueName)
    queueUrl <- Utils.getQueueUrl(queueName)
    producer  = Producer.make(queueUrl, Serializer.serializeString)
    _        <- producer.use { p =>
                  p.produce(ProducerEvent("hello"))
                }
    _        <- SqsStream(
                  queueUrl,
                  SqsStreamSettings(stopWhenQueueEmpty = true, waitTimeSeconds = Some(3))
                ).foreach(msg => UIO(println(msg.body)))
  } yield ()

  override def run(args: List[String]): URIO[zio.ZEnv, ExitCode] =
    program.provideCustomLayer(client).exitCode
}