From 16b2d1780bb96503e717016e98de7c208e8d96e9 Mon Sep 17 00:00:00 2001 From: sergeych Date: Sun, 18 Dec 2022 03:30:11 +0100 Subject: [PATCH] added reconnection/connection waiting logic added way to close cannection from the adapter if the protocol implementation allows it (adapter.onCancel/adapter.cancel()) --- README.md | 2 + build.gradle.kts | 2 +- .../kotlin/net.sergeych.parsec3/Adapter.kt | 31 ++++++-- .../net.sergeych.parsec3/Parsec3WSClient.kt | 54 +++++++++----- .../kotlin/net/sergeych/parsec3/WsServer.kt | 2 + .../net/sergeych/parsec3/WsServerKtTest.kt | 70 ++++++++++++++++++- 6 files changed, 138 insertions(+), 23 deletions(-) diff --git a/README.md b/README.md index 5de8ecb..eec0054 100644 --- a/README.md +++ b/README.md @@ -10,6 +10,8 @@ Please note about versioning Current stable version is __0.3.3__. +- v.0.4.* - improved cancellation & fixed auto-reconnection logic. + This is a connection-agnostic, full-duplex RPC type binary protocol, effective to work with binary data, such as encrypted data, keys, multimedia, etc. Its key points are: - simple and practical transport RPC layer, which is a primary choice when, for exaple, `wss://` level by TSL is enough, e.g. when there is no sensitive data being transmitted (games, etc). diff --git a/build.gradle.kts b/build.gradle.kts index 1a7293a..a7fafd1 100644 --- a/build.gradle.kts +++ b/build.gradle.kts @@ -10,7 +10,7 @@ plugins { } group = "net.sergeych" -version = "0.3.3" +version = "0.4.0-SNAPSHOT" repositories { mavenCentral() diff --git a/src/commonMain/kotlin/net.sergeych.parsec3/Adapter.kt b/src/commonMain/kotlin/net.sergeych.parsec3/Adapter.kt index cf68aba..d47ed5d 100644 --- a/src/commonMain/kotlin/net.sergeych.parsec3/Adapter.kt +++ b/src/commonMain/kotlin/net.sergeych.parsec3/Adapter.kt @@ -2,6 +2,7 @@ package net.sergeych.parsec3 +import io.ktor.utils.io.errors.* import kotlinx.coroutines.* import kotlinx.coroutines.sync.Mutex import kotlinx.coroutines.sync.withLock @@ -77,6 +78,12 @@ open class Adapter( val scope = CoroutineScope(GlobalScope.coroutineContext) + /** + * If you plaan to cancel the adapter from outside its context (e.g. from API command or like) + * provide specific close code that frees resource for this adapter (like closing websocket connection). + */ + var onCancel: suspend ()->Unit = { throw NotImplementedError("this adapted has no onCancel implementation, provide it")} + private val completions = mutableMapOf>() private var lastId = 1 private val access = Mutex() @@ -92,13 +99,13 @@ open class Adapter( */ @Suppress("UNCHECKED_CAST") suspend fun invokeCommand(ca: CommandDescriptor, args: A = Unit as A): R { - var myId = -1 +// var myId = -1 return CompletableDeferred().also { dr -> sendPackage( access.withLock { // debug { "calling $lastId:${ca.name}($args)" } completions[lastId] = dr - myId = lastId +// myId = lastId Package.Command(lastId++, ca.name, BossEncoder.encode(ca.ass, args)) } ) @@ -114,8 +121,9 @@ open class Adapter( * * Not calling it might cause unknown number of pending command processing coroutines to remain active. */ - fun cancel() { + suspend fun cancel() { scope.cancel() + onCancel.invoke() } /** @@ -136,6 +144,8 @@ open class Adapter( sendPackage( Package.Response(pe.id, result) ) + } catch(_: CancellationException) { + // just ignore it } catch (ae: ParsecException) { sendPackage(Package.Response(pe.id, null, ae.code, ae.text)) } catch (ex: Throwable) { @@ -167,7 +177,7 @@ open class Adapter( } private suspend fun sendPackage(pe: Package) { - sendEncoded(BossEncoder.encode(pe)) + sendEncoded(BossEncoder.encode(pe)) } /** @@ -182,6 +192,19 @@ open class Adapter( } } + class CloseError : IOException("adapter is closed") + + /** + * Frees any allocater resources, for example, pending commands. + * Any protocol implementatino MUST call it when connection is closed. + */ + fun close() { + val error = CloseError() + completions.forEach { + it.value.completeExceptionally(error) + } + } + companion object { val format = Json { prettyPrint = true } } diff --git a/src/commonMain/kotlin/net.sergeych.parsec3/Parsec3WSClient.kt b/src/commonMain/kotlin/net.sergeych.parsec3/Parsec3WSClient.kt index 0d612d4..a0a756a 100644 --- a/src/commonMain/kotlin/net.sergeych.parsec3/Parsec3WSClient.kt +++ b/src/commonMain/kotlin/net.sergeych.parsec3/Parsec3WSClient.kt @@ -3,11 +3,12 @@ package net.sergeych.parsec3 import io.ktor.client.* import io.ktor.client.plugins.websocket.* import io.ktor.websocket.* -import kotlinx.coroutines.CompletableDeferred +import kotlinx.coroutines.* import kotlinx.coroutines.flow.MutableStateFlow import kotlinx.coroutines.flow.StateFlow -import kotlinx.coroutines.launch import net.sergeych.mp_logger.LogTag +import net.sergeych.mp_logger.debug +import net.sergeych.mp_logger.exception import net.sergeych.mp_logger.info import net.sergeych.mp_tools.globalLaunch @@ -29,10 +30,13 @@ class Parsec3WSClient>( builder: AdapterBuilder.() -> Unit, ) : LogTag("P3WSC"), Parsec3Transport { + private lateinit var connectionJob: Job val builder = AdapterBuilder(api, exceptionsRegistry, builder) private val _connectionFlow = MutableStateFlow(false) private val closeFlow = MutableStateFlow(false) + + // This one is used to send reconnection signal private val reconnectFlow = MutableStateFlow(false) override val connectedFlow: StateFlow = _connectionFlow @@ -55,24 +59,42 @@ class Parsec3WSClient>( override suspend fun adapter(): Adapter = deferredAdapter.await() fun start() { - globalLaunch { + connectionJob = globalLaunch { while (closeFlow.value != true) { - reconnectFlow.value = false - client.webSocket(url) { - info { "Connected to $url" } - val a = builder.create { send(Frame.Binary(true, it)) } - _connectionFlow.value = true - launch { closeFlow.collect { if (it == true) close() } } - launch { reconnectFlow.collect { if (it == true) close() } } - deferredAdapter.complete(a) - for (f in incoming) - if (f is Frame.Binary) a.receiveFrame(f.data) - // when we leave connection will be closed. So far we do not close the adapter, - // should we? + try { + reconnectFlow.value = false + debug { "trying to connect to $url" } + client.webSocket(url) { + info { "Connected to $url" } + val a = builder.create { send(Frame.Binary(true, it)) } + a.onCancel = { close() } + _connectionFlow.value = true + launch { closeFlow.collect { if (it == true) close() } } + launch { reconnectFlow.collect { if (it == true) close() } } + deferredAdapter.complete(a) + for (f in incoming) + if (f is Frame.Binary) a.receiveFrame(f.data) + debug { "disconnecting $url" } + _connectionFlow.value = false + deferredAdapter = CompletableDeferred() + a.close() + cancel() + } + info { "connection to $url is closed normally" } + } + catch(x: CancellationException) { + info { "parsec3 connector job cancelled" } + return@globalLaunch + } + catch(t: Throwable) { + exception { "connection process failed, will try to reconnect" to t } _connectionFlow.value = false - deferredAdapter = CompletableDeferred() + if( !deferredAdapter.isActive) + deferredAdapter = CompletableDeferred() + delay(200) } } + debug { "exiting ws connection loop" } } } diff --git a/src/jvmMain/kotlin/net/sergeych/parsec3/WsServer.kt b/src/jvmMain/kotlin/net/sergeych/parsec3/WsServer.kt index 14db8b1..a605b39 100644 --- a/src/jvmMain/kotlin/net/sergeych/parsec3/WsServer.kt +++ b/src/jvmMain/kotlin/net/sergeych/parsec3/WsServer.kt @@ -39,6 +39,7 @@ fun >Application.parsec3TransportServer( routing { webSocket(path) { val adapter = builder.create { outgoing.send(Frame.Binary(true, it)) } + adapter.onCancel = {close()} for (frame in (this@webSocket).incoming) { when (frame) { is Frame.Binary -> { @@ -49,6 +50,7 @@ fun >Application.parsec3TransportServer( } } } + adapter.close() } var totalConnections = AtomicLong(0) var activeConnections = AtomicInteger(0) diff --git a/src/jvmTest/kotlin/net/sergeych/parsec3/WsServerKtTest.kt b/src/jvmTest/kotlin/net/sergeych/parsec3/WsServerKtTest.kt index 928a9d9..0b993cf 100644 --- a/src/jvmTest/kotlin/net/sergeych/parsec3/WsServerKtTest.kt +++ b/src/jvmTest/kotlin/net/sergeych/parsec3/WsServerKtTest.kt @@ -1,8 +1,10 @@ package net.sergeych.parsec3 +import assertThrows import io.ktor.server.engine.* import io.ktor.server.netty.* import kotlinx.coroutines.runBlocking +import net.sergeych.mp_logger.Log import kotlin.test.Test import kotlin.test.assertEquals @@ -12,6 +14,7 @@ internal class WsServerKtTest { object TestApiServer: CommandHost() { val foo by command() + val ping by command() } object TestApiClient: CommandHost() { val bar by command() @@ -20,7 +23,7 @@ internal class WsServerKtTest { @Test fun testWsServer() { - embeddedServer(Netty, port = 8080) { + embeddedServer(Netty, port = 8081) { parsec3TransportServer( TestApiServer, ) { @@ -31,7 +34,7 @@ internal class WsServerKtTest { } }.start(wait = false) - val client = Parsec3WSClient("ws://localhost:8080/api/p3", TestApiClient) { + val client = Parsec3WSClient("ws://localhost:8081/api/p3", TestApiClient) { on(api.bar) { "bar:$it" } @@ -43,4 +46,67 @@ internal class WsServerKtTest { } } + @Test + fun testWsServerReconnect() { + + embeddedServer(Netty, port = 8080) { + parsec3TransportServer( + TestApiServer, + ) { + newSession { TestSession() } + var count = 0 + on(api.foo) { + adapter.cancel() + "cancelled:${count++}" + } + on(api.ping) { "pong!"} + } + }.start(wait = false) + + Log.connectConsole(Log.Level.DEBUG) + val client = Parsec3WSClient("ws://localhost:8080/api/p3", TestApiClient) { + on(api.bar) { + "bar:$it" + } + } + runBlocking { + assertEquals("pong!", TestApiServer.ping.invoke(client.adapter())) + assertThrows { TestApiServer.foo.invoke(client.adapter(), "*great*") } + assertEquals("pong!", TestApiServer.ping.invoke(client.adapter())) + assertThrows { TestApiServer.foo.invoke(client.adapter(), "*great*") } + } + + } + + @Test + fun testWsServerWaitForConnect() { + + val client = Parsec3WSClient("ws://localhost:8084/api/p3", TestApiClient) { + on(api.bar) { + "bar:$it" + } + } + println("---1") + embeddedServer(Netty, port = 8084) { + parsec3TransportServer( + TestApiServer, + ) { + newSession { TestSession() } + on(api.foo) { + it + buzz + "-foo" + } + } + }.start(wait = false) + + + runBlocking { + println("----2") + val x = TestApiServer.foo.invoke(client.adapter(), "*great*") + println(">> $x") + assertEquals("*great*BuZZ-foo", x) + client.close() + } + + } + } \ No newline at end of file