From 38fbca955c84ba612d24a184235e907be2829a01 Mon Sep 17 00:00:00 2001 From: sergeych Date: Tue, 18 Jun 2024 16:11:13 +0700 Subject: [PATCH] fixed automatic connection reestablishing for web and tcp --- .idea/misc.xml | 1 - .../net/sergeych/kiloparsec/KiloClient.kt | 2 +- .../kiloparsec/adapter/ProxyDevice.kt | 3 +- .../kiloparsec/adapter/websocketClient.kt | 18 ++++---- .../kiloparsec/adapter/WebsocketServer.kt | 21 ++++++---- .../net/sergeych/kiloparsec/ClientTest.kt | 41 +++++++++---------- 6 files changed, 45 insertions(+), 41 deletions(-) diff --git a/.idea/misc.xml b/.idea/misc.xml index c049206..eec43a0 100644 --- a/.idea/misc.xml +++ b/.idea/misc.xml @@ -1,4 +1,3 @@ - diff --git a/src/commonMain/kotlin/net/sergeych/kiloparsec/KiloClient.kt b/src/commonMain/kotlin/net/sergeych/kiloparsec/KiloClient.kt index 3c4f66c..5ce02c9 100644 --- a/src/commonMain/kotlin/net/sergeych/kiloparsec/KiloClient.kt +++ b/src/commonMain/kotlin/net/sergeych/kiloparsec/KiloClient.kt @@ -35,7 +35,7 @@ class KiloClient( * to authenticate a client on connection restore, for example. */ @Suppress("unused") - val state = _state.asStateFlow() + val connectedStateFlow = _state.asStateFlow() private var deferredClient = CompletableDeferred>() diff --git a/src/commonMain/kotlin/net/sergeych/kiloparsec/adapter/ProxyDevice.kt b/src/commonMain/kotlin/net/sergeych/kiloparsec/adapter/ProxyDevice.kt index 2d4f7b9..a11d2d3 100644 --- a/src/commonMain/kotlin/net/sergeych/kiloparsec/adapter/ProxyDevice.kt +++ b/src/commonMain/kotlin/net/sergeych/kiloparsec/adapter/ProxyDevice.kt @@ -17,8 +17,9 @@ open class ProxyDevice( override val input: ReceiveChannel = inputChannel override val output: SendChannel = outputChannel + override suspend fun close() { - doClose?.invoke() + kotlin.runCatching { doClose?.invoke() } runCatching { inputChannel.close() } runCatching { outputChannel.close() } } diff --git a/src/commonMain/kotlin/net/sergeych/kiloparsec/adapter/websocketClient.kt b/src/commonMain/kotlin/net/sergeych/kiloparsec/adapter/websocketClient.kt index f2d2250..145e0c5 100644 --- a/src/commonMain/kotlin/net/sergeych/kiloparsec/adapter/websocketClient.kt +++ b/src/commonMain/kotlin/net/sergeych/kiloparsec/adapter/websocketClient.kt @@ -6,14 +6,11 @@ import io.ktor.http.* import io.ktor.websocket.* import kotlinx.coroutines.CancellationException import kotlinx.coroutines.CompletableDeferred -import kotlinx.coroutines.cancel import kotlinx.coroutines.channels.Channel import kotlinx.coroutines.channels.ClosedReceiveChannelException import kotlinx.coroutines.channels.ClosedSendChannelException -import kotlinx.coroutines.channels.ReceiveChannel import kotlinx.coroutines.launch import net.sergeych.crypto2.SigningKey -import net.sergeych.crypto2.toDump import net.sergeych.kiloparsec.KiloClient import net.sergeych.kiloparsec.KiloConnectionData import net.sergeych.kiloparsec.KiloInterface @@ -46,7 +43,8 @@ fun websocketClient( return KiloClient(clientInterface, secretKey) { val input = Channel() val output = Channel() - val job = globalLaunch { + val closeHandle = CompletableDeferred() + globalLaunch { val log = LogTag("KC:${counter.incrementAndGet()}") client.webSocket({ url.protocol = u.protocol @@ -56,7 +54,6 @@ fun websocketClient( url.parameters.appendAll(u.parameters) log.info { "kiloparsec server URL: $url" } }) { - val closeHandle = CompletableDeferred() log.info { "connected to the server" } // println("SENDING!!!") // send("Helluva") @@ -70,6 +67,7 @@ fun websocketClient( } catch (_: ClosedSendChannelException) { log.info { "send channel closed" } } + catch(_: CancellationException) {} catch(t: Throwable) { log.info { "unexpected exception in websock sender: ${t.stackTraceToString()}" } closeHandle.completeExceptionally(t) @@ -80,9 +78,7 @@ fun websocketClient( try { for (f in incoming) { if (f is Frame.Binary) { - input.send(f.readBytes().toUByteArray().also { - println("incoming\n${it.toDump()}") - }) + input.send(f.readBytes().toUByteArray()) } else { log.warning { "ignoring unexpected frame of type ${f.frameType}" } } @@ -102,14 +98,16 @@ fun websocketClient( log.warning { "Client is closing with error" } throw RemoteInterface.ClosedException() } + output.close() + input.close() } log.info { "closing connection" } } val device = ProxyDevice(input, output) { - input.close() // we need to explicitly close the coroutine job, or it can hang for a long time // leaking resources. - job.cancel() + closeHandle.complete(true) +// job.cancel() } KiloConnectionData(device, sessionMaker()) } diff --git a/src/jvmMain/kotlin/net/sergeych/kiloparsec/adapter/WebsocketServer.kt b/src/jvmMain/kotlin/net/sergeych/kiloparsec/adapter/WebsocketServer.kt index fb21c66..8a8ed5f 100644 --- a/src/jvmMain/kotlin/net/sergeych/kiloparsec/adapter/WebsocketServer.kt +++ b/src/jvmMain/kotlin/net/sergeych/kiloparsec/adapter/WebsocketServer.kt @@ -7,7 +7,6 @@ import io.ktor.websocket.* import kotlinx.coroutines.cancel import kotlinx.coroutines.channels.Channel import kotlinx.coroutines.channels.ClosedReceiveChannelException -import kotlinx.coroutines.channels.ClosedSendChannelException import kotlinx.coroutines.isActive import kotlinx.coroutines.launch import net.sergeych.crypto2.SigningKey @@ -36,7 +35,6 @@ fun Application.setupWebsocketServer( val counter = AtomicCounter() routing { webSocket(path) { - println("--------------------------------------------") val log = LogTag("KWS:${counter.incrementAndGet()}") log.debug { "opening the connection" } val input = Channel(256) @@ -44,7 +42,13 @@ fun Application.setupWebsocketServer( launch { log.debug { "starting output pump" } while (isActive) { - send(output.receive().toByteArray()) + try { + send(output.receive().toByteArray()) + } + catch(_: ClosedReceiveChannelException) { + log.debug { "closing output pump as output channel is closed" } + break + } } log.debug { "closing output pump" } } @@ -54,7 +58,10 @@ fun Application.setupWebsocketServer( createSession(), serverKey ) - launch { server.run() } + launch { + server.run() + close() + } log.debug { "KSC started, looking for incoming frames" } for (f in incoming) { log.debug { "incoming frame: ${f.frameType}" } @@ -64,7 +71,7 @@ fun Application.setupWebsocketServer( log.debug { "in frame\n${it.toDump()}" } }) } catch (_: RemoteInterface.ClosedException) { - log.info { "caught local closed exception, closing" } + log.warning { "caught local closed exception (strange!), closing" } break } catch (_: ClosedReceiveChannelException) { log.info { "receive channel is closed, closing connection" } @@ -77,9 +84,9 @@ fun Application.setupWebsocketServer( log.warning { "unknown frame type ${f.frameType}, ignoring" } } log.debug { "closing the server" } - println("****************prec") + close() cancel() - println("****************postc") + log.debug { "server wbesock processing done" } } } } diff --git a/src/jvmTest/kotlin/net/sergeych/kiloparsec/ClientTest.kt b/src/jvmTest/kotlin/net/sergeych/kiloparsec/ClientTest.kt index f56ab43..1343bba 100644 --- a/src/jvmTest/kotlin/net/sergeych/kiloparsec/ClientTest.kt +++ b/src/jvmTest/kotlin/net/sergeych/kiloparsec/ClientTest.kt @@ -12,10 +12,7 @@ import net.sergeych.kiloparsec.adapter.setupWebsocketServer import net.sergeych.kiloparsec.adapter.websocketClient import net.sergeych.mp_logger.Log import java.net.InetAddress -import kotlin.test.Test -import kotlin.test.assertEquals -import kotlin.test.assertIs -import kotlin.test.assertTrue +import kotlin.test.* class ClientTest { @@ -102,7 +99,7 @@ class ClientTest { on(cmdSetFoo) { session.foo = it } on(cmdCheckConnected) { connectedCalled } on(cmdClose) { - throw RemoteInterface.ClosedException() + throw LocalInterface.BreakConnectionException() // if( closeCounter < 2 ) { // println("-------------------------- call close!") // throw RemoteInterface.ClosedException() @@ -120,33 +117,35 @@ class ClientTest { val client = websocketClient("ws://localhost:8080/kp") val states = mutableListOf() val collector = launch { - client.state.collect { + client.connectedStateFlow.collect { println("got: $closeCounter/$it") states += it if( !it) { closeCounter++ } } } - println(1) assertEquals(true, client.call(cmdCheckConnected)) - assertTrue { client.state.value } - println(2) + assertTrue { client.connectedStateFlow.value } assertEquals("not set", client.call(cmdGetFoo)) - println(3) client.call(cmdSetFoo, "foo") - println(4) assertEquals("foo", client.call(cmdGetFoo)) - println(5) -// assertThrows { -// client.call(cmdClose) -// } - println("0------------------------------------------------------------------------------connection should be closed") -// assertFalse { client.state.value } -// assertEquals("foo", client.call(cmdGetFoo)) + + assertTrue { client.connectedStateFlow.value } + assertThrows { + client.call(cmdClose) + } + + // connection should now be closed + assertFalse { client.connectedStateFlow.value } + + // this should be run on automatically reopen connection + client.call(cmdSetFoo,"superbar") + assertTrue { client.connectedStateFlow.value } + assertEquals("superbar", client.call(cmdGetFoo)) client.close() ns.stop() collector.cancel() - println("----= states: $states") - println("stopped server") - println("closed client") +// println("----= states: $states") +// println("stopped server") +// println("closed client") } } \ No newline at end of file