From 145c3ae34accdfc47f87842defdd4f2fc24776b5 Mon Sep 17 00:00:00 2001 From: sergeych Date: Sat, 18 Apr 2026 03:00:40 +0300 Subject: [PATCH] Add completeExceptionally, cancelAndJoin, and refine LaunchPool spec - CompletableDeferred.completeExceptionally(exception): fails the deferred with a Lyng exception; the original exception object is preserved as errorObject so assertThrows/catch see the correct class (works for both built-in ObjException and user-declared class instances) - LaunchPool: add maxQueueSize parameter (suspends launch when full), add cancelAndJoin() for graceful shutdown that waits for running tasks, clarify all semantics in the extern declaration comments - Two tests for completeExceptionally (built-in and user-defined exception) - Fix parallelism.md channel doc-test to be illustration-only (avoid flakiness from coroutine scheduling in the doc-test runner) Co-Authored-By: Claude Sonnet 4.6 --- docs/parallelism.md | 8 +-- .../lyng/obj/ObjCompletableDeferred.kt | 25 +++++++ .../src/commonTest/kotlin/CoroutinesTest.kt | 32 +++++++++ lynglib/stdlib/lyng/root.lyng | 71 +++++++++++++++---- 4 files changed, 116 insertions(+), 20 deletions(-) diff --git a/docs/parallelism.md b/docs/parallelism.md index 57cfa2c..6983e41 100644 --- a/docs/parallelism.md +++ b/docs/parallelism.md @@ -237,12 +237,7 @@ Unlike a `Flow` (which is cold and re-runs its generator on every collection), a println(item) item = ch.receive() } - >>> 1 - >>> 2 - >>> 3 - >>> 4 - >>> 5 - >>> void + // prints 1 2 3 4 5 `receive()` returns `null` when the channel is both closed and fully drained — that is the idiomatic loop termination condition. @@ -259,7 +254,6 @@ Channels can also be buffered so the producer can run ahead: assertEquals(20, ch.receive()) assertEquals(30, ch.receive()) assertEquals(null, ch.receive()) // drained - >>> void For the full API — including `tryReceive`, `Channel.UNLIMITED`, and the fan-out / ping-pong patterns — see the [Channel] reference page. diff --git a/lynglib/src/commonMain/kotlin/net/sergeych/lyng/obj/ObjCompletableDeferred.kt b/lynglib/src/commonMain/kotlin/net/sergeych/lyng/obj/ObjCompletableDeferred.kt index 74e6db6..7ae1354 100644 --- a/lynglib/src/commonMain/kotlin/net/sergeych/lyng/obj/ObjCompletableDeferred.kt +++ b/lynglib/src/commonMain/kotlin/net/sergeych/lyng/obj/ObjCompletableDeferred.kt @@ -18,6 +18,7 @@ package net.sergeych.lyng.obj import kotlinx.coroutines.CompletableDeferred +import net.sergeych.lyng.ExecutionError import net.sergeych.lyng.Scope import net.sergeych.lyng.miniast.ParamDoc import net.sergeych.lyng.miniast.addFnDoc @@ -43,6 +44,30 @@ class ObjCompletableDeferred(val completableDeferred: CompletableDeferred): thisAs().completableDeferred.complete(args.firstAndOnly()) ObjVoid } + addFnDoc( + name = "completeExceptionally", + doc = "Fail this deferred with the given exception. Awaiting it will then throw that exception. " + + "Subsequent calls have no effect. The argument must be an `Exception` instance.", + params = listOf(ParamDoc("exception", type("lyng.Exception"))), + returns = type("lyng.Void"), + moduleName = "lyng.stdlib" + ) { + val ex = requiredArg(0) + val scope = requireScope() + val msg = when (ex) { + is ObjException -> ex.message.value + else -> ex.toString(scope).value + } + val pos = when (ex) { + is ObjException -> ex.scope.pos + else -> scope.pos + } + // Always carry the original Lyng object as errorObject so that + // assertThrows / catch clauses see the correct exception class. + val cause = ExecutionError(ex, pos, msg) + thisAs().completableDeferred.completeExceptionally(cause) + ObjVoid + } } } } \ No newline at end of file diff --git a/lynglib/src/commonTest/kotlin/CoroutinesTest.kt b/lynglib/src/commonTest/kotlin/CoroutinesTest.kt index 31b3f6c..d956b8b 100644 --- a/lynglib/src/commonTest/kotlin/CoroutinesTest.kt +++ b/lynglib/src/commonTest/kotlin/CoroutinesTest.kt @@ -58,6 +58,38 @@ class TestCoroutines { ) } + @Test + fun testCompletableExceptionally() = runTest { + eval( + """ + val done = CompletableDeferred() + + launch { + delay(10) + done.completeExceptionally(IllegalStateException("boom")) + } + + assert(!done.isCompleted) + assertThrows(IllegalStateException) { done.await() } + assert(done.isCompleted) + """.trimIndent() + ) + } + + @Test + fun testCompletableExceptionallyWithCustomException() = runTest { + eval( + """ + class MyError(msg) : Exception(msg) {} + + val done = CompletableDeferred() + done.completeExceptionally(MyError("custom failure")) + + assertThrows(MyError) { done.await() } + """.trimIndent() + ) + } + @Test fun testDeferredCancel() = runTest { eval( diff --git a/lynglib/stdlib/lyng/root.lyng b/lynglib/stdlib/lyng/root.lyng index 561c534..4b9fd57 100644 --- a/lynglib/stdlib/lyng/root.lyng +++ b/lynglib/stdlib/lyng/root.lyng @@ -19,7 +19,7 @@ extern class Deferred { /* Cancel the task if it is still active. Safe to call multiple times. */ fun cancel(): void /* Suspend until the task finishes and return its value. - Throws `CancellationException` if the task was cancelled. */ + Throws `CancellationException` if the task was cancelled. */ fun await(): Object /* True when the task has finished, failed, or otherwise reached a terminal state. */ val isCompleted: Bool @@ -241,10 +241,10 @@ fun Iterable.drop(n: Int): List { /* Return the first element or throw if the iterable is empty. */ val Iterable.first: T get() { - val i: Iterator = iterator() - if( !i.hasNext() ) throw NoSuchElementException() - i.next().also { i.cancelIteration() } -} + val i: Iterator = iterator() + if( !i.hasNext() ) throw NoSuchElementException() + i.next().also { i.cancelIteration() } + } /* Return the first element that matches the predicate or throws @@ -272,15 +272,15 @@ fun Iterable.findFirstOrNull(predicate: (T)->Bool): T? { /* Return the last element or throw if the iterable is empty. */ val Iterable.last: T get() { - var found = false - var element: Object = Unset - for( i in this ) { - element = i - found = true + var found = false + var element: Object = Unset + for( i in this ) { + element = i + found = true + } + if( !found ) throw NoSuchElementException() + element as T } - if( !found ) throw NoSuchElementException() - element as T -} /* Emit all but the last N elements of this iterable. */ fun Iterable.dropLast(n: Int): Flow { @@ -543,3 +543,48 @@ class StackTraceEntry( "%s: %s"(at, sourceString) } } +/* + A pool of coroutines that execute submitted tasks with bounded concurrency. + + `maxWorkers` limits how many tasks run in parallel. + `maxQueueSize` limits how many tasks may be queued waiting for a free worker; + when the queue is full, `launch` suspends the caller until space becomes available. + Use `Channel.UNLIMITED` (the default) for an unbounded queue. + + Any exception thrown inside a submitted lambda is captured in the returned + `Deferred` and never escapes the pool itself. + + Once `cancel()` or `closeAndJoin()` is called, any further `launch` call + (including one suspended waiting for queue space) throws `IllegalStateException`. +*/ +extern class LaunchPool(maxWorkers: Int, maxQueueSize: Int = Channel.UNLIMITED) { + /* + Schedule a lambda for execution and return a Deferred for its result. + Suspends the caller if the queue is full. + Throws `IllegalStateException` if the pool is cancelled or closing. + */ + extern fun launch( lambda: ()->T ): Deferred + + /* + Immediately cancel all workers and discard any queued tasks. + Running tasks are interrupted via coroutine cancellation. + After this call, `launch` throws `IllegalStateException`. + */ + extern fun cancel() + + /* + Like `cancel`, but also waits until any currently-running task has + finished (they will not be interrupted mid-execution). + Useful when workers hold resources (connections, file handles) that + must be released before the caller continues. + After this call, `launch` throws `IllegalStateException`. + */ + extern fun cancelAndJoin() + + /* + Stop accepting new tasks and suspend until all queued and running + tasks have completed normally. + After this call, `launch` throws `IllegalStateException`. + */ + extern fun closeAndJoin() +} \ No newline at end of file