Typed, Functional Scala SQS Consumer and Composable Functional Effects
Do one thing and do it well micro birds library series
libraryDependencies += "us.oyanglul" %% "zhuyu" % ZhuyuVersion
You can quickly scaffold a zhuyu project by using jcouyang/zhuyu.g8
sbt new jcouyang/zhuyu.g8
Once you have the project, let us start form a simple example.
Before we dive into the example, there are some glossaries(types) you need to notice:
Envelop[A]
: model of message in SQSJob[A, Deps]
: Job description forA
, requiresDeps
Worker[A, Deps]
: providedDeps
andJob[A,Deps]
exists, worker is able to work on the job.
Say we have a long work flow, each step is safe to retry(idempotent) without notice user Ideally we could put all tasks from the work flow into sqs to guarantee each of them will finish eventually.
Here is the work flow:
- Init Payment
- Debit Payment
- Notify User Payment Result
- Prepare Order
1 and 2 have to finish by sequence, but 3 and 4 can be done at the same time.
sealed trait Event
case class InitPayment(id: Int) extends Event
case class DebitPayment(id: Int, cardnum: String) extends Event
case class NotifyPaymentResult(result: Int) extends Event
case class PrepareOrder(id: Int) extends Event
Let us start implement the tasks, or shall we call them Job
s
It is good to have nice convension so the implementation will be much more predictable.
So we can simply prefix On i.e. OnInitPayment
import effects._
trait OnInitPayment {
implicit val onInitPayment =
new Job[InitPayment, HasSQS with HasDoobie] { // <- (1)
override def distribute(message: Envelop[InitPayment]) =
for {
cardnum <- Doobie(sql"select cardnum from credit_card where id = ${message.content.id}".query[String].unique)
_ <- message.spread[Event](DebitPayment(cardnum)) // <- (2)
} yield ()
}
}
- create a
Job
to handleInitPayment
event, which requiresHasSQS
andHasDoobie
effects spread
theEvent
ofDebitPayment(cardnum)
, the spreaded event will bedistribute
d byJob[DebitPayment, ?]
Implicit not found: send Message from us.oyanglul.zhuyu.models.InitPayment to us.oyanglul.zhuyu.models.InitPayment will cause cycle loop, please check the order of us.oyanglul.zhuyu.models.InitPayment and us.oyanglul.zhuyu.models.InitPayment in us.oyanglul.zhuyu.models.Event
[error] _ <- spread[Event](DebitPayment(cardnum))
📝 that spread
is typelevel safe from cycle loop, which means
if you want to spread[Event](InitPayment)
in Job[InitPayment, HasSQS]
you have to define HasOrder
type class HasOrder[Event]
about order of InitPayment
and DebitPayment
.
object Event {
type EventOrder =
InitPayment :+:
DebitPayment :+:
NotifyPaymentResult :+:
PrepareOrder :+: CNil
implicit val orderedEvent: HasOrder.Aux[Event, EventOrder] =
new HasOrder[Event] {
type Order = EventOrder
}
}
spread
event in wrong order will cause compile error(that to shapeless so we can do counting at typelevel)
Once implemented the new Job, register it in pacakge.scala
so Worker
knows where to look for jobs.
package object jobs
extends OnInitPayment
with OnDebitPayment
with OnNotifyUser
with OnPrepareOrder
Now we have 4 jobs ready for pick up, where are our workers?
import jobs._
object impl extends HasSQS with HasHttp4s with HasS3 with HasDoobie {...}
Worker.work[Event, HasSQS with HasHttp4s with HasS3 with HasDoobie].run(impl)
everytime our Worker run
:
- Worker will start polling
Event
from sqs - find out what
Job
theEvent
belongs to - work on the
Job
by instruction fromJob.distribute
method
Worker
is type level safe as well, for any Event
that the Worker
cannot find correct Job
, compile error will occur. Thus you never encounter runtime error for unprepared Job
, all Event
Worker
work on will definitly have Job
defined.
import effects._
trait OnDebitPayment {
implicit val onDebitPayment =
new Job[DebitPayment, HasSQS with HasHttp4s] {
override def distribute(message: Envelop[DebitPayment]) =
for {
result <- Http4s(_.expect[Int](uri"http://payment-gateway.com/pay/${message.content.cardnum}")
_ <- message.spread[Event](NotifyPaymentResult(result))
_ <- message.spread[Event](PrepareOrder(message.content.id))
} yield ()
}
}
The previous example is 1 way message only, the requester put message in the queue never expect any response.
But there is some common use case of the request-response messaging pattern, where we can use AWS requester creates a temporary queue for receiving each response message1.
import effects._
trait OnDebitPayment {
implicit val onDebitPayment =
new Job[DebitPayment, HasSQS with HasHttp4s] {
override def distribute(message: Envelop[DebitPayment]) =
for {
result <- Http4s(_.expect[Int](uri"http://payment-gateway.com/pay/${message.content.cardnum}")
+ _ <- message.respond(result)
_ <- message.spread[Event](NotifyPaymentResult(result))
_ <- message.spread[Event](PrepareOrder(message.content.id))
} yield ()
}
}
There are few builtin effect implementations, you can also simply create your own effect.
Just may sure your effect has type Kleisli[IO, HasSomething, A]
libraryDependencies += "us.oyanglul" %% "zhuyu-effect-http4s" % ZhuyuVersion
effects.Http4s(_.status(GET(uri"https://blog.oyanglul.us")))
// Kleisli[IO, HasHttp4s, Status]
libraryDependencies += "us.oyanglul" %% "zhuyu-effect-s3" % ZhuyuVersion
effects.S3(_.putObject("example-bucket", "filename", "content"))
// Kleisli[IO, HasS3, PutObjectResult]
libraryDependencies += "us.oyanglul" %% "zhuyu-effect-doobie" % ZhuyuVersion
effects.Doobie(sql"select 250".query[Int].unique)
// Kleisli[IO, HasDoobie, Int]
All these effects can be composed
val program = for {
_ <- effects.Http4s(_.status(GET(uri"https://blog.oyanglul.us")))
_ <- effects.S3(_.putObject("example-bucket", "filename", "content"))
_ <- effects.Doobie(sql"select 250".query[Int].unique)
} yield()
// Kleisli[IO, HasHttp4s with HasS3 with HasDoobie, Unit]
to run effects simply provide impletations
object impl
with HasHttp4s
with HasS3
with HasDoobie {
//...implement what ever compiler complains
}
program.run(impl) // IO[Unit]
for more detail, look at example Main.scala
and jobs