Compare commits
No commits in common. "145c3ae34accdfc47f87842defdd4f2fc24776b5" and "30e56946a011081fa38aa1c20f1c78e8a5c58e8a" have entirely different histories.
145c3ae34a
...
30e56946a0
211
docs/Channel.md
211
docs/Channel.md
@ -1,211 +0,0 @@
|
|||||||
# Channel
|
|
||||||
|
|
||||||
A `Channel` is a **hot, bidirectional pipe** for passing values between concurrently running coroutines.
|
|
||||||
Unlike a [Flow], which is cold and replayed on every collection, a `Channel` is stateful: each value
|
|
||||||
sent is consumed by exactly one receiver.
|
|
||||||
|
|
||||||
Channels model the classic _producer / consumer_ pattern and are the right tool when:
|
|
||||||
|
|
||||||
- two or more coroutines need to exchange individual values at their own pace;
|
|
||||||
- you want back-pressure (rendezvous) or explicit buffering control;
|
|
||||||
- you need a push-based, hot data source (opposite of the pull-based, cold [Flow]).
|
|
||||||
|
|
||||||
## Constructors
|
|
||||||
|
|
||||||
```
|
|
||||||
Channel() // rendezvous — sender and receiver must meet
|
|
||||||
Channel(n: Int) // buffered — sender may run n items ahead of the receiver
|
|
||||||
Channel(Channel.UNLIMITED) // no limit on buffered items
|
|
||||||
```
|
|
||||||
|
|
||||||
**Rendezvous** (`Channel()`, capacity 0): `send` suspends until a matching `receive` is ready,
|
|
||||||
and vice-versa. This gives the tightest synchronisation and the smallest memory footprint.
|
|
||||||
|
|
||||||
**Buffered** (`Channel(n)`): `send` only suspends when the internal buffer is full. Allows the
|
|
||||||
producer to get up to _n_ items ahead of the consumer.
|
|
||||||
|
|
||||||
**Unlimited** (`Channel(Channel.UNLIMITED)`): `send` never suspends. Useful when the producer is
|
|
||||||
bursty and you do not want it blocked, but be careful not to grow the buffer without bound.
|
|
||||||
|
|
||||||
## Sending and receiving
|
|
||||||
|
|
||||||
```lyng
|
|
||||||
val ch = Channel() // rendezvous channel
|
|
||||||
|
|
||||||
val producer = launch {
|
|
||||||
ch.send("hello") // suspends until the receiver is ready
|
|
||||||
ch.send("world")
|
|
||||||
ch.close() // signal: no more values
|
|
||||||
}
|
|
||||||
|
|
||||||
val a = ch.receive() // suspends until "hello" arrives
|
|
||||||
val b = ch.receive() // suspends until "world" arrives
|
|
||||||
val c = ch.receive() // channel is closed and drained → null
|
|
||||||
assertEquals("hello", a)
|
|
||||||
assertEquals("world", b)
|
|
||||||
assertEquals(null, c)
|
|
||||||
```
|
|
||||||
|
|
||||||
`receive()` returns `null` when the channel is both **closed** _and_ **fully drained** — that is
|
|
||||||
the idiomatic loop termination condition:
|
|
||||||
|
|
||||||
```lyng
|
|
||||||
val ch = Channel(4)
|
|
||||||
|
|
||||||
launch {
|
|
||||||
for (i in 1..5) ch.send(i)
|
|
||||||
ch.close()
|
|
||||||
}
|
|
||||||
|
|
||||||
var item = ch.receive()
|
|
||||||
while (item != null) {
|
|
||||||
println(item)
|
|
||||||
item = ch.receive()
|
|
||||||
}
|
|
||||||
```
|
|
||||||
|
|
||||||
## Non-suspending poll
|
|
||||||
|
|
||||||
`tryReceive()` never suspends. It returns the next buffered value, or `null` if the buffer is
|
|
||||||
empty or the channel is closed.
|
|
||||||
|
|
||||||
```lyng
|
|
||||||
val ch = Channel(8)
|
|
||||||
ch.send(42)
|
|
||||||
println(ch.tryReceive()) // 42
|
|
||||||
println(ch.tryReceive()) // null — nothing buffered right now
|
|
||||||
```
|
|
||||||
|
|
||||||
Use `tryReceive` for _polling_ patterns where blocking would be unacceptable, for example when
|
|
||||||
combining channel checks with other work inside a coroutine loop.
|
|
||||||
|
|
||||||
## Closing a channel
|
|
||||||
|
|
||||||
`close()` marks the channel so that no further `send` calls are accepted. Any items already in the
|
|
||||||
buffer can still be received. Once the buffer is drained, `receive()` returns `null` and
|
|
||||||
`isClosedForReceive` becomes `true`.
|
|
||||||
|
|
||||||
```lyng
|
|
||||||
val ch = Channel(2)
|
|
||||||
ch.send(1)
|
|
||||||
ch.send(2)
|
|
||||||
ch.close()
|
|
||||||
|
|
||||||
assert(ch.isClosedForSend)
|
|
||||||
assert(!ch.isClosedForReceive) // still has 2 buffered items
|
|
||||||
|
|
||||||
ch.receive() // 1
|
|
||||||
ch.receive() // 2
|
|
||||||
assert(ch.isClosedForReceive) // drained
|
|
||||||
```
|
|
||||||
|
|
||||||
Calling `send` after `close()` throws `IllegalStateException`.
|
|
||||||
|
|
||||||
## Properties
|
|
||||||
|
|
||||||
| property | type | description |
|
|
||||||
|---------------------|--------|----------------------------------------------------------|
|
|
||||||
| `isClosedForSend` | `Bool` | `true` after `close()` is called |
|
|
||||||
| `isClosedForReceive`| `Bool` | `true` when closed _and_ every buffered item is consumed |
|
|
||||||
|
|
||||||
## Methods
|
|
||||||
|
|
||||||
| method | suspends | description |
|
|
||||||
|-----------------|----------|----------------------------------------------------------------------------------|
|
|
||||||
| `send(value)` | yes | send a value; suspends when buffer full (rendezvous: always until partner ready) |
|
|
||||||
| `receive()` | yes | receive next value; suspends when empty; returns `null` when closed + drained |
|
|
||||||
| `tryReceive()` | no | return next buffered value or `null`; never suspends |
|
|
||||||
| `close()` | no | signal end of production; existing buffer items are still receivable |
|
|
||||||
|
|
||||||
## Static constants
|
|
||||||
|
|
||||||
| constant | value | description |
|
|
||||||
|---------------------|------------------|-------------------------------------|
|
|
||||||
| `Channel.UNLIMITED` | `Int.MAX_VALUE` | capacity for an unlimited-buffer channel |
|
|
||||||
|
|
||||||
## Common patterns
|
|
||||||
|
|
||||||
### Producer / consumer
|
|
||||||
|
|
||||||
```lyng
|
|
||||||
val ch = Channel()
|
|
||||||
val results = []
|
|
||||||
val mu = Mutex()
|
|
||||||
|
|
||||||
val consumer = launch {
|
|
||||||
var item = ch.receive()
|
|
||||||
while (item != null) {
|
|
||||||
mu.withLock { results += item }
|
|
||||||
item = ch.receive()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
launch {
|
|
||||||
for (i in 1..5) ch.send("msg:$i")
|
|
||||||
ch.close()
|
|
||||||
}.await()
|
|
||||||
|
|
||||||
consumer.await()
|
|
||||||
println(results)
|
|
||||||
```
|
|
||||||
|
|
||||||
### Fan-out: one channel, many consumers
|
|
||||||
|
|
||||||
```lyng
|
|
||||||
val ch = Channel(16)
|
|
||||||
|
|
||||||
// multiple consumers
|
|
||||||
val workers = (1..4).map { id ->
|
|
||||||
launch {
|
|
||||||
var task = ch.receive()
|
|
||||||
while (task != null) {
|
|
||||||
println("worker $id handles $task")
|
|
||||||
task = ch.receive()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// single producer
|
|
||||||
for (i in 1..20) ch.send(i)
|
|
||||||
ch.close()
|
|
||||||
|
|
||||||
workers.forEach { it.await() }
|
|
||||||
```
|
|
||||||
|
|
||||||
### Ping-pong between two coroutines
|
|
||||||
|
|
||||||
```lyng
|
|
||||||
val ping = Channel()
|
|
||||||
val pong = Channel()
|
|
||||||
|
|
||||||
launch {
|
|
||||||
repeat(3) {
|
|
||||||
val msg = ping.receive()
|
|
||||||
println("got: $msg → sending pong")
|
|
||||||
pong.send("pong")
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
repeat(3) {
|
|
||||||
ping.send("ping")
|
|
||||||
println(pong.receive())
|
|
||||||
}
|
|
||||||
```
|
|
||||||
|
|
||||||
## Channel vs Flow
|
|
||||||
|
|
||||||
| | [Flow] | Channel |
|
|
||||||
|---|---|---|
|
|
||||||
| **temperature** | cold (lazy) | hot (eager) |
|
|
||||||
| **replay** | every collector gets a fresh run | each item is consumed once |
|
|
||||||
| **consumers** | any number; each gets all items | one receiver per item |
|
|
||||||
| **back-pressure** | built-in via rendezvous | configurable (rendezvous / buffered / unlimited) |
|
|
||||||
| **typical use** | transform pipelines, sequences | producer–consumer, fan-out |
|
|
||||||
|
|
||||||
## See also
|
|
||||||
|
|
||||||
- [parallelism] — `launch`, `Deferred`, `Mutex`, `Flow`, and the full concurrency picture
|
|
||||||
- [Flow] — cold async sequences
|
|
||||||
|
|
||||||
[Flow]: parallelism.md#flow
|
|
||||||
[parallelism]: parallelism.md
|
|
||||||
@ -219,52 +219,6 @@ Flows allow easy transforming of any [Iterable]. See how the standard Lyng libra
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
## Channel
|
|
||||||
|
|
||||||
A [Channel] is a **hot pipe** between coroutines: values are pushed in by a producer and pulled out by a consumer, with each value consumed exactly once.
|
|
||||||
|
|
||||||
Unlike a `Flow` (which is cold and re-runs its generator on every collection), a `Channel` is stateful — the right tool for classic _producer / consumer_ work.
|
|
||||||
|
|
||||||
val ch = Channel() // rendezvous: sender waits for receiver
|
|
||||||
|
|
||||||
val producer = launch {
|
|
||||||
for (i in 1..5) ch.send(i)
|
|
||||||
ch.close() // signal: no more values
|
|
||||||
}
|
|
||||||
|
|
||||||
var item = ch.receive() // suspends until a value is ready
|
|
||||||
while (item != null) {
|
|
||||||
println(item)
|
|
||||||
item = ch.receive()
|
|
||||||
}
|
|
||||||
// prints 1 2 3 4 5
|
|
||||||
|
|
||||||
`receive()` returns `null` when the channel is both closed and fully drained — that is the idiomatic loop termination condition.
|
|
||||||
|
|
||||||
Channels can also be buffered so the producer can run ahead:
|
|
||||||
|
|
||||||
val ch = Channel(4) // buffer up to 4 items without blocking
|
|
||||||
|
|
||||||
ch.send(10)
|
|
||||||
ch.send(20)
|
|
||||||
ch.send(30)
|
|
||||||
ch.close()
|
|
||||||
|
|
||||||
assertEquals(10, ch.receive())
|
|
||||||
assertEquals(20, ch.receive())
|
|
||||||
assertEquals(30, ch.receive())
|
|
||||||
assertEquals(null, ch.receive()) // drained
|
|
||||||
|
|
||||||
For the full API — including `tryReceive`, `Channel.UNLIMITED`, and the fan-out / ping-pong patterns — see the [Channel] reference page.
|
|
||||||
|
|
||||||
| | Flow | Channel |
|
|
||||||
|---|---|---|
|
|
||||||
| **temperature** | cold (lazy) | hot (eager) |
|
|
||||||
| **replay** | every collector gets a fresh run | each item consumed once |
|
|
||||||
| **consumers** | any number, each gets all items | one receiver per item |
|
|
||||||
| **typical use** | transform pipelines, sequences | producer–consumer, fan-out |
|
|
||||||
|
|
||||||
[Channel]: Channel.md
|
|
||||||
|
|
||||||
[Iterable]: Iterable.md
|
[Iterable]: Iterable.md
|
||||||
|
|
||||||
|
|||||||
@ -634,7 +634,6 @@ class Script(
|
|||||||
addConst("Deferred", ObjDeferred.type)
|
addConst("Deferred", ObjDeferred.type)
|
||||||
addConst("CompletableDeferred", ObjCompletableDeferred.type)
|
addConst("CompletableDeferred", ObjCompletableDeferred.type)
|
||||||
addConst("Mutex", ObjMutex.type)
|
addConst("Mutex", ObjMutex.type)
|
||||||
addConst("Channel", ObjChannel.type)
|
|
||||||
addConst("Flow", ObjFlow.type)
|
addConst("Flow", ObjFlow.type)
|
||||||
addConst("FlowBuilder", ObjFlowBuilder.type)
|
addConst("FlowBuilder", ObjFlowBuilder.type)
|
||||||
addConst("Delegate", ObjDynamic.type)
|
addConst("Delegate", ObjDynamic.type)
|
||||||
|
|||||||
@ -1,133 +0,0 @@
|
|||||||
/*
|
|
||||||
* Copyright 2026 Sergey S. Chernov real.sergeych@gmail.com
|
|
||||||
*
|
|
||||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
|
||||||
* you may not use this file except in compliance with the License.
|
|
||||||
* You may obtain a copy of the License at
|
|
||||||
*
|
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
*
|
|
||||||
* Unless required by applicable law or agreed to in writing, software
|
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
* See the License for the specific language governing permissions and
|
|
||||||
* limitations under the License.
|
|
||||||
*
|
|
||||||
*/
|
|
||||||
|
|
||||||
package net.sergeych.lyng.obj
|
|
||||||
|
|
||||||
import kotlinx.coroutines.channels.Channel
|
|
||||||
import kotlinx.coroutines.channels.ClosedReceiveChannelException
|
|
||||||
import kotlinx.coroutines.channels.ClosedSendChannelException
|
|
||||||
import net.sergeych.lyng.Scope
|
|
||||||
import net.sergeych.lyng.miniast.ParamDoc
|
|
||||||
import net.sergeych.lyng.miniast.addFnDoc
|
|
||||||
import net.sergeych.lyng.miniast.addPropertyDoc
|
|
||||||
import net.sergeych.lyng.miniast.type
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Lyng-visible wrapper around Kotlin's [Channel].
|
|
||||||
*
|
|
||||||
* Construction:
|
|
||||||
* - `Channel()` – rendezvous channel (capacity 0, sender and receiver synchronise)
|
|
||||||
* - `Channel(n)` – buffered channel with capacity *n* (n > 0)
|
|
||||||
* - `Channel(Channel.UNLIMITED)` – unlimited-buffer channel
|
|
||||||
*
|
|
||||||
* Methods:
|
|
||||||
* - `send(value)` – suspend until the value is accepted by a receiver (or buffered)
|
|
||||||
* - `receive()` – suspend until a value is available; returns `null` when channel is closed and drained
|
|
||||||
* - `tryReceive()` – return the next value immediately, or `null` if none is available
|
|
||||||
* - `close()` – signal that no more values will be sent; pending receivers can still drain buffered items
|
|
||||||
*
|
|
||||||
* Properties:
|
|
||||||
* - `isClosedForSend: Bool`
|
|
||||||
* - `isClosedForReceive: Bool`
|
|
||||||
*/
|
|
||||||
class ObjChannel(val channel: Channel<Obj>) : Obj() {
|
|
||||||
|
|
||||||
override val objClass get() = type
|
|
||||||
|
|
||||||
companion object {
|
|
||||||
val type = object : ObjClass("Channel") {
|
|
||||||
override suspend fun callOn(scope: Scope): Obj {
|
|
||||||
val capacity = scope.args.list.getOrNull(0)
|
|
||||||
?.let { (it as? ObjInt)?.value?.toInt() ?: scope.raiseIllegalArgument("Channel capacity must be an integer") }
|
|
||||||
?: Channel.RENDEZVOUS
|
|
||||||
return ObjChannel(Channel(capacity))
|
|
||||||
}
|
|
||||||
}.apply {
|
|
||||||
// Expose Channel.UNLIMITED as a static constant on the Channel class so scripts can write
|
|
||||||
// Channel(Channel.UNLIMITED).
|
|
||||||
addConst("UNLIMITED", ObjInt(Channel.UNLIMITED.toLong()))
|
|
||||||
|
|
||||||
addFnDoc(
|
|
||||||
name = "send",
|
|
||||||
doc = "Suspend until the value is accepted by a receiver (or placed into the buffer). " +
|
|
||||||
"Throws if the channel is already closed.",
|
|
||||||
params = listOf(ParamDoc("value", type("lyng.Any"))),
|
|
||||||
returns = type("lyng.Void"),
|
|
||||||
moduleName = "lyng.stdlib"
|
|
||||||
) {
|
|
||||||
val value = requiredArg<Obj>(0)
|
|
||||||
try {
|
|
||||||
thisAs<ObjChannel>().channel.send(value)
|
|
||||||
} catch (e: ClosedSendChannelException) {
|
|
||||||
raiseIllegalState("Channel is closed for send")
|
|
||||||
}
|
|
||||||
ObjVoid
|
|
||||||
}
|
|
||||||
|
|
||||||
addFnDoc(
|
|
||||||
name = "receive",
|
|
||||||
doc = "Suspend until a value is available and return it, or return `null` when the channel " +
|
|
||||||
"is closed and all buffered items have been consumed.",
|
|
||||||
returns = type("lyng.Any"),
|
|
||||||
moduleName = "lyng.stdlib"
|
|
||||||
) {
|
|
||||||
try {
|
|
||||||
thisAs<ObjChannel>().channel.receive()
|
|
||||||
} catch (_: ClosedReceiveChannelException) {
|
|
||||||
ObjNull
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
addFnDoc(
|
|
||||||
name = "tryReceive",
|
|
||||||
doc = "Return the next buffered value immediately without suspending, or `null` if the " +
|
|
||||||
"channel is empty or closed.",
|
|
||||||
returns = type("lyng.Any"),
|
|
||||||
moduleName = "lyng.stdlib"
|
|
||||||
) {
|
|
||||||
val result = thisAs<ObjChannel>().channel.tryReceive()
|
|
||||||
result.getOrNull() ?: ObjNull
|
|
||||||
}
|
|
||||||
|
|
||||||
addFnDoc(
|
|
||||||
name = "close",
|
|
||||||
doc = "Signal that no more values will be sent. Receivers can still drain any buffered items.",
|
|
||||||
returns = type("lyng.Void"),
|
|
||||||
moduleName = "lyng.stdlib"
|
|
||||||
) {
|
|
||||||
thisAs<ObjChannel>().channel.close()
|
|
||||||
ObjVoid
|
|
||||||
}
|
|
||||||
|
|
||||||
addPropertyDoc(
|
|
||||||
name = "isClosedForSend",
|
|
||||||
doc = "Whether this channel is closed for sending (no more `send` calls are permitted).",
|
|
||||||
type = type("lyng.Bool"),
|
|
||||||
moduleName = "lyng.stdlib",
|
|
||||||
getter = { thisAs<ObjChannel>().channel.isClosedForSend.toObj() }
|
|
||||||
)
|
|
||||||
|
|
||||||
addPropertyDoc(
|
|
||||||
name = "isClosedForReceive",
|
|
||||||
doc = "Whether this channel is closed for receiving (closed and fully drained).",
|
|
||||||
type = type("lyng.Bool"),
|
|
||||||
moduleName = "lyng.stdlib",
|
|
||||||
getter = { thisAs<ObjChannel>().channel.isClosedForReceive.toObj() }
|
|
||||||
)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
@ -18,7 +18,6 @@
|
|||||||
package net.sergeych.lyng.obj
|
package net.sergeych.lyng.obj
|
||||||
|
|
||||||
import kotlinx.coroutines.CompletableDeferred
|
import kotlinx.coroutines.CompletableDeferred
|
||||||
import net.sergeych.lyng.ExecutionError
|
|
||||||
import net.sergeych.lyng.Scope
|
import net.sergeych.lyng.Scope
|
||||||
import net.sergeych.lyng.miniast.ParamDoc
|
import net.sergeych.lyng.miniast.ParamDoc
|
||||||
import net.sergeych.lyng.miniast.addFnDoc
|
import net.sergeych.lyng.miniast.addFnDoc
|
||||||
@ -44,30 +43,6 @@ class ObjCompletableDeferred(val completableDeferred: CompletableDeferred<Obj>):
|
|||||||
thisAs<ObjCompletableDeferred>().completableDeferred.complete(args.firstAndOnly())
|
thisAs<ObjCompletableDeferred>().completableDeferred.complete(args.firstAndOnly())
|
||||||
ObjVoid
|
ObjVoid
|
||||||
}
|
}
|
||||||
addFnDoc(
|
|
||||||
name = "completeExceptionally",
|
|
||||||
doc = "Fail this deferred with the given exception. Awaiting it will then throw that exception. " +
|
|
||||||
"Subsequent calls have no effect. The argument must be an `Exception` instance.",
|
|
||||||
params = listOf(ParamDoc("exception", type("lyng.Exception"))),
|
|
||||||
returns = type("lyng.Void"),
|
|
||||||
moduleName = "lyng.stdlib"
|
|
||||||
) {
|
|
||||||
val ex = requiredArg<Obj>(0)
|
|
||||||
val scope = requireScope()
|
|
||||||
val msg = when (ex) {
|
|
||||||
is ObjException -> ex.message.value
|
|
||||||
else -> ex.toString(scope).value
|
|
||||||
}
|
|
||||||
val pos = when (ex) {
|
|
||||||
is ObjException -> ex.scope.pos
|
|
||||||
else -> scope.pos
|
|
||||||
}
|
|
||||||
// Always carry the original Lyng object as errorObject so that
|
|
||||||
// assertThrows / catch clauses see the correct exception class.
|
|
||||||
val cause = ExecutionError(ex, pos, msg)
|
|
||||||
thisAs<ObjCompletableDeferred>().completableDeferred.completeExceptionally(cause)
|
|
||||||
ObjVoid
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -1,131 +0,0 @@
|
|||||||
/*
|
|
||||||
* Copyright 2026 Sergey S. Chernov real.sergeych@gmail.com
|
|
||||||
*
|
|
||||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
|
||||||
* you may not use this file except in compliance with the License.
|
|
||||||
* You may obtain a copy of the License at
|
|
||||||
*
|
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
*
|
|
||||||
* Unless required by applicable law or agreed to in writing, software
|
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
* See the License for the specific language governing permissions and
|
|
||||||
* limitations under the License.
|
|
||||||
*
|
|
||||||
*/
|
|
||||||
|
|
||||||
import kotlinx.coroutines.test.runTest
|
|
||||||
import net.sergeych.lyng.eval
|
|
||||||
import kotlin.test.Test
|
|
||||||
|
|
||||||
class ChannelTest {
|
|
||||||
|
|
||||||
@Test
|
|
||||||
fun testRendezvousChannel() = runTest {
|
|
||||||
eval("""
|
|
||||||
val ch = Channel()
|
|
||||||
val producer = launch {
|
|
||||||
ch.send(1)
|
|
||||||
ch.send(2)
|
|
||||||
ch.send(3)
|
|
||||||
ch.close()
|
|
||||||
}
|
|
||||||
val results = []
|
|
||||||
var v = ch.receive()
|
|
||||||
while (v != null) {
|
|
||||||
results += v
|
|
||||||
v = ch.receive()
|
|
||||||
}
|
|
||||||
assertEquals([1, 2, 3], results)
|
|
||||||
""".trimIndent())
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
|
||||||
fun testBufferedChannel() = runTest {
|
|
||||||
eval("""
|
|
||||||
val ch = Channel(4)
|
|
||||||
ch.send(10)
|
|
||||||
ch.send(20)
|
|
||||||
ch.send(30)
|
|
||||||
ch.close()
|
|
||||||
assertEquals(10, ch.receive())
|
|
||||||
assertEquals(20, ch.receive())
|
|
||||||
assertEquals(30, ch.receive())
|
|
||||||
// closed and drained
|
|
||||||
assertEquals(null, ch.receive())
|
|
||||||
""".trimIndent())
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
|
||||||
fun testTryReceive() = runTest {
|
|
||||||
eval("""
|
|
||||||
val ch = Channel(2)
|
|
||||||
// nothing buffered yet
|
|
||||||
assertEquals(null, ch.tryReceive())
|
|
||||||
ch.send(42)
|
|
||||||
assertEquals(42, ch.tryReceive())
|
|
||||||
assertEquals(null, ch.tryReceive())
|
|
||||||
""".trimIndent())
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
|
||||||
fun testChannelProperties() = runTest {
|
|
||||||
eval("""
|
|
||||||
val ch = Channel(1)
|
|
||||||
assert(!ch.isClosedForSend)
|
|
||||||
assert(!ch.isClosedForReceive)
|
|
||||||
ch.send(1)
|
|
||||||
ch.close()
|
|
||||||
assert(ch.isClosedForSend)
|
|
||||||
// still has one buffered item, so not yet closed for receive
|
|
||||||
assert(!ch.isClosedForReceive)
|
|
||||||
ch.receive()
|
|
||||||
// now fully drained
|
|
||||||
assert(ch.isClosedForReceive)
|
|
||||||
""".trimIndent())
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
|
||||||
fun testUnlimitedChannel() = runTest {
|
|
||||||
eval("""
|
|
||||||
val ch = Channel(Channel.UNLIMITED)
|
|
||||||
(1..100).forEach { ch.send(it) }
|
|
||||||
ch.close()
|
|
||||||
var sum = 0
|
|
||||||
var v = ch.receive()
|
|
||||||
while (v != null) {
|
|
||||||
sum += v
|
|
||||||
v = ch.receive()
|
|
||||||
}
|
|
||||||
assertEquals(5050, sum)
|
|
||||||
""".trimIndent())
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
|
||||||
fun testChannelProducerConsumerPattern() = runTest {
|
|
||||||
eval("""
|
|
||||||
val ch = Channel()
|
|
||||||
val results = []
|
|
||||||
val mu = Mutex()
|
|
||||||
|
|
||||||
val consumer = launch {
|
|
||||||
var item = ch.receive()
|
|
||||||
while (item != null) {
|
|
||||||
mu.withLock { results += item }
|
|
||||||
item = ch.receive()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
launch {
|
|
||||||
for (i in 1..5) {
|
|
||||||
ch.send("msg:${'$'}i")
|
|
||||||
}
|
|
||||||
ch.close()
|
|
||||||
}.await()
|
|
||||||
|
|
||||||
consumer.await()
|
|
||||||
assertEquals(["msg:1","msg:2","msg:3","msg:4","msg:5"], results)
|
|
||||||
""".trimIndent())
|
|
||||||
}
|
|
||||||
}
|
|
||||||
@ -58,38 +58,6 @@ class TestCoroutines {
|
|||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
|
||||||
fun testCompletableExceptionally() = runTest {
|
|
||||||
eval(
|
|
||||||
"""
|
|
||||||
val done = CompletableDeferred()
|
|
||||||
|
|
||||||
launch {
|
|
||||||
delay(10)
|
|
||||||
done.completeExceptionally(IllegalStateException("boom"))
|
|
||||||
}
|
|
||||||
|
|
||||||
assert(!done.isCompleted)
|
|
||||||
assertThrows(IllegalStateException) { done.await() }
|
|
||||||
assert(done.isCompleted)
|
|
||||||
""".trimIndent()
|
|
||||||
)
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
|
||||||
fun testCompletableExceptionallyWithCustomException() = runTest {
|
|
||||||
eval(
|
|
||||||
"""
|
|
||||||
class MyError(msg) : Exception(msg) {}
|
|
||||||
|
|
||||||
val done = CompletableDeferred()
|
|
||||||
done.completeExceptionally(MyError("custom failure"))
|
|
||||||
|
|
||||||
assertThrows(MyError) { done.await() }
|
|
||||||
""".trimIndent()
|
|
||||||
)
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
fun testDeferredCancel() = runTest {
|
fun testDeferredCancel() = runTest {
|
||||||
eval(
|
eval(
|
||||||
|
|||||||
@ -236,7 +236,7 @@ suspend fun runDocTests(fileName: String, bookMode: Boolean = false) {
|
|||||||
val bookScope = Scope()
|
val bookScope = Scope()
|
||||||
var count = 0
|
var count = 0
|
||||||
parseDocTests(fileName, bookMode).collect { dt ->
|
parseDocTests(fileName, bookMode).collect { dt ->
|
||||||
if (bookMode) dt.test(bookScope)
|
if (bookMode)imp dt.test(bookScope)
|
||||||
else dt.test()
|
else dt.test()
|
||||||
count++
|
count++
|
||||||
}
|
}
|
||||||
|
|||||||
@ -19,7 +19,7 @@ extern class Deferred {
|
|||||||
/* Cancel the task if it is still active. Safe to call multiple times. */
|
/* Cancel the task if it is still active. Safe to call multiple times. */
|
||||||
fun cancel(): void
|
fun cancel(): void
|
||||||
/* Suspend until the task finishes and return its value.
|
/* Suspend until the task finishes and return its value.
|
||||||
Throws `CancellationException` if the task was cancelled. */
|
Throws `CancellationException` if the task was cancelled. */
|
||||||
fun await(): Object
|
fun await(): Object
|
||||||
/* True when the task has finished, failed, or otherwise reached a terminal state. */
|
/* True when the task has finished, failed, or otherwise reached a terminal state. */
|
||||||
val isCompleted: Bool
|
val isCompleted: Bool
|
||||||
@ -241,10 +241,10 @@ fun Iterable<T>.drop(n: Int): List<T> {
|
|||||||
|
|
||||||
/* Return the first element or throw if the iterable is empty. */
|
/* Return the first element or throw if the iterable is empty. */
|
||||||
val Iterable<T>.first: T get() {
|
val Iterable<T>.first: T get() {
|
||||||
val i: Iterator<T> = iterator()
|
val i: Iterator<T> = iterator()
|
||||||
if( !i.hasNext() ) throw NoSuchElementException()
|
if( !i.hasNext() ) throw NoSuchElementException()
|
||||||
i.next().also { i.cancelIteration() }
|
i.next().also { i.cancelIteration() }
|
||||||
}
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
Return the first element that matches the predicate or throws
|
Return the first element that matches the predicate or throws
|
||||||
@ -272,15 +272,15 @@ fun Iterable<T>.findFirstOrNull(predicate: (T)->Bool): T? {
|
|||||||
|
|
||||||
/* Return the last element or throw if the iterable is empty. */
|
/* Return the last element or throw if the iterable is empty. */
|
||||||
val Iterable<T>.last: T get() {
|
val Iterable<T>.last: T get() {
|
||||||
var found = false
|
var found = false
|
||||||
var element: Object = Unset
|
var element: Object = Unset
|
||||||
for( i in this ) {
|
for( i in this ) {
|
||||||
element = i
|
element = i
|
||||||
found = true
|
found = true
|
||||||
}
|
|
||||||
if( !found ) throw NoSuchElementException()
|
|
||||||
element as T
|
|
||||||
}
|
}
|
||||||
|
if( !found ) throw NoSuchElementException()
|
||||||
|
element as T
|
||||||
|
}
|
||||||
|
|
||||||
/* Emit all but the last N elements of this iterable. */
|
/* Emit all but the last N elements of this iterable. */
|
||||||
fun Iterable<T>.dropLast(n: Int): Flow<T> {
|
fun Iterable<T>.dropLast(n: Int): Flow<T> {
|
||||||
@ -543,48 +543,3 @@ class StackTraceEntry(
|
|||||||
"%s: %s"(at, sourceString)
|
"%s: %s"(at, sourceString)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
/*
|
|
||||||
A pool of coroutines that execute submitted tasks with bounded concurrency.
|
|
||||||
|
|
||||||
`maxWorkers` limits how many tasks run in parallel.
|
|
||||||
`maxQueueSize` limits how many tasks may be queued waiting for a free worker;
|
|
||||||
when the queue is full, `launch` suspends the caller until space becomes available.
|
|
||||||
Use `Channel.UNLIMITED` (the default) for an unbounded queue.
|
|
||||||
|
|
||||||
Any exception thrown inside a submitted lambda is captured in the returned
|
|
||||||
`Deferred` and never escapes the pool itself.
|
|
||||||
|
|
||||||
Once `cancel()` or `closeAndJoin()` is called, any further `launch` call
|
|
||||||
(including one suspended waiting for queue space) throws `IllegalStateException`.
|
|
||||||
*/
|
|
||||||
extern class LaunchPool(maxWorkers: Int, maxQueueSize: Int = Channel.UNLIMITED) {
|
|
||||||
/*
|
|
||||||
Schedule a lambda for execution and return a Deferred for its result.
|
|
||||||
Suspends the caller if the queue is full.
|
|
||||||
Throws `IllegalStateException` if the pool is cancelled or closing.
|
|
||||||
*/
|
|
||||||
extern fun launch<T>( lambda: ()->T ): Deferred<T>
|
|
||||||
|
|
||||||
/*
|
|
||||||
Immediately cancel all workers and discard any queued tasks.
|
|
||||||
Running tasks are interrupted via coroutine cancellation.
|
|
||||||
After this call, `launch` throws `IllegalStateException`.
|
|
||||||
*/
|
|
||||||
extern fun cancel()
|
|
||||||
|
|
||||||
/*
|
|
||||||
Like `cancel`, but also waits until any currently-running task has
|
|
||||||
finished (they will not be interrupted mid-execution).
|
|
||||||
Useful when workers hold resources (connections, file handles) that
|
|
||||||
must be released before the caller continues.
|
|
||||||
After this call, `launch` throws `IllegalStateException`.
|
|
||||||
*/
|
|
||||||
extern fun cancelAndJoin()
|
|
||||||
|
|
||||||
/*
|
|
||||||
Stop accepting new tasks and suspend until all queued and running
|
|
||||||
tasks have completed normally.
|
|
||||||
After this call, `launch` throws `IllegalStateException`.
|
|
||||||
*/
|
|
||||||
extern fun closeAndJoin()
|
|
||||||
}
|
|
||||||
Loading…
x
Reference in New Issue
Block a user