vigoo / prox

A Scala library for working with system processes



Build Status codecov Apache 2 License License Latest version Maven central

prox is a small library that helps you starting system processes and redirecting their input/output/error streams, either to files, fs2 streams or each other.

It works by first defining a one or more processes then starting them, getting a set of running processes.


API reference

Getting started

Add the following dependency:

libraryDependencies += "io.github.vigoo" %% "prox" % "0.4"

Let's start by defining a single process with the Process constructor, taking a command and optionally a list of arguments and a working directory:

import cats.effect._
import cats.implicits._
import io.github.vigoo.prox._
import io.github.vigoo.prox.syntax._

val process = Process[IO]("echo", List("Hello world"))

This is just a definition of a process, no real effect happened yet. We can start this process by using the start method on it, which creates an effectful operation in the IO monad, defined by cats-effect:

implicit val contextShift: ContextShift[IO] = IO.contextShift(

val runningProcess: IO[RunningProcess] = Blocker[IO].use { blocker => process.start(blocker) }

This, when executed, will start the above defined process and return an instance of RunningProcess, which allows things like waiting for the process to be terminated, or kill it.

Let's see a full example of running the above process and printing it's exit code!

val program = 
  Blocker[IO].use { blocker => 
    for {
      echo <- Process[IO]("echo", List("Hello world")).start(blocker)
      result <- echo.waitForExit()
    } yield result.exitCode
val exitCode = program.unsafeRunSync()


Let's take a look at the type of process we defined above:

process: Process[IO, Byte, Byte, Unit, Unit, NotRedirected, NotRedirected, NotRedirected]

The last three type parameters indicate the redirection status of the processes input, output and error streams. The default is that they are not redirected, inheriting the parent processes streams.

Each stream can be redirected at most once using the <, > and redirectErrorTo operators. The target of these redirections are described by three type classes: CanBeProcessOutputTarget, CanBeProcessErrorTarget and CanBeProcessInputSource.

One type with such instances is Path. Let's how to redirect the output:

import io.github.vigoo.prox.path._

val process = Process[IO]("echo", List("Hello world")) > (home / "tmp" / "out.txt")

Running this process will redirect the process' output to the given file directly using the *Java process builder API'. We can't use this operator twice as it would be ambigous (and outputting to multiple files directly is not supported by the system), so the following does not typecheck:

val process = Process[IO]("echo", List("Hello world")) > (home / "tmp" / "out.txt") > (home / "tmp" / "out2.txt")

Similarly we can redirect the input and the error:

val p1 = Process[IO]("cat") < (home / "something")
val p2 = Process[IO]("make") redirectErrorTo (home / "errors.log")


fs2 streams of bytes can be used as inputs for processes in the same way:

import fs2._

val source = Stream("Hello world").through(text.utf8Encode)
val printSource = Process("cat") < source

Similarly the output can be redirected to a pipe as following:

val captured = Process("cat") < source > identity[Stream[IO, Byte]]

Calling start on a process which has its streams connected to fs2 streams sets up the IO operation and starts all the involved streams asynchronously.

The default type classes implement the following behavior depending on the target:

  • If the target is a Sink, the stream is started by drain and the result type is Unit
  • If the pipe's output is Out and there is a Monoid instance for Out, the stream is started by fold and the result type is Out
  • Otherwise if the pipe's output is Out, the stream is started by toVector and the result type is Vector[Out]

For example to send a string through cat and capture the output:

val source = Stream("Hello world").through(text.utf8Encode)
val program: IO[String] = for {
  runningProcess <- (Process("cat") < source > text.utf8Decode[IO]).start(blockingExecutionContext)
  result <- runningProcess.waitForExit()
} yield result.fullOutput

There are three wrappers for pipes to customize this behavior without implementing an own instance of CanBeOutputTarget[T]:

  • Drain(pipe) results in running the stream with drain, having a result of Unit
  • ToVector(pipe) results in running the stream with toVector even if Out is a Monoid, having a result of Vector[Out]
  • Fold(pipe, init: Res, f: (Res, Out) => Res) results in running the stream with runFold(init)(f), having a result of Res


The library also provides a way to pipe two or more processes together. This is implemented by the stream support above internally.

Let's start by piping two processes together:

val echoProcess = Process[IO]("echo", List("This is an output"))
val wordCountProcess = Process[IO]("wc", List("-w"))
val combined = echoProcess | wordCountProcess

The combined process is no longer a Process; it is a PipedProcess, but otherwise it works exactly the same, you can redirect its input and outputs or pipe it to another process:

val multipleProcesses = Process[IO]("cat", List("log.txt")) | Process[IO]("grep", List("ERROR")) | Process[IO]("sort") | Process[IO]("uniq", List("-c"))

The start method for piped processes no longer returns a single IO[RunningProcess], but a tuple containing all the RunningProcess instances for the involved processes:

for {
  runningProcs1 <- (echoProcess | wordCountProcess).start(blocker)
  (echo, wordCount) = runningProcs1
  runningProcs2 <- multipleProcesses.start(blocker)
  (cat, grep, sort, uniq) = runningProcs2
  _ <- wordCount.waitForExit()
  _ <- uniq.waitForExit()
} yield ()

The pipe between the two process can be customized with the followin syntax:

val customPipe: Pipe[IO, Byte, Byte] = ???
val process = echoProcess.via(customPipe).to(wordCountProcess)

Controlling the execution context

There are two parameters controlling the execution of the processes.

  • The implicit ContextShift[IO] is used for all the async operations necessary for running the involved streams in parallel. See cats-effect's concurrency basics for more information.
  • A specific blocking ExecutionContext has to be provided in the process start method. This will be used to run the blocking IO of reading and writing the process streams. See fs2-s migration guide to 1.0 for more information.


from 0.1.x to 0.2

  • The start method on processes now requires a blockingExecutionContext argument
  • Ignore has been renamed to Drain
  • Log has been renamed to ToVector

from 0.2 to 0.4

  • Process now takes the effect type as parameter, so in case of cats-effect, Process(...) becomes Process[IO](...)
  • The start method on processes now gets a Blocker instead of an execution context