From 07f9e720a14774ca80160e5009ccc7d6fe0381d8 Mon Sep 17 00:00:00 2001 From: sergeych Date: Wed, 28 Sep 2022 00:19:07 +0300 Subject: [PATCH] !refactored session to WithAdapter based to propery run many connections on the server --- build.gradle.kts | 2 +- .../kotlin/net.sergeych.parsec3/Adapter.kt | 7 +++--- .../net.sergeych.parsec3/AdapterBuilder.kt | 23 +++++++++++-------- .../net.sergeych.parsec3/CommandDescriptor.kt | 2 +- .../net.sergeych.parsec3/CommandHost.kt | 2 +- .../net.sergeych.parsec3/Parsec3Transport.kt | 2 +- .../net.sergeych.parsec3/Parsec3WSClient.kt | 2 +- src/commonTest/kotlin/parsec3/AdapterTest.kt | 17 +++++++------- .../kotlin/net/sergeych/parsec3/WsServer.kt | 2 +- .../net/sergeych/parsec3/WsServerKtTest.kt | 4 ++-- 10 files changed, 33 insertions(+), 30 deletions(-) diff --git a/build.gradle.kts b/build.gradle.kts index b5054e6..3202e3c 100644 --- a/build.gradle.kts +++ b/build.gradle.kts @@ -10,7 +10,7 @@ plugins { } group = "net.sergeych" -version = "0.1.0-SNAPSHOT" +version = "0.1.1-SNAPSHOT" repositories { mavenCentral() diff --git a/src/commonMain/kotlin/net.sergeych.parsec3/Adapter.kt b/src/commonMain/kotlin/net.sergeych.parsec3/Adapter.kt index 968fa0f..3a9a546 100644 --- a/src/commonMain/kotlin/net.sergeych.parsec3/Adapter.kt +++ b/src/commonMain/kotlin/net.sergeych.parsec3/Adapter.kt @@ -8,7 +8,6 @@ import net.sergeych.boss_serialization.BossDecoder import net.sergeych.boss_serialization_mp.BossEncoder import net.sergeych.boss_serialization_mp.decodeBoss import net.sergeych.mp_logger.LogTag -import net.sergeych.mp_logger.debug import net.sergeych.mp_logger.exception import net.sergeych.mp_logger.warning import net.sergeych.mptools.toDump @@ -67,7 +66,7 @@ import net.sergeych.mptools.toDump * parsec3 built-in exceptions. * @param sendEncoded a method that performs actual sending of the packed binary frame to the remote side */ -open class Adapter( +open class Adapter( private val instance: T, private val commandHost: CommandHost, private val exceptionRegistry: ExceptionsRegistry = ExceptionsRegistry(), @@ -95,14 +94,14 @@ open class Adapter( return CompletableDeferred().also { dr -> sendPackage( access.withLock { - debug { "calling $lastId:${ca.name}($args)" } +// debug { "calling $lastId:${ca.name}($args)" } completions[lastId] = dr myId = lastId Package.Command(lastId++, ca.name, BossEncoder.encode(ca.ass, args)) } ) }.await().let { - debug { "result $myId:$it" } +// debug { "result $myId:$it" } BossDecoder.decodeFrom(ca.rss, it) } } diff --git a/src/commonMain/kotlin/net.sergeych.parsec3/AdapterBuilder.kt b/src/commonMain/kotlin/net.sergeych.parsec3/AdapterBuilder.kt index fba5c8f..bef4c16 100644 --- a/src/commonMain/kotlin/net.sergeych.parsec3/AdapterBuilder.kt +++ b/src/commonMain/kotlin/net.sergeych.parsec3/AdapterBuilder.kt @@ -3,13 +3,19 @@ package net.sergeych.parsec3 import kotlinx.coroutines.flow.Flow import net.sergeych.mp_tools.globalLaunch -class AdapterBuilder>( +open class WithAdapter { + internal var _adapter: Adapter<*>? = null + + val adapter: Adapter<*> get() = _adapter ?: throw IllegalStateException("adapter is not yet initialized") +} + +class AdapterBuilder>( val api: H, val exceptionRegistry: ExceptionsRegistry = ExceptionsRegistry(), f: AdapterBuilder.() -> Unit, ) { - internal var sessionProducer: (suspend () -> S) = { Unit as S} + internal var sessionProducer: (suspend () -> S) = { WithAdapter() as S} private set @@ -17,10 +23,6 @@ class AdapterBuilder>( sessionProducer = f } - private var _adapter: Adapter? = null - - val adapter: Adapter get() = _adapter ?: throw IllegalStateException("adapter is not yet initialized") - /** * Register command implementation */ @@ -33,15 +35,18 @@ class AdapterBuilder>( } suspend fun createWith(input: Flow, f: suspend (ByteArray)->Unit ): Adapter { - return Adapter(sessionProducer(), api, exceptionRegistry) { f(it) } + val s = sessionProducer() + return Adapter( + s, api, exceptionRegistry) { f(it) } .also { a-> + s._adapter = a globalLaunch { input.collect { a.receiveFrame(it)} } - _adapter = a } } suspend fun create(f: suspend (ByteArray) -> Unit): Adapter { - return Adapter(sessionProducer(), api, exceptionRegistry, f) + val s = sessionProducer() + return Adapter(s, api, exceptionRegistry, f).also { s._adapter = it } } init { diff --git a/src/commonMain/kotlin/net.sergeych.parsec3/CommandDescriptor.kt b/src/commonMain/kotlin/net.sergeych.parsec3/CommandDescriptor.kt index 7dc0599..d3c02fc 100644 --- a/src/commonMain/kotlin/net.sergeych.parsec3/CommandDescriptor.kt +++ b/src/commonMain/kotlin/net.sergeych.parsec3/CommandDescriptor.kt @@ -13,7 +13,7 @@ class CommandDescriptor( @Suppress("UNCHECKED_CAST") suspend operator fun invoke(adapter: Adapter<*>): R = adapter.invokeCommand(this,Unit as A) - operator fun invoke(commandHost: CommandHost, block: suspend I.(A)->R) { + operator fun invoke(commandHost: CommandHost, block: suspend I.(A)->R) { commandHost.on(this, block) } } diff --git a/src/commonMain/kotlin/net.sergeych.parsec3/CommandHost.kt b/src/commonMain/kotlin/net.sergeych.parsec3/CommandHost.kt index 5e85c1c..87c7e4e 100644 --- a/src/commonMain/kotlin/net.sergeych.parsec3/CommandHost.kt +++ b/src/commonMain/kotlin/net.sergeych.parsec3/CommandHost.kt @@ -30,7 +30,7 @@ import kotlin.reflect.typeOf * * @param T the type of the `state` instance used to hold state, use `Unit` for stateless interfaces */ -open class CommandHost { +open class CommandHost { private val handlers = mutableMapOf ByteArray>() /** diff --git a/src/commonMain/kotlin/net.sergeych.parsec3/Parsec3Transport.kt b/src/commonMain/kotlin/net.sergeych.parsec3/Parsec3Transport.kt index d17c22f..0ce97ec 100644 --- a/src/commonMain/kotlin/net.sergeych.parsec3/Parsec3Transport.kt +++ b/src/commonMain/kotlin/net.sergeych.parsec3/Parsec3Transport.kt @@ -7,7 +7,7 @@ import kotlinx.coroutines.flow.StateFlow * asynchronous (push capable) calls, but normally it should be used as a transport for parsec 3.1 secure * protocol which uses this transport interface to run. */ -interface Parsec3Transport { +interface Parsec3Transport { val connectedFlow: StateFlow fun close() diff --git a/src/commonMain/kotlin/net.sergeych.parsec3/Parsec3WSClient.kt b/src/commonMain/kotlin/net.sergeych.parsec3/Parsec3WSClient.kt index 4c4ecd0..679bbb9 100644 --- a/src/commonMain/kotlin/net.sergeych.parsec3/Parsec3WSClient.kt +++ b/src/commonMain/kotlin/net.sergeych.parsec3/Parsec3WSClient.kt @@ -11,7 +11,7 @@ import net.sergeych.mp_logger.LogTag import net.sergeych.mp_logger.info import net.sergeych.mp_tools.globalLaunch -class Parsec3WSClient>( +class Parsec3WSClient>( val url: String, val api: H, val exceptionsRegistry: ExceptionsRegistry = ExceptionsRegistry(), diff --git a/src/commonTest/kotlin/parsec3/AdapterTest.kt b/src/commonTest/kotlin/parsec3/AdapterTest.kt index 7172abf..af9e820 100644 --- a/src/commonTest/kotlin/parsec3/AdapterTest.kt +++ b/src/commonTest/kotlin/parsec3/AdapterTest.kt @@ -11,14 +11,14 @@ import kotlin.test.assertEquals internal class AdapterTest { - object Api1 : CommandHost() { + object Api1 : CommandHost() { // create command `foo` that takes a string argument and // returns a string: val foo by command() } - object Api2 : CommandHost() { + object Api2 : CommandHost() { val bar by command() } @@ -36,8 +36,8 @@ internal class AdapterTest { it + "bar" } - val a1 = Adapter(Unit, api1) { ch12.send(it) } - val a2 = Adapter(Unit, api2) { ch21.send(it) } + val a1 = Adapter(WithAdapter(), api1) { ch12.send(it) } + val a2 = Adapter(WithAdapter(), api2) { ch21.send(it) } launch { for (b in ch12) a2.receiveFrame(b) } launch { for (b in ch21) a1.receiveFrame(b) } @@ -49,7 +49,7 @@ internal class AdapterTest { } - data class TestSession(var buzz: String) + data class TestSession(var buzz: String) : WithAdapter() object ApiS1 : CommandHost() { // create command `foo` that takes a string argument and @@ -58,7 +58,7 @@ internal class AdapterTest { val ex by command() val ex2 by command() } - class ApiS2 : CommandHost() { + class ApiS2 : CommandHost() { // create command `foo` that takes a string argument and // returns a string: val bar by command() @@ -86,8 +86,7 @@ internal class AdapterTest { throw IllegalArgumentException() } } - val b2 = AdapterBuilder(ApiS2(), er) { - newSession { } + val b2 = AdapterBuilder(ApiS2(), er) { on(api.bar) { it + "bar" } @@ -111,7 +110,7 @@ internal class AdapterTest { } // assertEquals("123bar", a1.invokeCommand(ApiS2().bar, "123")) - assertEquals("%% loop-42foo %%", a1.invokeCommand(ApiS2().loopCall, "123")) + assertEquals("%% loop-42foo %%", a1.invokeCommand(ApiS2().loopCall, "123")) assertEquals("32142foo", a2.invokeCommand(ApiS1.foo, "321")) assertEquals("---42foo", ApiS1.foo.invoke(a2, "---")) diff --git a/src/jvmMain/kotlin/net/sergeych/parsec3/WsServer.kt b/src/jvmMain/kotlin/net/sergeych/parsec3/WsServer.kt index 4899972..079d20f 100644 --- a/src/jvmMain/kotlin/net/sergeych/parsec3/WsServer.kt +++ b/src/jvmMain/kotlin/net/sergeych/parsec3/WsServer.kt @@ -14,7 +14,7 @@ import java.util.concurrent.atomic.AtomicLong * Creates a ktor server initialization module capable to perform p3 transport layer (not secure). * It could be used as is or as transport for p3.1 */ -fun >Application.parsec3TransportServer( +fun >Application.parsec3TransportServer( api: H, exceptionsRegistry: ExceptionsRegistry = ExceptionsRegistry(), path: String = "/api/p3", diff --git a/src/jvmTest/kotlin/net/sergeych/parsec3/WsServerKtTest.kt b/src/jvmTest/kotlin/net/sergeych/parsec3/WsServerKtTest.kt index 1432922..928a9d9 100644 --- a/src/jvmTest/kotlin/net/sergeych/parsec3/WsServerKtTest.kt +++ b/src/jvmTest/kotlin/net/sergeych/parsec3/WsServerKtTest.kt @@ -8,12 +8,12 @@ import kotlin.test.assertEquals internal class WsServerKtTest { - data class TestSession(var buzz: String = "BuZZ") + data class TestSession(var buzz: String = "BuZZ"): WithAdapter() object TestApiServer: CommandHost() { val foo by command() } - object TestApiClient: CommandHost() { + object TestApiClient: CommandHost() { val bar by command() }