From cb696f5c0fbb2b1dc34477d60f6bb1ea3a795237 Mon Sep 17 00:00:00 2001 From: sergeych Date: Wed, 30 Apr 2025 14:16:07 +0400 Subject: [PATCH] 0.6.9-snapshot: wbesock can use text frames too (good for xiaomi) --- build.gradle.kts | 6 +- .../kiloparsec/adapter/websocketClient.kt | 21 +++-- .../kiloparsec/adapter/WebsocketServer.kt | 58 +++++++++---- .../net/sergeych/kiloparsec/ClientTest.kt | 81 ++++++++++++++++++- 4 files changed, 137 insertions(+), 29 deletions(-) diff --git a/build.gradle.kts b/build.gradle.kts index 74de9c9..c4e004e 100644 --- a/build.gradle.kts +++ b/build.gradle.kts @@ -19,7 +19,7 @@ plugins { } group = "net.sergeych" -version = "0.6.8" +version = "0.6.9-SNAPSHOT" repositories { mavenCentral() @@ -64,7 +64,7 @@ kotlin { val commonMain by getting { dependencies { implementation("org.jetbrains.kotlinx:kotlinx-coroutines-core:1.10.1") - implementation("org.jetbrains.kotlinx:kotlinx-serialization-json:1.7.2") + implementation("org.jetbrains.kotlinx:kotlinx-serialization-json:1.8.1") api("io.ktor:ktor-client-core:$ktor_version") api("net.sergeych:crypto2:0.8.4") } @@ -84,7 +84,7 @@ kotlin { dependencies { implementation(kotlin("test")) implementation("org.slf4j:slf4j-simple:2.0.9") - implementation("org.jetbrains.kotlinx:kotlinx-coroutines-test:1.8.1") + implementation("org.jetbrains.kotlinx:kotlinx-coroutines-test:1.10.1") } } val ktorSocketTest by creating { diff --git a/src/commonMain/kotlin/net/sergeych/kiloparsec/adapter/websocketClient.kt b/src/commonMain/kotlin/net/sergeych/kiloparsec/adapter/websocketClient.kt index 17aba30..6838843 100644 --- a/src/commonMain/kotlin/net/sergeych/kiloparsec/adapter/websocketClient.kt +++ b/src/commonMain/kotlin/net/sergeych/kiloparsec/adapter/websocketClient.kt @@ -24,6 +24,8 @@ import kotlinx.io.IOException import net.sergeych.crypto2.SigningKey import net.sergeych.kiloparsec.* import net.sergeych.mp_logger.* +import net.sergeych.mp_tools.decodeBase64Compact +import net.sergeych.mp_tools.encodeToBase64Compact import net.sergeych.mp_tools.globalLaunch import net.sergeych.tools.AtomicCounter @@ -37,13 +39,14 @@ fun websocketClient( path: String, clientInterface: KiloInterface = KiloInterface(), secretKey: SigningKey? = null, + useTextFrames: Boolean = false, sessionMaker: () -> S = { @Suppress("UNCHECKED_CAST") Unit as S }, ): KiloClient { return KiloClient(clientInterface, secretKey) { - KiloConnectionData(websocketTransportDevice(path), sessionMaker()) + KiloConnectionData(websocketTransportDevice(path, useTextFrames), sessionMaker()) } } @@ -54,6 +57,7 @@ fun websocketClient( */ fun websocketTransportDevice( path: String, + useTextFrames: Boolean = false, client: HttpClient = HttpClient { install(WebSockets) }, @@ -87,7 +91,12 @@ fun websocketTransportDevice( launch { try { for (block in output) { - send(block.toByteArray()) + if (useTextFrames) + send( + Frame.Text(block.asByteArray().encodeToBase64Compact()) + ) + else + send(block.toByteArray()) } log.info { "input is closed, closing the websocket" } if (closeHandle.isActive) closeHandle.complete(true) @@ -103,10 +112,10 @@ fun websocketTransportDevice( launch { try { for (f in incoming) { - if (f is Frame.Binary) { - input.send(f.readBytes().toUByteArray()) - } else { - log.warning { "ignoring unexpected frame of type ${f.frameType}" } + when (f) { + is Frame.Binary -> input.send(f.readBytes().toUByteArray()) + is Frame.Text -> input.send(f.readText().decodeBase64Compact().toUByteArray()) + else -> log.warning { "ignoring unexpected frame of type ${f.frameType}" } } } if (closeHandle.isActive) closeHandle.complete(true) diff --git a/src/jvmMain/kotlin/net/sergeych/kiloparsec/adapter/WebsocketServer.kt b/src/jvmMain/kotlin/net/sergeych/kiloparsec/adapter/WebsocketServer.kt index 4da624a..5a6961f 100644 --- a/src/jvmMain/kotlin/net/sergeych/kiloparsec/adapter/WebsocketServer.kt +++ b/src/jvmMain/kotlin/net/sergeych/kiloparsec/adapter/WebsocketServer.kt @@ -24,6 +24,8 @@ import net.sergeych.kiloparsec.KiloInterface import net.sergeych.kiloparsec.KiloServerConnection import net.sergeych.kiloparsec.RemoteInterface import net.sergeych.mp_logger.* +import net.sergeych.mp_tools.decodeBase64Compact +import net.sergeych.mp_tools.encodeToBase64Compact import net.sergeych.tools.AtomicCounter import kotlin.time.Duration.Companion.seconds @@ -66,13 +68,18 @@ fun Application.setupWebsocketServer( log.debug { "opening the connection" } val input = Channel(256) val output = Channel(256) + var useBinary: Boolean? = null launch { log.debug { "starting output pump" } while (isActive) { try { - send(output.receive().toByteArray()) - } - catch(_: ClosedReceiveChannelException) { + val block = output.receive() + if (useBinary == false) + send(block.asByteArray().encodeToBase64Compact()) + else + send(block.toByteArray()) + + } catch (_: ClosedReceiveChannelException) { log.debug { "closing output pump as output channel is closed" } break } @@ -91,21 +98,38 @@ fun Application.setupWebsocketServer( } log.debug { "KSC started, looking for incoming frames" } for (f in incoming) { - if (f is Frame.Binary) - try { - input.send(f.readBytes().toUByteArray()) - } catch (_: RemoteInterface.ClosedException) { - log.warning { "caught local closed exception (strange!), closing" } - break - } catch (_: ClosedReceiveChannelException) { - log.info { "receive channel is closed, closing connection" } - break - } catch (t: Throwable) { - log.exception { "unexpected exception, server connection will close" to t } - break + try { + when (f) { + is Frame.Binary -> { + if (useBinary == null) { + log.debug { "Setting binary frame mode ------------------------------------" } + useBinary = true + } + input.send(f.readBytes().toUByteArray()) + } + + is Frame.Text -> { + if (useBinary == null) { + log.debug { "Setting text frame mode -----------------------------------" } + useBinary = false + } + input.send(f.readText().decodeBase64Compact().asUByteArray()) + } + + else -> { + log.warning { "unexpected frame type ${f.frameType}, ignoring" } + } } - else - log.warning { "unknown frame type ${f.frameType}, ignoring" } + } catch (_: RemoteInterface.ClosedException) { + log.warning { "caught local closed exception (strange!), closing" } + break + } catch (_: ClosedReceiveChannelException) { + log.info { "receive channel is closed, closing connection" } + break + } catch (t: Throwable) { + log.exception { "unexpected exception, server connection will close" to t } + break + } } log.debug { "closing the server" } close() diff --git a/src/jvmTest/kotlin/net/sergeych/kiloparsec/ClientTest.kt b/src/jvmTest/kotlin/net/sergeych/kiloparsec/ClientTest.kt index 238ff17..369dfa0 100644 --- a/src/jvmTest/kotlin/net/sergeych/kiloparsec/ClientTest.kt +++ b/src/jvmTest/kotlin/net/sergeych/kiloparsec/ClientTest.kt @@ -42,7 +42,7 @@ class ClientTest { @Test - fun webSocketTest() = runTest { + fun webSocketTest1() = runTest { initCrypto() // fun Application. val cmdClose by command() @@ -111,6 +111,60 @@ class ClientTest { // println("closed client") } + @Test + fun webSocketTest2() = runTest { + initCrypto() +// fun Application. + val cmdClose by command() + val cmdGetFoo by command() + val cmdSetFoo by command() + val cmdCheckConnected by command() + + Log.connectConsole(Log.Level.DEBUG) + + data class Session(var foo: String = "not set") + + var closeCounter = 0 + val serverInterface = KiloInterface().apply { + var connectedCalled = false + onConnected { connectedCalled = true } + on(cmdGetFoo) { session.foo } + on(cmdSetFoo) { session.foo = it } + on(cmdCheckConnected) { connectedCalled } + on(cmdClose) { + throw LocalInterface.BreakConnectionException() + } + } + + val port = Random.nextInt(8080, 9090) + val ns = embeddedServer(Netty, port = port, host = "0.0.0.0", module = { + setupWebsocketServer(serverInterface) { Session() } + }).start(wait = false) + + val client = websocketClient("ws://localhost:$port/kp", useTextFrames = true) + val states = mutableListOf() + val collector = launch { + client.connectedStateFlow.collect { + println("got: $closeCounter/$it") + states += it + if (!it) { + closeCounter++ + } + } + } + assertEquals(true, client.call(cmdCheckConnected)) + assertTrue { client.connectedStateFlow.value } + assertEquals("not set", client.call(cmdGetFoo)) + client.call(cmdSetFoo, "foo") + assertEquals("foo", client.call(cmdGetFoo)) + + client.close() + ns.stop() + collector.cancel() + + } + + @Test fun webSocketWaitForConnectTest() = runBlocking { initCrypto() @@ -124,7 +178,7 @@ class ClientTest { // It should repeatedly reconnect, and we will count: KiloClient(clientInterface, SigningSecretKey.new()) { clientConnectCalls++ - KiloConnectionData(websocketTransportDevice("ws://localhost:$port/kp"), Unit) + KiloConnectionData(websocketTransportDevice("ws://localhost:$port/kp", true), Unit) } delay(1200) // and check: @@ -132,4 +186,25 @@ class ClientTest { assertTrue { clientConnectCalls > 1 } } -} \ No newline at end of file + @Test + fun webSocketWaitForConnectTest2() = runBlocking { + initCrypto() +// fun Application. + Log.connectConsole(Log.Level.DEBUG) + + val clientInterface = KiloInterface().apply {} + + val port = Random.nextInt(8080, 9090) + var clientConnectCalls = 0 + // It should repeatedly reconnect, and we will count: + KiloClient(clientInterface, SigningSecretKey.new()) { + clientConnectCalls++ + KiloConnectionData(websocketTransportDevice("ws://localhost:$port/kp", false), Unit) + } + delay(1200) + // and check: +// println("connection attemtps: $clientConnectCalls") + assertTrue { clientConnectCalls > 1 } + } + +}