Memcontinuationed is an asynchronous memcached client for Scala. Memcontinuationed is the fastest memcached client on JVM, much faster than spymemcached or Whalin's client.
Memcontinuationed never blocks any threads. On the other hand, spymemcached does not block the IO thread but it does
block the user's thread. All Memcontinuationed API are
@suspendable,
which mean these methods can be invoked by a thread, and return to another thread.
Memcontinuationed can merge multiply get or gets requests into one request.
On the other hand, spymemcached sends all requests immediately, never waiting for previous response.
The spymemcached way consumes more CPU and more TCP overheads than Memcontinuationed.
Even worse, the spymemcached way is not compatible with some memcached server.
Note: Because Memcontinuationed queues requests until all the previous response have been received,
you may need to create a connection pool of com.dongxiguo.memcontinuationed.Memcontinuationed to maximize the IOPS.
import com.dongxiguo.memcontinuationed.Memcontinuationed
import com.dongxiguo.memcontinuationed.StorageAccessor
import java.io._
import java.net._
import java.nio.channels.AsynchronousChannelGroup
import java.util.concurrent.Executors
import scala.util.continuations.reset
import scala.util.control.Exception.Catcher
object Sample {
def main(args: Array[String]) {
val threadPool = Executors.newCachedThreadPool()
val channelGroup = AsynchronousChannelGroup.withThreadPool(threadPool)
// The locator determines where the memcached server is.
// You may want to implement ketama hashing here.
def locator(accessor: StorageAccessor[_]) = {
new InetSocketAddress("localhost", 1978)
}
val memcontinuationed = new Memcontinuationed(channelGroup, locator)
// The error handler
implicit def catcher:Catcher[Unit] = {
case e: Exception =>
scala.Console.err.print(e)
sys.exit(-1)
}
reset {
memcontinuationed.set(MyKey("hello"), "Hello, World!")
val result = memcontinuationed.require(MyKey("hello"))
assert(result == "Hello, World!")
println(result)
sys.exit()
}
}
}
/**
* `MyKey` specifies how to serialize the data of a key/value pair.
*/
case class MyKey(override val key: String) extends StorageAccessor[String] {
override def encode(output: OutputStream, data: String, flags: Int) {
output.write(data.getBytes("UTF-8"))
}
override def decode(input: InputStream, flags: Int): String = {
val result = new Array[Byte](input.available)
input.read(result)
new String(result, "UTF-8")
}
}There is something you need to know:
get,set, and most of other methods inMemcontinuationedare@suspendable. You must invoke them inresetor in another@suspendablefunction you defined.get,set, and most of other methods inMemcontinuationedaccept an implicit parameterCatcher. You must useCatcherto handle exceptions from@suspendablefunctions, instead oftry/catch.MyKeyis the key you passed to server, which is a customStorageAccessor. You should implement your ownStorageAccessorfor each type of data. If your value's format is Protocol Buffers, you can usecom.dongxiguo.memcontinuationed.ProtobufAccessoras your custom key's super class.
Add these lines to your build.sbt if you use Sbt:
libraryDependencies += "com.dongxiguo" %% "memcontinuationed" % "0.3.2"
libraryDependencies <++= scalaBinaryVersion { bv =>
bv match {
case "2.10" => {
Seq()
}
case _ => {
Seq("org.scala-lang.plugins" % s"scala-continuations-library_$bv" % "1.0.1")
}
}
}
libraryDependencies <+= scalaVersion { sv =>
if (sv.startsWith("2.10.")) {
compilerPlugin("org.scala-lang.plugins" % "continuations" % sv)
} else {
compilerPlugin("org.scala-lang.plugins" % s"scala-continuations-plugin_$sv" % "1.0.1")
}
}
scalacOptions += "-P:continuations:enable"Memcontinuationed requires Scala 2.10.x or 2.11.x, and JRE 7.
