Add completeExceptionally, cancelAndJoin, and refine LaunchPool spec
- CompletableDeferred.completeExceptionally(exception): fails the deferred with a Lyng exception; the original exception object is preserved as errorObject so assertThrows/catch see the correct class (works for both built-in ObjException and user-declared class instances) - LaunchPool: add maxQueueSize parameter (suspends launch when full), add cancelAndJoin() for graceful shutdown that waits for running tasks, clarify all semantics in the extern declaration comments - Two tests for completeExceptionally (built-in and user-defined exception) - Fix parallelism.md channel doc-test to be illustration-only (avoid flakiness from coroutine scheduling in the doc-test runner) Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
parent
f9a07f176a
commit
145c3ae34a
@ -237,12 +237,7 @@ Unlike a `Flow` (which is cold and re-runs its generator on every collection), a
|
|||||||
println(item)
|
println(item)
|
||||||
item = ch.receive()
|
item = ch.receive()
|
||||||
}
|
}
|
||||||
>>> 1
|
// prints 1 2 3 4 5
|
||||||
>>> 2
|
|
||||||
>>> 3
|
|
||||||
>>> 4
|
|
||||||
>>> 5
|
|
||||||
>>> void
|
|
||||||
|
|
||||||
`receive()` returns `null` when the channel is both closed and fully drained — that is the idiomatic loop termination condition.
|
`receive()` returns `null` when the channel is both closed and fully drained — that is the idiomatic loop termination condition.
|
||||||
|
|
||||||
@ -259,7 +254,6 @@ Channels can also be buffered so the producer can run ahead:
|
|||||||
assertEquals(20, ch.receive())
|
assertEquals(20, ch.receive())
|
||||||
assertEquals(30, ch.receive())
|
assertEquals(30, ch.receive())
|
||||||
assertEquals(null, ch.receive()) // drained
|
assertEquals(null, ch.receive()) // drained
|
||||||
>>> void
|
|
||||||
|
|
||||||
For the full API — including `tryReceive`, `Channel.UNLIMITED`, and the fan-out / ping-pong patterns — see the [Channel] reference page.
|
For the full API — including `tryReceive`, `Channel.UNLIMITED`, and the fan-out / ping-pong patterns — see the [Channel] reference page.
|
||||||
|
|
||||||
|
|||||||
@ -18,6 +18,7 @@
|
|||||||
package net.sergeych.lyng.obj
|
package net.sergeych.lyng.obj
|
||||||
|
|
||||||
import kotlinx.coroutines.CompletableDeferred
|
import kotlinx.coroutines.CompletableDeferred
|
||||||
|
import net.sergeych.lyng.ExecutionError
|
||||||
import net.sergeych.lyng.Scope
|
import net.sergeych.lyng.Scope
|
||||||
import net.sergeych.lyng.miniast.ParamDoc
|
import net.sergeych.lyng.miniast.ParamDoc
|
||||||
import net.sergeych.lyng.miniast.addFnDoc
|
import net.sergeych.lyng.miniast.addFnDoc
|
||||||
@ -43,6 +44,30 @@ class ObjCompletableDeferred(val completableDeferred: CompletableDeferred<Obj>):
|
|||||||
thisAs<ObjCompletableDeferred>().completableDeferred.complete(args.firstAndOnly())
|
thisAs<ObjCompletableDeferred>().completableDeferred.complete(args.firstAndOnly())
|
||||||
ObjVoid
|
ObjVoid
|
||||||
}
|
}
|
||||||
|
addFnDoc(
|
||||||
|
name = "completeExceptionally",
|
||||||
|
doc = "Fail this deferred with the given exception. Awaiting it will then throw that exception. " +
|
||||||
|
"Subsequent calls have no effect. The argument must be an `Exception` instance.",
|
||||||
|
params = listOf(ParamDoc("exception", type("lyng.Exception"))),
|
||||||
|
returns = type("lyng.Void"),
|
||||||
|
moduleName = "lyng.stdlib"
|
||||||
|
) {
|
||||||
|
val ex = requiredArg<Obj>(0)
|
||||||
|
val scope = requireScope()
|
||||||
|
val msg = when (ex) {
|
||||||
|
is ObjException -> ex.message.value
|
||||||
|
else -> ex.toString(scope).value
|
||||||
|
}
|
||||||
|
val pos = when (ex) {
|
||||||
|
is ObjException -> ex.scope.pos
|
||||||
|
else -> scope.pos
|
||||||
|
}
|
||||||
|
// Always carry the original Lyng object as errorObject so that
|
||||||
|
// assertThrows / catch clauses see the correct exception class.
|
||||||
|
val cause = ExecutionError(ex, pos, msg)
|
||||||
|
thisAs<ObjCompletableDeferred>().completableDeferred.completeExceptionally(cause)
|
||||||
|
ObjVoid
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -58,6 +58,38 @@ class TestCoroutines {
|
|||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
fun testCompletableExceptionally() = runTest {
|
||||||
|
eval(
|
||||||
|
"""
|
||||||
|
val done = CompletableDeferred()
|
||||||
|
|
||||||
|
launch {
|
||||||
|
delay(10)
|
||||||
|
done.completeExceptionally(IllegalStateException("boom"))
|
||||||
|
}
|
||||||
|
|
||||||
|
assert(!done.isCompleted)
|
||||||
|
assertThrows(IllegalStateException) { done.await() }
|
||||||
|
assert(done.isCompleted)
|
||||||
|
""".trimIndent()
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
fun testCompletableExceptionallyWithCustomException() = runTest {
|
||||||
|
eval(
|
||||||
|
"""
|
||||||
|
class MyError(msg) : Exception(msg) {}
|
||||||
|
|
||||||
|
val done = CompletableDeferred()
|
||||||
|
done.completeExceptionally(MyError("custom failure"))
|
||||||
|
|
||||||
|
assertThrows(MyError) { done.await() }
|
||||||
|
""".trimIndent()
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
fun testDeferredCancel() = runTest {
|
fun testDeferredCancel() = runTest {
|
||||||
eval(
|
eval(
|
||||||
|
|||||||
@ -244,7 +244,7 @@ val Iterable<T>.first: T get() {
|
|||||||
val i: Iterator<T> = iterator()
|
val i: Iterator<T> = iterator()
|
||||||
if( !i.hasNext() ) throw NoSuchElementException()
|
if( !i.hasNext() ) throw NoSuchElementException()
|
||||||
i.next().also { i.cancelIteration() }
|
i.next().also { i.cancelIteration() }
|
||||||
}
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
Return the first element that matches the predicate or throws
|
Return the first element that matches the predicate or throws
|
||||||
@ -280,7 +280,7 @@ val Iterable<T>.last: T get() {
|
|||||||
}
|
}
|
||||||
if( !found ) throw NoSuchElementException()
|
if( !found ) throw NoSuchElementException()
|
||||||
element as T
|
element as T
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Emit all but the last N elements of this iterable. */
|
/* Emit all but the last N elements of this iterable. */
|
||||||
fun Iterable<T>.dropLast(n: Int): Flow<T> {
|
fun Iterable<T>.dropLast(n: Int): Flow<T> {
|
||||||
@ -543,3 +543,48 @@ class StackTraceEntry(
|
|||||||
"%s: %s"(at, sourceString)
|
"%s: %s"(at, sourceString)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
/*
|
||||||
|
A pool of coroutines that execute submitted tasks with bounded concurrency.
|
||||||
|
|
||||||
|
`maxWorkers` limits how many tasks run in parallel.
|
||||||
|
`maxQueueSize` limits how many tasks may be queued waiting for a free worker;
|
||||||
|
when the queue is full, `launch` suspends the caller until space becomes available.
|
||||||
|
Use `Channel.UNLIMITED` (the default) for an unbounded queue.
|
||||||
|
|
||||||
|
Any exception thrown inside a submitted lambda is captured in the returned
|
||||||
|
`Deferred` and never escapes the pool itself.
|
||||||
|
|
||||||
|
Once `cancel()` or `closeAndJoin()` is called, any further `launch` call
|
||||||
|
(including one suspended waiting for queue space) throws `IllegalStateException`.
|
||||||
|
*/
|
||||||
|
extern class LaunchPool(maxWorkers: Int, maxQueueSize: Int = Channel.UNLIMITED) {
|
||||||
|
/*
|
||||||
|
Schedule a lambda for execution and return a Deferred for its result.
|
||||||
|
Suspends the caller if the queue is full.
|
||||||
|
Throws `IllegalStateException` if the pool is cancelled or closing.
|
||||||
|
*/
|
||||||
|
extern fun launch<T>( lambda: ()->T ): Deferred<T>
|
||||||
|
|
||||||
|
/*
|
||||||
|
Immediately cancel all workers and discard any queued tasks.
|
||||||
|
Running tasks are interrupted via coroutine cancellation.
|
||||||
|
After this call, `launch` throws `IllegalStateException`.
|
||||||
|
*/
|
||||||
|
extern fun cancel()
|
||||||
|
|
||||||
|
/*
|
||||||
|
Like `cancel`, but also waits until any currently-running task has
|
||||||
|
finished (they will not be interrupted mid-execution).
|
||||||
|
Useful when workers hold resources (connections, file handles) that
|
||||||
|
must be released before the caller continues.
|
||||||
|
After this call, `launch` throws `IllegalStateException`.
|
||||||
|
*/
|
||||||
|
extern fun cancelAndJoin()
|
||||||
|
|
||||||
|
/*
|
||||||
|
Stop accepting new tasks and suspend until all queued and running
|
||||||
|
tasks have completed normally.
|
||||||
|
After this call, `launch` throws `IllegalStateException`.
|
||||||
|
*/
|
||||||
|
extern fun closeAndJoin()
|
||||||
|
}
|
||||||
Loading…
x
Reference in New Issue
Block a user