+async commands allow adapter access (remote calls) from command handlers.

This commit is contained in:
Sergey Chernov 2022-09-27 16:06:06 +03:00
parent b68da9237a
commit f455f2b955
6 changed files with 81 additions and 22 deletions

View File

@ -1,5 +1,7 @@
# PARanodi SECuruty 3 protocol # PARanodi SECuruty 3 protocol
> v0.1.*+ __are incompatible with 0.0.* versions due to binary protocol optimization.
This is a connection-agnostic, full-duplex RPC type binary protocol, effective to work with binary data, such as encrypted data, keys, multimedia, etc. Its key points are: This is a connection-agnostic, full-duplex RPC type binary protocol, effective to work with binary data, such as encrypted data, keys, multimedia, etc. Its key points are:
- simple and practical transport RPC layer, which is a primary choice when, for exaple, `wss://` level by TSL is enough, e.g. when there is no sensitive data being transmitted (games, etc). - simple and practical transport RPC layer, which is a primary choice when, for exaple, `wss://` level by TSL is enough, e.g. when there is no sensitive data being transmitted (games, etc).

View File

@ -10,7 +10,7 @@ plugins {
} }
group = "net.sergeych" group = "net.sergeych"
version = "0.0.2-SNAPSHOT" version = "0.1.0-SNAPSHOT"
repositories { repositories {
mavenCentral() mavenCentral()
@ -19,6 +19,9 @@ repositories {
} }
kotlin { kotlin {
jvmToolchain {
languageVersion.set(JavaLanguageVersion.of("11"))
}
jvm { jvm {
compilations.all { compilations.all {
kotlinOptions.jvmTarget = "11" kotlinOptions.jvmTarget = "11"
@ -38,6 +41,7 @@ kotlin {
sourceSets { sourceSets {
val commonMain by getting { val commonMain by getting {
dependencies { dependencies {
implementation("org.jetbrains.kotlinx:kotlinx-coroutines-core:1.6.3")
implementation("io.ktor:ktor-client-core:$ktor_version") implementation("io.ktor:ktor-client-core:$ktor_version")
implementation("io.ktor:ktor-client-websockets:$ktor_version") implementation("io.ktor:ktor-client-websockets:$ktor_version")
api("net.sergeych:unikrypto:1.2.0-SNAPSHOT") api("net.sergeych:unikrypto:1.2.0-SNAPSHOT")
@ -74,4 +78,15 @@ kotlin {
val jsMain by getting val jsMain by getting
val jsTest by getting val jsTest by getting
} }
publishing {
repositories {
maven {
url = uri("https://maven.universablockchain.com/")
credentials {
username = System.getenv("maven_user")
password = System.getenv("maven_password")
}
}
}
}
} }

View File

@ -1,6 +1,6 @@
package net.sergeych.parsec3 package net.sergeych.parsec3
import kotlinx.coroutines.CompletableDeferred import kotlinx.coroutines.*
import kotlinx.coroutines.sync.Mutex import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock import kotlinx.coroutines.sync.withLock
import kotlinx.serialization.json.Json import kotlinx.serialization.json.Json
@ -74,6 +74,8 @@ open class Adapter<T>(
private val sendEncoded: suspend (data: ByteArray) -> Unit, private val sendEncoded: suspend (data: ByteArray) -> Unit,
) : LogTag("ADPTR") { ) : LogTag("ADPTR") {
val scope = CoroutineScope(GlobalScope.coroutineContext)
private val completions = mutableMapOf<Int, CompletableDeferred<ByteArray>>() private val completions = mutableMapOf<Int, CompletableDeferred<ByteArray>>()
private var lastId = 1 private var lastId = 1
private val access = Mutex() private val access = Mutex()
@ -105,25 +107,37 @@ open class Adapter<T>(
} }
} }
/**
* Cancels the scope that is used to call incoming commands. Cancelling the scope effectively cancels any
* unfinished commands. It _will not wait for its completion_.
*
* Not calling it might cause unknown number of pending command processing coroutines to remain active.
*/
fun cancel() {
scope.cancel()
}
private suspend fun processIncomingPackage(pe: Package) { private suspend fun processIncomingPackage(pe: Package) {
when (pe) { when (pe) {
is Package.Command -> { is Package.Command -> {
try { scope.launch {
val handler = commandHost.handler(pe.name) try {
val result = handler.invoke(instance, pe.args) val handler = commandHost.handler(pe.name)
sendPackage( val result = handler.invoke(instance, pe.args)
Package.Response(pe.id, result) sendPackage(
) Package.Response(pe.id, result)
} catch (ae: ParsecException) { )
sendPackage(Package.Response(pe.id, null, ae.code, ae.text)) } catch (ae: ParsecException) {
} catch (ex: Throwable) { sendPackage(Package.Response(pe.id, null, ae.code, ae.text))
exceptionRegistry.classCodes[ex::class]?.let { code -> } catch (ex: Throwable) {
sendPackage(Package.Response(pe.id, null, code, ex.toString())) exceptionRegistry.classCodes[ex::class]?.let { code ->
} ?: run { sendPackage(Package.Response(pe.id, null, code, ex.toString()))
ex.printStackTrace() } ?: run {
sendPackage(Package.Response(pe.id, null, "UNKNOWN_ERROR", ex.toString())) ex.printStackTrace()
sendPackage(Package.Response(pe.id, null, "UNKNOWN_ERROR", ex.toString()))
}
}
} }
}
} }
is Package.Response -> { is Package.Response -> {

View File

@ -17,6 +17,10 @@ class AdapterBuilder<S, H : CommandHost<S>>(
sessionProducer = f sessionProducer = f
} }
private var _adapter: Adapter<S>? = null
val adapter: Adapter<S> get() = _adapter ?: throw IllegalStateException("adapter is not yet initialized")
/** /**
* Register command implementation * Register command implementation
*/ */
@ -30,7 +34,10 @@ class AdapterBuilder<S, H : CommandHost<S>>(
suspend fun createWith(input: Flow<ByteArray>, f: suspend (ByteArray)->Unit ): Adapter<S> { suspend fun createWith(input: Flow<ByteArray>, f: suspend (ByteArray)->Unit ): Adapter<S> {
return Adapter<S>(sessionProducer(), api, exceptionRegistry) { f(it) } return Adapter<S>(sessionProducer(), api, exceptionRegistry) { f(it) }
.also { a-> globalLaunch { input.collect { a.receiveFrame(it)} } } .also { a->
globalLaunch { input.collect { a.receiveFrame(it)} }
_adapter = a
}
} }
suspend fun create(f: suspend (ByteArray) -> Unit): Adapter<S> { suspend fun create(f: suspend (ByteArray) -> Unit): Adapter<S> {

View File

@ -1,11 +1,13 @@
package net.sergeych.parsec3 package net.sergeych.parsec3
import kotlinx.serialization.SerialName
import kotlinx.serialization.Serializable import kotlinx.serialization.Serializable
/** /**
* The parsec3 package transmit requests and responses over the parsec3 channel. * The parsec3 package transmit requests and responses over the parsec3 channel.
*/ */
@Serializable @Serializable
@SerialName("p3")
sealed class Package { sealed class Package {
/** /**
* Invoke a remote command. * Invoke a remote command.
@ -15,6 +17,7 @@ sealed class Package {
* @param args whatever arguments the command accepts serialized with BOSS. * @param args whatever arguments the command accepts serialized with BOSS.
*/ */
@Serializable @Serializable
@SerialName("cmd")
data class Command(val id: Int, val name: String, val args: ByteArray) : Package() data class Command(val id: Int, val name: String, val args: ByteArray) : Package()
@ -28,6 +31,7 @@ sealed class Package {
* @param errorCode exception code, if not null then result must be ignored (and assumed to be null). * @param errorCode exception code, if not null then result must be ignored (and assumed to be null).
*/ */
@Serializable @Serializable
@SerialName("rsp")
data class Response( data class Response(
val toId: Int, val toId: Int,
val result: ByteArray? = null, val result: ByteArray? = null,

View File

@ -62,6 +62,7 @@ internal class AdapterTest {
// create command `foo` that takes a string argument and // create command `foo` that takes a string argument and
// returns a string: // returns a string:
val bar by command<String, String>() val bar by command<String, String>()
val loopCall by command<String, String>()
} }
@Test @Test
@ -90,12 +91,27 @@ internal class AdapterTest {
on(api.bar) { on(api.bar) {
it + "bar" it + "bar"
} }
on(api.loopCall) {
try {
val res = ApiS1.foo(adapter, "loop-")
"%% $res %%"
}
catch(t: Throwable) {
t.printStackTrace()
throw t
}
}
} }
val a1 = b1.createWith(ch21.receiveAsFlow()) { ch12.send(it) } val a1 = b1.createWith(ch21.receiveAsFlow()) {
val a2 = b2.createWith(ch12.receiveAsFlow()) { ch21.send(it) } ch12.send(it)
}
val a2 = b2.createWith(ch12.receiveAsFlow()) {
ch21.send(it)
}
assertEquals("123bar", a1.invokeCommand(ApiS2<Unit>().bar, "123")) // assertEquals("123bar", a1.invokeCommand(ApiS2<Unit>().bar, "123"))
assertEquals("%% loop-42foo %%", a1.invokeCommand(ApiS2<Unit>().loopCall, "123"))
assertEquals("32142foo", a2.invokeCommand(ApiS1.foo, "321")) assertEquals("32142foo", a2.invokeCommand(ApiS1.foo, "321"))
assertEquals("---42foo", ApiS1.foo.invoke(a2, "---")) assertEquals("---42foo", ApiS1.foo.invoke(a2, "---"))
@ -105,7 +121,8 @@ internal class AdapterTest {
ch12.cancel() ch12.cancel()
ch21.cancel() ch21.cancel()
a1.cancel()
a2.cancel()
} }
} }