diff --git a/.idea/misc.xml b/.idea/misc.xml
index 45281f7..94a01e0 100644
--- a/.idea/misc.xml
+++ b/.idea/misc.xml
@@ -3,7 +3,7 @@
-
+
\ No newline at end of file
diff --git a/build.gradle.kts b/build.gradle.kts
index 35bba14..725d247 100644
--- a/build.gradle.kts
+++ b/build.gradle.kts
@@ -47,12 +47,12 @@ kotlin {
// else -> throw GradleException("Host OS is not supported in Kotlin/Native.")
// }
- macosArm64()
- iosX64()
- iosArm64()
- iosSimulatorArm64()
+// macosArm64()
+// iosX64()
+// iosArm64()
+// iosSimulatorArm64()
linuxX64()
- macosX64()
+// macosX64()
val ktor_version = "2.3.6"
diff --git a/src/commonMain/kotlin/net/sergeych/kiloparsec/Transport.kt b/src/commonMain/kotlin/net/sergeych/kiloparsec/Transport.kt
index ffbe974..e83fafd 100644
--- a/src/commonMain/kotlin/net/sergeych/kiloparsec/Transport.kt
+++ b/src/commonMain/kotlin/net/sergeych/kiloparsec/Transport.kt
@@ -147,14 +147,9 @@ class Transport(
suspend fun run() {
coroutineScope {
debug { "awaiting incoming blocks" }
+ // todo: rewrite it to close the job with no exceptions at all, always
while (isActive && !isClosed) {
try {
- debug { "input step starting closed=$isClosed active=$isActive"}
- if( isClosed ) {
- info { "breaking transort loop on closed"}
- break
- }
-
device.input.receive().let { packed ->
debug { "<<<\n${packed.toDump()}" }
val b = unpack(packed)
@@ -189,7 +184,6 @@ class Transport(
// handler forced close
warning { "handler requested closing of the connection"}
isClosed = true
- runCatching { device.close() }
throw x
} catch (x: RemoteInterface.RemoteException) {
send(Block.Error(b.id, x.code, x.text, x.extra))
@@ -210,10 +204,7 @@ class Transport(
isClosed = true
}
catch (_: CancellationException) {
- info { "loop is cancelled" }
- isClosed = true
- } catch( _: RemoteInterface.ClosedException) {
- debug { "git closed exception here, ignoring" }
+ info { "loop is cancelled with CancellationException" }
isClosed = true
} catch (t: Throwable) {
exception { "channel closed on error" to t }
@@ -225,10 +216,12 @@ class Transport(
access.withLock {
debug { "access lock obtained"}
isClosed = true
- debug { "closgin device $device" }
+ debug { "closing device $device, calls in queue ${calls.size}" }
runCatching { device.close() }
- for (c in calls.values) c.completeExceptionally(RemoteInterface.ClosedException())
+ for (c in calls.values)
+ c.completeExceptionally(RemoteInterface.ClosedException())
calls.clear()
+ debug { "calls clear has been called" }
}
debug { "no more active: $isActive / ${calls.size}" }
}
diff --git a/src/commonMain/kotlin/net/sergeych/kiloparsec/adapter/websocketClient.kt b/src/commonMain/kotlin/net/sergeych/kiloparsec/adapter/websocketClient.kt
index 898b2ed..f2d2250 100644
--- a/src/commonMain/kotlin/net/sergeych/kiloparsec/adapter/websocketClient.kt
+++ b/src/commonMain/kotlin/net/sergeych/kiloparsec/adapter/websocketClient.kt
@@ -5,6 +5,7 @@ import io.ktor.client.plugins.websocket.*
import io.ktor.http.*
import io.ktor.websocket.*
import kotlinx.coroutines.CancellationException
+import kotlinx.coroutines.CompletableDeferred
import kotlinx.coroutines.cancel
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.channels.ClosedReceiveChannelException
@@ -16,6 +17,7 @@ import net.sergeych.crypto2.toDump
import net.sergeych.kiloparsec.KiloClient
import net.sergeych.kiloparsec.KiloConnectionData
import net.sergeych.kiloparsec.KiloInterface
+import net.sergeych.kiloparsec.RemoteInterface
import net.sergeych.mp_logger.LogTag
import net.sergeych.mp_logger.exception
import net.sergeych.mp_logger.info
@@ -54,38 +56,54 @@ fun websocketClient(
url.parameters.appendAll(u.parameters)
log.info { "kiloparsec server URL: $url" }
}) {
- try {
- log.info { "connected to the server" }
- println("SENDING!!!")
- send("Helluva")
- launch {
- try {
- for (block in output) {
- send(block.toByteArray())
- }
- log.info { "input is closed, closing the websocket" }
- } catch (_: ClosedSendChannelException) {
- log.info { "send channel closed" }
+ val closeHandle = CompletableDeferred()
+ log.info { "connected to the server" }
+// println("SENDING!!!")
+// send("Helluva")
+ launch {
+ try {
+ for (block in output) {
+ send(block.toByteArray())
}
- cancel()
+ log.info { "input is closed, closing the websocket" }
+ if (closeHandle.isActive) closeHandle.complete(true)
+ } catch (_: ClosedSendChannelException) {
+ log.info { "send channel closed" }
}
- for (f in incoming) {
- if (f is Frame.Binary) {
- input.send(f.readBytes().toUByteArray().also {
- println("incoming\n${it.toDump()}")
- })
- } else {
- log.warning { "ignoring unexpected frame of type ${f.frameType}" }
- }
+ catch(t: Throwable) {
+ log.info { "unexpected exception in websock sender: ${t.stackTraceToString()}" }
+ closeHandle.completeExceptionally(t)
}
- } catch (_: CancellationException) {
- } catch( _: ClosedReceiveChannelException) {
- log.warning { "receive channel closed unexpectedly" }
- } catch (t: Throwable) {
- log.exception { "unexpected error" to t }
+ if (closeHandle.isActive) closeHandle.complete(false)
+ }
+ launch {
+ try {
+ for (f in incoming) {
+ if (f is Frame.Binary) {
+ input.send(f.readBytes().toUByteArray().also {
+ println("incoming\n${it.toDump()}")
+ })
+ } else {
+ log.warning { "ignoring unexpected frame of type ${f.frameType}" }
+ }
+ }
+ if (closeHandle.isActive) closeHandle.complete(true)
+ } catch (_: CancellationException) {
+ if (closeHandle.isActive) closeHandle.complete(false)
+ } catch (_: ClosedReceiveChannelException) {
+ log.warning { "receive channel closed unexpectedly" }
+ if (closeHandle.isActive) closeHandle.complete(false)
+ } catch (t: Throwable) {
+ log.exception { "unexpected error" to t }
+ if (closeHandle.isActive) closeHandle.complete(false)
+ }
+ }
+ if(!closeHandle.await()) {
+ log.warning { "Client is closing with error" }
+ throw RemoteInterface.ClosedException()
}
- log.info { "closing connection" }
}
+ log.info { "closing connection" }
}
val device = ProxyDevice(input, output) {
input.close()
diff --git a/src/commonTest/kotlin/TransportTest.kt b/src/commonTest/kotlin/TransportTest.kt
index 254af21..1503597 100644
--- a/src/commonTest/kotlin/TransportTest.kt
+++ b/src/commonTest/kotlin/TransportTest.kt
@@ -193,7 +193,8 @@ class TransportTest {
registerError { IllegalStateException() }
registerError { IllegalArgumentException(it) }
}
- val kiloServerConnection = KiloServerConnection(serverInterface, d1, "server session", serverId.secretKey)
+ val kiloServerConnection = KiloServerConnection(serverInterface, d1, "server session", serverId.secretKey
+ )
launch { kiloServerConnection.run() }
var cnt = 0