sciss / processor

GNU Library General Public License v2.1 only GitHub

A simple mechanism for running asychronous processes in Scala. Mirror of https://codeberg.org/sciss/Processor

Processor

Build Status Maven Central

statement

Processor is a simple building block for the Scala programming language, launching asynchronous processing tasks. It is (C)opyright 2013–2021 by Hanns Holger Rutz. All rights reserved. This project is released under the GNU Lesser General Public License v2.1+ and comes with absolutely no warranties. To contact the author, send an e-mail to contact at sciss.de.

linking

To link to this library:

libraryDependencies += "de.sciss" %% "processor" % v

The current version v is "0.5.0"

building

This project builds with sbt against Scala 2.13, 2.12, Dotty (JVM) and Scala 2.13 (JS). The last version to support Scala 2.11 was v0.4.2.

contributing

Please see the file CONTRIBUTING.md

documentation

A Processor is a future which can be observed through the Model trait and aborted via an abort method. The model dispatches an event when the processor is completed either successfully or via failure, and during the processing progress reports are dispatched as Processor.Progress events.

The Processor trait takes as type constructor parameter the product or payload of the future. The super trait ProcessorLike also takes the self or representation type which allows a more specific type to appear in the processor's model updates.

Simple wrappers for existing Future or Process instances are provided on the Processor companion object:

    import de.sciss.processor._
    import de.sciss.processor.Ops._
    import scala.concurrent._
    import ExecutionContext.Implicits.global
    import scala.sys.process._

    val p0  = "sleep 100".run()
    val p1  = Processor.fromProcess("sleep", p0)
    p1.monitor()
    println("Sleeping...")
    Thread.sleep(2000)
    p1.abort()

The Processor object also contains an apply method to create a processor from a simple function:

    import de.sciss.processor._
    import de.sciss.processor.Ops._
    import scala.concurrent._
    import ExecutionContext.Implicits.global

    val p = Processor[String]("make-tea") { self =>
      blocking {
        for (i <- 1 to 10) {
          Thread.sleep(1000)
          self.checkAborted()
          self.progress = i * 0.1
        }
      }
      "tea"
    }

    p.monitor()

Processors are typically instantiated through a ProcessorFactory which also specifies a Config parameter which can be used to configure the processor. An implementation of ProcessorFactory needs to provide the prepare which returns the processor with a mixin of Processor.Prepared (which essentially adds a start method).

A convenient trait ProcessorBase managed the whole processor and merely asks for the implementation of the runBody: Future[Prod] method. For blocking API, the sub-trait ProcessorImpl can be used, where method body: Prod must be implemented. The process presented by the processor runs within runBody or body, and should typically call progress_= in regular intervals for observations such as GUIs to keep track of the advancement of the process. It should also either override the notifyAborted method or make sure to call into checkAborted at regular intervals in order to quickly respond to an abortion request.

Here is an example that shows most of the functionality using blocking API, running two instances of the same processor type in parallel, monitoring their progress, and upon completion of either of them, abort the remaining processor:

    import de.sciss.processor._
    import scala.concurrent._
    import de.sciss.processor.impl
    import duration.Duration.Inf

    case class Tea(variety: String)

    object MakeTea extends ProcessorFactory {
      type Product = Tea
      case class Config(variety: String, minutes: Int)
      type Repr = MakeTea

      protected def prepare(config: Config): Prepared = new Impl(config)

      private class Impl(val config: Config) extends MakeTea with impl.ProcessorImpl[Tea, MakeTea] {
        protected def body(): Tea = blocking {
          val seconds = config.minutes * 60
          for (i <- 1 to seconds) {
            Thread.sleep(1000)
            checkAborted()
            progress = i.toDouble/seconds
          }
          Tea(config.variety)
        }
      }
    }
    trait MakeTea extends ProcessorLike[Tea, MakeTea] { def config: MakeTea.Config }

    val black = MakeTea(MakeTea.Config("black", 4))
    val green = MakeTea(MakeTea.Config("green", 3))
    val all   = black :: green :: Nil

    val obs: MakeTea.Observer = {
      case prog @ Processor.Progress(p, _)  => println(s"${p.config.variety} brew ${prog.toInt}%")
      case Processor.Result(_, v)           => println(v)
    }

    all.foreach(_.addListener(obs))

    import ExecutionContext.Implicits.global
    all.foreach(_.start())

    Await.ready(Future.firstCompletedOf(all), Inf)
    all.foreach { p =>
      p.abort() // don't bother about the remaining teas
      Await.ready(p, Inf)
    }