0.6.9-snapshot: wbesock can use text frames too (good for xiaomi)

This commit is contained in:
Sergey Chernov 2025-04-30 14:16:07 +04:00
parent fe96ac69d7
commit cb696f5c0f
4 changed files with 137 additions and 29 deletions

View File

@ -19,7 +19,7 @@ plugins {
} }
group = "net.sergeych" group = "net.sergeych"
version = "0.6.8" version = "0.6.9-SNAPSHOT"
repositories { repositories {
mavenCentral() mavenCentral()
@ -64,7 +64,7 @@ kotlin {
val commonMain by getting { val commonMain by getting {
dependencies { dependencies {
implementation("org.jetbrains.kotlinx:kotlinx-coroutines-core:1.10.1") implementation("org.jetbrains.kotlinx:kotlinx-coroutines-core:1.10.1")
implementation("org.jetbrains.kotlinx:kotlinx-serialization-json:1.7.2") implementation("org.jetbrains.kotlinx:kotlinx-serialization-json:1.8.1")
api("io.ktor:ktor-client-core:$ktor_version") api("io.ktor:ktor-client-core:$ktor_version")
api("net.sergeych:crypto2:0.8.4") api("net.sergeych:crypto2:0.8.4")
} }
@ -84,7 +84,7 @@ kotlin {
dependencies { dependencies {
implementation(kotlin("test")) implementation(kotlin("test"))
implementation("org.slf4j:slf4j-simple:2.0.9") implementation("org.slf4j:slf4j-simple:2.0.9")
implementation("org.jetbrains.kotlinx:kotlinx-coroutines-test:1.8.1") implementation("org.jetbrains.kotlinx:kotlinx-coroutines-test:1.10.1")
} }
} }
val ktorSocketTest by creating { val ktorSocketTest by creating {

View File

@ -24,6 +24,8 @@ import kotlinx.io.IOException
import net.sergeych.crypto2.SigningKey import net.sergeych.crypto2.SigningKey
import net.sergeych.kiloparsec.* import net.sergeych.kiloparsec.*
import net.sergeych.mp_logger.* import net.sergeych.mp_logger.*
import net.sergeych.mp_tools.decodeBase64Compact
import net.sergeych.mp_tools.encodeToBase64Compact
import net.sergeych.mp_tools.globalLaunch import net.sergeych.mp_tools.globalLaunch
import net.sergeych.tools.AtomicCounter import net.sergeych.tools.AtomicCounter
@ -37,13 +39,14 @@ fun <S> websocketClient(
path: String, path: String,
clientInterface: KiloInterface<S> = KiloInterface(), clientInterface: KiloInterface<S> = KiloInterface(),
secretKey: SigningKey? = null, secretKey: SigningKey? = null,
useTextFrames: Boolean = false,
sessionMaker: () -> S = { sessionMaker: () -> S = {
@Suppress("UNCHECKED_CAST") @Suppress("UNCHECKED_CAST")
Unit as S Unit as S
}, },
): KiloClient<S> { ): KiloClient<S> {
return KiloClient(clientInterface, secretKey) { return KiloClient(clientInterface, secretKey) {
KiloConnectionData(websocketTransportDevice(path), sessionMaker()) KiloConnectionData(websocketTransportDevice(path, useTextFrames), sessionMaker())
} }
} }
@ -54,6 +57,7 @@ fun <S> websocketClient(
*/ */
fun websocketTransportDevice( fun websocketTransportDevice(
path: String, path: String,
useTextFrames: Boolean = false,
client: HttpClient = HttpClient { client: HttpClient = HttpClient {
install(WebSockets) install(WebSockets)
}, },
@ -87,7 +91,12 @@ fun websocketTransportDevice(
launch { launch {
try { try {
for (block in output) { for (block in output) {
send(block.toByteArray()) if (useTextFrames)
send(
Frame.Text(block.asByteArray().encodeToBase64Compact())
)
else
send(block.toByteArray())
} }
log.info { "input is closed, closing the websocket" } log.info { "input is closed, closing the websocket" }
if (closeHandle.isActive) closeHandle.complete(true) if (closeHandle.isActive) closeHandle.complete(true)
@ -103,10 +112,10 @@ fun websocketTransportDevice(
launch { launch {
try { try {
for (f in incoming) { for (f in incoming) {
if (f is Frame.Binary) { when (f) {
input.send(f.readBytes().toUByteArray()) is Frame.Binary -> input.send(f.readBytes().toUByteArray())
} else { is Frame.Text -> input.send(f.readText().decodeBase64Compact().toUByteArray())
log.warning { "ignoring unexpected frame of type ${f.frameType}" } else -> log.warning { "ignoring unexpected frame of type ${f.frameType}" }
} }
} }
if (closeHandle.isActive) closeHandle.complete(true) if (closeHandle.isActive) closeHandle.complete(true)

View File

@ -24,6 +24,8 @@ import net.sergeych.kiloparsec.KiloInterface
import net.sergeych.kiloparsec.KiloServerConnection import net.sergeych.kiloparsec.KiloServerConnection
import net.sergeych.kiloparsec.RemoteInterface import net.sergeych.kiloparsec.RemoteInterface
import net.sergeych.mp_logger.* import net.sergeych.mp_logger.*
import net.sergeych.mp_tools.decodeBase64Compact
import net.sergeych.mp_tools.encodeToBase64Compact
import net.sergeych.tools.AtomicCounter import net.sergeych.tools.AtomicCounter
import kotlin.time.Duration.Companion.seconds import kotlin.time.Duration.Companion.seconds
@ -66,13 +68,18 @@ fun <S> Application.setupWebsocketServer(
log.debug { "opening the connection" } log.debug { "opening the connection" }
val input = Channel<UByteArray>(256) val input = Channel<UByteArray>(256)
val output = Channel<UByteArray>(256) val output = Channel<UByteArray>(256)
var useBinary: Boolean? = null
launch { launch {
log.debug { "starting output pump" } log.debug { "starting output pump" }
while (isActive) { while (isActive) {
try { try {
send(output.receive().toByteArray()) val block = output.receive()
} if (useBinary == false)
catch(_: ClosedReceiveChannelException) { send(block.asByteArray().encodeToBase64Compact())
else
send(block.toByteArray())
} catch (_: ClosedReceiveChannelException) {
log.debug { "closing output pump as output channel is closed" } log.debug { "closing output pump as output channel is closed" }
break break
} }
@ -91,21 +98,38 @@ fun <S> Application.setupWebsocketServer(
} }
log.debug { "KSC started, looking for incoming frames" } log.debug { "KSC started, looking for incoming frames" }
for (f in incoming) { for (f in incoming) {
if (f is Frame.Binary) try {
try { when (f) {
input.send(f.readBytes().toUByteArray()) is Frame.Binary -> {
} catch (_: RemoteInterface.ClosedException) { if (useBinary == null) {
log.warning { "caught local closed exception (strange!), closing" } log.debug { "Setting binary frame mode ------------------------------------" }
break useBinary = true
} catch (_: ClosedReceiveChannelException) { }
log.info { "receive channel is closed, closing connection" } input.send(f.readBytes().toUByteArray())
break }
} catch (t: Throwable) {
log.exception { "unexpected exception, server connection will close" to t } is Frame.Text -> {
break if (useBinary == null) {
log.debug { "Setting text frame mode -----------------------------------" }
useBinary = false
}
input.send(f.readText().decodeBase64Compact().asUByteArray())
}
else -> {
log.warning { "unexpected frame type ${f.frameType}, ignoring" }
}
} }
else } catch (_: RemoteInterface.ClosedException) {
log.warning { "unknown frame type ${f.frameType}, ignoring" } log.warning { "caught local closed exception (strange!), closing" }
break
} catch (_: ClosedReceiveChannelException) {
log.info { "receive channel is closed, closing connection" }
break
} catch (t: Throwable) {
log.exception { "unexpected exception, server connection will close" to t }
break
}
} }
log.debug { "closing the server" } log.debug { "closing the server" }
close() close()

View File

@ -42,7 +42,7 @@ class ClientTest {
@Test @Test
fun webSocketTest() = runTest { fun webSocketTest1() = runTest {
initCrypto() initCrypto()
// fun Application. // fun Application.
val cmdClose by command<Unit, Unit>() val cmdClose by command<Unit, Unit>()
@ -111,6 +111,60 @@ class ClientTest {
// println("closed client") // println("closed client")
} }
@Test
fun webSocketTest2() = runTest {
initCrypto()
// fun Application.
val cmdClose by command<Unit, Unit>()
val cmdGetFoo by command<Unit, String>()
val cmdSetFoo by command<String, Unit>()
val cmdCheckConnected by command<Unit, Boolean>()
Log.connectConsole(Log.Level.DEBUG)
data class Session(var foo: String = "not set")
var closeCounter = 0
val serverInterface = KiloInterface<Session>().apply {
var connectedCalled = false
onConnected { connectedCalled = true }
on(cmdGetFoo) { session.foo }
on(cmdSetFoo) { session.foo = it }
on(cmdCheckConnected) { connectedCalled }
on(cmdClose) {
throw LocalInterface.BreakConnectionException()
}
}
val port = Random.nextInt(8080, 9090)
val ns = embeddedServer(Netty, port = port, host = "0.0.0.0", module = {
setupWebsocketServer(serverInterface) { Session() }
}).start(wait = false)
val client = websocketClient<Unit>("ws://localhost:$port/kp", useTextFrames = true)
val states = mutableListOf<Boolean>()
val collector = launch {
client.connectedStateFlow.collect {
println("got: $closeCounter/$it")
states += it
if (!it) {
closeCounter++
}
}
}
assertEquals(true, client.call(cmdCheckConnected))
assertTrue { client.connectedStateFlow.value }
assertEquals("not set", client.call(cmdGetFoo))
client.call(cmdSetFoo, "foo")
assertEquals("foo", client.call(cmdGetFoo))
client.close()
ns.stop()
collector.cancel()
}
@Test @Test
fun webSocketWaitForConnectTest() = runBlocking { fun webSocketWaitForConnectTest() = runBlocking {
initCrypto() initCrypto()
@ -124,7 +178,7 @@ class ClientTest {
// It should repeatedly reconnect, and we will count: // It should repeatedly reconnect, and we will count:
KiloClient(clientInterface, SigningSecretKey.new()) { KiloClient(clientInterface, SigningSecretKey.new()) {
clientConnectCalls++ clientConnectCalls++
KiloConnectionData(websocketTransportDevice("ws://localhost:$port/kp"), Unit) KiloConnectionData(websocketTransportDevice("ws://localhost:$port/kp", true), Unit)
} }
delay(1200) delay(1200)
// and check: // and check:
@ -132,4 +186,25 @@ class ClientTest {
assertTrue { clientConnectCalls > 1 } assertTrue { clientConnectCalls > 1 }
} }
} @Test
fun webSocketWaitForConnectTest2() = runBlocking {
initCrypto()
// fun Application.
Log.connectConsole(Log.Level.DEBUG)
val clientInterface = KiloInterface<Unit>().apply {}
val port = Random.nextInt(8080, 9090)
var clientConnectCalls = 0
// It should repeatedly reconnect, and we will count:
KiloClient(clientInterface, SigningSecretKey.new()) {
clientConnectCalls++
KiloConnectionData(websocketTransportDevice("ws://localhost:$port/kp", false), Unit)
}
delay(1200)
// and check:
// println("connection attemtps: $clientConnectCalls")
assertTrue { clientConnectCalls > 1 }
}
}