From 51873aa9b114c34ce6b57e6f78b8acdd19b68c8f Mon Sep 17 00:00:00 2001 From: sergeych Date: Tue, 23 Apr 2024 21:49:15 +0300 Subject: [PATCH] work in progress: restore connection --- .idea/misc.xml | 2 +- build.gradle.kts | 10 +-- .../net/sergeych/kiloparsec/Transport.kt | 19 ++--- .../kiloparsec/adapter/websocketClient.kt | 72 ++++++++++++------- src/commonTest/kotlin/TransportTest.kt | 3 +- 5 files changed, 59 insertions(+), 47 deletions(-) diff --git a/.idea/misc.xml b/.idea/misc.xml index 45281f7..94a01e0 100644 --- a/.idea/misc.xml +++ b/.idea/misc.xml @@ -3,7 +3,7 @@ - + \ No newline at end of file diff --git a/build.gradle.kts b/build.gradle.kts index 35bba14..725d247 100644 --- a/build.gradle.kts +++ b/build.gradle.kts @@ -47,12 +47,12 @@ kotlin { // else -> throw GradleException("Host OS is not supported in Kotlin/Native.") // } - macosArm64() - iosX64() - iosArm64() - iosSimulatorArm64() +// macosArm64() +// iosX64() +// iosArm64() +// iosSimulatorArm64() linuxX64() - macosX64() +// macosX64() val ktor_version = "2.3.6" diff --git a/src/commonMain/kotlin/net/sergeych/kiloparsec/Transport.kt b/src/commonMain/kotlin/net/sergeych/kiloparsec/Transport.kt index ffbe974..e83fafd 100644 --- a/src/commonMain/kotlin/net/sergeych/kiloparsec/Transport.kt +++ b/src/commonMain/kotlin/net/sergeych/kiloparsec/Transport.kt @@ -147,14 +147,9 @@ class Transport( suspend fun run() { coroutineScope { debug { "awaiting incoming blocks" } + // todo: rewrite it to close the job with no exceptions at all, always while (isActive && !isClosed) { try { - debug { "input step starting closed=$isClosed active=$isActive"} - if( isClosed ) { - info { "breaking transort loop on closed"} - break - } - device.input.receive().let { packed -> debug { "<<<\n${packed.toDump()}" } val b = unpack(packed) @@ -189,7 +184,6 @@ class Transport( // handler forced close warning { "handler requested closing of the connection"} isClosed = true - runCatching { device.close() } throw x } catch (x: RemoteInterface.RemoteException) { send(Block.Error(b.id, x.code, x.text, x.extra)) @@ -210,10 +204,7 @@ class Transport( isClosed = true } catch (_: CancellationException) { - info { "loop is cancelled" } - isClosed = true - } catch( _: RemoteInterface.ClosedException) { - debug { "git closed exception here, ignoring" } + info { "loop is cancelled with CancellationException" } isClosed = true } catch (t: Throwable) { exception { "channel closed on error" to t } @@ -225,10 +216,12 @@ class Transport( access.withLock { debug { "access lock obtained"} isClosed = true - debug { "closgin device $device" } + debug { "closing device $device, calls in queue ${calls.size}" } runCatching { device.close() } - for (c in calls.values) c.completeExceptionally(RemoteInterface.ClosedException()) + for (c in calls.values) + c.completeExceptionally(RemoteInterface.ClosedException()) calls.clear() + debug { "calls clear has been called" } } debug { "no more active: $isActive / ${calls.size}" } } diff --git a/src/commonMain/kotlin/net/sergeych/kiloparsec/adapter/websocketClient.kt b/src/commonMain/kotlin/net/sergeych/kiloparsec/adapter/websocketClient.kt index 898b2ed..f2d2250 100644 --- a/src/commonMain/kotlin/net/sergeych/kiloparsec/adapter/websocketClient.kt +++ b/src/commonMain/kotlin/net/sergeych/kiloparsec/adapter/websocketClient.kt @@ -5,6 +5,7 @@ import io.ktor.client.plugins.websocket.* import io.ktor.http.* import io.ktor.websocket.* import kotlinx.coroutines.CancellationException +import kotlinx.coroutines.CompletableDeferred import kotlinx.coroutines.cancel import kotlinx.coroutines.channels.Channel import kotlinx.coroutines.channels.ClosedReceiveChannelException @@ -16,6 +17,7 @@ import net.sergeych.crypto2.toDump import net.sergeych.kiloparsec.KiloClient import net.sergeych.kiloparsec.KiloConnectionData import net.sergeych.kiloparsec.KiloInterface +import net.sergeych.kiloparsec.RemoteInterface import net.sergeych.mp_logger.LogTag import net.sergeych.mp_logger.exception import net.sergeych.mp_logger.info @@ -54,38 +56,54 @@ fun websocketClient( url.parameters.appendAll(u.parameters) log.info { "kiloparsec server URL: $url" } }) { - try { - log.info { "connected to the server" } - println("SENDING!!!") - send("Helluva") - launch { - try { - for (block in output) { - send(block.toByteArray()) - } - log.info { "input is closed, closing the websocket" } - } catch (_: ClosedSendChannelException) { - log.info { "send channel closed" } + val closeHandle = CompletableDeferred() + log.info { "connected to the server" } +// println("SENDING!!!") +// send("Helluva") + launch { + try { + for (block in output) { + send(block.toByteArray()) } - cancel() + log.info { "input is closed, closing the websocket" } + if (closeHandle.isActive) closeHandle.complete(true) + } catch (_: ClosedSendChannelException) { + log.info { "send channel closed" } } - for (f in incoming) { - if (f is Frame.Binary) { - input.send(f.readBytes().toUByteArray().also { - println("incoming\n${it.toDump()}") - }) - } else { - log.warning { "ignoring unexpected frame of type ${f.frameType}" } - } + catch(t: Throwable) { + log.info { "unexpected exception in websock sender: ${t.stackTraceToString()}" } + closeHandle.completeExceptionally(t) } - } catch (_: CancellationException) { - } catch( _: ClosedReceiveChannelException) { - log.warning { "receive channel closed unexpectedly" } - } catch (t: Throwable) { - log.exception { "unexpected error" to t } + if (closeHandle.isActive) closeHandle.complete(false) + } + launch { + try { + for (f in incoming) { + if (f is Frame.Binary) { + input.send(f.readBytes().toUByteArray().also { + println("incoming\n${it.toDump()}") + }) + } else { + log.warning { "ignoring unexpected frame of type ${f.frameType}" } + } + } + if (closeHandle.isActive) closeHandle.complete(true) + } catch (_: CancellationException) { + if (closeHandle.isActive) closeHandle.complete(false) + } catch (_: ClosedReceiveChannelException) { + log.warning { "receive channel closed unexpectedly" } + if (closeHandle.isActive) closeHandle.complete(false) + } catch (t: Throwable) { + log.exception { "unexpected error" to t } + if (closeHandle.isActive) closeHandle.complete(false) + } + } + if(!closeHandle.await()) { + log.warning { "Client is closing with error" } + throw RemoteInterface.ClosedException() } - log.info { "closing connection" } } + log.info { "closing connection" } } val device = ProxyDevice(input, output) { input.close() diff --git a/src/commonTest/kotlin/TransportTest.kt b/src/commonTest/kotlin/TransportTest.kt index 254af21..1503597 100644 --- a/src/commonTest/kotlin/TransportTest.kt +++ b/src/commonTest/kotlin/TransportTest.kt @@ -193,7 +193,8 @@ class TransportTest { registerError { IllegalStateException() } registerError { IllegalArgumentException(it) } } - val kiloServerConnection = KiloServerConnection(serverInterface, d1, "server session", serverId.secretKey) + val kiloServerConnection = KiloServerConnection(serverInterface, d1, "server session", serverId.secretKey + ) launch { kiloServerConnection.run() } var cnt = 0