Go channels provide synchronization and messaging, 'select' provides multi-way concurrent control.
Rob Pike - Concurrency is not parallelism / another link with better slide view
This is one of Scala ports of Go channels. The idea of this particular port is to match as much as possible the features provided by GoLang so channels can be used for concurrency coordination. The library uses locks, conditional variables and semaphores as underlying concurrency primitives so the performance is not expected to match applications written in Go.
Scala 2.11 | Scala 2.12 | Scala 2.13 |
---|---|---|
Scala provides channels as a part of the standard library (https://www.scala-lang.org/api/2.13.0/scala/concurrent/Channel.html).
However, the real power of channels comes from select()
.
Scala promotes Actor model as a way of handling concurrency which is based on works of Carl Hewitt (1973). Channels are based on CSP model by Tony Hoare (1978). At first glance these models are similar, but they are in fact very different. Good deescripton on the differences are explained here: https://stackoverflow.com/a/22622880/1038282
CSP channels provide an extremely simple and uniform building block for designing concurrent applications.
Examples for this library are inspired by the Go By Example page by Mark McGranaghan.
You can try these example in Scala REPL.
In this example a channel and a thread are created. A message is send from the new thread and received in the main thread. Since synchronous channels are created by default we can be sure by the time the main thread receives the message, the child thread is already finished.
Note. Go relies on the builtin operator
<-
for sending and receiving messages. Our implementaion definessend()
andrecv()
methods instead to make the code look more Scala-like. Another reason is that in Scala it is not possible to define an infix<-
.
import com.github.yruslan.channel.Channel
import scala.concurrent.Future
// In these examples we always import global execution context
// so that futures could use the default thread pool.
import scala.concurrent.ExecutionContext.Implicits.global
val channel = Channel.make[String]
Future { channel.send("ping") }
val msg = channel.recv()
println(msg)
Output:
ping
Channels created by default are synchronous. That means that when a thread sends a message the method won't return
until the message is received by some thread. You may want to make channels asynchronous so that send()
method
returns immediately. To do that you need to specify the maximum number of items the channel can store in the buffer.
As long as there is still a free space in the buffer, send()
will return immediately.
import com.github.yruslan.channel.Channel
val channel = Channel.make[String](2)
channel.send("buffered")
channel.send("channel")
println(channel.recv())
println(channel.recv())
Output:
buffered
channel
Some use cases require limited but unknown buffer size in order to satisfy progress guarantees. This port of channels library supports unbounded channels.
import com.github.yruslan.channel.Channel
val channel = Channel.makeUnbounded[String]
channel.send("unbounded")
channel.send("buffered")
channel.send("channel")
println(channel.recv())
println(channel.recv())
println(channel.recv())
Output:
unbounded
buffered
channel
Channels can be used for synchronization. Here we use a channel to wait for a job executing in another thread to complete.
import com.github.yruslan.channel.Channel
import scala.concurrent.Future
import scala.concurrent.ExecutionContext.Implicits.global
def worker(done: Channel[Boolean]): Unit = {
println("working...")
Thread.sleep(1000 /*1 second*/)
println("done")
done.send(true)
}
val done = Channel.make[Boolean]
Future { worker(done) }
done.recv()
Output:
working...
done
You can define channel directions. That is channels that can either only send or only receive messages, but not both.
When you define a method, define an argument as ReadChannel
if you want the method to be able to only receive messages.
Define the argument as WriteChannel
so the method can only send messages to the channel.
import com.github.yruslan.channel._
def ping(pings: WriteChannel[String], msg: String): Unit = {
pings.send(msg)
}
def pong(pings: ReadChannel[String], pongs: WriteChannel[String]): Unit = {
val msg = pings.recv()
pongs.send(msg)
}
val pings = Channel.make[String](1)
val pongs = Channel.make[String](1)
ping(pings, "message")
pong(pings, pongs)
println(pongs.recv())
Output:
message
Channels can be closed which prevents sending more messages to it. It can be checked by consumers to determine when the processing has finished.
val ch = Channel.make[Int](5)
ch.send(1)
ch.send(2)
ch.send(3)
ch.close()
while (!ch.isClosed) {
println(ch.recv())
}
Output:
1
2
3
You can iterate over a channel using foreach()
. Several threads can do the same. Each thread will receive only one
copy of the sent message. Be careful, foreach()
is blocking and will exit only when the channel is closed. If you
forget to close a channel, the foreach loop will block the thread.
Here is an example how a stream of tasks can be processed in parallel by 2 workers using foreach()
.
Future {
ch.foreach(v => {
println(s"Worker 1 received $v")
Thread.sleep(500) // Simulate processing
})
}
Future {
ch.foreach(v => {
println(s"Worker 2 received $v")
Thread.sleep(600) // Simulate processing
})
}
ch.send(1)
ch.send(2)
ch.send(3)
ch.send(4)
ch.close()
Output:
Worker 1 received 1
Worker 2 received 2
Worker 1 received 3
Worker 2 received 4
What makes channels great is that a program can wait for events in several channels at the same time.
In this example select()
is used to wait for any of two channels to have an incoming message. Once a message
is available it is received.
import com.github.yruslan.channel.Channel
import scala.concurrent.Future
import scala.concurrent.ExecutionContext.Implicits.global
val channel1 = Channel.make[String]
val channel2 = Channel.make[String]
Future {
Thread.sleep(1000 /*1 second*/)
channel1.send("one")
}
Future {
Thread.sleep(2000 /*2 seconds*/)
channel2.send("two")
}
for (_ <- Range(0, 2)) {
Channel.select(
channel1.recver{v => println(v)},
channel2.recver{v => println(v)}
)
}
Output:
one
two
Like in Go, you can have default blocks that will be executed if none of other blocks are ready:
Channel.select(
channel1.recver{v => println(v)},
channel2.sender(v) { /* An action to do after the send. */ },
Channel.default { /* The action to do if nether channel1 nor channel2 can receive and send. */ }
)
In Go, you can use Time.After(duration)
to generate a channel that sends a message after the specified time has
passed. You don't need to close the channel afterward. The Scala implementation is TimeChannels.after(duration)
import com.github.yruslan.channel._
Channel.select(
someChannel.recver { v => println(s"Got a message in time: $v") },
TimeChannels.after(Duration(10, TimeUnit.SECONDS)).recver { v => println("Time is out!") }
)
In Go, you can use Time.newTicker(duration)
to generate a channel that sends a message after the specified time.
When the message is consumed, the ticker will generate another message after the same time duration.
Tickers should be closed after they are not in use. The Scala implementation is TimeChannels.ticker(duration)
import com.github.yruslan.channel._
val ticker = TimeChannels.ticker(Duration(10, TimeUnit.SECONDS))
Channel.select(
someChannel.recver { v => println(s"Got a message: $v") },
ticker.recver { _ => println("Tick.") }
)
ticker.close()
Similar to GoLang you can use WaitGroup
to coordinate multiple threads doing work. Use add()
, done()
, and await()
.
val wg = WaitGroup()
val n = 10
for (_ <- 1 to n) {
wg.add()
new Thread {
// do some work
wg.done()
}.start()
}
wg.await()
Go supports non-blocking channel operation by the elegant default
clause in the select
statement. The scala port
adds separate methods that support non-blocking operations: trySend()
, tryRecv()
and trySelect()
. There is an
optional timeout parameter for each of these methods. If it is not specified, all methods return immediately without
any waiting. If the timeout is specified the methods will wait the specified amount of time if the expected conditions
are not met. Timeout can be set to Duration.Inf
. In this case these methods are equivalent to their blocking variants
with the exception of the type of returned value.
trySend()
returns a boolean. Iftrue
, the message has been sent successfully, otherwise it is failed for whatever reason (maybe the channel was closed or the buffer is full).tryRecv()
returns a optional value. If there are no pending messages the method returnsNone
.trySelect()
returns true if any of specified operations have executed.
Here is an example of non-blocking methods:
val ch1 = Channel.make[Int](1)
val ch2 = Channel.make[String](1)
var ok = ch1.trySend(1)
println(s"msg1 -> channel1: $ok")
ok = ch1.trySend(2)
println(s"msg2 -> channel1: $ok")
val msg1 = ch1.tryRecv()
println(s"msg1 <- channel1: $msg1")
val msg2 = ch1.tryRecv()
println(s"msg2 <- channel1: $msg2")
val okSelect = Channel.trySelect(
ch1.recver{v => println(v)},
ch2.recver{v => println(v)})
println(s"selected: $okSelect")
Output:
msg1 -> channel1: true
msg2 -> channel1: false
msg1 <- channel1: Some(1)
msg2 <- channel1: false
selected: None
Since channels can be copied back and forth netween threads, each channel can have multiple readers and writers. So
if sclect()
returns a channel there are no guarantees that another thread won't fetch the message before the current
thread can receive it.
Here is an example where the worker is written so it would work correctly in case channels have multiple readers and writers.
import com.github.yruslan.channel.Channel
import scala.concurrent.Future
import scala.concurrent.ExecutionContext.Implicits.global
import Channel._
def worker(channel1: Channel[Int], channel2: Channel[String]): Unit = {
while(!channel1.isClosed && !channel2.isClosed) {
select(
channel1.recver{i => println(s"Int received: $i")},
channel2.recver{s => println(s"String received: $s")}
)
}
}
val channell = Channel.make[Int]
val channel2 = Channel.make[String]
val fut = Future {
worker(channell, channel2)
}
channell.send(1)
channel2.send("abc")
channell.send(2)
channel2.send("edef")
channell.close()
channel2.close()
Output:
Int received: 1
String received: abc
Int received: 2
String received: edef
Here is example function that balances inputs from two channels into two output channels.
Notice that select()
is used to wait for any of the channels to have an incoming message, as well as
to select a free output channel. You can mix sender()
and recver()
calls in the same select()
statement.
A finish channel is used to signal the end of the balancing process.
def balancer(input1: ReadChannel[Int],
input2: ReadChannel[Int],
output1: WriteChannel[Int],
output2: WriteChannel[Int],
finishChannel: ReadChannel[Boolean]): Unit = {
var v: Int = 0
var exit = false
while (!exit) {
select(
input1.recver(x => v = x),
input2.recver(x => v = x),
finishChannel.recver(_ => exit = true)
)
if (!exit) {
select(
output1.sender(v) {},
output2.sender(v) {}
)
}
}
}
Since Scala is a functional language, this implementation of channels supports functional operations used in for comprehension.
You can lazy map messages in a channel.
// Creating a channel of integers
val chInt = Channel.make[Int](2)
// The channel of strings is the result of mapping the channel of integer
val chString = chInt.map(v => v.toString)
// Send some integers
chInt.send(1)
chInt.send(2)
chInt.close()
// Receive some strings
val s1: String = chString.recv()
val s2: String = chString.recv()
You can lazy filer messages in a channel.
// Creating a channel of integers
val chInput = Channel.make[Int](3)
// Filter the original channel
val chFiltered = chInput.filter(v => v != 2)
// Send some integers
chInput.send(1)
chInput.send(2)
chInput.send(3)
chInput.close()
// Receive filtered values
val v1 = chFiltered.recv() // 1
val v2 = chFiltered.recv() // 3
You can use for
comprehension for channels.
// Creating a channel of integers
val chInput = Channel.make[Int](3)
// Applying maps and filters
val chOutput = chInput
.map(v => v * 2)
.filter(v => v != 4)
// Sending values to the input channel
chInput.send(1)
chInput.send(2)
chInput.send(3)
chInput.close()
// Traversing the output channel using for comprehension
for {
a <- chOutput
if a > 5
} println(a)
// Outputs:
// 6
You can convert a channel to List
by collecting all messages. This operation will block until the channel is closed.
val chInput = Channel.make[Int](3)
chInput.send(1)
chInput.send(2)
chInput.send(3)
chInput.close()
val lst = chInput.toList // List(1, 2, 3)
-
- Add wait groups.
- Improve the performance of time-based channels.
-
- Add support for
default
block inselect()
. - Add
after()
andticker()
. - Change select() logic for synchronous channels to match behavior of GoLang.
- Add support for
-
- Fix channel filtering does not filter some values at random.
-
- Add handling of
InterruptedException
that can occur while waiting on a channel. - Add support for priority
prioritySelect()
for channels. When several selectors are ready the first one will take precedence. - Fix race condition in
foreach()
when an exception is thrown inside the action.
- Add handling of
-
- Add covariance for read-only channels.
- E.g.
val ch1: ReadChannel[Animal] = ch2: ReadChannel[Dog]
.
- E.g.
- Add contravariance for write-only channels.
- E.g.
val ch1: Writehannel[Dog] = ch2: Writehannel[Animal]
.
- E.g.
- Add covariance for read-only channels.
-
- Add support for unbounded channels (use
Channel.makeUnbounded[MyType]()
.
- Add support for unbounded channels (use
-
- Add support for
map()
,filter()
andfor
comprehension for channels.
- Add support for
-
- Fix one corner case of trySelect().
-
- The initial release.