From f455f2b955b5b16ec2963e05b266ec3f3dd56eb7 Mon Sep 17 00:00:00 2001 From: sergeych Date: Tue, 27 Sep 2022 16:06:06 +0300 Subject: [PATCH] +async commands allow adapter access (remote calls) from command handlers. --- README.md | 2 + build.gradle.kts | 17 ++++++- .../kotlin/net.sergeych.parsec3/Adapter.kt | 46 ++++++++++++------- .../net.sergeych.parsec3/AdapterBuilder.kt | 9 +++- .../kotlin/net.sergeych.parsec3/Package.kt | 4 ++ src/commonTest/kotlin/parsec3/AdapterTest.kt | 25 ++++++++-- 6 files changed, 81 insertions(+), 22 deletions(-) diff --git a/README.md b/README.md index 89898ff..b052f6c 100644 --- a/README.md +++ b/README.md @@ -1,5 +1,7 @@ # PARanodi SECuruty 3 protocol +> v0.1.*+ __are incompatible with 0.0.* versions due to binary protocol optimization. + This is a connection-agnostic, full-duplex RPC type binary protocol, effective to work with binary data, such as encrypted data, keys, multimedia, etc. Its key points are: - simple and practical transport RPC layer, which is a primary choice when, for exaple, `wss://` level by TSL is enough, e.g. when there is no sensitive data being transmitted (games, etc). diff --git a/build.gradle.kts b/build.gradle.kts index 394034a..b5054e6 100644 --- a/build.gradle.kts +++ b/build.gradle.kts @@ -10,7 +10,7 @@ plugins { } group = "net.sergeych" -version = "0.0.2-SNAPSHOT" +version = "0.1.0-SNAPSHOT" repositories { mavenCentral() @@ -19,6 +19,9 @@ repositories { } kotlin { + jvmToolchain { + languageVersion.set(JavaLanguageVersion.of("11")) + } jvm { compilations.all { kotlinOptions.jvmTarget = "11" @@ -38,6 +41,7 @@ kotlin { sourceSets { val commonMain by getting { dependencies { + implementation("org.jetbrains.kotlinx:kotlinx-coroutines-core:1.6.3") implementation("io.ktor:ktor-client-core:$ktor_version") implementation("io.ktor:ktor-client-websockets:$ktor_version") api("net.sergeych:unikrypto:1.2.0-SNAPSHOT") @@ -74,4 +78,15 @@ kotlin { val jsMain by getting val jsTest by getting } + publishing { + repositories { + maven { + url = uri("https://maven.universablockchain.com/") + credentials { + username = System.getenv("maven_user") + password = System.getenv("maven_password") + } + } + } + } } diff --git a/src/commonMain/kotlin/net.sergeych.parsec3/Adapter.kt b/src/commonMain/kotlin/net.sergeych.parsec3/Adapter.kt index 5ffcb82..968fa0f 100644 --- a/src/commonMain/kotlin/net.sergeych.parsec3/Adapter.kt +++ b/src/commonMain/kotlin/net.sergeych.parsec3/Adapter.kt @@ -1,6 +1,6 @@ package net.sergeych.parsec3 -import kotlinx.coroutines.CompletableDeferred +import kotlinx.coroutines.* import kotlinx.coroutines.sync.Mutex import kotlinx.coroutines.sync.withLock import kotlinx.serialization.json.Json @@ -74,6 +74,8 @@ open class Adapter( private val sendEncoded: suspend (data: ByteArray) -> Unit, ) : LogTag("ADPTR") { + val scope = CoroutineScope(GlobalScope.coroutineContext) + private val completions = mutableMapOf>() private var lastId = 1 private val access = Mutex() @@ -105,25 +107,37 @@ open class Adapter( } } + /** + * Cancels the scope that is used to call incoming commands. Cancelling the scope effectively cancels any + * unfinished commands. It _will not wait for its completion_. + * + * Not calling it might cause unknown number of pending command processing coroutines to remain active. + */ + fun cancel() { + scope.cancel() + } + private suspend fun processIncomingPackage(pe: Package) { when (pe) { is Package.Command -> { - try { - val handler = commandHost.handler(pe.name) - val result = handler.invoke(instance, pe.args) - sendPackage( - Package.Response(pe.id, result) - ) - } catch (ae: ParsecException) { - sendPackage(Package.Response(pe.id, null, ae.code, ae.text)) - } catch (ex: Throwable) { - exceptionRegistry.classCodes[ex::class]?.let { code -> - sendPackage(Package.Response(pe.id, null, code, ex.toString())) - } ?: run { - ex.printStackTrace() - sendPackage(Package.Response(pe.id, null, "UNKNOWN_ERROR", ex.toString())) + scope.launch { + try { + val handler = commandHost.handler(pe.name) + val result = handler.invoke(instance, pe.args) + sendPackage( + Package.Response(pe.id, result) + ) + } catch (ae: ParsecException) { + sendPackage(Package.Response(pe.id, null, ae.code, ae.text)) + } catch (ex: Throwable) { + exceptionRegistry.classCodes[ex::class]?.let { code -> + sendPackage(Package.Response(pe.id, null, code, ex.toString())) + } ?: run { + ex.printStackTrace() + sendPackage(Package.Response(pe.id, null, "UNKNOWN_ERROR", ex.toString())) + } + } } - } } is Package.Response -> { diff --git a/src/commonMain/kotlin/net.sergeych.parsec3/AdapterBuilder.kt b/src/commonMain/kotlin/net.sergeych.parsec3/AdapterBuilder.kt index c36b547..fba5c8f 100644 --- a/src/commonMain/kotlin/net.sergeych.parsec3/AdapterBuilder.kt +++ b/src/commonMain/kotlin/net.sergeych.parsec3/AdapterBuilder.kt @@ -17,6 +17,10 @@ class AdapterBuilder>( sessionProducer = f } + private var _adapter: Adapter? = null + + val adapter: Adapter get() = _adapter ?: throw IllegalStateException("adapter is not yet initialized") + /** * Register command implementation */ @@ -30,7 +34,10 @@ class AdapterBuilder>( suspend fun createWith(input: Flow, f: suspend (ByteArray)->Unit ): Adapter { return Adapter(sessionProducer(), api, exceptionRegistry) { f(it) } - .also { a-> globalLaunch { input.collect { a.receiveFrame(it)} } } + .also { a-> + globalLaunch { input.collect { a.receiveFrame(it)} } + _adapter = a + } } suspend fun create(f: suspend (ByteArray) -> Unit): Adapter { diff --git a/src/commonMain/kotlin/net.sergeych.parsec3/Package.kt b/src/commonMain/kotlin/net.sergeych.parsec3/Package.kt index 656256d..c8e0f69 100644 --- a/src/commonMain/kotlin/net.sergeych.parsec3/Package.kt +++ b/src/commonMain/kotlin/net.sergeych.parsec3/Package.kt @@ -1,11 +1,13 @@ package net.sergeych.parsec3 +import kotlinx.serialization.SerialName import kotlinx.serialization.Serializable /** * The parsec3 package transmit requests and responses over the parsec3 channel. */ @Serializable +@SerialName("p3") sealed class Package { /** * Invoke a remote command. @@ -15,6 +17,7 @@ sealed class Package { * @param args whatever arguments the command accepts serialized with BOSS. */ @Serializable + @SerialName("cmd") data class Command(val id: Int, val name: String, val args: ByteArray) : Package() @@ -28,6 +31,7 @@ sealed class Package { * @param errorCode exception code, if not null then result must be ignored (and assumed to be null). */ @Serializable + @SerialName("rsp") data class Response( val toId: Int, val result: ByteArray? = null, diff --git a/src/commonTest/kotlin/parsec3/AdapterTest.kt b/src/commonTest/kotlin/parsec3/AdapterTest.kt index 485ad1a..7172abf 100644 --- a/src/commonTest/kotlin/parsec3/AdapterTest.kt +++ b/src/commonTest/kotlin/parsec3/AdapterTest.kt @@ -62,6 +62,7 @@ internal class AdapterTest { // create command `foo` that takes a string argument and // returns a string: val bar by command() + val loopCall by command() } @Test @@ -90,12 +91,27 @@ internal class AdapterTest { on(api.bar) { it + "bar" } + on(api.loopCall) { + try { + val res = ApiS1.foo(adapter, "loop-") + "%% $res %%" + } + catch(t: Throwable) { + t.printStackTrace() + throw t + } + } } - val a1 = b1.createWith(ch21.receiveAsFlow()) { ch12.send(it) } - val a2 = b2.createWith(ch12.receiveAsFlow()) { ch21.send(it) } + val a1 = b1.createWith(ch21.receiveAsFlow()) { + ch12.send(it) + } + val a2 = b2.createWith(ch12.receiveAsFlow()) { + ch21.send(it) + } - assertEquals("123bar", a1.invokeCommand(ApiS2().bar, "123")) +// assertEquals("123bar", a1.invokeCommand(ApiS2().bar, "123")) + assertEquals("%% loop-42foo %%", a1.invokeCommand(ApiS2().loopCall, "123")) assertEquals("32142foo", a2.invokeCommand(ApiS1.foo, "321")) assertEquals("---42foo", ApiS1.foo.invoke(a2, "---")) @@ -105,7 +121,8 @@ internal class AdapterTest { ch12.cancel() ch21.cancel() - + a1.cancel() + a2.cancel() } } \ No newline at end of file