j5ik2o / reactive-redis

Akka-Stream based Redis Client for Scala

GitHub

reactive-redis

CircleCI Maven Central Scaladoc License: MIT

Akka-Stream based Redis Client for Scala

Concept

  • Transport is akka-stream 2.5.x.
  • Response parser is fastparse.
  • monix.eval.Task support.

Installation

Add the following to your sbt build (Scala 2.11.x, 2.12.x):

Release Version

resolvers += "Sonatype OSS Release Repository" at "https://oss.sonatype.org/content/repositories/releases/"

libraryDependencies += "com.github.j5ik2o" %% "reactive-redis-core" % "1.0.14"

Snapshot Version

resolvers += "Sonatype OSS Snapshot Repository" at "https://oss.sonatype.org/content/repositories/snapshots/"

libraryDependencies += "com.github.j5ik2o" %% "reactive-redis-core" % "1.0.15-SNAPSHOT"

Support Commands

  • Cluster
Command Support
CLUSTER ADDSLOTS TODO
CLUSTER ADDSLOTS TODO
CLUSTER ADDSCLUSTER COUNT-FAILURE-REPORTSLOTS TODO
CLUSTER COUNTKEYSINSLOT TODO
CLUSTER DELSLOTS TODO
CLUSTER FAILOVER TODO
CLUSTER FORGET TODO
CLUSTER GETKEYSINSLOT TODO
CLUSTER INFO TODO
CLUSTER KEYSLOT TODO
CLUSTER MEET TODO
CLUSTER NODES TODO
CLUSTER REPLICATE TODO
CLUSTER RESET TODO
CLUSTER SAVECONFIG TODO
CLUSTER SET-CONFIG-EPOCH TODO
CLUSTER SETSLOT TODO
CLUSTER SLAVES TODO
CLUSTER SLOTS TODO
READONLY TODO
READWRITE TODO
  • Connection
Command Support
AUTH Supported
ECHO Supported
PING Supported
QUIT Supported
SELECT Supported
SWAPDB Supported
  • Geo
Command Support
GEOADD TODO
GEODIST TODO
GEOHASH TODO
GEOPOS TODO
GEORADIUS TODO
GEORADIUSBYMEMBER TODO
  • Hashes
Command Support
HDEL Supported
HEXISTS Supported
HGET Supported
HGETALL Supported
HINCRBY TODO
HINCRBYFLOAT TODO
HKEYS TODO
HLEN TODO
HMGET TODO
HMSET TODO
HSCAN TODO
HSET Supported
HSETNX Supported
HSTRLEN TODO
HVALS TODO
  • HyperLogLog
Command Support
PFADD TODO
PFCOUNT TODO
PFMERGE TODO
  • Keys
Command Support
DEL Supported
DUMP Supported
EXISTS Supported
EXPIRE Supported
EXPIREAT Supported
KEYS Supported
MIGRATE Supported
MOVE Supported
OBJECT Supported
PERSIST Supported
PEXPIRE Supported
PEXPIREAT Supported
PTTL Supported
RANDOMKEY Supported
RENAME Supported
RENAMENX Supported
RESTORE TODO
SCAN Supported
SORT Supported
TOUCH Supported
TTL Supported
TYPE Supported
UNLINK Supported
WAIT Supported
  • Lists
BLPOP Supported
BRPOP Supported
BRPOPLPUSH Supported
LINDEX
LINSERT
LLEN Supported
LPOP
LPUSH Supported
LPUSHX
LRANGE Supported
LREM
LSET
LTRIM
RPOP
RPOPLPUSH
RPUSH Supported
RPUSHX
  • Pub/Sub
PSUBSCRIBE
PUBLISH
PUBSUB
PUNSUBSCRIBE
SUBSCRIBE
UNSUBSCRIBE
  • Scripting
EVAL
EVALSHA
SCRIPT DEBUG
SCRIPT EXISTS
SCRIPT FLUSH
SCRIPT KILL
SCRIPT LOAD
  • Server
BGREWRITEAOF
BGSAVE
CLIENT GETNAME
CLIENT KILL
CLIENT LIST
CLIENT PAUSE
CLIENT REPLY
CLIENT SETNAME
COMMAND
COMMAND COUNT
COMMAND GETKEYS
COMMAND INFO
CONFIG GET
CONFIG RESETSTAT
CONFIG REWRITE
CONFIG SET
DBSIZE
DEBUG OBJECT
DEBUG SEGFAULT
FLUSHALL
FLUSHDB
INFO
LASTSAVE
MEMORY DOCTOR
MEMORY HELP
MEMORY MALLOC-STATS
MEMORY PURGE
MEMORY STATS
MEMORY USAGE
MONITOR
ROLE
SAVE
SHUTDOWN
SLAVEOF
SLOWLOG
SYNC
TIME
  • Sets
Command Support
SADD Supported
SCARD TODO
SDIFF TODO
SDIFFSTORE TODO
SINTER TODO
SINTERSTORE TODO
SISMEMBER TODO
SMEMBERS TODO
SMOVE TODO
SPOP TODO
SRANDMEMBER TODO
SREM TODO
SSCAN TODO
SUNION TODO
SUNIONSTORE TODO
  • SortedSets
Command Support
BZPOPMAX TODO
BZPOPMIN TODO
ZADD TODO
ZCARD TODO
ZCOUNT TODO
ZINCRBY TODO
ZINTERSTORE TODO
ZLEXCOUNT TODO
ZPOPMAX TODO
ZPOPMIN TODO
ZRANGE TODO
ZRANGEBYLEX TODO
ZRANGEBYSCORE TODO
ZRANK TODO
ZREM TODO
ZREMRANGEBYLEX TODO
ZREMRANGEBYRANK TODO
ZREMRANGEBYSCORE TODO
ZREVRANGE TODO
ZREVRANGEBYLEX TODO
ZREVRANGEBYSCORE TODO
ZREVRANK TODO
ZSCAN TODO
ZSCORE TODO
ZUNIONSTORE TODO
  • Streams
Command Support
XADD TODO
XLEN TODO
XPENDING TODO
XRANGE TODO
XREAD TODO
XREADGROUP TODO
XREVRANGE TODO
  • Strings
Command Support
APPEND Supported
BITCOUNT Supported
BITFIELD Supported
BITOP Supported
BITPOS Supported
DECR Supported
DECRBY Supported
GET Supported
GETBIT Supported
GETRANGE Supported
GETSET Supported
INCR Supported
INCRBY Supported
INCRBYFLOAT Supported
MGET Supported
MSET Supported
MSETNX Supported
PSETEX Supported
SET Supported
SETBIT Supported
SETEX Supported
SETNX Supported
SETRANGE Supported
STRLEN Supported
  • Transaction
Command Support
DISCARD Supported
EXEC Supported
MULTI Supported
UNWATCH Supported
WATCH Supported

Usage

Non connection pooling

import monix.execution.Scheduler.Implicits.global

implicit val system = ActorSystem()

val peerConfig = PeerConfig(remoteAddress = new InetSocketAddress("127.0.0.1", 6379))
val connection = RedisConnection(peerConfig)
val client = RedisClient()

val result = (for{
  _ <- client.set("foo", "bar")
  r <- client.get("foo")
} yield r).run(connection).runAsync

println(result) // bar

Connection pooling

import monix.execution.Scheduler.Implicits.global

implicit val system = ActorSystem()

val peerConfig = PeerConfig(remoteAddress = new InetSocketAddress("127.0.0.1", 6379))
val pool = RedisConnectionPool.ofRoundRobin(sizePerPeer = 5, Seq(peerConfig), RedisConnection(_)) // powered by RoundRobinPool
val connection = RedisConnection(connectionConfig)
val client = RedisClient()

// Fucntion style
val result1 = pool.withConnectionF{ con =>
  (for{
    _ <- client.set("foo", "bar")
    r <- client.get("foo")
  } yield r).run(con) 
}.runAsync

println(result1) // bar

// Monadic style
val result2 = (for {
  _ <- ConnectionAutoClose(pool)(client.set("foo", "bar").run)
  r <- ConnectionAutoClose(pool)(client.get("foo").run)
} yield r).run().runAsync

println(result2) // bar

if you want to use other pooling implementation, please select from the following modules.

  • reactive-redis-pool-commons (commons-pool2)
  • reactive-redis-pool-scala (scala-pool)
  • reactive-redis-pool-fop (fast-object-pool)
  • reactive-redis-pool-stormpot (stormpot)

Master & Slaves aggregate connection

import monix.execution.Scheduler.Implicits.global

implicit val system = ActorSystem()

val masterPeerConfig = PeerConfig(remoteAddress = new InetSocketAddress("127.0.0.1", 6379))
val slavePeerConfigs = Seq(
  PeerConfig(remoteAddress = new InetSocketAddress("127.0.0.1", 6380)),
  PeerConfig(remoteAddress = new InetSocketAddress("127.0.0.1", 6381)),
  PeerConfig(remoteAddress = new InetSocketAddress("127.0.0.1", 6382))
)

val connection = new RedisMasterSlavesConnection(
  masterConnectionPoolFactory = RedisConnectionPool.ofRoundRobin(sizePerPeer = 2, Seq(masterPeerConfig), RedisConnection(_)),
  slaveConnectionPoolFactory = RedisConnectionPool.ofRoundRobin(sizePerPeer = 2, slavePeerConfigs, RedisConnection(_))
)

val client = RedisClient()

val result = (for{
  _ <- client.set("foo", "bar") // write to master
  r <- client.get("foo")        // read from any slave
} yield r).run(connection).runAsync

println(result) // bar

License

MIT License / Copyright (c) 2016 Junichi Kato