Processor
statement
Processor is a simple building block for the Scala programming language, launching asynchronous processing tasks.
It is (C)opyright 2013–2021 by Hanns Holger Rutz. All rights reserved. This project is released under the
GNU Lesser General Public License v2.1+ and comes with
absolutely no warranties. To contact the author, send an e-mail to contact at sciss.de
.
linking
To link to this library:
libraryDependencies += "de.sciss" %% "processor" % v
The current version v
is "0.5.0"
building
This project builds with sbt against Scala 2.13, 2.12, Dotty (JVM) and Scala 2.13 (JS). The last version to support Scala 2.11 was v0.4.2.
contributing
Please see the file CONTRIBUTING.md
documentation
A Processor
is a future which can be observed through the Model
trait and aborted via an abort
method.
The model dispatches an event when the processor is completed either successfully or via failure, and during the
processing progress reports are dispatched as Processor.Progress
events.
The Processor
trait takes as type constructor parameter the product or payload of the future. The super trait
ProcessorLike
also takes the self or representation type which allows a more specific type to appear in the
processor's model updates.
Simple wrappers for existing Future
or Process
instances are provided on the Processor
companion object:
import de.sciss.processor._
import de.sciss.processor.Ops._
import scala.concurrent._
import ExecutionContext.Implicits.global
import scala.sys.process._
val p0 = "sleep 100".run()
val p1 = Processor.fromProcess("sleep", p0)
p1.monitor()
println("Sleeping...")
Thread.sleep(2000)
p1.abort()
The Processor
object also contains an apply
method to create a processor from a simple function:
import de.sciss.processor._
import de.sciss.processor.Ops._
import scala.concurrent._
import ExecutionContext.Implicits.global
val p = Processor[String]("make-tea") { self =>
blocking {
for (i <- 1 to 10) {
Thread.sleep(1000)
self.checkAborted()
self.progress = i * 0.1
}
}
"tea"
}
p.monitor()
Processors are typically instantiated through a ProcessorFactory
which also specifies a Config
parameter which
can be used to configure the processor. An implementation of ProcessorFactory
needs to provide the prepare
which
returns the processor with a mixin of Processor.Prepared
(which essentially adds a start
method).
A convenient trait ProcessorBase
managed the whole processor and merely asks for the implementation of the
runBody: Future[Prod]
method. For blocking API, the sub-trait ProcessorImpl
can be used, where method
body: Prod
must be implemented.
The process presented by the processor runs within runBody
or body
, and should typically call progress_=
in
regular intervals for observations such as GUIs to keep track of the advancement of the process. It should also
either override the notifyAborted
method or make sure to call into checkAborted
at regular intervals in order
to quickly respond to an abortion request.
Here is an example that shows most of the functionality using blocking API, running two instances of the same processor type in parallel, monitoring their progress, and upon completion of either of them, abort the remaining processor:
import de.sciss.processor._
import scala.concurrent._
import de.sciss.processor.impl
import duration.Duration.Inf
case class Tea(variety: String)
object MakeTea extends ProcessorFactory {
type Product = Tea
case class Config(variety: String, minutes: Int)
type Repr = MakeTea
protected def prepare(config: Config): Prepared = new Impl(config)
private class Impl(val config: Config) extends MakeTea with impl.ProcessorImpl[Tea, MakeTea] {
protected def body(): Tea = blocking {
val seconds = config.minutes * 60
for (i <- 1 to seconds) {
Thread.sleep(1000)
checkAborted()
progress = i.toDouble/seconds
}
Tea(config.variety)
}
}
}
trait MakeTea extends ProcessorLike[Tea, MakeTea] { def config: MakeTea.Config }
val black = MakeTea(MakeTea.Config("black", 4))
val green = MakeTea(MakeTea.Config("green", 3))
val all = black :: green :: Nil
val obs: MakeTea.Observer = {
case prog @ Processor.Progress(p, _) => println(s"${p.config.variety} brew ${prog.toInt}%")
case Processor.Result(_, v) => println(v)
}
all.foreach(_.addListener(obs))
import ExecutionContext.Implicits.global
all.foreach(_.start())
Await.ready(Future.firstCompletedOf(all), Inf)
all.foreach { p =>
p.abort() // don't bother about the remaining teas
Await.ready(p, Inf)
}