fix #47 parallelism: launch, Deferred, CompletableDeferred, Mutex (delay was already implemented)

This commit is contained in:
Sergey Chernov 2025-08-07 20:31:36 +03:00
parent de71c7c8ec
commit 19f8b6605b
9 changed files with 280 additions and 2 deletions

View File

@ -1,5 +1,7 @@
# Advanced topics
__See also:__ [parallelism].
## Closures/scopes isolation
Each block has own scope, in which it can safely use closures and override
@ -111,3 +113,5 @@ arguments list in almost arbitrary ways. For example:
>>> void
,
[parallelism]: parallelism.md

118
docs/parallelism.md Normal file
View File

@ -0,0 +1,118 @@
# Parallelism in Lyng
Lyng is built to me multithreaded where possible (e.g. all targets byt JS and wasmJS as for now)
and cooperatively parallel (coroutine based) everywhere.
In Lyng, every function, every lambda are _coroutines_. It means, you can have as many of these as you want without risking running out of memory on threads stack, or get too many threads.
Depending on the platform, these coroutines may be executed on different CPU and cores, too, truly in parallel. Where not, like Javascript browser, they are still executed cooperatively. You should not care about the platform capabilities, just call `launch`:
// track coroutine call:
var xIsCalled = false
// launch coroutine in parallel
val x = launch {
// wait 10ms to let main code to be executed
delay(10)
// now set the flag
xIsCalled = true
// and return something useful:
"ok"
}
// corouine is launhed, but not yet executed
// due to delay call:
assert(!xIsCalled)
// now we wait for it to be executed:
assertEquals( x.await(), "ok")
// now glag should be set:
assert(xIsCalled)
>>> void
This example shows how to launch a coroutine with `launch` which returns [Deferred] instance, the latter have ways to await for the coroutine completion and retrieve possible result.
Launch has the only argument which should be a callable (lambda usually) that is run in parallel (or cooperatively in parallel), and return anything as the result.
## Synchronization: Mutex
Suppose we have a resource, that could be used concurrently, a coutner in our case. If we won'r protect it, concurrent usage cause RC, Race Condition, providing wrong result:
var counter = 0
(1..4).map {
launch {
// slow increment:
val c = counter
delay(10)
counter = c + 1
}
}.forEach { it.await() }
assert(counter < 4)
>>> void
The obviously wrong result is not 4, as all coroutines capture the counter value, which is 1, then sleep for 5ms, then save 1 + 1 as result. May some coroutines will pass, so it will be 1 or 2, most likely.
Using [Mutex] makes it all working:
var counter = 0
val mutex = Mutex()
(1..4).map {
launch {
// slow increment:
mutex.withLock {
val c = counter
delay(10)
counter = c + 1
}
}
}.forEach { it.await() }
assertEquals(4, counter)
>>> void
now everything works as expected: `mutex.withLock` makes them all be executed in sequence, not in parallel.
## Completable deferred
Sometimes it is convenient to manually set completion status of some deferred result. This is when [CompletableDeferred] is used:
// this variable will be completed later:
val done = CompletableDeferred()
// complete it ater delay
launch {
delay(10)
// complete it setting the result:
done.complete("ok")
}
// now it is still not completed: coroutine is delayed
// (ot not started on sinthe-threaded platforms):
assert(!done.isCompleted)
assert(done.isActive)
// then we can just await it as any other deferred:
assertEquals( done.await(), "ok")
// and as any other deferred it is now complete:
assert(done.isCompleted)
## True parallelism
Cooperative, coroutine-based parallelism is automatically available on all platforms. Depending on the platform, though, the coroutines could be dispatched also in different threads; where there are multiple cores and/or CPU available, it means the coroutines could be exuted truly in parallel, unless [Mutex] is used:
| platofrm | multithreaded |
|------------|---------------|
| JVM | yes |
| Android | yes |
| Javascript | NO |
| wasmJS | NO |
| IOS | yes |
| MacOSX | yes |
| Linux | yes |
| Windows | yes |
So it is important to always use [Mutex] where concurrent execution could be a problem (so called Race Conditions, or RC).

View File

@ -4,7 +4,7 @@ clikt = "5.0.3"
kotlin = "2.1.21"
android-minSdk = "24"
android-compileSdk = "34"
kotlinx-coroutines = "1.10.1"
kotlinx-coroutines = "1.9.0"
mp_bintools = "0.1.12"
firebaseCrashlyticsBuildtools = "3.0.3"
okioVersion = "3.10.2"

View File

@ -48,7 +48,6 @@ data class Arguments(val list: List<Obj>, val tailBlockMode: Boolean = false) :
fun inspect(): String = list.joinToString(", ") { it.inspect() }
companion object {
val EMPTY = Arguments(emptyList())
fun from(values: Collection<Obj>) = Arguments(values.toList())

View File

@ -4,6 +4,7 @@ import kotlinx.coroutines.delay
import net.sergeych.lyng.obj.*
import net.sergeych.lyng.pacman.ImportManager
import net.sergeych.lynon.ObjLynonClass
import net.sergeych.mp_tools.globalDefer
import kotlin.math.*
class Script(
@ -175,6 +176,18 @@ class Script(
addConst("Array", ObjArray)
addConst("Class", ObjClassType)
addConst("Deferred", ObjDeferred.type)
addConst("CompletableDeferred", ObjCompletableDeferred.type)
addConst("Mutex", ObjMutex.type)
addFn("launch") {
val callable = args.firstAndOnly() as Statement
ObjDeferred(globalDefer {
callable.execute(this@addFn)
})
}
val pi = ObjReal(PI)
addConst("π", pi)
getOrCreateNamespace("Math").apply {

View File

@ -0,0 +1,22 @@
package net.sergeych.lyng.obj
import kotlinx.coroutines.CompletableDeferred
import net.sergeych.lyng.Scope
class ObjCompletableDeferred(val completableDeferred: CompletableDeferred<Obj>): ObjDeferred(completableDeferred) {
override val objClass = type
companion object {
val type = object: ObjClass("CompletableDeferred", ObjDeferred.type){
override suspend fun callOn(scope: Scope): Obj {
return ObjCompletableDeferred(CompletableDeferred())
}
}.apply {
addFn("complete") {
thisAs<ObjCompletableDeferred>().completableDeferred.complete(args.firstAndOnly())
ObjVoid
}
}
}
}

View File

@ -0,0 +1,51 @@
package net.sergeych.lyng.obj
import kotlinx.coroutines.Deferred
import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock
import net.sergeych.lyng.Scope
import net.sergeych.lyng.Statement
open class ObjDeferred(val deferred: Deferred<Obj>): Obj() {
override val objClass = type
companion object {
val type = object: ObjClass("Deferred"){
override suspend fun callOn(scope: Scope): Obj {
scope.raiseError("Deferred constructor is not directly callable")
}
}.apply {
addFn("await") {
thisAs<ObjDeferred>().deferred.await()
}
addFn("isCompleted") {
thisAs<ObjDeferred>().deferred.isCompleted.toObj()
}
addFn("isActive") {
thisAs<ObjDeferred>().deferred.isActive.toObj()
}
addFn("isCancelled") {
thisAs<ObjDeferred>().deferred.isCancelled.toObj()
}
}
}
}
class ObjMutex(val mutex: Mutex): Obj() {
override val objClass = type
companion object {
val type = object: ObjClass("Mutex") {
override suspend fun callOn(scope: Scope): Obj {
return ObjMutex(Mutex())
}
}.apply {
addFn("withLock") {
val f = requiredArg<Statement>(0)
thisAs<ObjMutex>().mutex.withLock { f.execute(this) }
}
}
}
}

View File

@ -0,0 +1,66 @@
import kotlinx.coroutines.test.runTest
import net.sergeych.lyng.eval
import kotlin.test.Test
class TestCoroutines {
@Test
fun testLaunch() = runTest {
eval(
"""
var passed = false
val x = launch {
delay(10)
passed = true
"ok"
}
assert(!passed)
assertEquals( x.await(), "ok")
assert(passed)
assert(x.isCompleted)
""".trimIndent()
)
}
@Test
fun testCompletableDeferred() = runTest {
eval(
"""
val done = CompletableDeferred()
launch {
delay(10)
done.complete("ok")
}
assert(!done.isCompleted)
assert(done.isActive)
assertEquals( done.await(), "ok")
assert(done.isCompleted)
""".trimIndent()
)
}
@Test
fun testMutex() = runTest {
eval(
"""
var counter = 0
val mutex = Mutex()
(1..4).map {
launch {
// mutex.withLock {
val c = counter
delay(5)
counter = c + 1
// }
}
}.forEach { it.await() }
println(counter)
assert( counter < 10 )
""".trimIndent()
)
}
}

View File

@ -299,4 +299,9 @@ class BookTest {
runDocTests("../docs/time.md")
}
@Test
fun testParallelismBook() = runBlocking {
runDocTests("../docs/parallelism.md")
}
}