From 192f7e135ffda483efce6a8bf8dbd50cdde4a8b2 Mon Sep 17 00:00:00 2001 From: sergeych Date: Thu, 23 Nov 2023 10:58:14 +0300 Subject: [PATCH] WS layer with tests --- .../sergeych/kiloparsec/CommandDelegate.kt | 3 ++ .../net/sergeych/kiloparsec/KiloClient.kt | 2 + .../kiloparsec/KiloClientConnection.kt | 1 + .../net/sergeych/kiloparsec/Transport.kt | 12 +++++- .../kiloparsec/adapter/ProxyDevice.kt | 6 +++ .../kiloparsec/adapter/websocketClient.kt | 22 +++++++--- .../kiloparsec/adapter/WebsocketServer.kt | 11 ++++- .../net/sergeych/kiloparsec/ClientTest.kt | 41 ++++++++++++++++++- 8 files changed, 87 insertions(+), 11 deletions(-) diff --git a/src/commonMain/kotlin/net/sergeych/kiloparsec/CommandDelegate.kt b/src/commonMain/kotlin/net/sergeych/kiloparsec/CommandDelegate.kt index 33d68d6..061a6e3 100644 --- a/src/commonMain/kotlin/net/sergeych/kiloparsec/CommandDelegate.kt +++ b/src/commonMain/kotlin/net/sergeych/kiloparsec/CommandDelegate.kt @@ -7,6 +7,9 @@ import kotlin.reflect.KProperty /** * delegate returning function that creates a [Command] in the current context which by default has the name of * the property. + * + * The Default name is a property name except the "cmd" prefix if present, which will be + * removed automatically. */ inline fun command(overrideName: String? = null): CommandDelegate { return CommandDelegate( diff --git a/src/commonMain/kotlin/net/sergeych/kiloparsec/KiloClient.kt b/src/commonMain/kotlin/net/sergeych/kiloparsec/KiloClient.kt index 1a2b63f..46909e1 100644 --- a/src/commonMain/kotlin/net/sergeych/kiloparsec/KiloClient.kt +++ b/src/commonMain/kotlin/net/sergeych/kiloparsec/KiloClient.kt @@ -63,11 +63,13 @@ class KiloClient( _state.value = false if (deferredClient.isActive) deferredClient = CompletableDeferred() + delay(1000) } } fun close() { job.cancel() + debug { "client is closed" } } override suspend fun call(cmd: Command, args: A): R = deferredClient.await().call(cmd, args) diff --git a/src/commonMain/kotlin/net/sergeych/kiloparsec/KiloClientConnection.kt b/src/commonMain/kotlin/net/sergeych/kiloparsec/KiloClientConnection.kt index 4579e34..5e4a7f6 100644 --- a/src/commonMain/kotlin/net/sergeych/kiloparsec/KiloClientConnection.kt +++ b/src/commonMain/kotlin/net/sergeych/kiloparsec/KiloClientConnection.kt @@ -86,6 +86,7 @@ class KiloClientConnection( } catch (x: CancellationException) { info { "client is cancelled" } } catch (x: RemoteInterface.ClosedException) { + x.printStackTrace() info { "connection closed by remote" } } finally { onConnectedStateChanged?.invoke(false) diff --git a/src/commonMain/kotlin/net/sergeych/kiloparsec/Transport.kt b/src/commonMain/kotlin/net/sergeych/kiloparsec/Transport.kt index 59e125b..dd1c561 100644 --- a/src/commonMain/kotlin/net/sergeych/kiloparsec/Transport.kt +++ b/src/commonMain/kotlin/net/sergeych/kiloparsec/Transport.kt @@ -112,8 +112,16 @@ class Transport( } // now we have mutex freed so we can call: - val r = device.output.trySend(pack(b).also { debug { ">>>\n${it.toDump()}" } }) - if (!r.isSuccess) deferred.completeExceptionally(RemoteInterface.ClosedException()) + val r = runCatching { device.output.send(pack(b).also { debug { ">>>\n${it.toDump()}" } }) } + if (!r.isSuccess) { + r.exceptionOrNull()?.let { + exception { "failed to send output block" to it } + } ?: run { + error { "It should not happen: empty exception on block send failure" } + throw RuntimeException("unexpected failure in sending block") + } + deferred.completeExceptionally(RemoteInterface.ClosedException()) + } // it returns packed result or throws a proper error: return deferred.await() diff --git a/src/commonMain/kotlin/net/sergeych/kiloparsec/adapter/ProxyDevice.kt b/src/commonMain/kotlin/net/sergeych/kiloparsec/adapter/ProxyDevice.kt index 9c58692..5ffd8d3 100644 --- a/src/commonMain/kotlin/net/sergeych/kiloparsec/adapter/ProxyDevice.kt +++ b/src/commonMain/kotlin/net/sergeych/kiloparsec/adapter/ProxyDevice.kt @@ -4,7 +4,9 @@ import kotlinx.coroutines.channels.Channel import kotlinx.coroutines.channels.ReceiveChannel import kotlinx.coroutines.channels.SendChannel import net.sergeych.kiloparsec.Transport +import net.sergeych.tools.AtomicCounter +private val counter = AtomicCounter() open class ProxyDevice( inputChannel: Channel, outputChannel: Channel, @@ -15,4 +17,8 @@ open class ProxyDevice( override suspend fun close() { onClose() } + + private val id = counter.incrementAndGet() + + override fun toString(): String = "PX$id" } \ No newline at end of file diff --git a/src/commonMain/kotlin/net/sergeych/kiloparsec/adapter/websocketClient.kt b/src/commonMain/kotlin/net/sergeych/kiloparsec/adapter/websocketClient.kt index 265363e..ffab2a5 100644 --- a/src/commonMain/kotlin/net/sergeych/kiloparsec/adapter/websocketClient.kt +++ b/src/commonMain/kotlin/net/sergeych/kiloparsec/adapter/websocketClient.kt @@ -26,7 +26,10 @@ fun websocketClient( clientInterface: KiloInterface = KiloInterface(), client: HttpClient = HttpClient { install(WebSockets) }, secretKey: SigningKey.Secret? = null, - sessionMaker: () -> S, + sessionMaker: () -> S = { + @Suppress("UNCHECKED_CAST") + Unit as S + }, ): KiloClient { var u = Url(path) if (u.encodedPath.length <= 1) @@ -37,16 +40,20 @@ fun websocketClient( return KiloClient(clientInterface, secretKey) { val input = Channel() val output = Channel() - globalLaunch { - val log = LogTag("KC:${counter.incrementAndGet()}:$u") + val job = globalLaunch { + val log = LogTag("KC:${counter.incrementAndGet()}") client.webSocket({ url.protocol = u.protocol url.host = u.host + url.port = u.port url.encodedPath = u.encodedPath url.parameters.appendAll(u.parameters) + log.info { "kiloparsec server URL: $url" } }) { try { - log.info { "connected to server" } + log.info { "connected to the server" } + println("SENDING!!!") + send("Helluva") launch { for (block in output) { send(block.toByteArray()) @@ -70,7 +77,12 @@ fun websocketClient( log.info { "closing connection" } } } - val device = ProxyDevice(input,output) { input.close() } + val device = ProxyDevice(input,output) { + input.close() + // we need to explicitly close the coroutine job or it can hang active + // forever and leak resources: + job.cancel() + } KiloConnectionData(device, sessionMaker()) } } diff --git a/src/jvmMain/kotlin/net/sergeych/kiloparsec/adapter/WebsocketServer.kt b/src/jvmMain/kotlin/net/sergeych/kiloparsec/adapter/WebsocketServer.kt index b73dc63..50b9be6 100644 --- a/src/jvmMain/kotlin/net/sergeych/kiloparsec/adapter/WebsocketServer.kt +++ b/src/jvmMain/kotlin/net/sergeych/kiloparsec/adapter/WebsocketServer.kt @@ -9,6 +9,7 @@ import kotlinx.coroutines.channels.Channel import kotlinx.coroutines.isActive import kotlinx.coroutines.launch import net.sergeych.crypto2.SigningKey +import net.sergeych.crypto2.toDump import net.sergeych.kiloparsec.KiloInterface import net.sergeych.kiloparsec.KiloServerConnection import net.sergeych.mp_logger.LogTag @@ -34,14 +35,17 @@ fun Application.setupWebsocketServer( val counter = AtomicCounter() routing { webSocket(path) { + println("--------------------------------------------") val log = LogTag("KWS:${counter.incrementAndGet()}") log.debug { "opening the connection" } val input = Channel(256) val output = Channel(256) launch { + log.debug { "starting output pump" } while (isActive) { send(output.receive().toByteArray()) } + log.debug { "closing output pump" } } val server = KiloServerConnection( localInterface, @@ -50,12 +54,15 @@ fun Application.setupWebsocketServer( serverKey ) launch { server.run() } + log.debug { "KSC started, looking for incoming frames" } for( f in incoming) { + log.debug { "incoming frame: ${f.frameType}" } if (f is Frame.Binary) - input.send(f.readBytes().toUByteArray()) + input.send(f.readBytes().toUByteArray().also { + log.debug { "in frame\n${it.toDump()}" } + }) else log.warning { "unknown frame type ${f.frameType}, ignoring" } - } log.debug { "closing the server" } cancel() diff --git a/src/jvmTest/kotlin/net/sergeych/kiloparsec/ClientTest.kt b/src/jvmTest/kotlin/net/sergeych/kiloparsec/ClientTest.kt index 67fbbac..6c398f6 100644 --- a/src/jvmTest/kotlin/net/sergeych/kiloparsec/ClientTest.kt +++ b/src/jvmTest/kotlin/net/sergeych/kiloparsec/ClientTest.kt @@ -1,9 +1,13 @@ package net.sergeych.kiloparsec +import io.ktor.server.engine.* +import io.ktor.server.netty.* import kotlinx.coroutines.test.runTest import net.sergeych.crypto2.initCrypto import net.sergeych.kiloparsec.adapter.acceptTcpDevice import net.sergeych.kiloparsec.adapter.connectTcpDevice +import net.sergeych.kiloparsec.adapter.setupWebsocketServer +import net.sergeych.kiloparsec.adapter.websocketClient import net.sergeych.mp_logger.Log import java.net.InetAddress import kotlin.test.Test @@ -46,12 +50,45 @@ class ClientTest { client.call(cmdSave, "foobar") assertEquals("foobar", client.call(cmdLoad)) server.close() -// client.close() - // Todo } @Test fun webSocketTest() = runTest { + initCrypto() +// fun Application. + val cmdGetFoo by command() + val cmdSetFoo by command() + val cmdCheckConnected by command() + + Log.connectConsole(Log.Level.DEBUG) + + data class Session(var foo: String="not set") + val serverInterface = KiloInterface().apply { + var connectedCalled = false + onConnected { connectedCalled = true } + on(cmdGetFoo) { session.foo } + on(cmdSetFoo) { session.foo = it } + on(cmdCheckConnected) { connectedCalled } + } // val server = setupWebsoketServer() + val ns: NettyApplicationEngine = embeddedServer(Netty, port = 8080, host = "0.0.0.0", module = { + setupWebsocketServer(serverInterface) { Session() } + }).start(wait = false) + + val client = websocketClient("ws://localhost:8080/kp") + println(1) + assertEquals(true, client.call(cmdCheckConnected)) + println(2) + assertEquals("not set", client.call(cmdGetFoo)) + println(3) + client.call(cmdSetFoo, "foo") + println(4) + assertEquals("foo", client.call(cmdGetFoo)) + println(5) + + client.close() + ns.stop() + println("stopped server") + println("closed client") } } \ No newline at end of file