diff --git a/src/commonMain/kotlin/net/sergeych/kiloparsec/Command.kt b/src/commonMain/kotlin/net/sergeych/kiloparsec/Command.kt index e2a8f08..a6faee0 100644 --- a/src/commonMain/kotlin/net/sergeych/kiloparsec/Command.kt +++ b/src/commonMain/kotlin/net/sergeych/kiloparsec/Command.kt @@ -31,7 +31,8 @@ class Command( suspend fun exec(packedArgs: UByteArray, handler: suspend (A) -> R): UByteArray = BipackEncoder.encode( resultSerializer, - handler(BipackDecoder.decode(packedArgs.toDataSource(), argsSerializer)) + handler( + BipackDecoder.decode(packedArgs.toDataSource(), argsSerializer)) ).toUByteArray() companion object { diff --git a/src/commonMain/kotlin/net/sergeych/kiloparsec/KiloClient.kt b/src/commonMain/kotlin/net/sergeych/kiloparsec/KiloClient.kt index 362d174..680734f 100644 --- a/src/commonMain/kotlin/net/sergeych/kiloparsec/KiloClient.kt +++ b/src/commonMain/kotlin/net/sergeych/kiloparsec/KiloClient.kt @@ -56,6 +56,7 @@ class KiloClient( delay(1000) } catch (_: CancellationException) { debug { "cancelled" } + break } catch (t: Throwable) { exception { "unexpected exception" to t } delay(1000) diff --git a/src/commonMain/kotlin/net/sergeych/kiloparsec/KiloL0Interface.kt b/src/commonMain/kotlin/net/sergeych/kiloparsec/KiloL0Interface.kt index 6eaa7ad..a787a17 100644 --- a/src/commonMain/kotlin/net/sergeych/kiloparsec/KiloL0Interface.kt +++ b/src/commonMain/kotlin/net/sergeych/kiloparsec/KiloL0Interface.kt @@ -1,8 +1,15 @@ package net.sergeych.kiloparsec import kotlinx.coroutines.CompletableDeferred +import net.sergeych.mp_logger.LogTag +import net.sergeych.mp_logger.Loggable +import net.sergeych.mp_logger.debug +import net.sergeych.mp_logger.info +import net.sergeych.tools.AtomicCounter import net.sergeych.utools.pack +private val idCounter = AtomicCounter(0) + /** * This class is not normally used directly. This is a local interface that supports * secure transport command layer (encrypted calls/results) to work with [KiloRemoteInterface]. @@ -12,7 +19,7 @@ import net.sergeych.utools.pack internal class KiloL0Interface( private val clientInterface: LocalInterface>, private val deferredParams: CompletableDeferred>, -): LocalInterface() { +) : LocalInterface(), Loggable by LogTag("KL0:${idCounter.incrementAndGet()}") { init { // local interface uses the same session as a client: addErrorProvider(clientInterface) @@ -27,7 +34,10 @@ internal class KiloL0Interface( 0u, clientInterface.execute(params.scope, call.name, call.serializedArgs) ) - } catch (t: Throwable) { + } catch(t: RemoteInterface.ClosedException) { + throw t + } + catch (t: Throwable) { clientInterface.encodeError(0u, t) } params.encrypt(pack(result)) diff --git a/src/commonMain/kotlin/net/sergeych/kiloparsec/KiloParams.kt b/src/commonMain/kotlin/net/sergeych/kiloparsec/KiloParams.kt index 9c83885..f3650a1 100644 --- a/src/commonMain/kotlin/net/sergeych/kiloparsec/KiloParams.kt +++ b/src/commonMain/kotlin/net/sergeych/kiloparsec/KiloParams.kt @@ -36,7 +36,7 @@ data class KiloParams( val sessionKeyPair: KeyExchangeSessionKeyPair, val scopeSession: S, val remoteIdentity: SigningKey.Public?, - val remoteTransport: RemoteInterface + val remoteTransport: RemoteInterface, ) { @Serializable data class Package( diff --git a/src/commonMain/kotlin/net/sergeych/kiloparsec/KiloServer.kt b/src/commonMain/kotlin/net/sergeych/kiloparsec/KiloServer.kt index 51f76e3..c9e10ee 100644 --- a/src/commonMain/kotlin/net/sergeych/kiloparsec/KiloServer.kt +++ b/src/commonMain/kotlin/net/sergeych/kiloparsec/KiloServer.kt @@ -32,6 +32,9 @@ class KiloServer( } catch(_: CancellationException) { } + catch(_: RemoteInterface.ClosedException) { + info { "Closed exception caught, closing" } + } catch (t: Throwable) { exception { "unexpected while creating kiloclient" to t } } diff --git a/src/commonMain/kotlin/net/sergeych/kiloparsec/KiloServerConnection.kt b/src/commonMain/kotlin/net/sergeych/kiloparsec/KiloServerConnection.kt index 6a664d9..43ff703 100644 --- a/src/commonMain/kotlin/net/sergeych/kiloparsec/KiloServerConnection.kt +++ b/src/commonMain/kotlin/net/sergeych/kiloparsec/KiloServerConnection.kt @@ -59,7 +59,6 @@ class KiloServerConnection( null, this@KiloServerConnection ) - Handshake(1u, pair.publicKey, serverSigningKey?.seal(params!!.token)) } diff --git a/src/commonMain/kotlin/net/sergeych/kiloparsec/LocalInterface.kt b/src/commonMain/kotlin/net/sergeych/kiloparsec/LocalInterface.kt index f126c49..cc15c8e 100644 --- a/src/commonMain/kotlin/net/sergeych/kiloparsec/LocalInterface.kt +++ b/src/commonMain/kotlin/net/sergeych/kiloparsec/LocalInterface.kt @@ -1,11 +1,17 @@ package net.sergeych.kiloparsec +import net.sergeych.mp_logger.LogTag +import net.sergeych.mp_logger.Loggable +import net.sergeych.mp_logger.info +import net.sergeych.tools.AtomicCounter import net.sergeych.utools.firstNonNull import kotlin.reflect.KClass private typealias RawCommandHandler = suspend (C, UByteArray) -> UByteArray -open class LocalInterface { +private val idCounter = AtomicCounter() + +open class LocalInterface: Loggable by LogTag("LocalInterface${idCounter.incrementAndGet()}") { private val commands = mutableMapOf>() @@ -72,14 +78,16 @@ open class LocalInterface { fun encodeError(forId: UInt, t: Throwable): Transport.Block.Error = getErrorCode(t)?.let { Transport.Block.Error(forId, it, t.message) } - ?: Transport.Block.Error(forId, "UnknownError", t.message) + ?: Transport.Block.Error(forId, "UnknownError", "${t::class.simpleName}: ${t.message}") open fun getErrorBuilder(code: String): ((String, UByteArray?) -> Throwable)? = errorBuilder[code] ?: errorProviders.firstNonNull { it.getErrorBuilder(code) } fun decodeError(tbe: Transport.Block.Error): Throwable = getErrorBuilder(tbe.code)?.invoke(tbe.message, tbe.extra) - ?: RemoteInterface.RemoteException(tbe) + ?: RemoteInterface.RemoteException(tbe).also { + info { "can't decode error ${tbe.code}: ${tbe.message}" } + } fun decodeAndThrow(tbe: Transport.Block.Error): Nothing { throw decodeError(tbe) diff --git a/src/commonMain/kotlin/net/sergeych/kiloparsec/Transport.kt b/src/commonMain/kotlin/net/sergeych/kiloparsec/Transport.kt index dd1c561..ffbe974 100644 --- a/src/commonMain/kotlin/net/sergeych/kiloparsec/Transport.kt +++ b/src/commonMain/kotlin/net/sergeych/kiloparsec/Transport.kt @@ -1,6 +1,8 @@ package net.sergeych.kiloparsec import kotlinx.coroutines.* +import kotlinx.coroutines.channels.ClosedReceiveChannelException +import kotlinx.coroutines.channels.ClosedSendChannelException import kotlinx.coroutines.channels.ReceiveChannel import kotlinx.coroutines.channels.SendChannel import kotlinx.coroutines.sync.Mutex @@ -147,6 +149,12 @@ class Transport( debug { "awaiting incoming blocks" } while (isActive && !isClosed) { try { + debug { "input step starting closed=$isClosed active=$isActive"} + if( isClosed ) { + info { "breaking transort loop on closed"} + break + } + device.input.receive().let { packed -> debug { "<<<\n${packed.toDump()}" } val b = unpack(packed) @@ -178,9 +186,11 @@ class Transport( ) ) } catch (x: RemoteInterface.ClosedException) { - // strange case: handler throws closed? - error { "not supported: command handler for $b has thrown ClosedException" } - send(Block.Error(b.id, "UnexpectedException", x.message)) + // handler forced close + warning { "handler requested closing of the connection"} + isClosed = true + runCatching { device.close() } + throw x } catch (x: RemoteInterface.RemoteException) { send(Block.Error(b.id, x.code, x.text, x.extra)) } catch (t: Throwable) { @@ -189,19 +199,34 @@ class Transport( .also { debug { "command executed: ${b.name}" } } } } + debug { "=---------------------------------------------"} } - } catch (_: CancellationException) { + debug { "input step performed closed=$isClosed active=$isActive"} + } catch (_: ClosedSendChannelException) { + info { "closed send channel" } + isClosed = true + } catch (_: ClosedReceiveChannelException) { + info { "closed receive channel"} + isClosed = true + } + catch (_: CancellationException) { info { "loop is cancelled" } isClosed = true + } catch( _: RemoteInterface.ClosedException) { + debug { "git closed exception here, ignoring" } + isClosed = true } catch (t: Throwable) { exception { "channel closed on error" to t } info { "isa? $isActive / $isClosed" } - runCatching { device.close() } isClosed = true } } + debug { "leaving transport loop" } access.withLock { + debug { "access lock obtained"} isClosed = true + debug { "closgin device $device" } + runCatching { device.close() } for (c in calls.values) c.completeExceptionally(RemoteInterface.ClosedException()) calls.clear() } @@ -211,7 +236,12 @@ class Transport( } private suspend fun send(block: Block) { - device.output.send(pack(block)) + try { + device.output.send(pack(block)) + } + catch(_: ClosedSendChannelException) { + throw RemoteInterface.ClosedException() + } } } diff --git a/src/commonMain/kotlin/net/sergeych/kiloparsec/adapter/ProxyDevice.kt b/src/commonMain/kotlin/net/sergeych/kiloparsec/adapter/ProxyDevice.kt index 5ffd8d3..00264c7 100644 --- a/src/commonMain/kotlin/net/sergeych/kiloparsec/adapter/ProxyDevice.kt +++ b/src/commonMain/kotlin/net/sergeych/kiloparsec/adapter/ProxyDevice.kt @@ -3,6 +3,7 @@ package net.sergeych.kiloparsec.adapter import kotlinx.coroutines.channels.Channel import kotlinx.coroutines.channels.ReceiveChannel import kotlinx.coroutines.channels.SendChannel +import net.sergeych.kiloparsec.RemoteInterface import net.sergeych.kiloparsec.Transport import net.sergeych.tools.AtomicCounter @@ -10,7 +11,7 @@ private val counter = AtomicCounter() open class ProxyDevice( inputChannel: Channel, outputChannel: Channel, - private val onClose: suspend ()->Unit = {}): Transport.Device { + private val onClose: suspend ()->Unit = { throw RemoteInterface.ClosedException() }): Transport.Device { override val input: ReceiveChannel = inputChannel override val output: SendChannel = outputChannel diff --git a/src/commonMain/kotlin/net/sergeych/kiloparsec/adapter/websocketClient.kt b/src/commonMain/kotlin/net/sergeych/kiloparsec/adapter/websocketClient.kt index 83f3083..898b2ed 100644 --- a/src/commonMain/kotlin/net/sergeych/kiloparsec/adapter/websocketClient.kt +++ b/src/commonMain/kotlin/net/sergeych/kiloparsec/adapter/websocketClient.kt @@ -7,8 +7,12 @@ import io.ktor.websocket.* import kotlinx.coroutines.CancellationException import kotlinx.coroutines.cancel import kotlinx.coroutines.channels.Channel +import kotlinx.coroutines.channels.ClosedReceiveChannelException +import kotlinx.coroutines.channels.ClosedSendChannelException +import kotlinx.coroutines.channels.ReceiveChannel import kotlinx.coroutines.launch import net.sergeych.crypto2.SigningKey +import net.sergeych.crypto2.toDump import net.sergeych.kiloparsec.KiloClient import net.sergeych.kiloparsec.KiloConnectionData import net.sergeych.kiloparsec.KiloInterface @@ -21,7 +25,7 @@ import net.sergeych.tools.AtomicCounter private val counter = AtomicCounter() -fun websocketClient( +fun websocketClient( path: String, clientInterface: KiloInterface = KiloInterface(), client: HttpClient = HttpClient { install(WebSockets) }, @@ -48,36 +52,42 @@ fun websocketClient( url.port = u.port url.encodedPath = u.encodedPath url.parameters.appendAll(u.parameters) - log.info { "kiloparsec server URL: $url" } + log.info { "kiloparsec server URL: $url" } }) { try { log.info { "connected to the server" } println("SENDING!!!") send("Helluva") launch { - for (block in output) { - send(block.toByteArray()) + try { + for (block in output) { + send(block.toByteArray()) + } + log.info { "input is closed, closing the websocket" } + } catch (_: ClosedSendChannelException) { + log.info { "send channel closed" } } - log.info { "input is closed, closing the websocket" } cancel() } for (f in incoming) { if (f is Frame.Binary) { - input.send(f.readBytes().toUByteArray()) + input.send(f.readBytes().toUByteArray().also { + println("incoming\n${it.toDump()}") + }) } else { log.warning { "ignoring unexpected frame of type ${f.frameType}" } } } - } - catch(_:CancellationException) { - } - catch(t: Throwable) { + } catch (_: CancellationException) { + } catch( _: ClosedReceiveChannelException) { + log.warning { "receive channel closed unexpectedly" } + } catch (t: Throwable) { log.exception { "unexpected error" to t } } log.info { "closing connection" } } } - val device = ProxyDevice(input,output) { + val device = ProxyDevice(input, output) { input.close() // we need to explicitly close the coroutine job, or it can hang for a long time // leaking resources. diff --git a/src/commonTest/kotlin/assertThrows.kt b/src/commonTest/kotlin/assertThrows.kt index dc63eeb..3e2d4f1 100644 --- a/src/commonTest/kotlin/assertThrows.kt +++ b/src/commonTest/kotlin/assertThrows.kt @@ -8,6 +8,7 @@ inline fun assertThrows(f: ()->Unit): T { } catch(x: Throwable) { if( x is T ) return x - fail("expected to throw $name but instead threw ${x::class.simpleName}: $x") + println("expected to throw $name but instead threw ${x::class.simpleName}: $x\b\n${x.stackTraceToString()}") + fail("expected to throw $name but instead threw ${x::class.simpleName}: $x\b\n${x.stackTraceToString()}") } } \ No newline at end of file diff --git a/src/jvmMain/kotlin/net/sergeych/kiloparsec/adapter/WebsocketServer.kt b/src/jvmMain/kotlin/net/sergeych/kiloparsec/adapter/WebsocketServer.kt index 50b9be6..fb21c66 100644 --- a/src/jvmMain/kotlin/net/sergeych/kiloparsec/adapter/WebsocketServer.kt +++ b/src/jvmMain/kotlin/net/sergeych/kiloparsec/adapter/WebsocketServer.kt @@ -6,15 +6,16 @@ import io.ktor.server.websocket.* import io.ktor.websocket.* import kotlinx.coroutines.cancel import kotlinx.coroutines.channels.Channel +import kotlinx.coroutines.channels.ClosedReceiveChannelException +import kotlinx.coroutines.channels.ClosedSendChannelException import kotlinx.coroutines.isActive import kotlinx.coroutines.launch import net.sergeych.crypto2.SigningKey import net.sergeych.crypto2.toDump import net.sergeych.kiloparsec.KiloInterface import net.sergeych.kiloparsec.KiloServerConnection -import net.sergeych.mp_logger.LogTag -import net.sergeych.mp_logger.debug -import net.sergeych.mp_logger.warning +import net.sergeych.kiloparsec.RemoteInterface +import net.sergeych.mp_logger.* import net.sergeych.tools.AtomicCounter import java.time.Duration @@ -49,23 +50,36 @@ fun Application.setupWebsocketServer( } val server = KiloServerConnection( localInterface, - ProxyDevice(input, output) { input.close() }, + ProxyDevice(input, output), createSession(), serverKey ) launch { server.run() } log.debug { "KSC started, looking for incoming frames" } - for( f in incoming) { + for (f in incoming) { log.debug { "incoming frame: ${f.frameType}" } if (f is Frame.Binary) - input.send(f.readBytes().toUByteArray().also { - log.debug { "in frame\n${it.toDump()}" } - }) + try { + input.send(f.readBytes().toUByteArray().also { + log.debug { "in frame\n${it.toDump()}" } + }) + } catch (_: RemoteInterface.ClosedException) { + log.info { "caught local closed exception, closing" } + break + } catch (_: ClosedReceiveChannelException) { + log.info { "receive channel is closed, closing connection" } + break + } catch (t: Throwable) { + log.exception { "unexpected exception, server connection will close" to t } + break + } else log.warning { "unknown frame type ${f.frameType}, ignoring" } } log.debug { "closing the server" } + println("****************prec") cancel() + println("****************postc") } } } diff --git a/src/jvmTest/kotlin/net/sergeych/kiloparsec/ClientTest.kt b/src/jvmTest/kotlin/net/sergeych/kiloparsec/ClientTest.kt index 3684c2d..d7e3426 100644 --- a/src/jvmTest/kotlin/net/sergeych/kiloparsec/ClientTest.kt +++ b/src/jvmTest/kotlin/net/sergeych/kiloparsec/ClientTest.kt @@ -1,7 +1,11 @@ package net.sergeych.kiloparsec +import assertThrows import io.ktor.server.engine.* import io.ktor.server.netty.* +import kotlinx.coroutines.delay +import kotlinx.coroutines.flow.take +import kotlinx.coroutines.launch import kotlinx.coroutines.test.runTest import net.sergeych.crypto2.initCrypto import net.sergeych.kiloparsec.adapter.acceptTcpDevice @@ -10,9 +14,7 @@ import net.sergeych.kiloparsec.adapter.setupWebsocketServer import net.sergeych.kiloparsec.adapter.websocketClient import net.sergeych.mp_logger.Log import java.net.InetAddress -import kotlin.test.Test -import kotlin.test.assertEquals -import kotlin.test.assertTrue +import kotlin.test.* class ClientTest { @@ -57,6 +59,7 @@ class ClientTest { fun webSocketTest() = runTest { initCrypto() // fun Application. + val cmdClose by command() val cmdGetFoo by command() val cmdSetFoo by command() val cmdCheckConnected by command() @@ -64,12 +67,23 @@ class ClientTest { Log.connectConsole(Log.Level.DEBUG) data class Session(var foo: String="not set") + var closeCounter = 0 val serverInterface = KiloInterface().apply { var connectedCalled = false onConnected { connectedCalled = true } on(cmdGetFoo) { session.foo } on(cmdSetFoo) { session.foo = it } on(cmdCheckConnected) { connectedCalled } + on(cmdClose) { + throw RemoteInterface.ClosedException() +// if( closeCounter < 2 ) { +// println("-------------------------- call close!") +// throw RemoteInterface.ClosedException() +// } +// else { +// println("close counter $closeCounter, ignoring") +// } + } } // val server = setupWebsoketServer() val ns: NettyApplicationEngine = embeddedServer(Netty, port = 8080, host = "0.0.0.0", module = { @@ -77,6 +91,14 @@ class ClientTest { }).start(wait = false) val client = websocketClient("ws://localhost:8080/kp") + val states = mutableListOf() + val collector = launch { + client.state.collect { + println("got: $closeCounter/$it") + states += it + if( !it) { closeCounter++ } + } + } println(1) assertEquals(true, client.call(cmdCheckConnected)) assertTrue { client.state.value } @@ -87,9 +109,16 @@ class ClientTest { println(4) assertEquals("foo", client.call(cmdGetFoo)) println(5) - + assertThrows { + client.call(cmdClose) + } + println("0------------------------------------------------------------------------------connection should be closed") +// assertFalse { client.state.value } +// assertEquals("foo", client.call(cmdGetFoo)) client.close() ns.stop() + collector.cancel() + println("----= states: $states") println("stopped server") println("closed client") }