verizon / ark   0.1.3

Apache License 2.0 GitHub

Thread safe implementation of org.apache.mesos.Scheduler based on scalaz streams

Scala versions: 2.10



Build Status Maven Central codecov

This library provides a functional scala implementation of org.apache.mesos.Scheduler interface provided by Mesos java API.

The goal of this library is to ease development of mesos schedulers by providing out-of-the-box implementations of common operational requirements of a framework, allowing developers to focus on domain logic implementation of task state transitions.


  • Pure functional implementation of mesos scheduler tasks state.
  • Scalaz stream to queue all messages sent to the framework (from mesos master or custom user defined messages) to be processed one at a time making it completely thread safe.
  • Recurring reconcialiation based on Mesos Reconciliation Algorithm.
  • TODO: Re-registration on mesos master failures.
  • TODO: High-Availability mode and leader election.

From the current state of the project there is a clear path to implement missing features above by enhancing oncue.mesos.Scheduler.processMessage function.


The core of this Mesos Scheduler implementation is handled by a scalaz async message queue. When Mesos calls any of the functions provided by the Scheduler interface, the scheduler creates one or many oncue.mesos.MesosMessage and enqueues them in the scalaz stream.

sealed trait MesosMessage { def driver: org.apache.mesos.SchedulerDriver }

The main scalaz stream is created inside oncue.mesos.Scheduler to handle calls from Mesos to the Scheduler interface. Users can provide any number of[scalaz.concurrent.Task, CustomMessage] when initializing the Scheduler. These custom streams get merged into the internal scalaz stream. This way the user can trigger any CustomMesssage to the scheduler which is handled by the same processMessage function that handles messages from Mesos.

trait CustomMessage extends MesosMessage


Mesos has very good documentation on how to implement the Reconciliation Algorithm, since most frameworks need to perform reconciliation this was the first feature to address in a common Mesos scheduler library.

Reconciliation is triggered by sending a ReconcileMessage to the stream:

case class ReconcileMessage(override val driver: SchedulerDriver) extends CustomMessage

oncue.mesos.Scheduler companion object provides a convenient function to initialize a timed reconciliation stream:

def reconcileProcess(driver: SchedulerDriver, reconcileInterval: FiniteDuration): Process[Task, ReconcileMessage] = {
  time.awakeEvery(reconcileInterval)(defaultExecutor, timeOutScheduler)
    .map(_ => ReconcileMessage(driver))

The user can create a reconcile process by calling the function above and passing it to the scheduler init function, this will trigger reconcialiation every reconcileInterval and all offers will be declined until reconciliation is over.

val reconciliationInterval = 1 hour
val customStreams = Seq( Scheduler.reconcileProcess(driver, reconciliationInterval) )
scheduler.init(state, driver, customStreams).run

TODO: The wait time to reconcile all tasks is currently fixed, Mesos recommends to use truncated exponential back off to "avoid a snowball effect in the case of the driver or master being backed up".


A full implementation of Mesos Scheduler would be required to implement oncue.mesos.SchedulerState and oncue.mesos.SchedulerStateManager traits and run the scheduler like this:

  // implement state and state manager
  case class MyState( ... ) extends SchedulerState
  class MyStateManager extends SchedulerState[MyState] { ... }

  // initialize state and state manager
  val initialState = MyState( ... )
  val stateManager = new MyStateManager( ... )

  // define framework info
  val frameworkInfo = Protos.FrameworkInfo.newBuilder
    .setOtherFrameworkattributes( ... )

  // initialize scheduler and mesos driver
  val scheduler = new oncue.mesos.Scheduler(stateManager)
  val driver = new org.apache.mesos.MesosSchedulerDriver(scheduler, frameworkInfo, mesosMaster)

  // shutdown scheduler on exit
  sys addShutdownHook {

  // Seq[Process[Task,CustomMessage]] pass custom state mutation messages
  // Scheduler.reconcileProcess triggers reconciliation every "reconciliationInterval"
  val reconciliationInterval = 1 hour
  val customStreams = Seq(Scheduler.reconcileProcess(driver, reconciliationInterval))

  // run scheduler (blocking)
  scheduler.init(initialState, driver, customStreams).run


The provided example implementation creates a scheduler that triggers the provided task on every slave in the cluster. This example also uses http4s to set up REST endpoints to query current scheduler state by sending custom messages to the queue. User can query scheduler info and add or remove slaves from a blacklist.

Running example module on a local mesos cluster with 2 slaves using docker-machine on mac (see

  1. Run ZK:

    docker run -d --net=host netflixoss/exhibitor:1.5.2
  2. Run master:

     docker run -d --net=host \
       -e LIBPROCESS_IP=$(docker-machine ip) \
       -e HOSTNAME=$(docker-machine ip) \
       -e MESOS_PORT=5050 \
       -e MESOS_ZK=zk:// \
       -e MESOS_QUORUM=1 \
       -e MESOS_REGISTRY=in_memory \
       -e MESOS_LOG_DIR=/var/log/mesos \
       -e MESOS_WORK_DIR=/var/tmp/mesos \
       -v "$(pwd)/log/mesos:/var/log/mesos" \
       -v "$(pwd)/tmp/mesos:/var/tmp/mesos" \
  3. Run slaves, notice MESOS_PORT and mount points change for /var/log/mesos and /var/tmp/mesos:

    docker run -d --net=host --privileged \
       -e LIBPROCESS_IP=$(docker-machine ip) \
       -e HOSTNAME=$(docker-machine ip)  \
       -e MESOS_PORT=5051 \
       -e MESOS_MASTER=zk:// \
       -e MESOS_SWITCH_USER=0 \
       -e MESOS_CONTAINERIZERS=docker,mesos \
       -e MESOS_LOG_DIR=/var/log/mesos \
       -e MESOS_WORK_DIR=/var/tmp/mesos \
       -v "$(pwd)/log/mesos1:/var/log/mesos" \
       -v "$(pwd)/tmp/mesos1:/var/tmp/mesos" \
       -v /var/run/docker.sock:/var/run/docker.sock \
       -v /cgroup:/cgroup \
       -v /sys:/sys \
       -v /usr/local/bin/docker:/usr/local/bin/docker \
    docker run -d --net=host --privileged \
       -e LIBPROCESS_IP=$(docker-machine ip) \
       -e HOSTNAME=$(docker-machine ip)  \
       -e MESOS_PORT=5052 \
       -e MESOS_MASTER=zk:// \
       -e MESOS_SWITCH_USER=0 \
       -e MESOS_CONTAINERIZERS=docker,mesos \
       -e MESOS_LOG_DIR=/var/log/mesos \
       -e MESOS_WORK_DIR=/var/tmp/mesos \
       -v "$(pwd)/log/mesos2:/var/log/mesos" \
       -v "$(pwd)/tmp/mesos2:/var/tmp/mesos" \
       -v /var/run/docker.sock:/var/run/docker.sock \
       -v /cgroup:/cgroup \
       -v /sys:/sys \
       -v /usr/local/bin/docker:/usr/local/bin/docker \
  4. Build scheduler assembly jar

    sbt "project example" assembly
  5. Build scheduler container from example/Dockerfile:

    docker build -t mysched example/
  6. Run scheduler container interactively:

    docker run --rm --net=host -it \
       -e LIBPROCESS_IP=$(docker-machine ip) \
       -v $(pwd)/example/target/scala-2.10:/opt/app \