Cuttle

An embedded job scheduler/executor for your Scala projects.

Concepts

Embedded means that cuttle is not an hosted service where you submit jobs to schedule/execute. Instead it is a Scala library that you embed into your own project to schedule and execute a DAG of jobs. The DAG and the jobs definitions are all written using the cuttle Scala API. The scheduling mechanism can be customized.

Jobs and Workflows

A cuttle project is composed of a single Workflow to execute. This workflow is actually a Directed Acyclic Graph (DAG) of Jobs.

Each job is defined by a set of metadata (such as the job identifier, name, etc.) and most importantly by a side effect function. This function handles the actual job execution, and its Scala signature is something like Context => Future[Completed] (which can be read as “execute the job for this input parameter and signal me the completion or failure with the returned Future value”).

The side effect function is opaque for cuttle, so it can't exactly know what will happen there (it can be any Scala code), but it assumes that the function:

  • Is asynchronous and non-blocking. It will immediatly return a Future value that will be resolved upon execution success or failure.
  • Produces a side effect, so calling it actually will do some job and mutate some state somewhere.
  • Is idempotent, so calling it twice for the same input (context) won't be a problem.

Being idempotent is important because cuttle is an at least once executor. It will ensure that the job has been successfully executed at least once for a given input. In case of failure or crash it may have to execute it again and so it may happen that the side effect function will succeed more that once. It would be very brittle otherwise.

Scheduler

The workflow is interpreted by a Scheduler. Actually a workflow or a job is always configured for a specific Scheduling and this is the type S you usually see in the Scala API (only one scheduling type can exist for a given workflow, meaning that jobs can only be composed together if they share the same scheduling). This scheduling information allows to provide more information to the scheduler about how the jobs must be triggered.

The scheduler gets the workflow as input and starts producing Executions for it. Because the workflow allows to express some dependencies via the DAG, a classical scheduler will start by executing root jobs, and then follow the dependencies until the graph is fully executed.

But more sophisticated schedulers can exist. Cuttle comes with a TimeSeries scheduler that executes the whole graph accross time partitions. For example it can execute the graph hourly or daily. And it can even execute it accross different time partitions such as a daily job depending on several executions of an hourly job.

The input context given to the side effect function depends of the scheduling. For example the input for a time series job is TimeSeriesContext and contains basically the start and end time for the partition for which the job is being executed.

Executor

The cuttle Executor handles the job executions triggered by the scheduler. When it has to execute a job for a given SchedulingContext it creates and execution, and then invoke the job's side effect function for it.

As soon a the execution starts, it is in the Started state. Started executions are displayed in the UI with a special status indicating if they are Running or Waiting. This actually indicates if the Scala code being currently executed is waiting for some external resources (the permit to fork an external process for example). But as soon as the execution is Started it means that the Scala lambda behind is running!

An execution can also be in the Stuck state. It happens when a given execution keeps failing: Let's say the scheduler wants to execute the job a for the X context. So it asks the executor which eventually executes the job side effect. If the function fails, the returned Future fails and the scheduler is notified of that failure. Because the scheduler really wants that job to be executed for the X context, it will submit it again. When the executor sees this new execution coming back after a failure it will apply a RetryStrategy. The default strategy is to use an exponential backoff to delay the retry of these failing executions. While being in this state Stuck executions are displayed in a special tab of the UI and it means that it is something you should take care of.

An execution can also be in Paused state. It happens when the job itself has been paused. Note that this is a temporary state; eventually the job has to be unpaused and so the executions will be triggered, otherwise more and more paused executions will stack forever.

Finally executions can be Finished either with a Success of Failed state. You can retrieve these old executions in the log for finished executions.

Execution Platforms

The way to manage external resources in cuttle is via ExecutionPlatform. An execution platforms defines the contract about how to use the resources. They are configured at project bootstrap and usually set limits on how resources will be used (for example to only allow 10 external processes to be forked at the same time).

This is necessary because potentially thousand of concurrent executions can happen in cuttle. These executions will fight for shared resources via these execution platforms. Usually a platform will use priority queue to prioritize access to these shared resources, and the priority is based on the SchedulingContext of each execution (so the most prioritary executions get access to the shared resources first). For example the TimeSeriesContext defines its Ordering in such way that oldest partitions are more prioritary.

Time series scheduling

The built-in TimeSeriesScheduler executes the workflow for the time partitions defined in a calendar. Each job defines how it maps to the calendar (for example Hourly or Daily CEST), and the scheduler ensures that at least one execution is created and successfully run for each defined (Job, Period) pair.

In case of failure the time series scheduler will submit the execution again and again until the partition is successfully completed (depending of the retry strategy you have configured the delay between retries will vary).

It is also possible to Backfill successfully completed past partitions, meaning that we want to recompute them anyway. The whole graph or only a part of the graph can be backfilled depending of what you need. A priority can be given to the backfill so the executions triggered by this backfill can be more or less prioritary than the day to day workload.

Documentation

The API documentation is the main reference for Scala programmers.

For a project example, you can also follow this hands-on introduction.

To run the example application, checkout the repository, launch the sbt console in the project (you will need yarn as well to compile the UI part), and run the example HelloWorld command.

Usage

The library is cross-built for Scala 2.11 and Scala 2.12.

The core module to use is "com.criteo.cuttle" %% "cuttle" % "0.2".

You also need to fetch one Scheduler implementation:

  • TimeSeries: "com.criteo.cuttle" %% "timeseries" % "0.2".

License

This project is licensed under the Apache 2.0 license.

Copyright

Copyright © Criteo, 2017.