Let-it-crash style ZooKeeper client based on Akka actor
- Asynchronous: ZooKeeper event callback is asynchronously executed and should never block in its execution. Actors are also asynchronous and well-designed actor never blocks.
- Passive: ZooKeeper client application is basically designed in passive fashion. Actors are also passive. They wait for messages and until they receive a message nothing happens. This characteristic fits in ZooKeeper's event-driven style.
- State Machine: When a ZooKeeper event is fired, application's state is changed accordingly. Actors are state machines. They can change its state and behavior in thread-safe manner.
- Fault Tolerant: ZooKeeper is distributed system so failures are inevitable. Actor can confine fault, crush safely, and escalate faults it cannot handle to its supervisor.
- Graceful Recovery: When you use ZooKeeper operations, you must retry on failures such as connection loss. Akka scheduler, backoff supervision and circuit breaker can help recover gracefully.
A example project shows how to use reactive-zookeepr by building a example application from ZooKeeper book from O'Reilly.
- Scala
- 2.11.12, 2.12.13, 2.13.5
- ZooKeeper
- 3.4.x, 3.5.x, 3.6.x, 3.7.x
- Akka
- 2.5.x, 2.6.x
libraryDependencies ++= Seq(
"org.apache.zookeeper" % "zookeeper" % "3.4.8",
"com.chatwork" %% "reactive-zookeeper" % "1.0.0"
)
Note) package name starts with tanukki
since this is a fork of https://github.com/TanUkkii007/reactive-zookeeper
The following import is assumed.
import tanukkii.reactivezk._
ZooKeeper session is represented as an actor.
You can create a session ActorRef
with
val connectString = "localhost:2181"
val sessionTimeout = 5000
system.actorOf(ZooKeeperSessionActor.props(connectString, sessionTimeout)
ZooKeeperSessionActor
manage ZooKeeper session in a following way
- If a session is expired,
ZooKeeperSessionActor
throwsZooKeeperSessionRestartException
and closes expired session and reestablishes a new session. - If a connection is disconnected, ZooKeeperSessionActor throws
ConnectionRecoveryTimeoutException
after connection-timeout periods. This results in closing previous session and reestablishing a new session.
Actors that communicate with ZooKeeperSessionActor
must handle ZooKeeper session state changes.
Especially Expired sate and Disconnected state transition must be carefully handled by clients.
It is not a simple task and bad implementation could cause disastrous effect to your system.
Since ZooKeeperSessionActor
is aware of ZooKeeper session state change
and its lifecycle as an actor is synchronised with the session state transition, one way to handle ZooKeeper session state change is
to have your actor to be a child of ZooKeeperSessionActor
.
When ZooKeeperSessionActor
restarts for some reason caused by session state changes, your actor is also restarted and its internal state is cleared.
To register your actor as a child of ZooKeeperSessionActor
, pass its Props to argument of ZooKeeperSessionActor
.
val settings = ZKSessionSettings(system)
val supervisorSettings = ZKSessionSupervisorSettings(ActorUsingZooKeeper.props, "client", isConnectionStateAware = false)
val yourActor = system.actorOf(ZooKeeperSessionActor.props(settings, supervisorSettings), "zookeeper-session")
You can obtain ActorRef of ZooKeeperSessionActor
with context.parent
.
class ActorUsingZooKeeper extends Actor {
val zkSession = context.parent
def receive: Receive = {
case "Ping" =>
zkSession ! ZKOperations.Exists("/pings")
sender() ! "Pong"
}
}
object ActorUsingZooKeeper [
def props = Props(new ActorUsingZooKeeper)
}
You can communicate with your actor via ZooKeeperSessionActor
. ZooKeeperSessionActor
can forward messages to the registered actor.
yourActor ! "Ping"
expectMsg("Pong")
If a registered actor wants to behave passive or read-only mode when disconnected from ZooKeeper, you can use ZKConnectionStateAwareActor
trait.
trait ZKConnectionStateAwareActor extends Actor {
// `receive` called under SyncConnected state
def receiveSyncConnected: Receive
// `receive` called under Disconnected state
def receiveDisconnected: Receive
}
If a registered actor implements ZKConnectionStateAwareActor
, you can enable passive mode transition with isConnectionStateAware=true
.
val supervisorSettings = ZKSessionSupervisorSettings(ActorUsingZooKeeper.props, "client", isConnectionStateAware = true)
Each ZooKeeper operation is represented as a message and its result is also a message. The result of operation have two types of message. One is successful message, and the other is failure message with suffix "Failure".
These message can be sent to the ZooKeeper session ActorRef
.
The result response message will be returned to the sender
.
In the following document a zookeeperSession
variable is assumed to be ActorRef
representing ZooKeeper session.
GetData
, Exists
, GetChildren
message support watch feature. They have a watch
flag parameter.
When it is a true
, a subsequent change event will be sent to its sender
as ZooKeeperWatchEvent
.
object ZKOperations {
case class Create(path: String, data: Array[Byte], acl: List[ACL], createMode: CreateMode, ctx: Any = NoContext)
sealed trait CreateResponse
case class Created(path: String, name: String, ctx: Any) extends CreateResponse
case class CreateFailure(error: KeeperException, path: String, ctx: Any) extends CreateResponse
case class GetData(path: String, watch: Boolean = false, ctx: Any = NoContext)
sealed trait GetDataResponse
case class DataGot(path: String, data: Array[Byte], stat: Stat, ctx: Any) extends GetDataResponse
case class GetDataFailure(error: KeeperException, path: String, ctx: Any) extends GetDataResponse
case class SetData(path: String, data: Array[Byte], version: Int, ctx: Any = NoContext)
sealed trait SetDataResponse
case class DataSet(path: String, stat: Stat, ctx: Any) extends SetDataResponse
case class SetDataFailure(error: KeeperException, path: String, ctx: Any) extends SetDataResponse
case class Exists(path: String, watch: Boolean = false, ctx: Any = NoContext)
sealed trait ExistsResponse
case class DoesExist(path: String, stat: Option[Stat], ctx: Any) extends ExistsResponse
case class ExistsFailure(error: KeeperException, path: String, ctx: Any) extends ExistsResponse
case class GetChildren(path: String, watch: Boolean = false, ctx: Any = NoContext)
sealed trait GetChildrenResponse
case class ChildrenGot(path: String, children: List[String], ctx: Any)
case class GetChildrenFailure(error: KeeperException, path: String, ctx: Any)
case class Delete(path: String, version: Int, ctx: Any = NoContext)
sealed trait DeleteResponse
case class Deleted(path: String, ctx: Any) extends DeleteResponse
case class DeleteFailure(error: KeeperException, path: String, ctx: Any) extends DeleteResponse
}
ZKOperations.Create
has following signature. Its fields are corresponding to ZooKeeper#create
API.
if ctx
is not specified NoContext
is used.
case class Create(path: String, data: Array[Byte], acl: List[ACL], createMode: CreateMode, ctx: Any = NoContext)
The Create
has successful response Created
and failed response CreateFailure
.
The ctx: Any
parameter value is the value specified at Create
message.
The error: KeeperException
parameter in CreateFailure
is the same type provided by ZooKeeper Java API.
case class Created(path: String, name: String, ctx: Any) extends CreateResponse
case class CreateFailure(error: KeeperException, path: String, ctx: Any) extends CreateResponse
A usage example is below.
def receive: Receive = {
case CreateExample(path: String, data: Array[Byte]) => {
zookeeperSession ! Create(path, data, Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL, data)
}
case Created(path, name, ctx) =>
case CreateFailure(e, path, ctx) if e.code() == Code.NODEEXISTS =>
case CreateFailure(e, path, ctx: Array[Byte]) if e.code() == Code.CONNECTIONLOSS =>
self ! CreateExample(path, data)
case CreateFailure(e, _, _) => throw e
}
ZKOperations.GetData
has following signature. Its fields are corresponding to ZooKeeper#getData
API.
if ctx
is not specified NoContext
is used.
case class GetData(path: String, watch: Boolean = false, ctx: Any = NoContext)
The GetData
has successful response DataGot
and failed response GetDataFailure
.
The ctx: Any
parameter value is the value specified at GetData
message.
The error: KeeperException
parameter in GetDataFailure
is the same type provided by ZooKeeper Java API.
case class DataGot(path: String, data: Array[Byte], stat: Stat, ctx: Any) extends GetDataResponse
case class GetDataFailure(error: KeeperException, path: String, ctx: Any) extends GetDataResponse
When watch = true
in the GetData
is specified, ZooKeeperWatchEvent(e: WatchedEvent)
will be sent to its sender
.
A usage example is below.
def receive: Receive = {
case GetDataExample => zookeeperSession ! GetData("/example")
case DataGot(path, data, stat, _) =>
case GetDataFailure(e, path, _) if e.code() == Code.NONODE =>
case GetDataFailure(e, path, _) if e.code() == Code.CONNECTIONLOSS =>
self ! GetDataExample
case GetDataFailure(e, _, _) => throw e
}
ZKOperations.SetData
has following signature. Its fields are corresponding to ZooKeeper#setData
API.
if ctx
is not specified NoContext
is used.
case class SetData(path: String, data: Array[Byte], version: Int, ctx: Any = NoContext)
The SetData
has successful response DataSet
and failed response SetDataFailure
.
The ctx: Any
parameter value is the value specified at SetData
message.
The error: KeeperException
parameter in SetDataFailure
is the same type provided by ZooKeeper Java API.
case class DataSet(path: String, stat: Stat, ctx: Any) extends SetDataResponse
case class SetDataFailure(error: KeeperException, path: String, ctx: Any) extends SetDataResponse
A usage example is below.
def setStatus: Receive = {
case SetDataExample(data: Array[Byte]) =>
zookeeperSession ! SetData(SetDataExample, data, -1, data)
case DataSet(path, stat, _) =>
case SetDataFailure(e, path, data: Array[Byte]) if e.code() == Code.CONNECTIONLOSS => self ! SetDataExample(path, data)
}
ZKOperations.Exists
has following signature. Its fields are corresponding to ZooKeeper#exists
API.
if ctx
is not specified NoContext
is used.
case class Exists(path: String, watch: Boolean = false, ctx: Any = NoContext)
The Exists
has successful response DoesExist
and failed response ExistsFailure
.
The ctx: Any
parameter value is the value specified at Exists
message.
Note that stat: Option[Stat]
in DoesExist
response has type Option[Stat]
.
The error: KeeperException
parameter in ExistsFailure
is the same type provided by ZooKeeper Java API.
case class DoesExist(path: String, stat: Option[Stat], ctx: Any) extends ExistsResponse
case class ExistsFailure(error: KeeperException, path: String, ctx: Any) extends ExistsResponse
When watch = true
in the Exists
is specified, ZooKeeperWatchEvent(e: WatchedEvent)
will be sent to its sender
.
A usage example is below.
def receive: Receive = {
case ExistsExample => zookeeperSession ! Exists("/example", watch = true)
case DoesExist(path, stat, _) =>
case ExistsFailure(e, _, _) if e.code() == Code.CONNECTIONLOSS => self ! ExistsExample
case ExistsFailure(e, _, _) if e.code() == Code.NONODE =>
case ZooKeeperWatchEvent(e) if e.getType == EventType.NodeDeleted =>
}
ZKOperations.GetChildren
has following signature. Its fields are corresponding to ZooKeeper#getChildren
API.
if ctx
is not specified NoContext
is used.
case class GetChildren(path: String, watch: Boolean = false, ctx: Any = NoContext)
The GetChildren
has successful response ChildrenGot
and failed response GetChildrenFailure
.
The ctx: Any
parameter value is the value specified at GetChildren
message.
The error: KeeperException
parameter in GetChildrenFailure
is the same type provided by ZooKeeper Java API.
case class ChildrenGot(path: String, children: List[String], ctx: Any)
case class GetChildrenFailure(error: KeeperException, path: String, ctx: Any)
When watch = true
in the GetChildren
is specified, ZooKeeperWatchEvent(e: WatchedEvent)
will be sent to its sender
.
A usage example is below.
def receive: Receive = {
case GetChildrenExample => zookeeperSession ! GetChildren("/examples", watch = true)
case ChildrenGot(path, children, _) =>
case GetChildrenFailure(e, _, _) if e.code() == Code.CONNECTIONLOSS => self ! GetChildrenExample
case GetChildrenFailure(e, _, _) => throw e
case ZooKeeperWatchEvent(e) if e.getType == EventType.NodeChildrenChanged => self ! GetChildrenExample
}
ZKOperations.Delete
has following signature. Its fields are corresponding to ZooKeeper#delete
API.
if ctx
is not specified NoContext
is used.
case class Delete(path: String, version: Int, ctx: Any = NoContext)
The Delete
has successful response Deleted
and failed response DeleteFailure
.
The ctx: Any
parameter value is the value specified at Delete
message.
The error: KeeperException
parameter in DeleteFailure
is the same type provided by ZooKeeper Java API.
case class Deleted(path: String, ctx: Any) extends DeleteResponse
case class DeleteFailure(error: KeeperException, path: String, ctx: Any) extends DeleteResponse
A usage example is below.
def receive: Receive = {
case DeleteExample(path) => zookeeperSession ! Delete(path, -1, path)
case Deleted(path, _) =>
case DeleteFailure(e, _, path: String) if e.code() == Code.CONNECTIONLOSS => self ! DeleteExample(path)
case DeleteFailure(e, _, _) => throw e
}
See reference.conf.
Default values are follow.
reactive-zookeeper {
connect-string = "localhost:2181"
session-timeout = 5000
connection-timeout = ${reactive-zookeeper.session-timeout}
}
As you know ZooKeeper ensure the order of operation and event. When you use ZooKeeper with Actor, you must be careful with ordering. Actor is a concurrent unit. Within an actor the order of message passing to a specific receiver is preserved. However when multiple actors are sending to Zookeeper session actor, no one knows the order of the messages reception.