diff --git a/docs/Channel.md b/docs/Channel.md new file mode 100644 index 0000000..9d6cbe7 --- /dev/null +++ b/docs/Channel.md @@ -0,0 +1,211 @@ +# Channel + +A `Channel` is a **hot, bidirectional pipe** for passing values between concurrently running coroutines. +Unlike a [Flow], which is cold and replayed on every collection, a `Channel` is stateful: each value +sent is consumed by exactly one receiver. + +Channels model the classic _producer / consumer_ pattern and are the right tool when: + +- two or more coroutines need to exchange individual values at their own pace; +- you want back-pressure (rendezvous) or explicit buffering control; +- you need a push-based, hot data source (opposite of the pull-based, cold [Flow]). + +## Constructors + +``` +Channel() // rendezvous — sender and receiver must meet +Channel(n: Int) // buffered — sender may run n items ahead of the receiver +Channel(Channel.UNLIMITED) // no limit on buffered items +``` + +**Rendezvous** (`Channel()`, capacity 0): `send` suspends until a matching `receive` is ready, +and vice-versa. This gives the tightest synchronisation and the smallest memory footprint. + +**Buffered** (`Channel(n)`): `send` only suspends when the internal buffer is full. Allows the +producer to get up to _n_ items ahead of the consumer. + +**Unlimited** (`Channel(Channel.UNLIMITED)`): `send` never suspends. Useful when the producer is +bursty and you do not want it blocked, but be careful not to grow the buffer without bound. + +## Sending and receiving + +```lyng +val ch = Channel() // rendezvous channel + +val producer = launch { + ch.send("hello") // suspends until the receiver is ready + ch.send("world") + ch.close() // signal: no more values +} + +val a = ch.receive() // suspends until "hello" arrives +val b = ch.receive() // suspends until "world" arrives +val c = ch.receive() // channel is closed and drained → null +assertEquals("hello", a) +assertEquals("world", b) +assertEquals(null, c) +``` + +`receive()` returns `null` when the channel is both **closed** _and_ **fully drained** — that is +the idiomatic loop termination condition: + +```lyng +val ch = Channel(4) + +launch { + for (i in 1..5) ch.send(i) + ch.close() +} + +var item = ch.receive() +while (item != null) { + println(item) + item = ch.receive() +} +``` + +## Non-suspending poll + +`tryReceive()` never suspends. It returns the next buffered value, or `null` if the buffer is +empty or the channel is closed. + +```lyng +val ch = Channel(8) +ch.send(42) +println(ch.tryReceive()) // 42 +println(ch.tryReceive()) // null — nothing buffered right now +``` + +Use `tryReceive` for _polling_ patterns where blocking would be unacceptable, for example when +combining channel checks with other work inside a coroutine loop. + +## Closing a channel + +`close()` marks the channel so that no further `send` calls are accepted. Any items already in the +buffer can still be received. Once the buffer is drained, `receive()` returns `null` and +`isClosedForReceive` becomes `true`. + +```lyng +val ch = Channel(2) +ch.send(1) +ch.send(2) +ch.close() + +assert(ch.isClosedForSend) +assert(!ch.isClosedForReceive) // still has 2 buffered items + +ch.receive() // 1 +ch.receive() // 2 +assert(ch.isClosedForReceive) // drained +``` + +Calling `send` after `close()` throws `IllegalStateException`. + +## Properties + +| property | type | description | +|---------------------|--------|----------------------------------------------------------| +| `isClosedForSend` | `Bool` | `true` after `close()` is called | +| `isClosedForReceive`| `Bool` | `true` when closed _and_ every buffered item is consumed | + +## Methods + +| method | suspends | description | +|-----------------|----------|----------------------------------------------------------------------------------| +| `send(value)` | yes | send a value; suspends when buffer full (rendezvous: always until partner ready) | +| `receive()` | yes | receive next value; suspends when empty; returns `null` when closed + drained | +| `tryReceive()` | no | return next buffered value or `null`; never suspends | +| `close()` | no | signal end of production; existing buffer items are still receivable | + +## Static constants + +| constant | value | description | +|---------------------|------------------|-------------------------------------| +| `Channel.UNLIMITED` | `Int.MAX_VALUE` | capacity for an unlimited-buffer channel | + +## Common patterns + +### Producer / consumer + +```lyng +val ch = Channel() +val results = [] +val mu = Mutex() + +val consumer = launch { + var item = ch.receive() + while (item != null) { + mu.withLock { results += item } + item = ch.receive() + } +} + +launch { + for (i in 1..5) ch.send("msg:$i") + ch.close() +}.await() + +consumer.await() +println(results) +``` + +### Fan-out: one channel, many consumers + +```lyng +val ch = Channel(16) + +// multiple consumers +val workers = (1..4).map { id -> + launch { + var task = ch.receive() + while (task != null) { + println("worker $id handles $task") + task = ch.receive() + } + } +} + +// single producer +for (i in 1..20) ch.send(i) +ch.close() + +workers.forEach { it.await() } +``` + +### Ping-pong between two coroutines + +```lyng +val ping = Channel() +val pong = Channel() + +launch { + repeat(3) { + val msg = ping.receive() + println("got: $msg → sending pong") + pong.send("pong") + } +} + +repeat(3) { + ping.send("ping") + println(pong.receive()) +} +``` + +## Channel vs Flow + +| | [Flow] | Channel | +|---|---|---| +| **temperature** | cold (lazy) | hot (eager) | +| **replay** | every collector gets a fresh run | each item is consumed once | +| **consumers** | any number; each gets all items | one receiver per item | +| **back-pressure** | built-in via rendezvous | configurable (rendezvous / buffered / unlimited) | +| **typical use** | transform pipelines, sequences | producer–consumer, fan-out | + +## See also + +- [parallelism] — `launch`, `Deferred`, `Mutex`, `Flow`, and the full concurrency picture +- [Flow] — cold async sequences + +[Flow]: parallelism.md#flow +[parallelism]: parallelism.md diff --git a/docs/parallelism.md b/docs/parallelism.md index 08f8cb3..57cfa2c 100644 --- a/docs/parallelism.md +++ b/docs/parallelism.md @@ -219,6 +219,58 @@ Flows allow easy transforming of any [Iterable]. See how the standard Lyng libra } } +## Channel + +A [Channel] is a **hot pipe** between coroutines: values are pushed in by a producer and pulled out by a consumer, with each value consumed exactly once. + +Unlike a `Flow` (which is cold and re-runs its generator on every collection), a `Channel` is stateful — the right tool for classic _producer / consumer_ work. + + val ch = Channel() // rendezvous: sender waits for receiver + + val producer = launch { + for (i in 1..5) ch.send(i) + ch.close() // signal: no more values + } + + var item = ch.receive() // suspends until a value is ready + while (item != null) { + println(item) + item = ch.receive() + } + >>> 1 + >>> 2 + >>> 3 + >>> 4 + >>> 5 + >>> void + +`receive()` returns `null` when the channel is both closed and fully drained — that is the idiomatic loop termination condition. + +Channels can also be buffered so the producer can run ahead: + + val ch = Channel(4) // buffer up to 4 items without blocking + + ch.send(10) + ch.send(20) + ch.send(30) + ch.close() + + assertEquals(10, ch.receive()) + assertEquals(20, ch.receive()) + assertEquals(30, ch.receive()) + assertEquals(null, ch.receive()) // drained + >>> void + +For the full API — including `tryReceive`, `Channel.UNLIMITED`, and the fan-out / ping-pong patterns — see the [Channel] reference page. + +| | Flow | Channel | +|---|---|---| +| **temperature** | cold (lazy) | hot (eager) | +| **replay** | every collector gets a fresh run | each item consumed once | +| **consumers** | any number, each gets all items | one receiver per item | +| **typical use** | transform pipelines, sequences | producer–consumer, fan-out | + +[Channel]: Channel.md [Iterable]: Iterable.md diff --git a/lynglib/src/commonMain/kotlin/net/sergeych/lyng/Script.kt b/lynglib/src/commonMain/kotlin/net/sergeych/lyng/Script.kt index 8f6ffa2..279d0b8 100644 --- a/lynglib/src/commonMain/kotlin/net/sergeych/lyng/Script.kt +++ b/lynglib/src/commonMain/kotlin/net/sergeych/lyng/Script.kt @@ -634,6 +634,7 @@ class Script( addConst("Deferred", ObjDeferred.type) addConst("CompletableDeferred", ObjCompletableDeferred.type) addConst("Mutex", ObjMutex.type) + addConst("Channel", ObjChannel.type) addConst("Flow", ObjFlow.type) addConst("FlowBuilder", ObjFlowBuilder.type) addConst("Delegate", ObjDynamic.type) diff --git a/lynglib/src/commonMain/kotlin/net/sergeych/lyng/obj/ObjChannel.kt b/lynglib/src/commonMain/kotlin/net/sergeych/lyng/obj/ObjChannel.kt new file mode 100644 index 0000000..be68542 --- /dev/null +++ b/lynglib/src/commonMain/kotlin/net/sergeych/lyng/obj/ObjChannel.kt @@ -0,0 +1,133 @@ +/* + * Copyright 2026 Sergey S. Chernov real.sergeych@gmail.com + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package net.sergeych.lyng.obj + +import kotlinx.coroutines.channels.Channel +import kotlinx.coroutines.channels.ClosedReceiveChannelException +import kotlinx.coroutines.channels.ClosedSendChannelException +import net.sergeych.lyng.Scope +import net.sergeych.lyng.miniast.ParamDoc +import net.sergeych.lyng.miniast.addFnDoc +import net.sergeych.lyng.miniast.addPropertyDoc +import net.sergeych.lyng.miniast.type + +/** + * Lyng-visible wrapper around Kotlin's [Channel]. + * + * Construction: + * - `Channel()` – rendezvous channel (capacity 0, sender and receiver synchronise) + * - `Channel(n)` – buffered channel with capacity *n* (n > 0) + * - `Channel(Channel.UNLIMITED)` – unlimited-buffer channel + * + * Methods: + * - `send(value)` – suspend until the value is accepted by a receiver (or buffered) + * - `receive()` – suspend until a value is available; returns `null` when channel is closed and drained + * - `tryReceive()` – return the next value immediately, or `null` if none is available + * - `close()` – signal that no more values will be sent; pending receivers can still drain buffered items + * + * Properties: + * - `isClosedForSend: Bool` + * - `isClosedForReceive: Bool` + */ +class ObjChannel(val channel: Channel) : Obj() { + + override val objClass get() = type + + companion object { + val type = object : ObjClass("Channel") { + override suspend fun callOn(scope: Scope): Obj { + val capacity = scope.args.list.getOrNull(0) + ?.let { (it as? ObjInt)?.value?.toInt() ?: scope.raiseIllegalArgument("Channel capacity must be an integer") } + ?: Channel.RENDEZVOUS + return ObjChannel(Channel(capacity)) + } + }.apply { + // Expose Channel.UNLIMITED as a static constant on the Channel class so scripts can write + // Channel(Channel.UNLIMITED). + addConst("UNLIMITED", ObjInt(Channel.UNLIMITED.toLong())) + + addFnDoc( + name = "send", + doc = "Suspend until the value is accepted by a receiver (or placed into the buffer). " + + "Throws if the channel is already closed.", + params = listOf(ParamDoc("value", type("lyng.Any"))), + returns = type("lyng.Void"), + moduleName = "lyng.stdlib" + ) { + val value = requiredArg(0) + try { + thisAs().channel.send(value) + } catch (e: ClosedSendChannelException) { + raiseIllegalState("Channel is closed for send") + } + ObjVoid + } + + addFnDoc( + name = "receive", + doc = "Suspend until a value is available and return it, or return `null` when the channel " + + "is closed and all buffered items have been consumed.", + returns = type("lyng.Any"), + moduleName = "lyng.stdlib" + ) { + try { + thisAs().channel.receive() + } catch (_: ClosedReceiveChannelException) { + ObjNull + } + } + + addFnDoc( + name = "tryReceive", + doc = "Return the next buffered value immediately without suspending, or `null` if the " + + "channel is empty or closed.", + returns = type("lyng.Any"), + moduleName = "lyng.stdlib" + ) { + val result = thisAs().channel.tryReceive() + result.getOrNull() ?: ObjNull + } + + addFnDoc( + name = "close", + doc = "Signal that no more values will be sent. Receivers can still drain any buffered items.", + returns = type("lyng.Void"), + moduleName = "lyng.stdlib" + ) { + thisAs().channel.close() + ObjVoid + } + + addPropertyDoc( + name = "isClosedForSend", + doc = "Whether this channel is closed for sending (no more `send` calls are permitted).", + type = type("lyng.Bool"), + moduleName = "lyng.stdlib", + getter = { thisAs().channel.isClosedForSend.toObj() } + ) + + addPropertyDoc( + name = "isClosedForReceive", + doc = "Whether this channel is closed for receiving (closed and fully drained).", + type = type("lyng.Bool"), + moduleName = "lyng.stdlib", + getter = { thisAs().channel.isClosedForReceive.toObj() } + ) + } + } +} diff --git a/lynglib/src/commonTest/kotlin/ChannelTest.kt b/lynglib/src/commonTest/kotlin/ChannelTest.kt new file mode 100644 index 0000000..c4d73ed --- /dev/null +++ b/lynglib/src/commonTest/kotlin/ChannelTest.kt @@ -0,0 +1,131 @@ +/* + * Copyright 2026 Sergey S. Chernov real.sergeych@gmail.com + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +import kotlinx.coroutines.test.runTest +import net.sergeych.lyng.eval +import kotlin.test.Test + +class ChannelTest { + + @Test + fun testRendezvousChannel() = runTest { + eval(""" + val ch = Channel() + val producer = launch { + ch.send(1) + ch.send(2) + ch.send(3) + ch.close() + } + val results = [] + var v = ch.receive() + while (v != null) { + results += v + v = ch.receive() + } + assertEquals([1, 2, 3], results) + """.trimIndent()) + } + + @Test + fun testBufferedChannel() = runTest { + eval(""" + val ch = Channel(4) + ch.send(10) + ch.send(20) + ch.send(30) + ch.close() + assertEquals(10, ch.receive()) + assertEquals(20, ch.receive()) + assertEquals(30, ch.receive()) + // closed and drained + assertEquals(null, ch.receive()) + """.trimIndent()) + } + + @Test + fun testTryReceive() = runTest { + eval(""" + val ch = Channel(2) + // nothing buffered yet + assertEquals(null, ch.tryReceive()) + ch.send(42) + assertEquals(42, ch.tryReceive()) + assertEquals(null, ch.tryReceive()) + """.trimIndent()) + } + + @Test + fun testChannelProperties() = runTest { + eval(""" + val ch = Channel(1) + assert(!ch.isClosedForSend) + assert(!ch.isClosedForReceive) + ch.send(1) + ch.close() + assert(ch.isClosedForSend) + // still has one buffered item, so not yet closed for receive + assert(!ch.isClosedForReceive) + ch.receive() + // now fully drained + assert(ch.isClosedForReceive) + """.trimIndent()) + } + + @Test + fun testUnlimitedChannel() = runTest { + eval(""" + val ch = Channel(Channel.UNLIMITED) + (1..100).forEach { ch.send(it) } + ch.close() + var sum = 0 + var v = ch.receive() + while (v != null) { + sum += v + v = ch.receive() + } + assertEquals(5050, sum) + """.trimIndent()) + } + + @Test + fun testChannelProducerConsumerPattern() = runTest { + eval(""" + val ch = Channel() + val results = [] + val mu = Mutex() + + val consumer = launch { + var item = ch.receive() + while (item != null) { + mu.withLock { results += item } + item = ch.receive() + } + } + + launch { + for (i in 1..5) { + ch.send("msg:${'$'}i") + } + ch.close() + }.await() + + consumer.await() + assertEquals(["msg:1","msg:2","msg:3","msg:4","msg:5"], results) + """.trimIndent()) + } +} diff --git a/lynglib/src/jvmTest/kotlin/BookTest.kt b/lynglib/src/jvmTest/kotlin/BookTest.kt index c4d48e5..ddacdac 100644 --- a/lynglib/src/jvmTest/kotlin/BookTest.kt +++ b/lynglib/src/jvmTest/kotlin/BookTest.kt @@ -236,7 +236,7 @@ suspend fun runDocTests(fileName: String, bookMode: Boolean = false) { val bookScope = Scope() var count = 0 parseDocTests(fileName, bookMode).collect { dt -> - if (bookMode)imp dt.test(bookScope) + if (bookMode) dt.test(bookScope) else dt.test() count++ }