Fix concurrent TCP CLI scope and add regressions
This commit is contained in:
parent
c9eb3df93d
commit
fbb1683ba3
@ -1,21 +1,72 @@
|
|||||||
import lyng.buffer
|
|
||||||
import lyng.io.net
|
import lyng.io.net
|
||||||
|
|
||||||
val server = Net.tcpListen(0, "127.0.0.1")
|
val host = "127.0.0.1"
|
||||||
val port = server.localAddress().port
|
val clientCount = 1000
|
||||||
val accepted = launch {
|
val server: TcpServer = Net.tcpListen(0, host, clientCount, true) as TcpServer
|
||||||
val client = server.accept()
|
val port: Int = server.localAddress().port
|
||||||
val line = (client.read(4) as Buffer).decodeUtf8()
|
|
||||||
client.writeUtf8("echo:" + line)
|
fun payloadFor(index: Int): String {
|
||||||
client.flush()
|
"$index:${Random.nextInt()}:${Random.nextInt()}"
|
||||||
client.close()
|
|
||||||
server.close()
|
|
||||||
line
|
|
||||||
}
|
}
|
||||||
|
|
||||||
val socket = Net.tcpConnect("127.0.0.1", port)
|
fun handleClient(client: TcpSocket): String {
|
||||||
socket.writeUtf8("ping")
|
try {
|
||||||
socket.flush()
|
val source = client.readLine()
|
||||||
val reply = (socket.read(16) as Buffer).decodeUtf8()
|
if( source == null ) {
|
||||||
socket.close()
|
return "server-eof"
|
||||||
println("${accepted.await()}: $reply")
|
}
|
||||||
|
val reply = "pong: $source"
|
||||||
|
client.writeUtf8(reply + "\n")
|
||||||
|
client.flush()
|
||||||
|
reply
|
||||||
|
} finally {
|
||||||
|
client.close()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
val serverJob: Deferred = launch {
|
||||||
|
var handlers: List<Deferred> = List()
|
||||||
|
try {
|
||||||
|
for( i in 0..<1000 ) {
|
||||||
|
val client: TcpSocket = server.accept() as TcpSocket
|
||||||
|
handlers += launch {
|
||||||
|
handleClient(client)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
handlers.joinAll()
|
||||||
|
} finally {
|
||||||
|
server.close()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
val clientJobs: List<Deferred> = (0..<clientCount).map { index ->
|
||||||
|
val payload = payloadFor(index)
|
||||||
|
launch {
|
||||||
|
val socket: TcpSocket = Net.tcpConnect(host, port) as TcpSocket
|
||||||
|
try {
|
||||||
|
socket.writeUtf8(payload + "\n")
|
||||||
|
socket.flush()
|
||||||
|
val reply = socket.readLine()
|
||||||
|
if( reply == null ) {
|
||||||
|
"client-eof:$payload"
|
||||||
|
}
|
||||||
|
else {
|
||||||
|
assertEquals("pong: $payload", reply)
|
||||||
|
reply
|
||||||
|
}
|
||||||
|
} finally {
|
||||||
|
socket.close()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
val replies = clientJobs.joinAll()
|
||||||
|
val serverReplies = serverJob.await() as List<Object>
|
||||||
|
|
||||||
|
assertEquals(clientCount, replies.size)
|
||||||
|
assertEquals(clientCount, serverReplies.size)
|
||||||
|
assertEquals(replies.toSet, serverReplies.toSet)
|
||||||
|
|
||||||
|
val summary = "OK: $clientCount concurrent tcp clients"
|
||||||
|
println(summary)
|
||||||
|
summary
|
||||||
|
|||||||
@ -79,8 +79,18 @@ private val baseCliImportManagerDefer = globalDefer {
|
|||||||
manager
|
manager
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private fun ImportManager.invalidateCliModuleCaches() {
|
||||||
|
invalidatePackageCache("lyng.io.fs")
|
||||||
|
invalidatePackageCache("lyng.io.console")
|
||||||
|
invalidatePackageCache("lyng.io.http")
|
||||||
|
invalidatePackageCache("lyng.io.ws")
|
||||||
|
invalidatePackageCache("lyng.io.net")
|
||||||
|
}
|
||||||
|
|
||||||
val baseScopeDefer = globalDefer {
|
val baseScopeDefer = globalDefer {
|
||||||
baseCliImportManagerDefer.await().copy().newStdScope().apply {
|
baseCliImportManagerDefer.await().copy().apply {
|
||||||
|
invalidateCliModuleCaches()
|
||||||
|
}.newStdScope().apply {
|
||||||
installCliBuiltins()
|
installCliBuiltins()
|
||||||
addConst("ARGV", ObjList(mutableListOf()))
|
addConst("ARGV", ObjList(mutableListOf()))
|
||||||
}
|
}
|
||||||
@ -232,7 +242,9 @@ private suspend fun ImportManager.newCliScope(argv: List<String>): Scope =
|
|||||||
}
|
}
|
||||||
|
|
||||||
internal suspend fun newCliScope(argv: List<String>, entryFileName: String? = null): Scope {
|
internal suspend fun newCliScope(argv: List<String>, entryFileName: String? = null): Scope {
|
||||||
val baseManager = baseCliImportManagerDefer.await()
|
val baseManager = baseCliImportManagerDefer.await().copy().apply {
|
||||||
|
invalidateCliModuleCaches()
|
||||||
|
}
|
||||||
if (entryFileName == null) {
|
if (entryFileName == null) {
|
||||||
return baseManager.newCliScope(argv)
|
return baseManager.newCliScope(argv)
|
||||||
}
|
}
|
||||||
@ -241,9 +253,8 @@ internal suspend fun newCliScope(argv: List<String>, entryFileName: String? = nu
|
|||||||
if (localModules.isEmpty()) {
|
if (localModules.isEmpty()) {
|
||||||
return baseManager.newCliScope(argv)
|
return baseManager.newCliScope(argv)
|
||||||
}
|
}
|
||||||
val manager = baseManager.copy()
|
registerLocalCliModules(baseManager, localModules)
|
||||||
registerLocalCliModules(manager, localModules)
|
return baseManager.newCliScope(argv)
|
||||||
return manager.newCliScope(argv)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
fun runMain(args: Array<String>) {
|
fun runMain(args: Array<String>) {
|
||||||
|
|||||||
@ -6,6 +6,7 @@ import net.sergeych.lyng.Source
|
|||||||
import net.sergeych.lyng.obj.ObjString
|
import net.sergeych.lyng.obj.ObjString
|
||||||
import kotlin.test.Test
|
import kotlin.test.Test
|
||||||
import kotlin.test.assertEquals
|
import kotlin.test.assertEquals
|
||||||
|
import kotlin.test.assertTrue
|
||||||
|
|
||||||
class CliTcpServerRegressionTest {
|
class CliTcpServerRegressionTest {
|
||||||
|
|
||||||
@ -48,4 +49,124 @@ class CliTcpServerRegressionTest {
|
|||||||
session.cancelAndJoin()
|
session.cancelAndJoin()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
fun concurrentTcpExampleRunsInCliScope() = runBlocking {
|
||||||
|
val cliScope = newCliScope(emptyList())
|
||||||
|
val session = EvalSession(cliScope)
|
||||||
|
|
||||||
|
try {
|
||||||
|
val result = evalOnCliDispatcher(
|
||||||
|
session,
|
||||||
|
Source(
|
||||||
|
"<tcp-server-concurrency-cli>",
|
||||||
|
"""
|
||||||
|
import lyng.io.net
|
||||||
|
|
||||||
|
val host = "127.0.0.1"
|
||||||
|
val clientCount = 32
|
||||||
|
val server: TcpServer = Net.tcpListen(0, host, 32, true) as TcpServer
|
||||||
|
val port: Int = server.localAddress().port
|
||||||
|
|
||||||
|
fun payloadFor(index: Int): String {
|
||||||
|
"${'$'}index:${'$'}{Random.nextInt()}:${'$'}{Random.nextInt()}"
|
||||||
|
}
|
||||||
|
|
||||||
|
fun handleClient(client: TcpSocket): String {
|
||||||
|
try {
|
||||||
|
val source = client.readLine()
|
||||||
|
if( source == null ) {
|
||||||
|
return "server-eof"
|
||||||
|
}
|
||||||
|
val reply = "pong: ${'$'}source"
|
||||||
|
client.writeUtf8(reply + "\n")
|
||||||
|
client.flush()
|
||||||
|
reply
|
||||||
|
} finally {
|
||||||
|
client.close()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
val serverJob: Deferred = launch {
|
||||||
|
var handlers: List<Deferred> = List()
|
||||||
|
try {
|
||||||
|
for( i in 0..<32 ) {
|
||||||
|
val client: TcpSocket = server.accept() as TcpSocket
|
||||||
|
handlers += launch {
|
||||||
|
handleClient(client)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
handlers.joinAll()
|
||||||
|
} finally {
|
||||||
|
server.close()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
val clientJobs = (0..<clientCount).map { index ->
|
||||||
|
val payload = payloadFor(index)
|
||||||
|
launch {
|
||||||
|
val socket: TcpSocket = Net.tcpConnect(host, port) as TcpSocket
|
||||||
|
try {
|
||||||
|
socket.writeUtf8(payload + "\n")
|
||||||
|
socket.flush()
|
||||||
|
val reply = socket.readLine()
|
||||||
|
if( reply == null ) {
|
||||||
|
"client-eof:${'$'}payload"
|
||||||
|
} else {
|
||||||
|
assertEquals("pong: ${'$'}payload", reply)
|
||||||
|
reply
|
||||||
|
}
|
||||||
|
} finally {
|
||||||
|
socket.close()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
val replies = clientJobs.joinAll()
|
||||||
|
val serverReplies = serverJob.await() as List<Object>
|
||||||
|
assertEquals(clientCount, replies.size)
|
||||||
|
assertEquals(clientCount, serverReplies.size)
|
||||||
|
assertEquals(replies.toSet, serverReplies.toSet)
|
||||||
|
"OK:${'$'}clientCount:${'$'}{replies.toSet}:${'$'}{serverReplies.toSet}"
|
||||||
|
""".trimIndent()
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
|
val text = (result as ObjString).value
|
||||||
|
assertTrue(text.startsWith("OK:32:"), text)
|
||||||
|
} finally {
|
||||||
|
session.cancelAndJoin()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
fun mixedModuleAndLocalCapturesWorkInCliScope() = runBlocking {
|
||||||
|
val cliScope = newCliScope(emptyList())
|
||||||
|
val session = EvalSession(cliScope)
|
||||||
|
|
||||||
|
try {
|
||||||
|
val result = evalOnCliDispatcher(
|
||||||
|
session,
|
||||||
|
Source(
|
||||||
|
"<cli-capture-regression>",
|
||||||
|
"""
|
||||||
|
val prefix = "pong"
|
||||||
|
val jobs = (0..<32).map { index ->
|
||||||
|
val payload = "${'$'}index:${'$'}{Random.nextInt()}"
|
||||||
|
launch {
|
||||||
|
delay(5)
|
||||||
|
"${'$'}prefix:${'$'}payload"
|
||||||
|
}
|
||||||
|
}
|
||||||
|
jobs.joinAll()
|
||||||
|
""".trimIndent()
|
||||||
|
)
|
||||||
|
) as net.sergeych.lyng.obj.ObjList
|
||||||
|
|
||||||
|
assertEquals(32, result.list.size)
|
||||||
|
assertEquals(32, result.list.map { (it as ObjString).value }.toSet().size)
|
||||||
|
} finally {
|
||||||
|
session.cancelAndJoin()
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@ -18,7 +18,7 @@
|
|||||||
package net.sergeych.lyng.io.net
|
package net.sergeych.lyng.io.net
|
||||||
|
|
||||||
import kotlinx.coroutines.Dispatchers
|
import kotlinx.coroutines.Dispatchers
|
||||||
import kotlinx.coroutines.test.runTest
|
import kotlinx.coroutines.runBlocking
|
||||||
import kotlinx.coroutines.withContext
|
import kotlinx.coroutines.withContext
|
||||||
import kotlinx.coroutines.withTimeout
|
import kotlinx.coroutines.withTimeout
|
||||||
import net.sergeych.lyng.Compiler
|
import net.sergeych.lyng.Compiler
|
||||||
@ -30,44 +30,92 @@ import kotlin.test.assertEquals
|
|||||||
|
|
||||||
class LyngNetTcpServerExampleTest {
|
class LyngNetTcpServerExampleTest {
|
||||||
|
|
||||||
|
private fun concurrentTcpScript(clientCount: Int): String = """
|
||||||
|
import lyng.io.net
|
||||||
|
|
||||||
|
val host = "127.0.0.1"
|
||||||
|
val clientCount = $clientCount
|
||||||
|
val server: TcpServer = Net.tcpListen(0, host, clientCount, true) as TcpServer
|
||||||
|
val port: Int = server.localAddress().port
|
||||||
|
|
||||||
|
fun payloadFor(index: Int): String {
|
||||||
|
"${'$'}index:${'$'}{Random.nextInt()}:${'$'}{Random.nextInt()}"
|
||||||
|
}
|
||||||
|
|
||||||
|
fun handleClient(client: TcpSocket): String {
|
||||||
|
try {
|
||||||
|
val source = client.readLine()
|
||||||
|
if( source == null ) {
|
||||||
|
return "server-eof"
|
||||||
|
}
|
||||||
|
val reply = "pong: ${'$'}source"
|
||||||
|
client.writeUtf8(reply + "\n")
|
||||||
|
client.flush()
|
||||||
|
reply
|
||||||
|
} finally {
|
||||||
|
client.close()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
val serverJob: Deferred = launch {
|
||||||
|
var handlers: List<Deferred> = List()
|
||||||
|
try {
|
||||||
|
for( i in 0..<${clientCount} ) {
|
||||||
|
val client: TcpSocket = server.accept() as TcpSocket
|
||||||
|
handlers += launch {
|
||||||
|
handleClient(client)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
handlers.joinAll()
|
||||||
|
} finally {
|
||||||
|
server.close()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
val clientJobs: List<Deferred> = (0..<clientCount).map { index ->
|
||||||
|
val payload = payloadFor(index)
|
||||||
|
launch {
|
||||||
|
val socket: TcpSocket = Net.tcpConnect(host, port) as TcpSocket
|
||||||
|
try {
|
||||||
|
socket.writeUtf8(payload + "\n")
|
||||||
|
socket.flush()
|
||||||
|
val reply = socket.readLine()
|
||||||
|
if( reply == null ) {
|
||||||
|
"client-eof:${'$'}payload"
|
||||||
|
}
|
||||||
|
else {
|
||||||
|
assertEquals("pong: ${'$'}payload", reply)
|
||||||
|
reply
|
||||||
|
}
|
||||||
|
} finally {
|
||||||
|
socket.close()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
val replies = clientJobs.joinAll()
|
||||||
|
val serverReplies = serverJob.await() as List<Object>
|
||||||
|
|
||||||
|
assertEquals(clientCount, replies.size)
|
||||||
|
assertEquals(clientCount, serverReplies.size)
|
||||||
|
assertEquals(replies.toSet, serverReplies.toSet)
|
||||||
|
"OK:${'$'}clientCount"
|
||||||
|
""".trimIndent()
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
fun tcpServerExampleRoundTripsOverLoopback() = runTest {
|
fun tcpServerExampleSurvivesConcurrentLoopbackLoad() = runBlocking {
|
||||||
val engine = getSystemNetEngine()
|
val engine = getSystemNetEngine()
|
||||||
if (!engine.isSupported || !engine.isTcpAvailable || !engine.isTcpServerAvailable) return@runTest
|
if (!engine.isSupported || !engine.isTcpAvailable || !engine.isTcpServerAvailable) return@runBlocking
|
||||||
|
|
||||||
val scope = Script.newScope()
|
val scope = Script.newScope()
|
||||||
createNetModule(PermitAllNetAccessPolicy, scope)
|
createNetModule(PermitAllNetAccessPolicy, scope)
|
||||||
|
|
||||||
val code = """
|
|
||||||
import lyng.buffer
|
|
||||||
import lyng.io.net
|
|
||||||
|
|
||||||
val server = Net.tcpListen(0, "127.0.0.1")
|
|
||||||
val port = server.localAddress().port
|
|
||||||
val accepted = launch {
|
|
||||||
val client = server.accept()
|
|
||||||
val line = (client.read(4) as Buffer).decodeUtf8()
|
|
||||||
client.writeUtf8("echo:" + line)
|
|
||||||
client.flush()
|
|
||||||
client.close()
|
|
||||||
server.close()
|
|
||||||
line
|
|
||||||
}
|
|
||||||
|
|
||||||
val socket = Net.tcpConnect("127.0.0.1", port)
|
|
||||||
socket.writeUtf8("ping")
|
|
||||||
socket.flush()
|
|
||||||
val reply = (socket.read(16) as Buffer).decodeUtf8()
|
|
||||||
socket.close()
|
|
||||||
"${'$'}{accepted.await()}: ${'$'}reply"
|
|
||||||
""".trimIndent()
|
|
||||||
|
|
||||||
val result = withContext(Dispatchers.Default) {
|
val result = withContext(Dispatchers.Default) {
|
||||||
withTimeout(5_000) {
|
withTimeout(20_000) {
|
||||||
Compiler.compile(code).execute(scope).inspect(scope)
|
Compiler.compile(concurrentTcpScript(clientCount = 32)).execute(scope).inspect(scope)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
assertEquals("\"ping: echo:ping\"", result)
|
assertEquals("\"OK:32\"", result)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@ -0,0 +1,68 @@
|
|||||||
|
package net.sergeych.lyngio.net
|
||||||
|
|
||||||
|
import kotlinx.coroutines.async
|
||||||
|
import kotlinx.coroutines.awaitAll
|
||||||
|
import kotlinx.coroutines.coroutineScope
|
||||||
|
import kotlinx.coroutines.runBlocking
|
||||||
|
import kotlinx.coroutines.withTimeout
|
||||||
|
import kotlin.test.Test
|
||||||
|
import kotlin.test.assertEquals
|
||||||
|
import kotlin.test.assertTrue
|
||||||
|
|
||||||
|
class NetConcurrentLoopbackTest {
|
||||||
|
|
||||||
|
@Test
|
||||||
|
fun concurrentTcpRoundTripsWorkAtEngineLevel() = runBlocking {
|
||||||
|
val engine = getSystemNetEngine()
|
||||||
|
if (!engine.isSupported || !engine.isTcpAvailable || !engine.isTcpServerAvailable) return@runBlocking
|
||||||
|
|
||||||
|
withTimeout(10_000) {
|
||||||
|
val clients = 32
|
||||||
|
val server = engine.tcpListen(host = "127.0.0.1", port = 0, backlog = 64, reuseAddress = true)
|
||||||
|
val serverJob = async {
|
||||||
|
coroutineScope {
|
||||||
|
val handlers = ArrayList<kotlinx.coroutines.Deferred<String>>(clients)
|
||||||
|
repeat(clients) {
|
||||||
|
val client = server.accept()
|
||||||
|
handlers += async {
|
||||||
|
try {
|
||||||
|
val payload = client.readLine()
|
||||||
|
val reply = "pong:$payload"
|
||||||
|
client.writeUtf8("$reply\n")
|
||||||
|
client.flush()
|
||||||
|
reply
|
||||||
|
} finally {
|
||||||
|
client.close()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
handlers.awaitAll()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
val clientJobs = coroutineScope {
|
||||||
|
(0 until clients).map { index ->
|
||||||
|
async {
|
||||||
|
val payload = "ping:$index"
|
||||||
|
val socket = engine.tcpConnect("127.0.0.1", server.localAddress().port, timeoutMillis = 2_000, noDelay = true)
|
||||||
|
try {
|
||||||
|
socket.writeUtf8("$payload\n")
|
||||||
|
socket.flush()
|
||||||
|
socket.readLine()
|
||||||
|
} finally {
|
||||||
|
socket.close()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
val clientReplies = clientJobs.awaitAll()
|
||||||
|
val serverReplies = serverJob.await()
|
||||||
|
server.close()
|
||||||
|
|
||||||
|
assertEquals((0 until clients).map { "pong:ping:$it" }, clientReplies)
|
||||||
|
assertEquals((0 until clients).map { "pong:ping:$it" }, serverReplies)
|
||||||
|
assertTrue(clientReplies.all { it != null })
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
@ -24,6 +24,7 @@ import net.sergeych.lyng.bridge.bind
|
|||||||
import net.sergeych.lyng.bridge.bindObject
|
import net.sergeych.lyng.bridge.bindObject
|
||||||
import net.sergeych.lyng.bytecode.CmdFunction
|
import net.sergeych.lyng.bytecode.CmdFunction
|
||||||
import net.sergeych.lyng.bytecode.CmdVm
|
import net.sergeych.lyng.bytecode.CmdVm
|
||||||
|
import net.sergeych.lyng.bytecode.BytecodeLambdaCallable
|
||||||
import net.sergeych.lyng.miniast.*
|
import net.sergeych.lyng.miniast.*
|
||||||
import net.sergeych.lyng.obj.*
|
import net.sergeych.lyng.obj.*
|
||||||
import net.sergeych.lyng.pacman.ImportManager
|
import net.sergeych.lyng.pacman.ImportManager
|
||||||
@ -635,16 +636,20 @@ class Script(
|
|||||||
addConst("MapEntry", ObjMapEntry.type)
|
addConst("MapEntry", ObjMapEntry.type)
|
||||||
|
|
||||||
addFn("launch") {
|
addFn("launch") {
|
||||||
val callable = requireOnlyArg<Obj>()
|
val rawCallable = requireOnlyArg<Obj>()
|
||||||
val captured = this
|
val currentScope = requireScope()
|
||||||
|
// Freeze non-module lexical state at launch time so each coroutine gets
|
||||||
|
// its own view of loop locals and other transient frame values.
|
||||||
|
val captured = if (currentScope is ModuleScope) currentScope else currentScope.snapshotForClosure()
|
||||||
|
val callable = (rawCallable as? BytecodeLambdaCallable)?.freezeForLaunch(captured) ?: rawCallable
|
||||||
val session = EvalSession.currentOrNull()
|
val session = EvalSession.currentOrNull()
|
||||||
val deferred = if (session != null) {
|
val deferred = if (session != null) {
|
||||||
session.launchTrackedDeferred {
|
session.launchTrackedDeferred {
|
||||||
captured.call(callable)
|
ScopeBridge(captured).call(callable)
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
globalDefer {
|
globalDefer {
|
||||||
captured.call(callable)
|
ScopeBridge(captured).call(callable)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
ObjDeferred(deferred)
|
ObjDeferred(deferred)
|
||||||
|
|||||||
@ -1034,15 +1034,18 @@ class BytecodeCompiler(
|
|||||||
val typeValue = compileRefWithFallback(ref.castTypeRef(), null, ref.castPos()) ?: return null
|
val typeValue = compileRefWithFallback(ref.castTypeRef(), null, ref.castPos()) ?: return null
|
||||||
val objValue = ensureObjSlot(value)
|
val objValue = ensureObjSlot(value)
|
||||||
val typeObj = ensureObjSlot(typeValue)
|
val typeObj = ensureObjSlot(typeValue)
|
||||||
|
val sourceSlot = allocSlot()
|
||||||
|
builder.emit(Opcode.MOVE_OBJ, objValue.slot, sourceSlot)
|
||||||
|
updateSlotType(sourceSlot, SlotType.OBJ)
|
||||||
if (!ref.castIsNullable()) {
|
if (!ref.castIsNullable()) {
|
||||||
builder.emit(Opcode.ASSERT_IS, objValue.slot, typeObj.slot)
|
builder.emit(Opcode.ASSERT_IS, sourceSlot, typeObj.slot)
|
||||||
val resultSlot = allocSlot()
|
val resultSlot = allocSlot()
|
||||||
builder.emit(Opcode.MAKE_QUALIFIED_VIEW, objValue.slot, typeObj.slot, resultSlot)
|
builder.emit(Opcode.MAKE_QUALIFIED_VIEW, sourceSlot, typeObj.slot, resultSlot)
|
||||||
updateSlotType(resultSlot, SlotType.OBJ)
|
updateSlotType(resultSlot, SlotType.OBJ)
|
||||||
return CompiledValue(resultSlot, SlotType.OBJ)
|
return CompiledValue(resultSlot, SlotType.OBJ)
|
||||||
}
|
}
|
||||||
val checkSlot = allocSlot()
|
val checkSlot = allocSlot()
|
||||||
builder.emit(Opcode.CHECK_IS, objValue.slot, typeObj.slot, checkSlot)
|
builder.emit(Opcode.CHECK_IS, sourceSlot, typeObj.slot, checkSlot)
|
||||||
updateSlotType(checkSlot, SlotType.BOOL)
|
updateSlotType(checkSlot, SlotType.BOOL)
|
||||||
val resultSlot = allocSlot()
|
val resultSlot = allocSlot()
|
||||||
val nullSlot = allocSlot()
|
val nullSlot = allocSlot()
|
||||||
@ -1056,7 +1059,7 @@ class BytecodeCompiler(
|
|||||||
builder.emit(Opcode.MOVE_OBJ, nullSlot, resultSlot)
|
builder.emit(Opcode.MOVE_OBJ, nullSlot, resultSlot)
|
||||||
builder.emit(Opcode.JMP, listOf(CmdBuilder.Operand.LabelRef(endLabel)))
|
builder.emit(Opcode.JMP, listOf(CmdBuilder.Operand.LabelRef(endLabel)))
|
||||||
builder.mark(okLabel)
|
builder.mark(okLabel)
|
||||||
builder.emit(Opcode.MAKE_QUALIFIED_VIEW, objValue.slot, typeObj.slot, resultSlot)
|
builder.emit(Opcode.MAKE_QUALIFIED_VIEW, sourceSlot, typeObj.slot, resultSlot)
|
||||||
builder.mark(endLabel)
|
builder.mark(endLabel)
|
||||||
updateSlotType(resultSlot, SlotType.OBJ)
|
updateSlotType(resultSlot, SlotType.OBJ)
|
||||||
return CompiledValue(resultSlot, SlotType.OBJ)
|
return CompiledValue(resultSlot, SlotType.OBJ)
|
||||||
|
|||||||
@ -3783,11 +3783,48 @@ class BytecodeLambdaCallable(
|
|||||||
private val returnLabels: Set<String>,
|
private val returnLabels: Set<String>,
|
||||||
override val pos: Pos,
|
override val pos: Pos,
|
||||||
) : Statement(), BytecodeCallable {
|
) : Statement(), BytecodeCallable {
|
||||||
|
private fun freezeRecord(record: ObjRecord): ObjRecord {
|
||||||
|
val frozenValue = when (val raw = record.value) {
|
||||||
|
is net.sergeych.lyng.FrameSlotRef -> raw.read()
|
||||||
|
is net.sergeych.lyng.RecordSlotRef -> raw.read()
|
||||||
|
is net.sergeych.lyng.ScopeSlotRef -> raw.read()
|
||||||
|
else -> raw
|
||||||
|
}
|
||||||
|
return record.copy(value = frozenValue)
|
||||||
|
}
|
||||||
|
|
||||||
|
private fun resolveCaptureRecords(base: Scope): List<ObjRecord>? {
|
||||||
|
if (captureNames.isEmpty()) return null
|
||||||
|
return captureNames.map { name ->
|
||||||
|
base.chainLookupIgnoreClosure(
|
||||||
|
name,
|
||||||
|
followClosure = true,
|
||||||
|
caller = base.currentClassCtx
|
||||||
|
) ?: base.raiseSymbolNotFound("symbol $name not found")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
fun rebindClosure(newClosureScope: Scope): BytecodeLambdaCallable {
|
fun rebindClosure(newClosureScope: Scope): BytecodeLambdaCallable {
|
||||||
return BytecodeLambdaCallable(
|
return BytecodeLambdaCallable(
|
||||||
fn = fn,
|
fn = fn,
|
||||||
closureScope = newClosureScope,
|
closureScope = newClosureScope,
|
||||||
captureRecords = captureRecords,
|
captureRecords = resolveCaptureRecords(newClosureScope) ?: captureRecords,
|
||||||
|
captureNames = captureNames,
|
||||||
|
paramSlotPlan = paramSlotPlan,
|
||||||
|
argsDeclaration = argsDeclaration,
|
||||||
|
preferredThisType = preferredThisType,
|
||||||
|
returnLabels = returnLabels,
|
||||||
|
pos = pos
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
fun freezeForLaunch(newClosureScope: Scope): BytecodeLambdaCallable {
|
||||||
|
val frozenCaptures = captureRecords?.map(::freezeRecord)
|
||||||
|
?: resolveCaptureRecords(newClosureScope)?.map(::freezeRecord)
|
||||||
|
return BytecodeLambdaCallable(
|
||||||
|
fn = fn,
|
||||||
|
closureScope = newClosureScope,
|
||||||
|
captureRecords = frozenCaptures,
|
||||||
captureNames = captureNames,
|
captureNames = captureNames,
|
||||||
paramSlotPlan = paramSlotPlan,
|
paramSlotPlan = paramSlotPlan,
|
||||||
argsDeclaration = argsDeclaration,
|
argsDeclaration = argsDeclaration,
|
||||||
|
|||||||
@ -157,4 +157,10 @@ class ImportManager(
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fun invalidatePackageCache(name: String) {
|
||||||
|
op.withLock {
|
||||||
|
imports[name]?.cachedScope = null
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|||||||
@ -139,6 +139,66 @@ class TestCoroutines {
|
|||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
fun testLaunchCapturesDistinctLoopValues() = runTest {
|
||||||
|
eval(
|
||||||
|
"""
|
||||||
|
val jobs = []
|
||||||
|
for( i in 0..<8 ) {
|
||||||
|
val x = i
|
||||||
|
jobs += launch {
|
||||||
|
delay(5)
|
||||||
|
x
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
assertEquals([0,1,2,3,4,5,6,7], jobs.joinAll())
|
||||||
|
""".trimIndent()
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
fun testNestedLaunchCapturesDistinctLoopValues() = runTest {
|
||||||
|
eval(
|
||||||
|
"""
|
||||||
|
val outer = launch {
|
||||||
|
val jobs = []
|
||||||
|
for( i in 0..<8 ) {
|
||||||
|
val x = i
|
||||||
|
jobs += launch {
|
||||||
|
delay(5)
|
||||||
|
x
|
||||||
|
}
|
||||||
|
}
|
||||||
|
jobs.joinAll()
|
||||||
|
}
|
||||||
|
|
||||||
|
assertEquals([0,1,2,3,4,5,6,7], outer.await())
|
||||||
|
""".trimIndent()
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
fun testLaunchCapturesDistinctObjectValues() = runTest {
|
||||||
|
eval(
|
||||||
|
"""
|
||||||
|
val jobs = []
|
||||||
|
for( i in 0..<8 ) {
|
||||||
|
val box = ["item:${'$'}i"]
|
||||||
|
jobs += launch {
|
||||||
|
delay(5)
|
||||||
|
box[0]
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
assertEquals(
|
||||||
|
["item:0", "item:1", "item:2", "item:3", "item:4", "item:5", "item:6", "item:7"],
|
||||||
|
jobs.joinAll()
|
||||||
|
)
|
||||||
|
""".trimIndent()
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
fun testFlows() = runTest {
|
fun testFlows() = runTest {
|
||||||
eval("""
|
eval("""
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user