naoh87 / lettucef   0.1.8

GitHub

Scala FP wrapper for Lettuce with Cats Effect

Scala versions: 3.x 2.13

LettuceF

Scala FP Redis client wrapper for Lettuce with cats-effect 3

Motivation

Lettuce is incredible performance Java Redis client, but some api is not compatible with scala mind.

This library hide the matters when you use Lettuce.

Getting Started

Core

Add to build.sbt

libraryDependencies += "dev.naoh" %% "lettucef-core" % "0.1.3"

Basic usage

Redis connection and commands are thread safe.

import dev.naoh.lettucef.api.LettuceF

def run: IO[Unit] = {
  for {
    client <- LettuceF.cluster[IO](RedisClusterClient.create("redis://127.0.0.1:7000"))
    sync <- client.connect(StringCodec.UTF8).map(_.sync())
  } yield for {
    _ <- sync.set("key", "value")
    v <- sync.get("key")
  } yield println(v) // Some(value)
}.use(identity)

Lua

def run: IO[Unit] = {
    for {
      client <- LettuceF.client[IO](RedisClient.create())
      cmd <- client.connect(StringCodec.UTF8, RedisURI.create("redis://127.0.0.1")).map(_.sync())
    } yield for {
      _ <- cmd.eval("""return 1;""", Nil, Nil).flatTap(IO.println)
      // RedisInteger(1)
      _ <- cmd.eval("""return redis.call('set',KEYS[1],ARGV[1])""", "foo" :: Nil, "bar" :: Nil).flatTap(IO.println)
      // RedisBulk(OK)
      _ <- cmd.eval("""return {1, {err="ERR"}, "B"};""", Nil, Nil).flatTap(IO.println)
      // RedisArray(RedisInteger(1), RedisError(ERR), RedisBulk(B))
    } yield ()
}.use(identity)

Pipelining

import dev.naoh.lettucef.api.LettuceF

def run: IO[Unit] = {
  for {
    client <- LettuceF.cluster[IO](RedisClusterClient.create("redis://127.0.0.1:7000"))
    conn <- client.connect(StringCodec.UTF8)
    conn2 <- client.connect(StringCodec.UTF8)
    sync = conn.sync()
    async = conn.async()
  } yield for {
    start <- IO(System.currentTimeMillis())
    elapsed = (any: Any) =>
      IO.println("%4d ms > %s".format(System.currentTimeMillis() - start, any))
    _ <- async.set("Ix", "0")
    _ <- async.incr("Ix").replicateA_(100000)
    aget <- async.get("Ix")
    _ <- async.incr("Ix")
    _ <- conn2.sync().get("Ix").flatTap(elapsed)
    //  679 ms > Some(6426)   Executions run out of order between different connections
    _ <- sync.get("Ix").flatTap(elapsed)
    // 3498 ms > Some(100001) Executions run in order on the same connection
    _ <- aget.flatTap(elapsed)
    // 3499 ms > Some(100000)
  } yield ()
}.use(identity)

PubSub

This api is also just wrap lettuce api. It's bothering to control right consistency.

I recommend to use stream extension if you want to just receive messages from subscribe / psubscribe.

import dev.naoh.lettucef.api.LettuceF

def run: IO[Unit] = {
  for {
    client <- LettuceF.cluster[IO](RedisClusterClient.create("redis://127.0.0.1:7000"))
    pub <- client.connect(StringCodec.UTF8).map(_.sync())
    sub <- client.connectPubSub(StringCodec.UTF8)
    _ <- sub.setListener(RedisPubSubF.makeListener(println))
  } yield for {
    _ <- sub.subscribe("Topic")
    _ <- IO.sleep(100.milli)
    _ <- List.range(0, 3).map(i => pub.publish("Topic", i.toString)).sequence
    _ <- IO.sleep(100.milli)
    _ <- sub.unsubscribe("Topic")
    _ <- IO.sleep(100.milli)
  } yield ()
}.use(identity)
// Subscribed(Topic,1)
// Message(Topic,0)
// Message(Topic,1)
// Message(Topic,2)
// Unsubscribed(Topic,0)

Stream Extension

libraryDependencies += "dev.naoh" %% "lettucef-streams" % version

PubSub

import dev.naoh.lettucef.api.LettuceF
import dev.naoh.lettucef.api.streams._

def run: IO[Unit] = {
  val N = 3
  for {
    client <- LettuceF.cluster[IO](RedisClusterClient.create("redis://127.0.0.1:7000"))
    pub <- client.connect(StringCodec.UTF8).map(_.sync())
    sub <- client.connectPubSub(StringCodec.UTF8).stream()
    _ <- sub.setListener(RedisPubSubF.makeListener(printSubscription))
    _ <- sub.subscribe("A").evalMap(m => pub.publish("B", m.message)).compile.drain.background
    _ <- sub.subscribe("B").evalMap(m => pub.publish("C", m.message)).compile.drain.background
    _ <- sub.subscribeAwait("A", "B", "C").flatMap(_.debug().take(N * 3).compile.drain.uncancelable.background)
  } yield for {
    _ <- List.range(0, N).map(i => pub.publish("A", i.toHexString)).sequence
  } yield ()
}.use(identity)

val printSubscription: PushedMessage[String, String] => Unit = {
  case m: Subscribed[_] => println(s"> $m")
  case m: Unsubscribed[_] => println(s"> $m")
  case _ => ()
}
// > Subscribed(A,1)
// > Subscribed(C,2)
// > Subscribed(B,3)
// Message(A,0)
// Message(A,1)
// Message(A,2)
// Message(B,0)
// Message(C,0)
// Message(B,1)
// Message(B,2)
// Message(C,1)
// Message(C,2)
// > Unsubscribed(C,2)
// > Unsubscribed(B,1)
// > Unsubscribed(A,0)

Scan

import dev.naoh.lettucef.api.LettuceF
import dev.naoh.lettucef.api.streams._

def run: IO[Unit] = {
  for {
    client <- LettuceF.cluster[IO](RedisClusterClient.create("redis://127.0.0.1:7000"))
    conn <- client.connect(StringCodec.UTF8)
  } yield for {
    _ <- conn.sync().del("Set")
    _ <- List.range(0, 100).map(_.toHexString).grouped(10)
      .map(args => conn.sync().sadd("Set", args: _*))
      .toList.sequence
    ret <- conn.stream()
      .sscan("Set", ScanArgs.Builder.limit(20))
      .chunks.map(_.size)
      .compile.toList
  } yield println(ret)
  // List(23, 23, 20, 21, 13)
}.use(identity)

Extra Extension

libraryDependencies += "dev.naoh" %% "lettucef-extras" % version 

Connection Pool

It is useful to execute blocking command

https://lettuce.io/core/release/reference/#_connection_pooling

import dev.naoh.lettucef.api.LettuceF
import dev.naoh.lettucef.api.extras.ResourcePool

val count = new AtomicInteger(0)
val printResource: Resource[IO, Int] =
  Resource.make(IO(count.getAndIncrement()).flatTap(n => IO.println(s"$n >")))(n => IO.println(s"$n <"))
def run: IO[Unit] = {
  for {
    client <- LettuceF.cluster[IO](RedisClusterClient.create("redis://127.0.0.1:7000"))
    pub <- client.connect(StringCodec.UTF8).map(_.sync())
    pool <- client.connect(StringCodec.UTF8).map(_.sync())
            .flatMap(printResource.as)
            .pipe(ResourcePool(maxIdle = 2, minIdle = 1).make(_))
  } yield for {
    _ <- pub.unlink("hoge")
    _ <- List.range(0, 3).map(i => pub.rpush("hoge", i.toString)).sequence
    _ <- IO.println("start")
    _ <- pool.use(_.blpop(1, "hoge").flatTap(IO.println)).parReplicateA(4)
    _ <- IO.println("end")
  } yield ()
}.use(identity)
// 0 >
// start          # active=0 idle=1
// 1 >
// 2 >
// 3 >
// Some((hoge,0)) # active=4 idle=0
// Some((hoge,2)) # active=3 idle=1
// Some((hoge,1)) # active=2 idle=2
// 1 <
// None           # active=1 idle=2
// 3 <            
// end            # active=0 idle=2
// 0 <
// 2 <

Features

  • Support Scala 2.13 and 3.x
  • Convert RedisFuture I/F with Async of cats-effect 3
  • Convert Java collection types to scala collection
  • New type-safe Lua scripting I/F
  • Eliminate java.lang.Object I/F
  • Eliminate null
  • Add alternative models for Scala
  • Add PubSub I/F
  • Support cluster/non-cluster RedisClient
  • Support All Commands
    • Bitmaps
    • Cluster
    • Connection
    • Geo
    • Hashed
    • HyperLogLog
    • Keys
    • Lists
    • Pub/Sub
    • Scripting
    • Sentinel
    • Server
    • Sets
    • SortedSets
    • Streams
    • Strings
    • Transactions

Benchmark

Simple benchmark on laptop

Benchmark               Mode  Cnt  Score   Error  Units
LettuceF.parallel20k      ss   10  1.959 ± 0.135   s/op
LettuceF.pipeline50k      ss   10  2.546 ± 0.157   s/op
Redis4Cats.parallel20k    ss   10  2.081 ± 0.525   s/op
Redis4Cats.pipeline50k    Fail to complete run
ScalaRedis.parallel20k    ss   10  2.671 ± 0.404   s/op
ScalaRedis.pipeline50k    ss   10  5.860 ± 0.725   s/op