Compare commits

..

No commits in common. "master" and "v0.6.3" have entirely different histories.

13 changed files with 111 additions and 321 deletions

1
.gitignore vendored
View File

@ -45,4 +45,3 @@ out/
.kotlin .kotlin
/.idea/workspace.xml /.idea/workspace.xml
/.gigaide/gigaide.properties /.gigaide/gigaide.properties
local.properties

View File

@ -1,29 +1,5 @@
<component name="ProjectCodeStyleConfiguration"> <component name="ProjectCodeStyleConfiguration">
<code_scheme name="Project" version="173"> <code_scheme name="Project" version="173">
<DBN-PSQL>
<case-options enabled="true">
<option name="KEYWORD_CASE" value="lower" />
<option name="FUNCTION_CASE" value="lower" />
<option name="PARAMETER_CASE" value="lower" />
<option name="DATATYPE_CASE" value="lower" />
<option name="OBJECT_CASE" value="preserve" />
</case-options>
<formatting-settings enabled="false" />
</DBN-PSQL>
<DBN-SQL>
<case-options enabled="true">
<option name="KEYWORD_CASE" value="lower" />
<option name="FUNCTION_CASE" value="lower" />
<option name="PARAMETER_CASE" value="lower" />
<option name="DATATYPE_CASE" value="lower" />
<option name="OBJECT_CASE" value="preserve" />
</case-options>
<formatting-settings enabled="false">
<option name="STATEMENT_SPACING" value="one_line" />
<option name="CLAUSE_CHOP_DOWN" value="chop_down_if_statement_long" />
<option name="ITERATION_ELEMENTS_WRAPPING" value="chop_down_if_not_single" />
</formatting-settings>
</DBN-SQL>
<ScalaCodeStyleSettings> <ScalaCodeStyleSettings>
<option name="MULTILINE_STRING_CLOSING_QUOTES_ON_NEW_LINE" value="true" /> <option name="MULTILINE_STRING_CLOSING_QUOTES_ON_NEW_LINE" value="true" />
</ScalaCodeStyleSettings> </ScalaCodeStyleSettings>

View File

@ -1,6 +1,6 @@
# Kiloparsec # Kiloparsec
__Recommended version is `0.6.8`: to keep the code compatible with current and further versions we __Recommended version is `0.4.1`: to keep the code compatible with current and further versions we
ask to upgrade to `0.4.2` at least.__ Starting from this version some package names are changed for ask to upgrade to `0.4.2` at least.__ Starting from this version some package names are changed for
better clarity and fast UDP endpoints are added. better clarity and fast UDP endpoints are added.
@ -19,8 +19,6 @@ provides the following transports:
### Note on version compatibility ### Note on version compatibility
Since version 0.6.9 websocket protocol supports both text and binary frames; old clients are backward compatible with mew servers, but new clients only can work with older servers only in default binary frame mode. Upgrade also your servers to get better websocket compatibility [^1].
Version 0.5.1 could be backward incompatible due to upgrade of the crypto2. Version 0.5.1 could be backward incompatible due to upgrade of the crypto2.
Protocols >= 0.3.0 are not binary compatible with previous version due to more compact binary Protocols >= 0.3.0 are not binary compatible with previous version due to more compact binary
@ -84,7 +82,7 @@ It could be, depending on your project structure, something like:
```kotlin ```kotlin
val commonMain by getting { val commonMain by getting {
dependencies { dependencies {
api("net.sergeych:kiloparsec:0.6.8") api("net.sergeych:kiloparsec:0.4.1")
} }
} }
``` ```
@ -277,4 +275,3 @@ It will be moved to open source; we also guarantee that it will be moved to open
[MITM]: https://en.wikipedia.org/wiki/Man-in-the-middle_attack [MITM]: https://en.wikipedia.org/wiki/Man-in-the-middle_attack
[Sergey Chernov]: https://t.me/real_sergeych [Sergey Chernov]: https://t.me/real_sergeych
[^1]: On some new Xiaomi phones we found problems with websocket binary frames, probably in ktor; use text frames otherwise.

View File

@ -8,18 +8,15 @@
* real dot sergeych at gmail. * real dot sergeych at gmail.
*/ */
import org.jetbrains.kotlin.gradle.ExperimentalWasmDsl
plugins { plugins {
kotlin("multiplatform") version "2.1.0" kotlin("multiplatform") version "2.1.0"
id("org.jetbrains.kotlin.plugin.serialization") version "2.1.0" id("org.jetbrains.kotlin.plugin.serialization") version "2.1.0"
id("com.android.library") version "8.5.2" apply true
`maven-publish` `maven-publish`
id("org.jetbrains.dokka") version "1.9.20" id("org.jetbrains.dokka") version "1.9.20"
} }
group = "net.sergeych" group = "net.sergeych"
version = "0.6.9-SNAPSHOT" version = "0.6.3"
repositories { repositories {
mavenCentral() mavenCentral()
@ -37,22 +34,19 @@ kotlin {
} }
nodejs() nodejs()
} }
macosArm64() // macosArm64()
iosX64() // iosX64()
iosArm64() // iosArm64()
iosSimulatorArm64() // iosSimulatorArm64()
linuxX64() linuxX64()
linuxArm64() linuxArm64()
macosX64() // macosX64()
macosX64() // macosX64()
mingwX64() mingwX64()
androidTarget() // @OptIn(ExperimentalWasmDsl::class)
@OptIn(ExperimentalWasmDsl::class) // wasmJs()
wasmJs {
browser()
}
val ktor_version = "3.1.1" val ktor_version = "3.1.0"
sourceSets { sourceSets {
all { all {
@ -64,14 +58,9 @@ kotlin {
val commonMain by getting { val commonMain by getting {
dependencies { dependencies {
implementation("org.jetbrains.kotlinx:kotlinx-coroutines-core:1.10.1") implementation("org.jetbrains.kotlinx:kotlinx-coroutines-core:1.10.1")
implementation("org.jetbrains.kotlinx:kotlinx-serialization-json:1.8.1") implementation("org.jetbrains.kotlinx:kotlinx-serialization-json:1.7.2")
api("io.ktor:ktor-client-core:$ktor_version") api("io.ktor:ktor-client-core:$ktor_version")
api("net.sergeych:crypto2:0.8.4") api("net.sergeych:crypto2:0.7.4-SNAPSHOT")
}
}
val androidMain by getting {
dependencies {
implementation("io.ktor:ktor-client-okhttp:$ktor_version")
} }
} }
val ktorSocketMain by creating { val ktorSocketMain by creating {
@ -84,7 +73,7 @@ kotlin {
dependencies { dependencies {
implementation(kotlin("test")) implementation(kotlin("test"))
implementation("org.slf4j:slf4j-simple:2.0.9") implementation("org.slf4j:slf4j-simple:2.0.9")
implementation("org.jetbrains.kotlinx:kotlinx-coroutines-test:1.10.1") implementation("org.jetbrains.kotlinx:kotlinx-coroutines-test:1.8.1")
} }
} }
val ktorSocketTest by creating { val ktorSocketTest by creating {
@ -103,6 +92,7 @@ kotlin {
val jvmTest by getting { val jvmTest by getting {
dependsOn(ktorSocketTest) dependsOn(ktorSocketTest)
} }
val jsMain by getting { val jsMain by getting {
dependencies { dependencies {
implementation("io.ktor:ktor-client-js:$ktor_version") implementation("io.ktor:ktor-client-js:$ktor_version")
@ -172,15 +162,3 @@ tasks.dokkaHtml.configure {
} }
} }
android {
namespace = "net.sergeych.kiloparsec"
compileSdk = 34
defaultConfig {
minSdk = 24
}
compileOptions {
sourceCompatibility = JavaVersion.VERSION_17
targetCompatibility = JavaVersion.VERSION_17
}
}

View File

@ -10,5 +10,3 @@
kotlin.code.style=official kotlin.code.style=official
kotlin.mpp.applyDefaultHierarchyTemplate=false kotlin.mpp.applyDefaultHierarchyTemplate=false
kotlin.daemon.jvmargs=-Xmx2048m

View File

@ -10,6 +10,6 @@
distributionBase=GRADLE_USER_HOME distributionBase=GRADLE_USER_HOME
distributionPath=wrapper/dists distributionPath=wrapper/dists
distributionUrl=https\://services.gradle.org/distributions/gradle-8.7-bin.zip distributionUrl=https\://services.gradle.org/distributions/gradle-8.2-bin.zip
zipStoreBase=GRADLE_USER_HOME zipStoreBase=GRADLE_USER_HOME
zipStorePath=wrapper/dists zipStorePath=wrapper/dists

View File

@ -1,4 +1,4 @@
/* /*
* Copyright (c) 2025. Sergey S. Chernov - All Rights Reserved * Copyright (c) 2025. Sergey S. Chernov - All Rights Reserved
* *
* You may use, distribute and modify this code under the * You may use, distribute and modify this code under the
@ -10,7 +10,6 @@
pluginManagement { pluginManagement {
repositories { repositories {
google()
mavenCentral() mavenCentral()
gradlePluginPortal() gradlePluginPortal()
} }

View File

@ -91,11 +91,9 @@ class KiloClient<S>(
debug { "get device and session" } debug { "get device and session" }
val client = KiloClientConnection(localInterface, kc, secretKey) val client = KiloClientConnection(localInterface, kc, secretKey)
deferredClient.complete(client) deferredClient.complete(client)
debug { "starting client run"} client.run {
val r = runCatching { client.run {
_state.value = it _state.value = it
} } }
debug { "----------- client run finished: $r" }
resetDeferredClient() resetDeferredClient()
debug { "client run finished" } debug { "client run finished" }
} catch (_: RemoteInterface.ClosedException) { } catch (_: RemoteInterface.ClosedException) {
@ -111,7 +109,7 @@ class KiloClient<S>(
_state.value = false _state.value = false
resetDeferredClient() resetDeferredClient()
// reconnection timeout // reconnection timeout
delay(700) delay(100)
} }
} }

View File

@ -49,6 +49,7 @@ class KiloClientConnection<S>(
try { try {
// in parallel: keys and connection // in parallel: keys and connection
val deferredKeyPair = async { SafeKeyExchange() } val deferredKeyPair = async { SafeKeyExchange() }
debug { "opening device" }
debug { "got a transport device $device" } debug { "got a transport device $device" }
@ -61,11 +62,10 @@ class KiloClientConnection<S>(
debug { "transport started" } debug { "transport started" }
val pair = deferredKeyPair.await() val pair = deferredKeyPair.await()
debug { "keypair ready (1)" } debug { "keypair ready" }
val serverHe = transport.call(L0Request, Handshake(1u, pair.publicKey)) val serverHe = transport.call(L0Request, Handshake(1u, pair.publicKey))
debug { "got server HE (2)" }
val sk = pair.clientSessionKey(serverHe.publicKey) val sk = pair.clientSessionKey(serverHe.publicKey)
var params = KiloParams(false, transport, sk, session, null, this@KiloClientConnection) var params = KiloParams(false, transport, sk, session, null, this@KiloClientConnection)
@ -97,7 +97,8 @@ class KiloClientConnection<S>(
} catch (x: CancellationException) { } catch (x: CancellationException) {
info { "client is cancelled" } info { "client is cancelled" }
} catch (x: RemoteInterface.ClosedException) { } catch (x: RemoteInterface.ClosedException) {
debug { "connection closed/refused by remote" } x.printStackTrace()
info { "connection closed by remote" }
} finally { } finally {
onConnectedStateChanged?.invoke(false) onConnectedStateChanged?.invoke(false)
job?.cancel() job?.cancel()

View File

@ -134,13 +134,7 @@ class Transport<S>(
} }
// now we have mutex freed so we can call: // now we have mutex freed so we can call:
val r = runCatching { val r = runCatching { device.output.send(pack(b)) }
do {
val cr = device.output.trySend(pack(b))
if( cr.isClosed ) throw ClosedSendChannelException("can't send block: channel is closed")
delay(100)
} while(!cr.isSuccess)
}
if (!r.isSuccess) { if (!r.isSuccess) {
r.exceptionOrNull()?.let { r.exceptionOrNull()?.let {
exception { "failed to send output block" to it } exception { "failed to send output block" to it }
@ -277,7 +271,7 @@ class Transport<S>(
} }
debug { "no more active: $isActive / ${calls.size}" } debug { "no more active: $isActive / ${calls.size}" }
} }
debug { "exiting transport loop" } info { "exiting transport loop" }
} }
private suspend fun send(block: Block) { private suspend fun send(block: Block) {

View File

@ -20,12 +20,12 @@ import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.channels.ClosedReceiveChannelException import kotlinx.coroutines.channels.ClosedReceiveChannelException
import kotlinx.coroutines.channels.ClosedSendChannelException import kotlinx.coroutines.channels.ClosedSendChannelException
import kotlinx.coroutines.launch import kotlinx.coroutines.launch
import kotlinx.io.IOException
import net.sergeych.crypto2.SigningKey import net.sergeych.crypto2.SigningKey
import net.sergeych.kiloparsec.* import net.sergeych.kiloparsec.*
import net.sergeych.mp_logger.* import net.sergeych.mp_logger.LogTag
import net.sergeych.mp_tools.decodeBase64Compact import net.sergeych.mp_logger.exception
import net.sergeych.mp_tools.encodeToBase64Compact import net.sergeych.mp_logger.info
import net.sergeych.mp_logger.warning
import net.sergeych.mp_tools.globalLaunch import net.sergeych.mp_tools.globalLaunch
import net.sergeych.tools.AtomicCounter import net.sergeych.tools.AtomicCounter
@ -39,14 +39,13 @@ fun <S> websocketClient(
path: String, path: String,
clientInterface: KiloInterface<S> = KiloInterface(), clientInterface: KiloInterface<S> = KiloInterface(),
secretKey: SigningKey? = null, secretKey: SigningKey? = null,
useTextFrames: Boolean = false,
sessionMaker: () -> S = { sessionMaker: () -> S = {
@Suppress("UNCHECKED_CAST") @Suppress("UNCHECKED_CAST")
Unit as S Unit as S
}, },
): KiloClient<S> { ): KiloClient<S> {
return KiloClient(clientInterface, secretKey) { return KiloClient(clientInterface, secretKey) {
KiloConnectionData(websocketTransportDevice(path, useTextFrames), sessionMaker()) KiloConnectionData(websocketTransportDevice(path), sessionMaker())
} }
} }
@ -57,10 +56,7 @@ fun <S> websocketClient(
*/ */
fun websocketTransportDevice( fun websocketTransportDevice(
path: String, path: String,
useTextFrames: Boolean = false, client: HttpClient = HttpClient { install(WebSockets) },
client: HttpClient = HttpClient {
install(WebSockets)
},
): Transport.Device { ): Transport.Device {
var u = Url(path) var u = Url(path)
if (u.encodedPath.length <= 1) if (u.encodedPath.length <= 1)
@ -71,11 +67,8 @@ fun websocketTransportDevice(
val input = Channel<UByteArray>() val input = Channel<UByteArray>()
val output = Channel<UByteArray>() val output = Channel<UByteArray>()
val closeHandle = CompletableDeferred<Boolean>() val closeHandle = CompletableDeferred<Boolean>()
val readyHandle = CompletableDeferred<Unit>()
globalLaunch { globalLaunch {
val log = LogTag("KC:${counter.incrementAndGet()}") val log = LogTag("KC:${counter.incrementAndGet()}")
try {
client.webSocket({ client.webSocket({
url.protocol = u.protocol url.protocol = u.protocol
url.host = u.host url.host = u.host
@ -87,15 +80,9 @@ fun websocketTransportDevice(
log.info { "connected to the server" } log.info { "connected to the server" }
// println("SENDING!!!") // println("SENDING!!!")
// send("Helluva") // send("Helluva")
readyHandle.complete(Unit)
launch { launch {
try { try {
for (block in output) { for (block in output) {
if (useTextFrames)
send(
Frame.Text(block.asByteArray().encodeToBase64Compact())
)
else
send(block.toByteArray()) send(block.toByteArray())
} }
log.info { "input is closed, closing the websocket" } log.info { "input is closed, closing the websocket" }
@ -112,10 +99,10 @@ fun websocketTransportDevice(
launch { launch {
try { try {
for (f in incoming) { for (f in incoming) {
when (f) { if (f is Frame.Binary) {
is Frame.Binary -> input.send(f.readBytes().toUByteArray()) input.send(f.readBytes().toUByteArray())
is Frame.Text -> input.send(f.readText().decodeBase64Compact().toUByteArray()) } else {
else -> log.warning { "ignoring unexpected frame of type ${f.frameType}" } log.warning { "ignoring unexpected frame of type ${f.frameType}" }
} }
} }
if (closeHandle.isActive) closeHandle.complete(true) if (closeHandle.isActive) closeHandle.complete(true)
@ -133,27 +120,17 @@ fun websocketTransportDevice(
log.warning { "Client is closing with error" } log.warning { "Client is closing with error" }
throw RemoteInterface.ClosedException() throw RemoteInterface.ClosedException()
} }
runCatching { output.close() } output.close()
runCatching { input.close() } input.close()
runCatching { close() }
}
} catch (x: IOException) {
if ("refused" in x.toString()) log.debug { "connection refused" }
else log.warning { "unexpected IO error $x" }
runCatching { output.close() }
runCatching { input.close() }
} }
log.info { "closing connection" } log.info { "closing connection" }
} }
// Wait for connection be established or failed val device = ProxyDevice(input, output) {
val device = ProxyDevice(input, output, doClose = {
// we need to explicitly close the coroutine job, or it can hang for a long time // we need to explicitly close the coroutine job, or it can hang for a long time
// leaking resources. // leaking resources.
runCatching { output.close() }
runCatching { input.close() }
closeHandle.complete(true) closeHandle.complete(true)
// job.cancel() // job.cancel()
}) }
return device return device
} }

View File

@ -24,8 +24,6 @@ import net.sergeych.kiloparsec.KiloInterface
import net.sergeych.kiloparsec.KiloServerConnection import net.sergeych.kiloparsec.KiloServerConnection
import net.sergeych.kiloparsec.RemoteInterface import net.sergeych.kiloparsec.RemoteInterface
import net.sergeych.mp_logger.* import net.sergeych.mp_logger.*
import net.sergeych.mp_tools.decodeBase64Compact
import net.sergeych.mp_tools.encodeToBase64Compact
import net.sergeych.tools.AtomicCounter import net.sergeych.tools.AtomicCounter
import kotlin.time.Duration.Companion.seconds import kotlin.time.Duration.Companion.seconds
@ -68,18 +66,13 @@ fun <S> Application.setupWebsocketServer(
log.debug { "opening the connection" } log.debug { "opening the connection" }
val input = Channel<UByteArray>(256) val input = Channel<UByteArray>(256)
val output = Channel<UByteArray>(256) val output = Channel<UByteArray>(256)
var useBinary: Boolean? = null
launch { launch {
log.debug { "starting output pump" } log.debug { "starting output pump" }
while (isActive) { while (isActive) {
try { try {
val block = output.receive() send(output.receive().toByteArray())
if (useBinary == false) }
send(block.asByteArray().encodeToBase64Compact()) catch(_: ClosedReceiveChannelException) {
else
send(block.toByteArray())
} catch (_: ClosedReceiveChannelException) {
log.debug { "closing output pump as output channel is closed" } log.debug { "closing output pump as output channel is closed" }
break break
} }
@ -98,28 +91,9 @@ fun <S> Application.setupWebsocketServer(
} }
log.debug { "KSC started, looking for incoming frames" } log.debug { "KSC started, looking for incoming frames" }
for (f in incoming) { for (f in incoming) {
if (f is Frame.Binary)
try { try {
when (f) {
is Frame.Binary -> {
if (useBinary == null) {
log.debug { "Setting binary frame mode ------------------------------------" }
useBinary = true
}
input.send(f.readBytes().toUByteArray()) input.send(f.readBytes().toUByteArray())
}
is Frame.Text -> {
if (useBinary == null) {
log.debug { "Setting text frame mode -----------------------------------" }
useBinary = false
}
input.send(f.readText().decodeBase64Compact().asUByteArray())
}
else -> {
log.warning { "unexpected frame type ${f.frameType}, ignoring" }
}
}
} catch (_: RemoteInterface.ClosedException) { } catch (_: RemoteInterface.ClosedException) {
log.warning { "caught local closed exception (strange!), closing" } log.warning { "caught local closed exception (strange!), closing" }
break break
@ -130,6 +104,8 @@ fun <S> Application.setupWebsocketServer(
log.exception { "unexpected exception, server connection will close" to t } log.exception { "unexpected exception, server connection will close" to t }
break break
} }
else
log.warning { "unknown frame type ${f.frameType}, ignoring" }
} }
log.debug { "closing the server" } log.debug { "closing the server" }
close() close()

View File

@ -15,13 +15,10 @@ import io.ktor.server.engine.*
import io.ktor.server.netty.* import io.ktor.server.netty.*
import kotlinx.coroutines.delay import kotlinx.coroutines.delay
import kotlinx.coroutines.launch import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
import kotlinx.coroutines.test.runTest import kotlinx.coroutines.test.runTest
import net.sergeych.crypto2.SigningSecretKey
import net.sergeych.crypto2.initCrypto import net.sergeych.crypto2.initCrypto
import net.sergeych.kiloparsec.adapter.setupWebsocketServer import net.sergeych.kiloparsec.adapter.setupWebsocketServer
import net.sergeych.kiloparsec.adapter.websocketClient import net.sergeych.kiloparsec.adapter.websocketClient
import net.sergeych.kiloparsec.adapter.websocketTransportDevice
import net.sergeych.mp_logger.Log import net.sergeych.mp_logger.Log
import java.net.InetAddress import java.net.InetAddress
import kotlin.random.Random import kotlin.random.Random
@ -42,18 +39,17 @@ class ClientTest {
@Test @Test
fun webSocketTest1() = runTest { fun webSocketTest() = runTest {
initCrypto() initCrypto()
// fun Application. // fun Application.
val cmdClose by command<Unit, Unit>() val cmdClose by command<Unit,Unit>()
val cmdGetFoo by command<Unit, String>() val cmdGetFoo by command<Unit,String>()
val cmdSetFoo by command<String, Unit>() val cmdSetFoo by command<String,Unit>()
val cmdCheckConnected by command<Unit, Boolean>() val cmdCheckConnected by command<Unit,Boolean>()
Log.connectConsole(Log.Level.DEBUG) Log.connectConsole(Log.Level.DEBUG)
data class Session(var foo: String = "not set") data class Session(var foo: String="not set")
var closeCounter = 0 var closeCounter = 0
val serverInterface = KiloInterface<Session>().apply { val serverInterface = KiloInterface<Session>().apply {
var connectedCalled = false var connectedCalled = false
@ -66,7 +62,7 @@ class ClientTest {
} }
} }
val port = Random.nextInt(8080, 9090) val port = Random.nextInt(8080,9090)
val ns = embeddedServer(Netty, port = port, host = "0.0.0.0", module = { val ns = embeddedServer(Netty, port = port, host = "0.0.0.0", module = {
setupWebsocketServer(serverInterface) { Session() } setupWebsocketServer(serverInterface) { Session() }
}).start(wait = false) }).start(wait = false)
@ -77,9 +73,7 @@ class ClientTest {
client.connectedStateFlow.collect { client.connectedStateFlow.collect {
println("got: $closeCounter/$it") println("got: $closeCounter/$it")
states += it states += it
if (!it) { if( !it) { closeCounter++ }
closeCounter++
}
} }
} }
assertEquals(true, client.call(cmdCheckConnected)) assertEquals(true, client.call(cmdCheckConnected))
@ -100,7 +94,7 @@ class ClientTest {
assertFalse { client.connectedStateFlow.value } assertFalse { client.connectedStateFlow.value }
// this should be run on automatically reopen connection // this should be run on automatically reopen connection
client.call(cmdSetFoo, "superbar") client.call(cmdSetFoo,"superbar")
assertTrue { client.connectedStateFlow.value } assertTrue { client.connectedStateFlow.value }
assertEquals("superbar", client.call(cmdGetFoo)) assertEquals("superbar", client.call(cmdGetFoo))
client.close() client.close()
@ -110,101 +104,4 @@ class ClientTest {
// println("stopped server") // println("stopped server")
// println("closed client") // println("closed client")
} }
@Test
fun webSocketTest2() = runTest {
initCrypto()
// fun Application.
val cmdClose by command<Unit, Unit>()
val cmdGetFoo by command<Unit, String>()
val cmdSetFoo by command<String, Unit>()
val cmdCheckConnected by command<Unit, Boolean>()
Log.connectConsole(Log.Level.DEBUG)
data class Session(var foo: String = "not set")
var closeCounter = 0
val serverInterface = KiloInterface<Session>().apply {
var connectedCalled = false
onConnected { connectedCalled = true }
on(cmdGetFoo) { session.foo }
on(cmdSetFoo) { session.foo = it }
on(cmdCheckConnected) { connectedCalled }
on(cmdClose) {
throw LocalInterface.BreakConnectionException()
}
}
val port = Random.nextInt(8080, 9090)
val ns = embeddedServer(Netty, port = port, host = "0.0.0.0", module = {
setupWebsocketServer(serverInterface) { Session() }
}).start(wait = false)
val client = websocketClient<Unit>("ws://localhost:$port/kp", useTextFrames = true)
val states = mutableListOf<Boolean>()
val collector = launch {
client.connectedStateFlow.collect {
println("got: $closeCounter/$it")
states += it
if (!it) {
closeCounter++
}
}
}
assertEquals(true, client.call(cmdCheckConnected))
assertTrue { client.connectedStateFlow.value }
assertEquals("not set", client.call(cmdGetFoo))
client.call(cmdSetFoo, "foo")
assertEquals("foo", client.call(cmdGetFoo))
client.close()
ns.stop()
collector.cancel()
}
@Test
fun webSocketWaitForConnectTest() = runBlocking {
initCrypto()
// fun Application.
Log.connectConsole(Log.Level.DEBUG)
val clientInterface = KiloInterface<Unit>().apply {}
val port = Random.nextInt(8080, 9090)
var clientConnectCalls = 0
// It should repeatedly reconnect, and we will count:
KiloClient(clientInterface, SigningSecretKey.new()) {
clientConnectCalls++
KiloConnectionData(websocketTransportDevice("ws://localhost:$port/kp", true), Unit)
}
delay(1200)
// and check:
// println("connection attemtps: $clientConnectCalls")
assertTrue { clientConnectCalls > 1 }
}
@Test
fun webSocketWaitForConnectTest2() = runBlocking {
initCrypto()
// fun Application.
Log.connectConsole(Log.Level.DEBUG)
val clientInterface = KiloInterface<Unit>().apply {}
val port = Random.nextInt(8080, 9090)
var clientConnectCalls = 0
// It should repeatedly reconnect, and we will count:
KiloClient(clientInterface, SigningSecretKey.new()) {
clientConnectCalls++
KiloConnectionData(websocketTransportDevice("ws://localhost:$port/kp", false), Unit)
}
delay(1200)
// and check:
// println("connection attemtps: $clientConnectCalls")
assertTrue { clientConnectCalls > 1 }
}
} }