From fbb1683ba3e1cff544e58d1e57801c549c76dd2b Mon Sep 17 00:00:00 2001 From: sergeych Date: Thu, 9 Apr 2026 11:55:01 +0300 Subject: [PATCH] Fix concurrent TCP CLI scope and add regressions --- examples/tcpserver.lyng | 85 +++++++++--- lyng/src/commonMain/kotlin/Common.kt | 21 ++- .../sergeych/CliTcpServerRegressionTest.kt | 121 ++++++++++++++++++ .../io/net/LyngNetTcpServerExampleTest.kt | 108 +++++++++++----- .../lyngio/net/NetConcurrentLoopbackTest.kt | 68 ++++++++++ .../kotlin/net/sergeych/lyng/Script.kt | 13 +- .../lyng/bytecode/BytecodeCompiler.kt | 11 +- .../net/sergeych/lyng/bytecode/CmdRuntime.kt | 39 +++++- .../net/sergeych/lyng/pacman/ImportManager.kt | 6 + .../src/commonTest/kotlin/CoroutinesTest.kt | 60 +++++++++ 10 files changed, 471 insertions(+), 61 deletions(-) create mode 100644 lyngio/src/commonTest/kotlin/net/sergeych/lyngio/net/NetConcurrentLoopbackTest.kt diff --git a/examples/tcpserver.lyng b/examples/tcpserver.lyng index db72fb4..8c11c97 100644 --- a/examples/tcpserver.lyng +++ b/examples/tcpserver.lyng @@ -1,21 +1,72 @@ -import lyng.buffer import lyng.io.net -val server = Net.tcpListen(0, "127.0.0.1") -val port = server.localAddress().port -val accepted = launch { - val client = server.accept() - val line = (client.read(4) as Buffer).decodeUtf8() - client.writeUtf8("echo:" + line) - client.flush() - client.close() - server.close() - line +val host = "127.0.0.1" +val clientCount = 1000 +val server: TcpServer = Net.tcpListen(0, host, clientCount, true) as TcpServer +val port: Int = server.localAddress().port + +fun payloadFor(index: Int): String { + "$index:${Random.nextInt()}:${Random.nextInt()}" } -val socket = Net.tcpConnect("127.0.0.1", port) -socket.writeUtf8("ping") -socket.flush() -val reply = (socket.read(16) as Buffer).decodeUtf8() -socket.close() -println("${accepted.await()}: $reply") +fun handleClient(client: TcpSocket): String { + try { + val source = client.readLine() + if( source == null ) { + return "server-eof" + } + val reply = "pong: $source" + client.writeUtf8(reply + "\n") + client.flush() + reply + } finally { + client.close() + } +} + +val serverJob: Deferred = launch { + var handlers: List = List() + try { + for( i in 0..<1000 ) { + val client: TcpSocket = server.accept() as TcpSocket + handlers += launch { + handleClient(client) + } + } + handlers.joinAll() + } finally { + server.close() + } +} + +val clientJobs: List = (0.. + val payload = payloadFor(index) + launch { + val socket: TcpSocket = Net.tcpConnect(host, port) as TcpSocket + try { + socket.writeUtf8(payload + "\n") + socket.flush() + val reply = socket.readLine() + if( reply == null ) { + "client-eof:$payload" + } + else { + assertEquals("pong: $payload", reply) + reply + } + } finally { + socket.close() + } + } +} + +val replies = clientJobs.joinAll() +val serverReplies = serverJob.await() as List + +assertEquals(clientCount, replies.size) +assertEquals(clientCount, serverReplies.size) +assertEquals(replies.toSet, serverReplies.toSet) + +val summary = "OK: $clientCount concurrent tcp clients" +println(summary) +summary diff --git a/lyng/src/commonMain/kotlin/Common.kt b/lyng/src/commonMain/kotlin/Common.kt index c3bec89..7b89c4d 100644 --- a/lyng/src/commonMain/kotlin/Common.kt +++ b/lyng/src/commonMain/kotlin/Common.kt @@ -79,8 +79,18 @@ private val baseCliImportManagerDefer = globalDefer { manager } +private fun ImportManager.invalidateCliModuleCaches() { + invalidatePackageCache("lyng.io.fs") + invalidatePackageCache("lyng.io.console") + invalidatePackageCache("lyng.io.http") + invalidatePackageCache("lyng.io.ws") + invalidatePackageCache("lyng.io.net") +} + val baseScopeDefer = globalDefer { - baseCliImportManagerDefer.await().copy().newStdScope().apply { + baseCliImportManagerDefer.await().copy().apply { + invalidateCliModuleCaches() + }.newStdScope().apply { installCliBuiltins() addConst("ARGV", ObjList(mutableListOf())) } @@ -232,7 +242,9 @@ private suspend fun ImportManager.newCliScope(argv: List): Scope = } internal suspend fun newCliScope(argv: List, entryFileName: String? = null): Scope { - val baseManager = baseCliImportManagerDefer.await() + val baseManager = baseCliImportManagerDefer.await().copy().apply { + invalidateCliModuleCaches() + } if (entryFileName == null) { return baseManager.newCliScope(argv) } @@ -241,9 +253,8 @@ internal suspend fun newCliScope(argv: List, entryFileName: String? = nu if (localModules.isEmpty()) { return baseManager.newCliScope(argv) } - val manager = baseManager.copy() - registerLocalCliModules(manager, localModules) - return manager.newCliScope(argv) + registerLocalCliModules(baseManager, localModules) + return baseManager.newCliScope(argv) } fun runMain(args: Array) { diff --git a/lyng/src/commonTest/kotlin/net/sergeych/CliTcpServerRegressionTest.kt b/lyng/src/commonTest/kotlin/net/sergeych/CliTcpServerRegressionTest.kt index 8bd0dc9..5e63187 100644 --- a/lyng/src/commonTest/kotlin/net/sergeych/CliTcpServerRegressionTest.kt +++ b/lyng/src/commonTest/kotlin/net/sergeych/CliTcpServerRegressionTest.kt @@ -6,6 +6,7 @@ import net.sergeych.lyng.Source import net.sergeych.lyng.obj.ObjString import kotlin.test.Test import kotlin.test.assertEquals +import kotlin.test.assertTrue class CliTcpServerRegressionTest { @@ -48,4 +49,124 @@ class CliTcpServerRegressionTest { session.cancelAndJoin() } } + + @Test + fun concurrentTcpExampleRunsInCliScope() = runBlocking { + val cliScope = newCliScope(emptyList()) + val session = EvalSession(cliScope) + + try { + val result = evalOnCliDispatcher( + session, + Source( + "", + """ + import lyng.io.net + + val host = "127.0.0.1" + val clientCount = 32 + val server: TcpServer = Net.tcpListen(0, host, 32, true) as TcpServer + val port: Int = server.localAddress().port + + fun payloadFor(index: Int): String { + "${'$'}index:${'$'}{Random.nextInt()}:${'$'}{Random.nextInt()}" + } + + fun handleClient(client: TcpSocket): String { + try { + val source = client.readLine() + if( source == null ) { + return "server-eof" + } + val reply = "pong: ${'$'}source" + client.writeUtf8(reply + "\n") + client.flush() + reply + } finally { + client.close() + } + } + + val serverJob: Deferred = launch { + var handlers: List = List() + try { + for( i in 0..<32 ) { + val client: TcpSocket = server.accept() as TcpSocket + handlers += launch { + handleClient(client) + } + } + handlers.joinAll() + } finally { + server.close() + } + } + + val clientJobs = (0.. + val payload = payloadFor(index) + launch { + val socket: TcpSocket = Net.tcpConnect(host, port) as TcpSocket + try { + socket.writeUtf8(payload + "\n") + socket.flush() + val reply = socket.readLine() + if( reply == null ) { + "client-eof:${'$'}payload" + } else { + assertEquals("pong: ${'$'}payload", reply) + reply + } + } finally { + socket.close() + } + } + } + + val replies = clientJobs.joinAll() + val serverReplies = serverJob.await() as List + assertEquals(clientCount, replies.size) + assertEquals(clientCount, serverReplies.size) + assertEquals(replies.toSet, serverReplies.toSet) + "OK:${'$'}clientCount:${'$'}{replies.toSet}:${'$'}{serverReplies.toSet}" + """.trimIndent() + ) + ) + + val text = (result as ObjString).value + assertTrue(text.startsWith("OK:32:"), text) + } finally { + session.cancelAndJoin() + } + } + + @Test + fun mixedModuleAndLocalCapturesWorkInCliScope() = runBlocking { + val cliScope = newCliScope(emptyList()) + val session = EvalSession(cliScope) + + try { + val result = evalOnCliDispatcher( + session, + Source( + "", + """ + val prefix = "pong" + val jobs = (0..<32).map { index -> + val payload = "${'$'}index:${'$'}{Random.nextInt()}" + launch { + delay(5) + "${'$'}prefix:${'$'}payload" + } + } + jobs.joinAll() + """.trimIndent() + ) + ) as net.sergeych.lyng.obj.ObjList + + assertEquals(32, result.list.size) + assertEquals(32, result.list.map { (it as ObjString).value }.toSet().size) + } finally { + session.cancelAndJoin() + } + } } diff --git a/lyngio/src/commonTest/kotlin/net/sergeych/lyng/io/net/LyngNetTcpServerExampleTest.kt b/lyngio/src/commonTest/kotlin/net/sergeych/lyng/io/net/LyngNetTcpServerExampleTest.kt index dfb7bff..590b1e6 100644 --- a/lyngio/src/commonTest/kotlin/net/sergeych/lyng/io/net/LyngNetTcpServerExampleTest.kt +++ b/lyngio/src/commonTest/kotlin/net/sergeych/lyng/io/net/LyngNetTcpServerExampleTest.kt @@ -18,7 +18,7 @@ package net.sergeych.lyng.io.net import kotlinx.coroutines.Dispatchers -import kotlinx.coroutines.test.runTest +import kotlinx.coroutines.runBlocking import kotlinx.coroutines.withContext import kotlinx.coroutines.withTimeout import net.sergeych.lyng.Compiler @@ -30,44 +30,92 @@ import kotlin.test.assertEquals class LyngNetTcpServerExampleTest { + private fun concurrentTcpScript(clientCount: Int): String = """ + import lyng.io.net + + val host = "127.0.0.1" + val clientCount = $clientCount + val server: TcpServer = Net.tcpListen(0, host, clientCount, true) as TcpServer + val port: Int = server.localAddress().port + + fun payloadFor(index: Int): String { + "${'$'}index:${'$'}{Random.nextInt()}:${'$'}{Random.nextInt()}" + } + + fun handleClient(client: TcpSocket): String { + try { + val source = client.readLine() + if( source == null ) { + return "server-eof" + } + val reply = "pong: ${'$'}source" + client.writeUtf8(reply + "\n") + client.flush() + reply + } finally { + client.close() + } + } + + val serverJob: Deferred = launch { + var handlers: List = List() + try { + for( i in 0..<${clientCount} ) { + val client: TcpSocket = server.accept() as TcpSocket + handlers += launch { + handleClient(client) + } + } + handlers.joinAll() + } finally { + server.close() + } + } + + val clientJobs: List = (0.. + val payload = payloadFor(index) + launch { + val socket: TcpSocket = Net.tcpConnect(host, port) as TcpSocket + try { + socket.writeUtf8(payload + "\n") + socket.flush() + val reply = socket.readLine() + if( reply == null ) { + "client-eof:${'$'}payload" + } + else { + assertEquals("pong: ${'$'}payload", reply) + reply + } + } finally { + socket.close() + } + } + } + + val replies = clientJobs.joinAll() + val serverReplies = serverJob.await() as List + + assertEquals(clientCount, replies.size) + assertEquals(clientCount, serverReplies.size) + assertEquals(replies.toSet, serverReplies.toSet) + "OK:${'$'}clientCount" + """.trimIndent() + @Test - fun tcpServerExampleRoundTripsOverLoopback() = runTest { + fun tcpServerExampleSurvivesConcurrentLoopbackLoad() = runBlocking { val engine = getSystemNetEngine() - if (!engine.isSupported || !engine.isTcpAvailable || !engine.isTcpServerAvailable) return@runTest + if (!engine.isSupported || !engine.isTcpAvailable || !engine.isTcpServerAvailable) return@runBlocking val scope = Script.newScope() createNetModule(PermitAllNetAccessPolicy, scope) - val code = """ - import lyng.buffer - import lyng.io.net - - val server = Net.tcpListen(0, "127.0.0.1") - val port = server.localAddress().port - val accepted = launch { - val client = server.accept() - val line = (client.read(4) as Buffer).decodeUtf8() - client.writeUtf8("echo:" + line) - client.flush() - client.close() - server.close() - line - } - - val socket = Net.tcpConnect("127.0.0.1", port) - socket.writeUtf8("ping") - socket.flush() - val reply = (socket.read(16) as Buffer).decodeUtf8() - socket.close() - "${'$'}{accepted.await()}: ${'$'}reply" - """.trimIndent() - val result = withContext(Dispatchers.Default) { - withTimeout(5_000) { - Compiler.compile(code).execute(scope).inspect(scope) + withTimeout(20_000) { + Compiler.compile(concurrentTcpScript(clientCount = 32)).execute(scope).inspect(scope) } } - assertEquals("\"ping: echo:ping\"", result) + assertEquals("\"OK:32\"", result) } } diff --git a/lyngio/src/commonTest/kotlin/net/sergeych/lyngio/net/NetConcurrentLoopbackTest.kt b/lyngio/src/commonTest/kotlin/net/sergeych/lyngio/net/NetConcurrentLoopbackTest.kt new file mode 100644 index 0000000..d2b5e87 --- /dev/null +++ b/lyngio/src/commonTest/kotlin/net/sergeych/lyngio/net/NetConcurrentLoopbackTest.kt @@ -0,0 +1,68 @@ +package net.sergeych.lyngio.net + +import kotlinx.coroutines.async +import kotlinx.coroutines.awaitAll +import kotlinx.coroutines.coroutineScope +import kotlinx.coroutines.runBlocking +import kotlinx.coroutines.withTimeout +import kotlin.test.Test +import kotlin.test.assertEquals +import kotlin.test.assertTrue + +class NetConcurrentLoopbackTest { + + @Test + fun concurrentTcpRoundTripsWorkAtEngineLevel() = runBlocking { + val engine = getSystemNetEngine() + if (!engine.isSupported || !engine.isTcpAvailable || !engine.isTcpServerAvailable) return@runBlocking + + withTimeout(10_000) { + val clients = 32 + val server = engine.tcpListen(host = "127.0.0.1", port = 0, backlog = 64, reuseAddress = true) + val serverJob = async { + coroutineScope { + val handlers = ArrayList>(clients) + repeat(clients) { + val client = server.accept() + handlers += async { + try { + val payload = client.readLine() + val reply = "pong:$payload" + client.writeUtf8("$reply\n") + client.flush() + reply + } finally { + client.close() + } + } + } + handlers.awaitAll() + } + } + + val clientJobs = coroutineScope { + (0 until clients).map { index -> + async { + val payload = "ping:$index" + val socket = engine.tcpConnect("127.0.0.1", server.localAddress().port, timeoutMillis = 2_000, noDelay = true) + try { + socket.writeUtf8("$payload\n") + socket.flush() + socket.readLine() + } finally { + socket.close() + } + } + } + } + + val clientReplies = clientJobs.awaitAll() + val serverReplies = serverJob.await() + server.close() + + assertEquals((0 until clients).map { "pong:ping:$it" }, clientReplies) + assertEquals((0 until clients).map { "pong:ping:$it" }, serverReplies) + assertTrue(clientReplies.all { it != null }) + } + } +} diff --git a/lynglib/src/commonMain/kotlin/net/sergeych/lyng/Script.kt b/lynglib/src/commonMain/kotlin/net/sergeych/lyng/Script.kt index 06a936b..14a0998 100644 --- a/lynglib/src/commonMain/kotlin/net/sergeych/lyng/Script.kt +++ b/lynglib/src/commonMain/kotlin/net/sergeych/lyng/Script.kt @@ -24,6 +24,7 @@ import net.sergeych.lyng.bridge.bind import net.sergeych.lyng.bridge.bindObject import net.sergeych.lyng.bytecode.CmdFunction import net.sergeych.lyng.bytecode.CmdVm +import net.sergeych.lyng.bytecode.BytecodeLambdaCallable import net.sergeych.lyng.miniast.* import net.sergeych.lyng.obj.* import net.sergeych.lyng.pacman.ImportManager @@ -635,16 +636,20 @@ class Script( addConst("MapEntry", ObjMapEntry.type) addFn("launch") { - val callable = requireOnlyArg() - val captured = this + val rawCallable = requireOnlyArg() + val currentScope = requireScope() + // Freeze non-module lexical state at launch time so each coroutine gets + // its own view of loop locals and other transient frame values. + val captured = if (currentScope is ModuleScope) currentScope else currentScope.snapshotForClosure() + val callable = (rawCallable as? BytecodeLambdaCallable)?.freezeForLaunch(captured) ?: rawCallable val session = EvalSession.currentOrNull() val deferred = if (session != null) { session.launchTrackedDeferred { - captured.call(callable) + ScopeBridge(captured).call(callable) } } else { globalDefer { - captured.call(callable) + ScopeBridge(captured).call(callable) } } ObjDeferred(deferred) diff --git a/lynglib/src/commonMain/kotlin/net/sergeych/lyng/bytecode/BytecodeCompiler.kt b/lynglib/src/commonMain/kotlin/net/sergeych/lyng/bytecode/BytecodeCompiler.kt index 1a456fd..d6d0d78 100644 --- a/lynglib/src/commonMain/kotlin/net/sergeych/lyng/bytecode/BytecodeCompiler.kt +++ b/lynglib/src/commonMain/kotlin/net/sergeych/lyng/bytecode/BytecodeCompiler.kt @@ -1034,15 +1034,18 @@ class BytecodeCompiler( val typeValue = compileRefWithFallback(ref.castTypeRef(), null, ref.castPos()) ?: return null val objValue = ensureObjSlot(value) val typeObj = ensureObjSlot(typeValue) + val sourceSlot = allocSlot() + builder.emit(Opcode.MOVE_OBJ, objValue.slot, sourceSlot) + updateSlotType(sourceSlot, SlotType.OBJ) if (!ref.castIsNullable()) { - builder.emit(Opcode.ASSERT_IS, objValue.slot, typeObj.slot) + builder.emit(Opcode.ASSERT_IS, sourceSlot, typeObj.slot) val resultSlot = allocSlot() - builder.emit(Opcode.MAKE_QUALIFIED_VIEW, objValue.slot, typeObj.slot, resultSlot) + builder.emit(Opcode.MAKE_QUALIFIED_VIEW, sourceSlot, typeObj.slot, resultSlot) updateSlotType(resultSlot, SlotType.OBJ) return CompiledValue(resultSlot, SlotType.OBJ) } val checkSlot = allocSlot() - builder.emit(Opcode.CHECK_IS, objValue.slot, typeObj.slot, checkSlot) + builder.emit(Opcode.CHECK_IS, sourceSlot, typeObj.slot, checkSlot) updateSlotType(checkSlot, SlotType.BOOL) val resultSlot = allocSlot() val nullSlot = allocSlot() @@ -1056,7 +1059,7 @@ class BytecodeCompiler( builder.emit(Opcode.MOVE_OBJ, nullSlot, resultSlot) builder.emit(Opcode.JMP, listOf(CmdBuilder.Operand.LabelRef(endLabel))) builder.mark(okLabel) - builder.emit(Opcode.MAKE_QUALIFIED_VIEW, objValue.slot, typeObj.slot, resultSlot) + builder.emit(Opcode.MAKE_QUALIFIED_VIEW, sourceSlot, typeObj.slot, resultSlot) builder.mark(endLabel) updateSlotType(resultSlot, SlotType.OBJ) return CompiledValue(resultSlot, SlotType.OBJ) diff --git a/lynglib/src/commonMain/kotlin/net/sergeych/lyng/bytecode/CmdRuntime.kt b/lynglib/src/commonMain/kotlin/net/sergeych/lyng/bytecode/CmdRuntime.kt index 1d807d3..df45f71 100644 --- a/lynglib/src/commonMain/kotlin/net/sergeych/lyng/bytecode/CmdRuntime.kt +++ b/lynglib/src/commonMain/kotlin/net/sergeych/lyng/bytecode/CmdRuntime.kt @@ -3783,11 +3783,48 @@ class BytecodeLambdaCallable( private val returnLabels: Set, override val pos: Pos, ) : Statement(), BytecodeCallable { + private fun freezeRecord(record: ObjRecord): ObjRecord { + val frozenValue = when (val raw = record.value) { + is net.sergeych.lyng.FrameSlotRef -> raw.read() + is net.sergeych.lyng.RecordSlotRef -> raw.read() + is net.sergeych.lyng.ScopeSlotRef -> raw.read() + else -> raw + } + return record.copy(value = frozenValue) + } + + private fun resolveCaptureRecords(base: Scope): List? { + if (captureNames.isEmpty()) return null + return captureNames.map { name -> + base.chainLookupIgnoreClosure( + name, + followClosure = true, + caller = base.currentClassCtx + ) ?: base.raiseSymbolNotFound("symbol $name not found") + } + } + fun rebindClosure(newClosureScope: Scope): BytecodeLambdaCallable { return BytecodeLambdaCallable( fn = fn, closureScope = newClosureScope, - captureRecords = captureRecords, + captureRecords = resolveCaptureRecords(newClosureScope) ?: captureRecords, + captureNames = captureNames, + paramSlotPlan = paramSlotPlan, + argsDeclaration = argsDeclaration, + preferredThisType = preferredThisType, + returnLabels = returnLabels, + pos = pos + ) + } + + fun freezeForLaunch(newClosureScope: Scope): BytecodeLambdaCallable { + val frozenCaptures = captureRecords?.map(::freezeRecord) + ?: resolveCaptureRecords(newClosureScope)?.map(::freezeRecord) + return BytecodeLambdaCallable( + fn = fn, + closureScope = newClosureScope, + captureRecords = frozenCaptures, captureNames = captureNames, paramSlotPlan = paramSlotPlan, argsDeclaration = argsDeclaration, diff --git a/lynglib/src/commonMain/kotlin/net/sergeych/lyng/pacman/ImportManager.kt b/lynglib/src/commonMain/kotlin/net/sergeych/lyng/pacman/ImportManager.kt index d6956bd..dec219c 100644 --- a/lynglib/src/commonMain/kotlin/net/sergeych/lyng/pacman/ImportManager.kt +++ b/lynglib/src/commonMain/kotlin/net/sergeych/lyng/pacman/ImportManager.kt @@ -157,4 +157,10 @@ class ImportManager( } } + fun invalidatePackageCache(name: String) { + op.withLock { + imports[name]?.cachedScope = null + } + } + } diff --git a/lynglib/src/commonTest/kotlin/CoroutinesTest.kt b/lynglib/src/commonTest/kotlin/CoroutinesTest.kt index a597c96..5dd3de1 100644 --- a/lynglib/src/commonTest/kotlin/CoroutinesTest.kt +++ b/lynglib/src/commonTest/kotlin/CoroutinesTest.kt @@ -139,6 +139,66 @@ class TestCoroutines { ) } + @Test + fun testLaunchCapturesDistinctLoopValues() = runTest { + eval( + """ + val jobs = [] + for( i in 0..<8 ) { + val x = i + jobs += launch { + delay(5) + x + } + } + + assertEquals([0,1,2,3,4,5,6,7], jobs.joinAll()) + """.trimIndent() + ) + } + + @Test + fun testNestedLaunchCapturesDistinctLoopValues() = runTest { + eval( + """ + val outer = launch { + val jobs = [] + for( i in 0..<8 ) { + val x = i + jobs += launch { + delay(5) + x + } + } + jobs.joinAll() + } + + assertEquals([0,1,2,3,4,5,6,7], outer.await()) + """.trimIndent() + ) + } + + @Test + fun testLaunchCapturesDistinctObjectValues() = runTest { + eval( + """ + val jobs = [] + for( i in 0..<8 ) { + val box = ["item:${'$'}i"] + jobs += launch { + delay(5) + box[0] + } + } + + assertEquals( + ["item:0", "item:1", "item:2", "item:3", "item:4", "item:5", "item:6", "item:7"], + jobs.joinAll() + ) + """.trimIndent() + ) + } + @Test fun testFlows() = runTest { eval("""