From a8f73dc8bddaf81c181c71fd1adae1cf4dc8862e Mon Sep 17 00:00:00 2001 From: sergeych Date: Sat, 18 Apr 2026 23:32:35 +0300 Subject: [PATCH] added LaunchPool --- docs/LaunchPool.md | 110 ++++++++++ docs/parallelism.md | 21 ++ lynglib/build.gradle.kts | 1 - .../kotlin/net/sergeych/lyng/CodeContext.kt | 7 +- .../kotlin/net/sergeych/lyng/Compiler.kt | 20 +- .../lyng/bytecode/BytecodeCompiler.kt | 25 ++- .../src/commonTest/kotlin/LaunchPoolTest.kt | 189 ++++++++++++++++++ .../commonTest/kotlin/TypeInferenceTest.kt | 107 ++++++++++ lynglib/stdlib/lyng/root.lyng | 83 ++++++-- 9 files changed, 533 insertions(+), 30 deletions(-) create mode 100644 docs/LaunchPool.md create mode 100644 lynglib/src/commonTest/kotlin/LaunchPoolTest.kt create mode 100644 lynglib/src/commonTest/kotlin/TypeInferenceTest.kt diff --git a/docs/LaunchPool.md b/docs/LaunchPool.md new file mode 100644 index 0000000..1d0ca5d --- /dev/null +++ b/docs/LaunchPool.md @@ -0,0 +1,110 @@ +# LaunchPool + +`LaunchPool` is a bounded-concurrency task pool: you submit lambdas with `launch`, and the pool runs them using a fixed number of worker coroutines. + +## Constructor + +``` +LaunchPool(maxWorkers, maxQueueSize = Channel.UNLIMITED) +``` + +| Parameter | Description | +|-----------|-------------| +| `maxWorkers` | Maximum number of tasks that run in parallel. | +| `maxQueueSize` | Maximum number of tasks that may wait in the queue. When the queue is full, `launch` suspends the caller until space becomes available. Defaults to `Channel.UNLIMITED` (no bound). | + +## Methods + +### `launch(lambda): Deferred` + +Schedules `lambda` for execution and returns a `Deferred` for its result. + +- Suspends if the queue is full (`maxQueueSize` reached). +- Throws `IllegalStateException` if the pool is already closed or cancelled. +- Any exception thrown by `lambda` is captured in the returned `Deferred` and **does not escape the pool**. + +```lyng +val pool = LaunchPool(4) +val d1 = pool.launch { computeSomething() } +val d2 = pool.launch { computeOther() } +pool.closeAndJoin() +println(d1.await()) +println(d2.await()) +``` + +### `closeAndJoin()` + +Stops accepting new tasks and suspends until all queued and running tasks complete normally. After this call, any further `launch` throws `IllegalStateException`. Idempotent — safe to call multiple times. + +### `cancel()` + +Immediately closes the queue and cancels all worker coroutines. Queued but unstarted tasks are discarded. After this call, `launch` throws `IllegalStateException`. Idempotent. + +### `cancelAndJoin()` + +Like `cancel()`, but also suspends until all worker coroutines have stopped. Useful when you need to be sure no worker code is still running before proceeding. Idempotent. + +## Exception handling + +Exceptions from submitted lambdas are captured per-task in the returned `Deferred`. The pool itself continues running after a task failure: + +```lyng +val pool = LaunchPool(2) +val good = pool.launch { 42 } +val bad = pool.launch { throw IllegalArgumentException("boom") } +pool.closeAndJoin() + +assertEquals(42, good.await()) +assertThrows(IllegalArgumentException) { bad.await() } +``` + +## Bounded queue / back-pressure + +When `maxQueueSize` is set, the producer suspends if the queue fills up, providing automatic back-pressure: + +```lyng +// 1 worker, queue of 2 — producer can be at most 2 tasks ahead of what's running +val pool = LaunchPool(1, 2) +val d1 = pool.launch { delay(10); "a" } +val d2 = pool.launch { delay(10); "b" } +val d3 = pool.launch { delay(10); "c" } // suspends until d1 is picked up by the worker +pool.closeAndJoin() +``` + +## Collecting all results + +`launch` returns a `Deferred`, so you can collect results via `map`: + +```lyng +val pool = LaunchPool(4) +val jobs = (1..10).map { n -> pool.launch { n * n } } +pool.closeAndJoin() +val results = jobs.map { (it as Deferred).await() } +// results == [1, 4, 9, 16, 25, 36, 49, 64, 81, 100] +``` + +## Concurrency limit in practice + +With `maxWorkers = 2`, at most 2 tasks run simultaneously regardless of how many are queued: + +```lyng +val mu = Mutex() +var active = 0 +var maxSeen = 0 + +val pool = LaunchPool(2) +(1..8).map { + pool.launch { + mu.withLock { active++; if (active > maxSeen) maxSeen = active } + delay(5) + mu.withLock { active-- } + } +} +pool.closeAndJoin() +assert(maxSeen <= 2) +``` + +## See also + +- [parallelism.md](parallelism.md) — `launch`, `Deferred`, `Mutex`, `Channel`, and coroutine basics +- [Channel.md](Channel.md) — the underlying channel primitive used by `LaunchPool` diff --git a/docs/parallelism.md b/docs/parallelism.md index 6983e41..3b5b88d 100644 --- a/docs/parallelism.md +++ b/docs/parallelism.md @@ -257,6 +257,27 @@ Channels can also be buffered so the producer can run ahead: For the full API — including `tryReceive`, `Channel.UNLIMITED`, and the fan-out / ping-pong patterns — see the [Channel] reference page. +## LaunchPool + +When you need **bounded concurrency** — run at most *N* tasks at the same time without spawning a new coroutine per task — use [LaunchPool]: + +```lyng +val pool = LaunchPool(4) // at most 4 tasks run in parallel + +val jobs = (1..20).map { n -> + pool.launch { expensiveCompute(n) } +} +pool.closeAndJoin() // wait for all tasks to complete + +val results = jobs.map { (it as Deferred).await() } +``` + +Exceptions thrown inside a submitted lambda are captured in the returned `Deferred` and do not crash the pool, so other tasks continue running normally. + +See [LaunchPool] for the full API including bounded queues and cancellation. + +[LaunchPool]: LaunchPool.md + | | Flow | Channel | |---|---|---| | **temperature** | cold (lazy) | hot (eager) | diff --git a/lynglib/build.gradle.kts b/lynglib/build.gradle.kts index abd1313..226af69 100644 --- a/lynglib/build.gradle.kts +++ b/lynglib/build.gradle.kts @@ -345,7 +345,6 @@ tasks.withType { // Make the flag visible inside tests if they want to branch on it systemProperty("LYNG_BENCHMARKS", benchmarksEnabled.toString()) - if (!benchmarksEnabled) { // Exclude all JVM tests whose class name ends with or contains BenchmarkTest // This keeps CI fast and avoids noisy timing logs by default. diff --git a/lynglib/src/commonMain/kotlin/net/sergeych/lyng/CodeContext.kt b/lynglib/src/commonMain/kotlin/net/sergeych/lyng/CodeContext.kt index 1a1e7e4..f4ceddd 100644 --- a/lynglib/src/commonMain/kotlin/net/sergeych/lyng/CodeContext.kt +++ b/lynglib/src/commonMain/kotlin/net/sergeych/lyng/CodeContext.kt @@ -1,5 +1,5 @@ /* - * Copyright 2025 Sergey S. Chernov real.sergeych@gmail.com + * Copyright 2026 Sergey S. Chernov real.sergeych@gmail.com * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -24,7 +24,10 @@ sealed class CodeContext { val implicitThisMembers: Boolean = false, val implicitThisTypeName: String? = null, val typeParams: Set = emptySet(), - val typeParamDecls: List = emptyList() + val typeParamDecls: List = emptyList(), + /** True for static methods and top-level functions: they have no implicit `this`, + * so class-body field initializers inside them should not inherit the class name. */ + val noImplicitThis: Boolean = false ): CodeContext() class ClassBody(val name: String, val isExtern: Boolean = false): CodeContext() { var typeParams: Set = emptySet() diff --git a/lynglib/src/commonMain/kotlin/net/sergeych/lyng/Compiler.kt b/lynglib/src/commonMain/kotlin/net/sergeych/lyng/Compiler.kt index 9157640..9965c2b 100644 --- a/lynglib/src/commonMain/kotlin/net/sergeych/lyng/Compiler.kt +++ b/lynglib/src/commonMain/kotlin/net/sergeych/lyng/Compiler.kt @@ -812,8 +812,18 @@ class Compiler( private fun currentImplicitThisTypeName(): String? { for (ctx in codeContexts.asReversed()) { - val fn = ctx as? CodeContext.Function ?: continue - if (fn.implicitThisTypeName != null) return fn.implicitThisTypeName + when (ctx) { + is CodeContext.Function -> { + if (ctx.implicitThisTypeName != null) return ctx.implicitThisTypeName + // A static method or top-level function explicitly has no implicit `this`. + // Stop here — do not fall through to an enclosing ClassBody. + if (ctx.noImplicitThis) return null + } + // Class field initializers are compiled directly under ClassBody with no wrapping + // Function. Lambdas inside those initializers must still see `this` as the class. + is CodeContext.ClassBody -> return ctx.name + else -> {} + } } return null } @@ -9214,13 +9224,17 @@ class Compiler( parentContext is CodeContext.ClassBody && !isStatic -> parentContext.name else -> null } + // Static methods and top-level functions have no implicit `this`; mark them so that + // currentImplicitThisTypeName() does not fall through to an enclosing ClassBody. + val noImplicitThis = implicitThisTypeName == null && extTypeName == null return inCodeContext( CodeContext.Function( name, implicitThisMembers = implicitThisMembers, implicitThisTypeName = implicitThisTypeName, typeParams = typeParams, - typeParamDecls = typeParamDecls + typeParamDecls = typeParamDecls, + noImplicitThis = noImplicitThis ) ) { cc.labels.add(name) diff --git a/lynglib/src/commonMain/kotlin/net/sergeych/lyng/bytecode/BytecodeCompiler.kt b/lynglib/src/commonMain/kotlin/net/sergeych/lyng/bytecode/BytecodeCompiler.kt index 705f64f..98ab1ee 100644 --- a/lynglib/src/commonMain/kotlin/net/sergeych/lyng/bytecode/BytecodeCompiler.kt +++ b/lynglib/src/commonMain/kotlin/net/sergeych/lyng/bytecode/BytecodeCompiler.kt @@ -5869,17 +5869,22 @@ class BytecodeCompiler( } private fun compileCatchClassSlot(name: String): CompiledValue? { - val ref = LocalVarRef(name, Pos.builtIn) - val compiled = compileRef(ref) - if (compiled != null) { - return ensureObjSlot(compiled) + // resolveTypeNameClass always returns the correct Lyng class for built-in type names. + // nameObjClass[name] tracks variable type inference — for class declarations it stores + // ObjClassType (the "Class" meta-type), NOT the actual class object. Using nameObjClass + // for catch clause matching would cause CHECK_IS to compare against the wrong class, + // silently failing to catch exceptions. Always prefer resolveTypeNameClass first. + val cls = resolveTypeNameClass(name) ?: nameObjClass[name]?.takeIf { it != ObjClassType } + if (cls != null) { + val id = builder.addConst(BytecodeConst.ObjRef(cls)) + val slot = allocSlot() + builder.emit(Opcode.CONST_OBJ, id, slot) + updateSlotType(slot, SlotType.OBJ) + return CompiledValue(slot, SlotType.OBJ) } - val cls = nameObjClass[name] ?: resolveTypeNameClass(name) ?: return null - val id = builder.addConst(BytecodeConst.ObjRef(cls)) - val slot = allocSlot() - builder.emit(Opcode.CONST_OBJ, id, slot) - updateSlotType(slot, SlotType.OBJ) - return CompiledValue(slot, SlotType.OBJ) + val ref = LocalVarRef(name, Pos.builtIn) + val compiled = compileRef(ref) ?: return null + return ensureObjSlot(compiled) } private fun emitInlineStatements(statements: List, needResult: Boolean): CompiledValue? { diff --git a/lynglib/src/commonTest/kotlin/LaunchPoolTest.kt b/lynglib/src/commonTest/kotlin/LaunchPoolTest.kt new file mode 100644 index 0000000..968c13f --- /dev/null +++ b/lynglib/src/commonTest/kotlin/LaunchPoolTest.kt @@ -0,0 +1,189 @@ +/* + * Copyright 2026 Sergey S. Chernov real.sergeych@gmail.com + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +import kotlinx.coroutines.runBlocking +import kotlinx.coroutines.withTimeout +import kotlin.test.Test +import net.sergeych.lyng.eval as lyngEval + +class LaunchPoolTest { + + private suspend fun eval(code: String) = withTimeout(2_000L) { lyngEval(code) } + + @Test + fun testBasicExecution() = runBlocking { + eval(""" + val pool = LaunchPool(2) + val d1 = pool.launch { 1 + 1 } + val d2 = pool.launch { "hello" } + pool.closeAndJoin() + assertEquals(2, d1.await()) + assertEquals("hello", d2.await()) + """.trimIndent()) + } + + @Test + fun testResultsCollected() = runBlocking { + eval(""" + val pool = LaunchPool(4) + val jobs = (1..10).map { n -> pool.launch { n * n } } + pool.closeAndJoin() + val results = jobs.map { (it as Deferred).await() } + assertEquals([1,4,9,16,25,36,49,64,81,100], results) + """.trimIndent()) + } + + @Test + fun testConcurrencyLimit() = runBlocking { + eval(""" + // With maxWorkers=2, at most 2 tasks run at the same time. + val mu = Mutex() + var active = 0 + var maxSeen = 0 + + val pool = LaunchPool(2) + val jobs = (1..8).map { + pool.launch { + mu.withLock { active++; if (active > maxSeen) maxSeen = active } + delay(5) + mu.withLock { active-- } + } + } + pool.closeAndJoin() + + assert(maxSeen <= 2) { "maxSeen was " + maxSeen + ", expected <= 2" } + """.trimIndent()) + } + + @Test + fun testExceptionCapturedInDeferred() = runBlocking { + eval(""" + val pool = LaunchPool(2) + val good = pool.launch { 42 } + val bad = pool.launch { throw IllegalArgumentException("boom") } + pool.closeAndJoin() + + assertEquals(42, good.await()) + assertThrows(IllegalArgumentException) { bad.await() } + """.trimIndent()) + } + + @Test + fun testPoolContinuesAfterLambdaException() = runBlocking { + eval(""" + val pool = LaunchPool(1) + val bad = pool.launch { throw IllegalArgumentException("fail") } + val good = pool.launch { "ok" } + pool.closeAndJoin() + + assertThrows(IllegalArgumentException) { bad.await() } + assertEquals("ok", good.await()) + """.trimIndent()) + } + + @Test + fun testLaunchAfterCloseAndJoinThrows() = runBlocking { + eval(""" + val pool = LaunchPool(2) + pool.launch { 1 } + pool.closeAndJoin() + + assertThrows(IllegalStateException) { pool.launch { 2 } } + """.trimIndent()) + } + + @Test + fun testLaunchAfterCancelThrows() = runBlocking { + eval(""" + val pool = LaunchPool(2) + pool.cancel() + + assertThrows(IllegalStateException) { pool.launch { 1 } } + """.trimIndent()) + } + + @Test + fun testCancelAndJoinWaitsForWorkers() = runBlocking { + eval(""" + val pool = LaunchPool(2) + pool.launch { delay(5) } + pool.cancelAndJoin() + // pool is now closed — further launches must throw + assertThrows(IllegalStateException) { pool.launch { 1 } } + """.trimIndent()) + } + + @Test + fun testCloseAndJoinDrainsQueue() = runBlocking { + eval(""" + val mu = Mutex() + val results = [] + val pool = LaunchPool(1) // single worker to force sequential execution + + (1..5).forEach { n -> + pool.launch { + delay(1) + mu.withLock { results += n } + } + } + pool.closeAndJoin() // waits for all 5 to complete + + assertEquals(5, results.size) + """.trimIndent()) + } + + @Test + fun testBoundedQueueSuspendsProducer() = runBlocking { + eval(""" + // queue of 2 + 1 worker; producer can only be 1 ahead of what's running + val pool = LaunchPool(1, 2) + val order = [] + val mu = Mutex() + + // fill the queue + val d1 = pool.launch { delay(5); mu.withLock { order += 1 } } + val d2 = pool.launch { delay(3); mu.withLock { order += 2 } } + val d3 = pool.launch { delay(3); mu.withLock { order += 3 } } + + pool.closeAndJoin() + assertEquals(3, order.size) + """.trimIndent()) + } + + @Test + fun testUnlimitedQueueDefault() = runBlocking { + eval(""" + val pool = LaunchPool(4) + val jobs = (1..50).map { n -> pool.launch { n } } + pool.closeAndJoin() + var sum = 0 + for (j in jobs) { sum += (j as Deferred).await() } + assertEquals(1275, sum) // 1+2+...+50 + """.trimIndent()) + } + + @Test + fun testIdempotentClose() = runBlocking { + eval(""" + val pool = LaunchPool(2) + pool.closeAndJoin() + pool.closeAndJoin() // calling again must not throw + pool.cancel() // mixing close calls must not throw either + """.trimIndent()) + } + +} diff --git a/lynglib/src/commonTest/kotlin/TypeInferenceTest.kt b/lynglib/src/commonTest/kotlin/TypeInferenceTest.kt new file mode 100644 index 0000000..5b40fe8 --- /dev/null +++ b/lynglib/src/commonTest/kotlin/TypeInferenceTest.kt @@ -0,0 +1,107 @@ +/* + * Copyright 2026 Sergey S. Chernov real.sergeych@gmail.com + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +import kotlinx.coroutines.runBlocking +import net.sergeych.lyng.eval +import kotlin.test.Test + +/** + * Regression tests for type inference of class fields accessed inside nested closures. + * + * A class field declared as `val foo = SomeType(...)` should have its type inferred as + * `SomeType` everywhere inside the class body, including inside lambdas and closures + * that capture the field via the implicit `this` receiver. + */ +class TypeInferenceTest { + + /** Channel field type inferred from constructor — accessed in a launch closure */ + @Test + fun testChannelFieldInLaunchClosure() = runBlocking { + eval(""" + class Foo { + private val ch = Channel(Channel.UNLIMITED) + private val worker = launch { + var item = ch.receive() + while (item != null) { + item = ch.receive() + } + } + fun start() { + ch.send(1) + ch.close() + (worker as Deferred).await() + } + } + Foo().start() + """.trimIndent()) + } + + /** Mutex field type inferred from constructor — used directly in a method body */ + @Test + fun testMutexFieldDirectUse() = runBlocking { + eval(""" + class Bar { + private val mu = Mutex() + private var count = 0 + fun inc() { mu.withLock { count++ } } + fun get() { count } + } + val b = Bar() + b.inc() + b.inc() + assertEquals(2, b.get()) + """.trimIndent()) + } + + /** CompletableDeferred field type inferred — complete/await used directly */ + @Test + fun testCompletableDeferredFieldDirectUse() = runBlocking { + eval(""" + class Baz { + private val d = CompletableDeferred() + fun complete(v) { d.complete(v) } + fun result() { (d as Deferred).await() } + } + val baz = Baz() + launch { baz.complete(42) } + assertEquals(42, baz.result()) + """.trimIndent()) + } + + /** Channel field accessed inside a map closure within class initializer */ + @Test + fun testChannelFieldInMapAndLaunchClosure() = runBlocking { + eval(""" + class Pool(n) { + private val ch = Channel(Channel.UNLIMITED) + private val workers = (1..n).map { + launch { + var item = ch.receive() + while (item != null) { + item = ch.receive() + } + } + } + fun closeAll() { + ch.close() + for (w in workers) { (w as Deferred).await() } + } + } + Pool(2).closeAll() + """.trimIndent()) + } +} diff --git a/lynglib/stdlib/lyng/root.lyng b/lynglib/stdlib/lyng/root.lyng index 4b9fd57..11eb0e1 100644 --- a/lynglib/stdlib/lyng/root.lyng +++ b/lynglib/stdlib/lyng/root.lyng @@ -543,6 +543,19 @@ class StackTraceEntry( "%s: %s"(at, sourceString) } } +// Private helper: starts one LaunchPool worker coroutine for the given queue. +// Defined outside LaunchPool so the global `launch` is not shadowed by the method. +private fun _launchPoolWorker(q) { + val ch = q as Channel + launch { + var task = ch.receive() + while (task != null) { + task() + task = ch.receive() + } + } +} + /* A pool of coroutines that execute submitted tasks with bounded concurrency. @@ -554,37 +567,79 @@ class StackTraceEntry( Any exception thrown inside a submitted lambda is captured in the returned `Deferred` and never escapes the pool itself. - Once `cancel()` or `closeAndJoin()` is called, any further `launch` call - (including one suspended waiting for queue space) throws `IllegalStateException`. + Once `cancel()`, `cancelAndJoin()`, or `closeAndJoin()` is called, any further + `launch` call (including one that was suspended waiting for queue space) throws + `IllegalStateException`. */ -extern class LaunchPool(maxWorkers: Int, maxQueueSize: Int = Channel.UNLIMITED) { +class LaunchPool(maxWorkers, maxQueueSize = Channel.UNLIMITED) { + private val queue = Channel(maxQueueSize) + private var isClosed = false + private val mu = Mutex() + private val workers = (1..maxWorkers).map { _launchPoolWorker(queue) } + /* Schedule a lambda for execution and return a Deferred for its result. - Suspends the caller if the queue is full. - Throws `IllegalStateException` if the pool is cancelled or closing. + Suspends the caller if the queue is full (maxQueueSize reached). + Throws `IllegalStateException` if the pool is closed or cancelled. + Any exception thrown by the lambda is captured in the returned Deferred + and does not escape the pool. */ - extern fun launch( lambda: ()->T ): Deferred + fun launch(lambda) { + mu.withLock { + if (isClosed) throw IllegalStateException("LaunchPool is closed") + } + val d = CompletableDeferred() + val wrapper = { + try { d.complete(lambda()) } + catch(e) { d.completeExceptionally(e) } + } + // send may suspend if the queue is full; throws if pool is closed meanwhile + try { + queue.send(wrapper) + } catch(e: IllegalStateException) { + throw IllegalStateException("LaunchPool is closed") + } + d + } + + private fun closeQueue() { + mu.withLock { + if (!isClosed) { + isClosed = true + queue.close() + } + } + } /* Immediately cancel all workers and discard any queued tasks. - Running tasks are interrupted via coroutine cancellation. + Tasks suspended at an IO point are interrupted via coroutine cancellation. After this call, `launch` throws `IllegalStateException`. */ - extern fun cancel() + fun cancel() { + closeQueue() + workers.forEach { (it as Deferred).cancel() } + } /* - Like `cancel`, but also waits until any currently-running task has - finished (they will not be interrupted mid-execution). - Useful when workers hold resources (connections, file handles) that - must be released before the caller continues. + Like `cancel`, but cancels all workers and suspends until they all stop. + Worker cancellations are silent — exceptions from cancelled workers are ignored. After this call, `launch` throws `IllegalStateException`. */ - extern fun cancelAndJoin() + fun cancelAndJoin() { + closeQueue() + workers.forEach { (it as Deferred).cancel() } + } /* Stop accepting new tasks and suspend until all queued and running tasks have completed normally. After this call, `launch` throws `IllegalStateException`. */ - extern fun closeAndJoin() + fun closeAndJoin() { + closeQueue() + for (w in workers) { + (w as Deferred).await() + } + } } \ No newline at end of file