From b35ca92e48279670c0df6fe42e2f47d39857adf4 Mon Sep 17 00:00:00 2001 From: sergeych Date: Wed, 7 Sep 2022 22:38:21 +0300 Subject: [PATCH] added simple realtime server and client code (over ws0 --- build.gradle.kts | 12 +++- gradle.properties | 3 +- .../kotlin/net.sergeych.parsec3/Adapter.kt | 8 ++- .../net.sergeych.parsec3/AdapterBuilder.kt | 12 ++-- .../ExceptionsRegistry.kt | 9 ++- .../net.sergeych.parsec3/Parsec3WSClient.kt | 66 +++++++++++++++++++ src/commonTest/kotlin/parsec3/AdapterTest.kt | 12 +++- .../kotlin/net/sergeych/parsec3/WsServer.kt | 47 +++++++++++++ .../net/sergeych/parsec3/WsServerKtTest.kt | 46 +++++++++++++ 9 files changed, 202 insertions(+), 13 deletions(-) create mode 100644 src/commonMain/kotlin/net.sergeych.parsec3/Parsec3WSClient.kt create mode 100644 src/jvmMain/kotlin/net/sergeych/parsec3/WsServer.kt create mode 100644 src/jvmTest/kotlin/net/sergeych/parsec3/WsServerKtTest.kt diff --git a/build.gradle.kts b/build.gradle.kts index 8de9756..aa75348 100644 --- a/build.gradle.kts +++ b/build.gradle.kts @@ -1,5 +1,6 @@ val ktor_version: String by project val kotlin_version: String by project +val logback_version: String by project plugins { kotlin("multiplatform") version "1.7.10" @@ -58,9 +59,18 @@ kotlin { val jvmMain by getting { dependencies { implementation("org.mapdb:mapdb:3.0.8") + implementation("io.ktor:ktor-client-cio-jvm:$ktor_version") + implementation("io.ktor:ktor-server-websockets:$ktor_version") + } + } + val jvmTest by getting { + dependencies { + implementation("io.ktor:ktor-server-core:$ktor_version") + implementation("io.ktor:ktor-server-netty:$ktor_version") + implementation("ch.qos.logback:logback-classic:$logback_version") + } } - val jvmTest by getting val jsMain by getting val jsTest by getting } diff --git a/gradle.properties b/gradle.properties index 56a75d8..743d342 100644 --- a/gradle.properties +++ b/gradle.properties @@ -2,4 +2,5 @@ kotlin.code.style=official kotlin.mpp.enableGranularSourceSetsMetadata=true kotlin.native.enableDependencyPropagation=false kotlin.js.generate.executable.default=false -ktor_version=2.1.0 +ktor_version=2.1.1 +logback_version=1.2.10 diff --git a/src/commonMain/kotlin/net.sergeych.parsec3/Adapter.kt b/src/commonMain/kotlin/net.sergeych.parsec3/Adapter.kt index e923415..5ffcb82 100644 --- a/src/commonMain/kotlin/net.sergeych.parsec3/Adapter.kt +++ b/src/commonMain/kotlin/net.sergeych.parsec3/Adapter.kt @@ -117,8 +117,12 @@ open class Adapter( } catch (ae: ParsecException) { sendPackage(Package.Response(pe.id, null, ae.code, ae.text)) } catch (ex: Throwable) { - ex.printStackTrace() - sendPackage(Package.Response(pe.id, null, "UNKNOWN_ERROR", ex.toString())) + exceptionRegistry.classCodes[ex::class]?.let { code -> + sendPackage(Package.Response(pe.id, null, code, ex.toString())) + } ?: run { + ex.printStackTrace() + sendPackage(Package.Response(pe.id, null, "UNKNOWN_ERROR", ex.toString())) + } } } diff --git a/src/commonMain/kotlin/net.sergeych.parsec3/AdapterBuilder.kt b/src/commonMain/kotlin/net.sergeych.parsec3/AdapterBuilder.kt index 8f13869..c36b547 100644 --- a/src/commonMain/kotlin/net.sergeych.parsec3/AdapterBuilder.kt +++ b/src/commonMain/kotlin/net.sergeych.parsec3/AdapterBuilder.kt @@ -5,11 +5,11 @@ import net.sergeych.mp_tools.globalLaunch class AdapterBuilder>( val api: H, - private val exceptionRegistry: ExceptionsRegistry = ExceptionsRegistry(), + val exceptionRegistry: ExceptionsRegistry = ExceptionsRegistry(), f: AdapterBuilder.() -> Unit, ) { - internal var sessionProducer: (suspend () -> S)? = null + internal var sessionProducer: (suspend () -> S) = { Unit as S} private set @@ -24,15 +24,19 @@ class AdapterBuilder>( api.on(ca, block) } - fun addError(code: String, handler: (String?) -> T) { + inline fun addError(code: String, noinline handler: (String?) -> T) { exceptionRegistry.register(code, handler) } suspend fun createWith(input: Flow, f: suspend (ByteArray)->Unit ): Adapter { - return Adapter(sessionProducer!!(), api, exceptionRegistry) { f(it) } + return Adapter(sessionProducer(), api, exceptionRegistry) { f(it) } .also { a-> globalLaunch { input.collect { a.receiveFrame(it)} } } } + suspend fun create(f: suspend (ByteArray) -> Unit): Adapter { + return Adapter(sessionProducer(), api, exceptionRegistry, f) + } + init { f(this) } diff --git a/src/commonMain/kotlin/net.sergeych.parsec3/ExceptionsRegistry.kt b/src/commonMain/kotlin/net.sergeych.parsec3/ExceptionsRegistry.kt index 018b518..fd86715 100644 --- a/src/commonMain/kotlin/net.sergeych.parsec3/ExceptionsRegistry.kt +++ b/src/commonMain/kotlin/net.sergeych.parsec3/ExceptionsRegistry.kt @@ -1,5 +1,7 @@ package net.sergeych.parsec3 +import kotlin.reflect.KClass + /** * Registry to restore exceptions from parsec block data. Serializing exceptions is dangerous: being a OS-bound * objects, exceptions can carry too much sensitive or useless information (e.g. call stack), and serializng @@ -15,18 +17,21 @@ package net.sergeych.parsec3 */ open class ExceptionsRegistry { - private val handlers = mutableMapOfThrowable>().also { + val handlers = mutableMapOfThrowable>().also { // predefined exceptions: it[commandNotFoundCode] = { CommandNotFoundException(it ?: "???") } it[unknownErrorCode] = { UnknownException(it ?: "???") } } + val classCodes = mutableMapOf,String>() + /** * Register an exception with a code with a handler that creates its instance. Note that the * handler _should not throw anything_ but rather create an instance of the exception. */ - fun register(code: String, block: (message: String?) -> T) { + inline fun register(code: String, noinline block: (message: String?) -> T) { handlers[code] = block + classCodes[T::class] = code } /** diff --git a/src/commonMain/kotlin/net.sergeych.parsec3/Parsec3WSClient.kt b/src/commonMain/kotlin/net.sergeych.parsec3/Parsec3WSClient.kt new file mode 100644 index 0000000..6f72e47 --- /dev/null +++ b/src/commonMain/kotlin/net.sergeych.parsec3/Parsec3WSClient.kt @@ -0,0 +1,66 @@ +package net.sergeych.parsec3 + +import io.ktor.client.* +import io.ktor.client.plugins.websocket.* +import io.ktor.websocket.* +import kotlinx.coroutines.CompletableDeferred +import kotlinx.coroutines.flow.MutableStateFlow +import kotlinx.coroutines.flow.StateFlow +import kotlinx.coroutines.launch +import net.sergeych.mp_logger.LogTag +import net.sergeych.mp_logger.info +import net.sergeych.mp_tools.globalLaunch + +class Parsec3WSClient>( + val url: String, + val api: H, + val exceptionsRegistry: ExceptionsRegistry = ExceptionsRegistry(), + f: AdapterBuilder.() -> Unit, +) : LogTag("P3WSC") { + + val builder = AdapterBuilder(api, exceptionsRegistry, f) + + private val _connectionFlow = MutableStateFlow(false) + private val closeFlow = MutableStateFlow(false) + val connectedFlow: StateFlow = _connectionFlow + + + init { + start() + } + + fun close() { + if( closeFlow.value == false ) closeFlow.value = true + } + + var deferredAdapter = CompletableDeferred>() + private set + + suspend fun adapter(): Adapter = deferredAdapter.await() + + fun start() { + globalLaunch { + client.webSocket(url) { + info { "Connected to $url" } + val a = builder.create { send(Frame.Binary(true, it)) } + _connectionFlow.value = true + launch { closeFlow.collect { if( it == true ) close() } } + deferredAdapter.complete(a) + for (f in incoming) + if (f is Frame.Binary) a.receiveFrame(f.data) + // when we leave connection will be closed. So far we do not close the adapter, + // should we? + _connectionFlow.value = false + deferredAdapter = CompletableDeferred() + } + } + } + + companion object { + private val client = HttpClient { + install(WebSockets) { + // Configure WebSockets + } + } + } +} diff --git a/src/commonTest/kotlin/parsec3/AdapterTest.kt b/src/commonTest/kotlin/parsec3/AdapterTest.kt index c11efa9..485ad1a 100644 --- a/src/commonTest/kotlin/parsec3/AdapterTest.kt +++ b/src/commonTest/kotlin/parsec3/AdapterTest.kt @@ -56,6 +56,7 @@ internal class AdapterTest { // returns a string: val foo by command() val ex by command() + val ex2 by command() } class ApiS2 : CommandHost() { // create command `foo` that takes a string argument and @@ -69,7 +70,7 @@ internal class AdapterTest { val ch21 = Channel() val er = ExceptionsRegistry().also { - it.register("foo_x") { IllegalArgumentException("foo_x") } + it.register("foo_x") { IllegalArgumentException(it) } } val b1 = AdapterBuilder(ApiS1, er) { @@ -78,7 +79,10 @@ internal class AdapterTest { it + buzz + "foo" } on(ApiS1.ex) { - throw ParsecException("foo_x") + throw ParsecException("foo_x", it) + } + on(ApiS1.ex2) { + throw IllegalArgumentException() } } val b2 = AdapterBuilder(ApiS2(), er) { @@ -95,7 +99,9 @@ internal class AdapterTest { assertEquals("32142foo", a2.invokeCommand(ApiS1.foo, "321")) assertEquals("---42foo", ApiS1.foo.invoke(a2, "---")) - assertThrows { ApiS1.ex.invoke(a2, "foobar") } + val x = assertThrows { ApiS1.ex.invoke(a2, "foobar") } + assertEquals("foobar", x.message) + assertThrows { ApiS1.ex2.invoke(a2, "foobar") } ch12.cancel() ch21.cancel() diff --git a/src/jvmMain/kotlin/net/sergeych/parsec3/WsServer.kt b/src/jvmMain/kotlin/net/sergeych/parsec3/WsServer.kt new file mode 100644 index 0000000..800b789 --- /dev/null +++ b/src/jvmMain/kotlin/net/sergeych/parsec3/WsServer.kt @@ -0,0 +1,47 @@ +package net.sergeych.parsec3 + +import io.ktor.server.application.* +import io.ktor.server.routing.* +import io.ktor.server.websocket.* +import io.ktor.websocket.* +import net.sergeych.mp_logger.LogTag +import net.sergeych.mp_logger.warning +import java.time.Duration +import java.util.concurrent.atomic.AtomicInteger +import java.util.concurrent.atomic.AtomicLong + +fun >Application.parsec3Server( + api: H, + exceptionsRegistry: ExceptionsRegistry = ExceptionsRegistry(), + path: String = "/api/p3", + f: AdapterBuilder.() -> Unit, +) { + val log = LogTag("P3WSS") + install(WebSockets) { + pingPeriod = Duration.ofSeconds(45) + timeout = Duration.ofSeconds(15) + maxFrameSize = Long.MAX_VALUE + masking = false + } + + val builder = AdapterBuilder(api, exceptionsRegistry, f) + + routing { + webSocket(path) { // websocketSession + val adapter = builder.create { outgoing.send(Frame.Binary(true, it)) } + for (frame in incoming) { + when (frame) { + is Frame.Binary -> { + adapter.receiveFrame(frame.readBytes()) + } + else -> { + log.warning { "unsupported frame type: $frame" } + } + } + } + } + var totalConnections = AtomicLong(0) + var activeConnections = AtomicInteger(0) + + } +} \ No newline at end of file diff --git a/src/jvmTest/kotlin/net/sergeych/parsec3/WsServerKtTest.kt b/src/jvmTest/kotlin/net/sergeych/parsec3/WsServerKtTest.kt new file mode 100644 index 0000000..a04e43c --- /dev/null +++ b/src/jvmTest/kotlin/net/sergeych/parsec3/WsServerKtTest.kt @@ -0,0 +1,46 @@ +package net.sergeych.parsec3 + +import io.ktor.server.engine.* +import io.ktor.server.netty.* +import kotlinx.coroutines.runBlocking +import kotlin.test.Test +import kotlin.test.assertEquals + +internal class WsServerKtTest { + + data class TestSession(var buzz: String = "BuZZ") + + object TestApiServer: CommandHost() { + val foo by command() + } + object TestApiClient: CommandHost() { + val bar by command() + } + + @Test + fun testWsServer() { + + embeddedServer(Netty, port = 8080) { + parsec3Server( + TestApiServer, + ) { + newSession { TestSession() } + on(api.foo) { + it + buzz + "-foo" + } + } + }.start(wait = false) + + val client = Parsec3WSClient("ws://localhost:8080/api/p3", TestApiClient) { + on(api.bar) { + "bar:$it" + } + } + runBlocking { + val x = TestApiServer.foo.invoke(client.adapter(), "*great*") + assertEquals("*great*BuZZ-foo", x) + client.close() + } + + } +} \ No newline at end of file