Add Channel primitive for coroutine inter-task communication

Exposes Kotlin's Channel<Obj> to Lyng scripts as a first-class type
with rendezvous, buffered, and unlimited capacity modes. Supports
suspending send/receive, non-suspending tryReceive, close/drain
semantics, and isClosedForSend/isClosedForReceive properties.

Also fixes a pre-existing typo in BookTest.kt that blocked JVM test
compilation, and adds Channel reference docs and a parallelism.md section.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
Sergey Chernov 2026-04-18 02:28:55 +03:00
parent 30e56946a0
commit f9a07f176a
6 changed files with 529 additions and 1 deletions

211
docs/Channel.md Normal file
View File

@ -0,0 +1,211 @@
# Channel
A `Channel` is a **hot, bidirectional pipe** for passing values between concurrently running coroutines.
Unlike a [Flow], which is cold and replayed on every collection, a `Channel` is stateful: each value
sent is consumed by exactly one receiver.
Channels model the classic _producer / consumer_ pattern and are the right tool when:
- two or more coroutines need to exchange individual values at their own pace;
- you want back-pressure (rendezvous) or explicit buffering control;
- you need a push-based, hot data source (opposite of the pull-based, cold [Flow]).
## Constructors
```
Channel() // rendezvous — sender and receiver must meet
Channel(n: Int) // buffered — sender may run n items ahead of the receiver
Channel(Channel.UNLIMITED) // no limit on buffered items
```
**Rendezvous** (`Channel()`, capacity 0): `send` suspends until a matching `receive` is ready,
and vice-versa. This gives the tightest synchronisation and the smallest memory footprint.
**Buffered** (`Channel(n)`): `send` only suspends when the internal buffer is full. Allows the
producer to get up to _n_ items ahead of the consumer.
**Unlimited** (`Channel(Channel.UNLIMITED)`): `send` never suspends. Useful when the producer is
bursty and you do not want it blocked, but be careful not to grow the buffer without bound.
## Sending and receiving
```lyng
val ch = Channel() // rendezvous channel
val producer = launch {
ch.send("hello") // suspends until the receiver is ready
ch.send("world")
ch.close() // signal: no more values
}
val a = ch.receive() // suspends until "hello" arrives
val b = ch.receive() // suspends until "world" arrives
val c = ch.receive() // channel is closed and drained → null
assertEquals("hello", a)
assertEquals("world", b)
assertEquals(null, c)
```
`receive()` returns `null` when the channel is both **closed** _and_ **fully drained** — that is
the idiomatic loop termination condition:
```lyng
val ch = Channel(4)
launch {
for (i in 1..5) ch.send(i)
ch.close()
}
var item = ch.receive()
while (item != null) {
println(item)
item = ch.receive()
}
```
## Non-suspending poll
`tryReceive()` never suspends. It returns the next buffered value, or `null` if the buffer is
empty or the channel is closed.
```lyng
val ch = Channel(8)
ch.send(42)
println(ch.tryReceive()) // 42
println(ch.tryReceive()) // null — nothing buffered right now
```
Use `tryReceive` for _polling_ patterns where blocking would be unacceptable, for example when
combining channel checks with other work inside a coroutine loop.
## Closing a channel
`close()` marks the channel so that no further `send` calls are accepted. Any items already in the
buffer can still be received. Once the buffer is drained, `receive()` returns `null` and
`isClosedForReceive` becomes `true`.
```lyng
val ch = Channel(2)
ch.send(1)
ch.send(2)
ch.close()
assert(ch.isClosedForSend)
assert(!ch.isClosedForReceive) // still has 2 buffered items
ch.receive() // 1
ch.receive() // 2
assert(ch.isClosedForReceive) // drained
```
Calling `send` after `close()` throws `IllegalStateException`.
## Properties
| property | type | description |
|---------------------|--------|----------------------------------------------------------|
| `isClosedForSend` | `Bool` | `true` after `close()` is called |
| `isClosedForReceive`| `Bool` | `true` when closed _and_ every buffered item is consumed |
## Methods
| method | suspends | description |
|-----------------|----------|----------------------------------------------------------------------------------|
| `send(value)` | yes | send a value; suspends when buffer full (rendezvous: always until partner ready) |
| `receive()` | yes | receive next value; suspends when empty; returns `null` when closed + drained |
| `tryReceive()` | no | return next buffered value or `null`; never suspends |
| `close()` | no | signal end of production; existing buffer items are still receivable |
## Static constants
| constant | value | description |
|---------------------|------------------|-------------------------------------|
| `Channel.UNLIMITED` | `Int.MAX_VALUE` | capacity for an unlimited-buffer channel |
## Common patterns
### Producer / consumer
```lyng
val ch = Channel()
val results = []
val mu = Mutex()
val consumer = launch {
var item = ch.receive()
while (item != null) {
mu.withLock { results += item }
item = ch.receive()
}
}
launch {
for (i in 1..5) ch.send("msg:$i")
ch.close()
}.await()
consumer.await()
println(results)
```
### Fan-out: one channel, many consumers
```lyng
val ch = Channel(16)
// multiple consumers
val workers = (1..4).map { id ->
launch {
var task = ch.receive()
while (task != null) {
println("worker $id handles $task")
task = ch.receive()
}
}
}
// single producer
for (i in 1..20) ch.send(i)
ch.close()
workers.forEach { it.await() }
```
### Ping-pong between two coroutines
```lyng
val ping = Channel()
val pong = Channel()
launch {
repeat(3) {
val msg = ping.receive()
println("got: $msg → sending pong")
pong.send("pong")
}
}
repeat(3) {
ping.send("ping")
println(pong.receive())
}
```
## Channel vs Flow
| | [Flow] | Channel |
|---|---|---|
| **temperature** | cold (lazy) | hot (eager) |
| **replay** | every collector gets a fresh run | each item is consumed once |
| **consumers** | any number; each gets all items | one receiver per item |
| **back-pressure** | built-in via rendezvous | configurable (rendezvous / buffered / unlimited) |
| **typical use** | transform pipelines, sequences | producer–consumer, fan-out |
## See also
- [parallelism] — `launch`, `Deferred`, `Mutex`, `Flow`, and the full concurrency picture
- [Flow] — cold async sequences
[Flow]: parallelism.md#flow
[parallelism]: parallelism.md

View File

@ -219,6 +219,58 @@ Flows allow easy transforming of any [Iterable]. See how the standard Lyng libra
} }
} }
## Channel
A [Channel] is a **hot pipe** between coroutines: values are pushed in by a producer and pulled out by a consumer, with each value consumed exactly once.
Unlike a `Flow` (which is cold and re-runs its generator on every collection), a `Channel` is stateful — the right tool for classic _producer / consumer_ work.
val ch = Channel() // rendezvous: sender waits for receiver
val producer = launch {
for (i in 1..5) ch.send(i)
ch.close() // signal: no more values
}
var item = ch.receive() // suspends until a value is ready
while (item != null) {
println(item)
item = ch.receive()
}
>>> 1
>>> 2
>>> 3
>>> 4
>>> 5
>>> void
`receive()` returns `null` when the channel is both closed and fully drained — that is the idiomatic loop termination condition.
Channels can also be buffered so the producer can run ahead:
val ch = Channel(4) // buffer up to 4 items without blocking
ch.send(10)
ch.send(20)
ch.send(30)
ch.close()
assertEquals(10, ch.receive())
assertEquals(20, ch.receive())
assertEquals(30, ch.receive())
assertEquals(null, ch.receive()) // drained
>>> void
For the full API — including `tryReceive`, `Channel.UNLIMITED`, and the fan-out / ping-pong patterns — see the [Channel] reference page.
| | Flow | Channel |
|---|---|---|
| **temperature** | cold (lazy) | hot (eager) |
| **replay** | every collector gets a fresh run | each item consumed once |
| **consumers** | any number, each gets all items | one receiver per item |
| **typical use** | transform pipelines, sequences | producer–consumer, fan-out |
[Channel]: Channel.md
[Iterable]: Iterable.md [Iterable]: Iterable.md

View File

@ -634,6 +634,7 @@ class Script(
addConst("Deferred", ObjDeferred.type) addConst("Deferred", ObjDeferred.type)
addConst("CompletableDeferred", ObjCompletableDeferred.type) addConst("CompletableDeferred", ObjCompletableDeferred.type)
addConst("Mutex", ObjMutex.type) addConst("Mutex", ObjMutex.type)
addConst("Channel", ObjChannel.type)
addConst("Flow", ObjFlow.type) addConst("Flow", ObjFlow.type)
addConst("FlowBuilder", ObjFlowBuilder.type) addConst("FlowBuilder", ObjFlowBuilder.type)
addConst("Delegate", ObjDynamic.type) addConst("Delegate", ObjDynamic.type)

View File

@ -0,0 +1,133 @@
/*
* Copyright 2026 Sergey S. Chernov real.sergeych@gmail.com
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package net.sergeych.lyng.obj
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.channels.ClosedReceiveChannelException
import kotlinx.coroutines.channels.ClosedSendChannelException
import net.sergeych.lyng.Scope
import net.sergeych.lyng.miniast.ParamDoc
import net.sergeych.lyng.miniast.addFnDoc
import net.sergeych.lyng.miniast.addPropertyDoc
import net.sergeych.lyng.miniast.type
/**
* Lyng-visible wrapper around Kotlin's [Channel].
*
* Construction:
* - `Channel()` rendezvous channel (capacity 0, sender and receiver synchronise)
* - `Channel(n)` buffered channel with capacity *n* (n > 0)
* - `Channel(Channel.UNLIMITED)` unlimited-buffer channel
*
* Methods:
* - `send(value)` suspend until the value is accepted by a receiver (or buffered)
* - `receive()` suspend until a value is available; returns `null` when channel is closed and drained
* - `tryReceive()` return the next value immediately, or `null` if none is available
* - `close()` signal that no more values will be sent; pending receivers can still drain buffered items
*
* Properties:
* - `isClosedForSend: Bool`
* - `isClosedForReceive: Bool`
*/
class ObjChannel(val channel: Channel<Obj>) : Obj() {
override val objClass get() = type
companion object {
val type = object : ObjClass("Channel") {
override suspend fun callOn(scope: Scope): Obj {
val capacity = scope.args.list.getOrNull(0)
?.let { (it as? ObjInt)?.value?.toInt() ?: scope.raiseIllegalArgument("Channel capacity must be an integer") }
?: Channel.RENDEZVOUS
return ObjChannel(Channel(capacity))
}
}.apply {
// Expose Channel.UNLIMITED as a static constant on the Channel class so scripts can write
// Channel(Channel.UNLIMITED).
addConst("UNLIMITED", ObjInt(Channel.UNLIMITED.toLong()))
addFnDoc(
name = "send",
doc = "Suspend until the value is accepted by a receiver (or placed into the buffer). " +
"Throws if the channel is already closed.",
params = listOf(ParamDoc("value", type("lyng.Any"))),
returns = type("lyng.Void"),
moduleName = "lyng.stdlib"
) {
val value = requiredArg<Obj>(0)
try {
thisAs<ObjChannel>().channel.send(value)
} catch (e: ClosedSendChannelException) {
raiseIllegalState("Channel is closed for send")
}
ObjVoid
}
addFnDoc(
name = "receive",
doc = "Suspend until a value is available and return it, or return `null` when the channel " +
"is closed and all buffered items have been consumed.",
returns = type("lyng.Any"),
moduleName = "lyng.stdlib"
) {
try {
thisAs<ObjChannel>().channel.receive()
} catch (_: ClosedReceiveChannelException) {
ObjNull
}
}
addFnDoc(
name = "tryReceive",
doc = "Return the next buffered value immediately without suspending, or `null` if the " +
"channel is empty or closed.",
returns = type("lyng.Any"),
moduleName = "lyng.stdlib"
) {
val result = thisAs<ObjChannel>().channel.tryReceive()
result.getOrNull() ?: ObjNull
}
addFnDoc(
name = "close",
doc = "Signal that no more values will be sent. Receivers can still drain any buffered items.",
returns = type("lyng.Void"),
moduleName = "lyng.stdlib"
) {
thisAs<ObjChannel>().channel.close()
ObjVoid
}
addPropertyDoc(
name = "isClosedForSend",
doc = "Whether this channel is closed for sending (no more `send` calls are permitted).",
type = type("lyng.Bool"),
moduleName = "lyng.stdlib",
getter = { thisAs<ObjChannel>().channel.isClosedForSend.toObj() }
)
addPropertyDoc(
name = "isClosedForReceive",
doc = "Whether this channel is closed for receiving (closed and fully drained).",
type = type("lyng.Bool"),
moduleName = "lyng.stdlib",
getter = { thisAs<ObjChannel>().channel.isClosedForReceive.toObj() }
)
}
}
}

View File

@ -0,0 +1,131 @@
/*
* Copyright 2026 Sergey S. Chernov real.sergeych@gmail.com
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
import kotlinx.coroutines.test.runTest
import net.sergeych.lyng.eval
import kotlin.test.Test
class ChannelTest {
@Test
fun testRendezvousChannel() = runTest {
eval("""
val ch = Channel()
val producer = launch {
ch.send(1)
ch.send(2)
ch.send(3)
ch.close()
}
val results = []
var v = ch.receive()
while (v != null) {
results += v
v = ch.receive()
}
assertEquals([1, 2, 3], results)
""".trimIndent())
}
@Test
fun testBufferedChannel() = runTest {
eval("""
val ch = Channel(4)
ch.send(10)
ch.send(20)
ch.send(30)
ch.close()
assertEquals(10, ch.receive())
assertEquals(20, ch.receive())
assertEquals(30, ch.receive())
// closed and drained
assertEquals(null, ch.receive())
""".trimIndent())
}
@Test
fun testTryReceive() = runTest {
eval("""
val ch = Channel(2)
// nothing buffered yet
assertEquals(null, ch.tryReceive())
ch.send(42)
assertEquals(42, ch.tryReceive())
assertEquals(null, ch.tryReceive())
""".trimIndent())
}
@Test
fun testChannelProperties() = runTest {
eval("""
val ch = Channel(1)
assert(!ch.isClosedForSend)
assert(!ch.isClosedForReceive)
ch.send(1)
ch.close()
assert(ch.isClosedForSend)
// still has one buffered item, so not yet closed for receive
assert(!ch.isClosedForReceive)
ch.receive()
// now fully drained
assert(ch.isClosedForReceive)
""".trimIndent())
}
@Test
fun testUnlimitedChannel() = runTest {
eval("""
val ch = Channel(Channel.UNLIMITED)
(1..100).forEach { ch.send(it) }
ch.close()
var sum = 0
var v = ch.receive()
while (v != null) {
sum += v
v = ch.receive()
}
assertEquals(5050, sum)
""".trimIndent())
}
@Test
fun testChannelProducerConsumerPattern() = runTest {
eval("""
val ch = Channel()
val results = []
val mu = Mutex()
val consumer = launch {
var item = ch.receive()
while (item != null) {
mu.withLock { results += item }
item = ch.receive()
}
}
launch {
for (i in 1..5) {
ch.send("msg:${'$'}i")
}
ch.close()
}.await()
consumer.await()
assertEquals(["msg:1","msg:2","msg:3","msg:4","msg:5"], results)
""".trimIndent())
}
}

View File

@ -236,7 +236,7 @@ suspend fun runDocTests(fileName: String, bookMode: Boolean = false) {
val bookScope = Scope() val bookScope = Scope()
var count = 0 var count = 0
parseDocTests(fileName, bookMode).collect { dt -> parseDocTests(fileName, bookMode).collect { dt ->
if (bookMode)imp dt.test(bookScope) if (bookMode) dt.test(bookScope)
else dt.test() else dt.test()
count++ count++
} }