Compare commits
9 Commits
Author | SHA1 | Date | |
---|---|---|---|
40f4352a31 | |||
cb696f5c0f | |||
fe96ac69d7 | |||
212f82dedd | |||
59b7310385 | |||
8837e49248 | |||
7e1f7ec4aa | |||
146878629e | |||
d7fb26c03b |
1
.gitignore
vendored
1
.gitignore
vendored
@ -45,3 +45,4 @@ out/
|
||||
.kotlin
|
||||
/.idea/workspace.xml
|
||||
/.gigaide/gigaide.properties
|
||||
local.properties
|
||||
|
24
.idea/codeStyles/Project.xml
generated
24
.idea/codeStyles/Project.xml
generated
@ -1,5 +1,29 @@
|
||||
<component name="ProjectCodeStyleConfiguration">
|
||||
<code_scheme name="Project" version="173">
|
||||
<DBN-PSQL>
|
||||
<case-options enabled="true">
|
||||
<option name="KEYWORD_CASE" value="lower" />
|
||||
<option name="FUNCTION_CASE" value="lower" />
|
||||
<option name="PARAMETER_CASE" value="lower" />
|
||||
<option name="DATATYPE_CASE" value="lower" />
|
||||
<option name="OBJECT_CASE" value="preserve" />
|
||||
</case-options>
|
||||
<formatting-settings enabled="false" />
|
||||
</DBN-PSQL>
|
||||
<DBN-SQL>
|
||||
<case-options enabled="true">
|
||||
<option name="KEYWORD_CASE" value="lower" />
|
||||
<option name="FUNCTION_CASE" value="lower" />
|
||||
<option name="PARAMETER_CASE" value="lower" />
|
||||
<option name="DATATYPE_CASE" value="lower" />
|
||||
<option name="OBJECT_CASE" value="preserve" />
|
||||
</case-options>
|
||||
<formatting-settings enabled="false">
|
||||
<option name="STATEMENT_SPACING" value="one_line" />
|
||||
<option name="CLAUSE_CHOP_DOWN" value="chop_down_if_statement_long" />
|
||||
<option name="ITERATION_ELEMENTS_WRAPPING" value="chop_down_if_not_single" />
|
||||
</formatting-settings>
|
||||
</DBN-SQL>
|
||||
<ScalaCodeStyleSettings>
|
||||
<option name="MULTILINE_STRING_CLOSING_QUOTES_ON_NEW_LINE" value="true" />
|
||||
</ScalaCodeStyleSettings>
|
||||
|
@ -1,6 +1,6 @@
|
||||
# Kiloparsec
|
||||
|
||||
__Recommended version is `0.4.1`: to keep the code compatible with current and further versions we
|
||||
__Recommended version is `0.6.8`: to keep the code compatible with current and further versions we
|
||||
ask to upgrade to `0.4.2` at least.__ Starting from this version some package names are changed for
|
||||
better clarity and fast UDP endpoints are added.
|
||||
|
||||
@ -19,6 +19,8 @@ provides the following transports:
|
||||
|
||||
### Note on version compatibility
|
||||
|
||||
Since version 0.6.9 websocket protocol supports both text and binary frames; old clients are backward compatible with mew servers, but new clients only can work with older servers only in default binary frame mode. Upgrade also your servers to get better websocket compatibility [^1].
|
||||
|
||||
Version 0.5.1 could be backward incompatible due to upgrade of the crypto2.
|
||||
|
||||
Protocols >= 0.3.0 are not binary compatible with previous version due to more compact binary
|
||||
@ -82,7 +84,7 @@ It could be, depending on your project structure, something like:
|
||||
```kotlin
|
||||
val commonMain by getting {
|
||||
dependencies {
|
||||
api("net.sergeych:kiloparsec:0.4.1")
|
||||
api("net.sergeych:kiloparsec:0.6.8")
|
||||
}
|
||||
}
|
||||
```
|
||||
@ -274,4 +276,5 @@ This is work in progress, not yet moved to public domain;
|
||||
It will be moved to open source; we also guarantee that it will be moved to open source immediately if the software export restrictions will be lifted. We do not support such practices here at 8-rays.dev.
|
||||
|
||||
[MITM]: https://en.wikipedia.org/wiki/Man-in-the-middle_attack
|
||||
[Sergey Chernov]: https://t.me/real_sergeych
|
||||
[Sergey Chernov]: https://t.me/real_sergeych
|
||||
[^1]: On some new Xiaomi phones we found problems with websocket binary frames, probably in ktor; use text frames otherwise.
|
@ -8,15 +8,18 @@
|
||||
* real dot sergeych at gmail.
|
||||
*/
|
||||
|
||||
import org.jetbrains.kotlin.gradle.ExperimentalWasmDsl
|
||||
|
||||
plugins {
|
||||
kotlin("multiplatform") version "2.1.0"
|
||||
id("org.jetbrains.kotlin.plugin.serialization") version "2.1.0"
|
||||
id("com.android.library") version "8.5.2" apply true
|
||||
`maven-publish`
|
||||
id("org.jetbrains.dokka") version "1.9.20"
|
||||
}
|
||||
|
||||
group = "net.sergeych"
|
||||
version = "0.6.3"
|
||||
version = "0.6.9-SNAPSHOT"
|
||||
|
||||
repositories {
|
||||
mavenCentral()
|
||||
@ -34,19 +37,22 @@ kotlin {
|
||||
}
|
||||
nodejs()
|
||||
}
|
||||
// macosArm64()
|
||||
// iosX64()
|
||||
// iosArm64()
|
||||
// iosSimulatorArm64()
|
||||
macosArm64()
|
||||
iosX64()
|
||||
iosArm64()
|
||||
iosSimulatorArm64()
|
||||
linuxX64()
|
||||
linuxArm64()
|
||||
// macosX64()
|
||||
// macosX64()
|
||||
macosX64()
|
||||
macosX64()
|
||||
mingwX64()
|
||||
// @OptIn(ExperimentalWasmDsl::class)
|
||||
// wasmJs()
|
||||
androidTarget()
|
||||
@OptIn(ExperimentalWasmDsl::class)
|
||||
wasmJs {
|
||||
browser()
|
||||
}
|
||||
|
||||
val ktor_version = "3.1.0"
|
||||
val ktor_version = "3.1.1"
|
||||
|
||||
sourceSets {
|
||||
all {
|
||||
@ -58,9 +64,14 @@ kotlin {
|
||||
val commonMain by getting {
|
||||
dependencies {
|
||||
implementation("org.jetbrains.kotlinx:kotlinx-coroutines-core:1.10.1")
|
||||
implementation("org.jetbrains.kotlinx:kotlinx-serialization-json:1.7.2")
|
||||
implementation("org.jetbrains.kotlinx:kotlinx-serialization-json:1.8.1")
|
||||
api("io.ktor:ktor-client-core:$ktor_version")
|
||||
api("net.sergeych:crypto2:0.7.4-SNAPSHOT")
|
||||
api("net.sergeych:crypto2:0.8.4")
|
||||
}
|
||||
}
|
||||
val androidMain by getting {
|
||||
dependencies {
|
||||
implementation("io.ktor:ktor-client-okhttp:$ktor_version")
|
||||
}
|
||||
}
|
||||
val ktorSocketMain by creating {
|
||||
@ -73,7 +84,7 @@ kotlin {
|
||||
dependencies {
|
||||
implementation(kotlin("test"))
|
||||
implementation("org.slf4j:slf4j-simple:2.0.9")
|
||||
implementation("org.jetbrains.kotlinx:kotlinx-coroutines-test:1.8.1")
|
||||
implementation("org.jetbrains.kotlinx:kotlinx-coroutines-test:1.10.1")
|
||||
}
|
||||
}
|
||||
val ktorSocketTest by creating {
|
||||
@ -92,7 +103,6 @@ kotlin {
|
||||
val jvmTest by getting {
|
||||
dependsOn(ktorSocketTest)
|
||||
}
|
||||
|
||||
val jsMain by getting {
|
||||
dependencies {
|
||||
implementation("io.ktor:ktor-client-js:$ktor_version")
|
||||
@ -162,3 +172,15 @@ tasks.dokkaHtml.configure {
|
||||
}
|
||||
}
|
||||
|
||||
android {
|
||||
namespace = "net.sergeych.kiloparsec"
|
||||
compileSdk = 34
|
||||
defaultConfig {
|
||||
minSdk = 24
|
||||
}
|
||||
compileOptions {
|
||||
sourceCompatibility = JavaVersion.VERSION_17
|
||||
targetCompatibility = JavaVersion.VERSION_17
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -9,4 +9,6 @@
|
||||
#
|
||||
|
||||
kotlin.code.style=official
|
||||
kotlin.mpp.applyDefaultHierarchyTemplate=false
|
||||
kotlin.mpp.applyDefaultHierarchyTemplate=false
|
||||
|
||||
kotlin.daemon.jvmargs=-Xmx2048m
|
||||
|
2
gradle/wrapper/gradle-wrapper.properties
vendored
2
gradle/wrapper/gradle-wrapper.properties
vendored
@ -10,6 +10,6 @@
|
||||
|
||||
distributionBase=GRADLE_USER_HOME
|
||||
distributionPath=wrapper/dists
|
||||
distributionUrl=https\://services.gradle.org/distributions/gradle-8.2-bin.zip
|
||||
distributionUrl=https\://services.gradle.org/distributions/gradle-8.7-bin.zip
|
||||
zipStoreBase=GRADLE_USER_HOME
|
||||
zipStorePath=wrapper/dists
|
@ -1,4 +1,4 @@
|
||||
/*
|
||||
/*
|
||||
* Copyright (c) 2025. Sergey S. Chernov - All Rights Reserved
|
||||
*
|
||||
* You may use, distribute and modify this code under the
|
||||
@ -10,6 +10,7 @@
|
||||
|
||||
pluginManagement {
|
||||
repositories {
|
||||
google()
|
||||
mavenCentral()
|
||||
gradlePluginPortal()
|
||||
}
|
||||
|
@ -91,9 +91,11 @@ class KiloClient<S>(
|
||||
debug { "get device and session" }
|
||||
val client = KiloClientConnection(localInterface, kc, secretKey)
|
||||
deferredClient.complete(client)
|
||||
client.run {
|
||||
debug { "starting client run"}
|
||||
val r = runCatching { client.run {
|
||||
_state.value = it
|
||||
}
|
||||
} }
|
||||
debug { "----------- client run finished: $r" }
|
||||
resetDeferredClient()
|
||||
debug { "client run finished" }
|
||||
} catch (_: RemoteInterface.ClosedException) {
|
||||
@ -109,7 +111,7 @@ class KiloClient<S>(
|
||||
_state.value = false
|
||||
resetDeferredClient()
|
||||
// reconnection timeout
|
||||
delay(100)
|
||||
delay(700)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -49,7 +49,6 @@ class KiloClientConnection<S>(
|
||||
try {
|
||||
// in parallel: keys and connection
|
||||
val deferredKeyPair = async { SafeKeyExchange() }
|
||||
debug { "opening device" }
|
||||
debug { "got a transport device $device" }
|
||||
|
||||
|
||||
@ -62,10 +61,11 @@ class KiloClientConnection<S>(
|
||||
debug { "transport started" }
|
||||
|
||||
val pair = deferredKeyPair.await()
|
||||
debug { "keypair ready" }
|
||||
debug { "keypair ready (1)" }
|
||||
|
||||
val serverHe = transport.call(L0Request, Handshake(1u, pair.publicKey))
|
||||
|
||||
debug { "got server HE (2)" }
|
||||
val sk = pair.clientSessionKey(serverHe.publicKey)
|
||||
var params = KiloParams(false, transport, sk, session, null, this@KiloClientConnection)
|
||||
|
||||
@ -97,8 +97,7 @@ class KiloClientConnection<S>(
|
||||
} catch (x: CancellationException) {
|
||||
info { "client is cancelled" }
|
||||
} catch (x: RemoteInterface.ClosedException) {
|
||||
x.printStackTrace()
|
||||
info { "connection closed by remote" }
|
||||
debug { "connection closed/refused by remote" }
|
||||
} finally {
|
||||
onConnectedStateChanged?.invoke(false)
|
||||
job?.cancel()
|
||||
|
@ -134,7 +134,13 @@ class Transport<S>(
|
||||
}
|
||||
|
||||
// now we have mutex freed so we can call:
|
||||
val r = runCatching { device.output.send(pack(b)) }
|
||||
val r = runCatching {
|
||||
do {
|
||||
val cr = device.output.trySend(pack(b))
|
||||
if( cr.isClosed ) throw ClosedSendChannelException("can't send block: channel is closed")
|
||||
delay(100)
|
||||
} while(!cr.isSuccess)
|
||||
}
|
||||
if (!r.isSuccess) {
|
||||
r.exceptionOrNull()?.let {
|
||||
exception { "failed to send output block" to it }
|
||||
@ -271,7 +277,7 @@ class Transport<S>(
|
||||
}
|
||||
debug { "no more active: $isActive / ${calls.size}" }
|
||||
}
|
||||
info { "exiting transport loop" }
|
||||
debug { "exiting transport loop" }
|
||||
}
|
||||
|
||||
private suspend fun send(block: Block) {
|
||||
|
@ -20,12 +20,12 @@ import kotlinx.coroutines.channels.Channel
|
||||
import kotlinx.coroutines.channels.ClosedReceiveChannelException
|
||||
import kotlinx.coroutines.channels.ClosedSendChannelException
|
||||
import kotlinx.coroutines.launch
|
||||
import kotlinx.io.IOException
|
||||
import net.sergeych.crypto2.SigningKey
|
||||
import net.sergeych.kiloparsec.*
|
||||
import net.sergeych.mp_logger.LogTag
|
||||
import net.sergeych.mp_logger.exception
|
||||
import net.sergeych.mp_logger.info
|
||||
import net.sergeych.mp_logger.warning
|
||||
import net.sergeych.mp_logger.*
|
||||
import net.sergeych.mp_tools.decodeBase64Compact
|
||||
import net.sergeych.mp_tools.encodeToBase64Compact
|
||||
import net.sergeych.mp_tools.globalLaunch
|
||||
import net.sergeych.tools.AtomicCounter
|
||||
|
||||
@ -39,13 +39,14 @@ fun <S> websocketClient(
|
||||
path: String,
|
||||
clientInterface: KiloInterface<S> = KiloInterface(),
|
||||
secretKey: SigningKey? = null,
|
||||
useTextFrames: Boolean = false,
|
||||
sessionMaker: () -> S = {
|
||||
@Suppress("UNCHECKED_CAST")
|
||||
Unit as S
|
||||
},
|
||||
): KiloClient<S> {
|
||||
return KiloClient(clientInterface, secretKey) {
|
||||
KiloConnectionData(websocketTransportDevice(path), sessionMaker())
|
||||
KiloConnectionData(websocketTransportDevice(path, useTextFrames), sessionMaker())
|
||||
}
|
||||
}
|
||||
|
||||
@ -56,7 +57,10 @@ fun <S> websocketClient(
|
||||
*/
|
||||
fun websocketTransportDevice(
|
||||
path: String,
|
||||
client: HttpClient = HttpClient { install(WebSockets) },
|
||||
useTextFrames: Boolean = false,
|
||||
client: HttpClient = HttpClient {
|
||||
install(WebSockets)
|
||||
},
|
||||
): Transport.Device {
|
||||
var u = Url(path)
|
||||
if (u.encodedPath.length <= 1)
|
||||
@ -67,70 +71,89 @@ fun websocketTransportDevice(
|
||||
val input = Channel<UByteArray>()
|
||||
val output = Channel<UByteArray>()
|
||||
val closeHandle = CompletableDeferred<Boolean>()
|
||||
val readyHandle = CompletableDeferred<Unit>()
|
||||
|
||||
globalLaunch {
|
||||
val log = LogTag("KC:${counter.incrementAndGet()}")
|
||||
client.webSocket({
|
||||
url.protocol = u.protocol
|
||||
url.host = u.host
|
||||
url.port = u.port
|
||||
url.encodedPath = u.encodedPath
|
||||
url.parameters.appendAll(u.parameters)
|
||||
log.info { "kiloparsec server URL: $url" }
|
||||
}) {
|
||||
log.info { "connected to the server" }
|
||||
try {
|
||||
client.webSocket({
|
||||
url.protocol = u.protocol
|
||||
url.host = u.host
|
||||
url.port = u.port
|
||||
url.encodedPath = u.encodedPath
|
||||
url.parameters.appendAll(u.parameters)
|
||||
log.info { "kiloparsec server URL: $url" }
|
||||
}) {
|
||||
log.info { "connected to the server" }
|
||||
// println("SENDING!!!")
|
||||
// send("Helluva")
|
||||
launch {
|
||||
try {
|
||||
for (block in output) {
|
||||
send(block.toByteArray())
|
||||
}
|
||||
log.info { "input is closed, closing the websocket" }
|
||||
if (closeHandle.isActive) closeHandle.complete(true)
|
||||
} catch (_: ClosedSendChannelException) {
|
||||
log.info { "send channel closed" }
|
||||
} catch (_: CancellationException) {
|
||||
} catch (t: Throwable) {
|
||||
log.info { "unexpected exception in websock sender: ${t.stackTraceToString()}" }
|
||||
closeHandle.completeExceptionally(t)
|
||||
}
|
||||
if (closeHandle.isActive) closeHandle.complete(false)
|
||||
}
|
||||
launch {
|
||||
try {
|
||||
for (f in incoming) {
|
||||
if (f is Frame.Binary) {
|
||||
input.send(f.readBytes().toUByteArray())
|
||||
} else {
|
||||
log.warning { "ignoring unexpected frame of type ${f.frameType}" }
|
||||
readyHandle.complete(Unit)
|
||||
launch {
|
||||
try {
|
||||
for (block in output) {
|
||||
if (useTextFrames)
|
||||
send(
|
||||
Frame.Text(block.asByteArray().encodeToBase64Compact())
|
||||
)
|
||||
else
|
||||
send(block.toByteArray())
|
||||
}
|
||||
log.info { "input is closed, closing the websocket" }
|
||||
if (closeHandle.isActive) closeHandle.complete(true)
|
||||
} catch (_: ClosedSendChannelException) {
|
||||
log.info { "send channel closed" }
|
||||
} catch (_: CancellationException) {
|
||||
} catch (t: Throwable) {
|
||||
log.info { "unexpected exception in websock sender: ${t.stackTraceToString()}" }
|
||||
closeHandle.completeExceptionally(t)
|
||||
}
|
||||
if (closeHandle.isActive) closeHandle.complete(true)
|
||||
} catch (_: CancellationException) {
|
||||
if (closeHandle.isActive) closeHandle.complete(false)
|
||||
} catch (_: ClosedReceiveChannelException) {
|
||||
log.warning { "receive channel closed unexpectedly" }
|
||||
if (closeHandle.isActive) closeHandle.complete(false)
|
||||
} catch (t: Throwable) {
|
||||
log.exception { "unexpected error" to t }
|
||||
if (closeHandle.isActive) closeHandle.complete(false)
|
||||
}
|
||||
launch {
|
||||
try {
|
||||
for (f in incoming) {
|
||||
when (f) {
|
||||
is Frame.Binary -> input.send(f.readBytes().toUByteArray())
|
||||
is Frame.Text -> input.send(f.readText().decodeBase64Compact().toUByteArray())
|
||||
else -> log.warning { "ignoring unexpected frame of type ${f.frameType}" }
|
||||
}
|
||||
}
|
||||
if (closeHandle.isActive) closeHandle.complete(true)
|
||||
} catch (_: CancellationException) {
|
||||
if (closeHandle.isActive) closeHandle.complete(false)
|
||||
} catch (_: ClosedReceiveChannelException) {
|
||||
log.warning { "receive channel closed unexpectedly" }
|
||||
if (closeHandle.isActive) closeHandle.complete(false)
|
||||
} catch (t: Throwable) {
|
||||
log.exception { "unexpected error" to t }
|
||||
if (closeHandle.isActive) closeHandle.complete(false)
|
||||
}
|
||||
}
|
||||
if (!closeHandle.await()) {
|
||||
log.warning { "Client is closing with error" }
|
||||
throw RemoteInterface.ClosedException()
|
||||
}
|
||||
runCatching { output.close() }
|
||||
runCatching { input.close() }
|
||||
runCatching { close() }
|
||||
}
|
||||
if (!closeHandle.await()) {
|
||||
log.warning { "Client is closing with error" }
|
||||
throw RemoteInterface.ClosedException()
|
||||
}
|
||||
output.close()
|
||||
input.close()
|
||||
} catch (x: IOException) {
|
||||
if ("refused" in x.toString()) log.debug { "connection refused" }
|
||||
else log.warning { "unexpected IO error $x" }
|
||||
runCatching { output.close() }
|
||||
runCatching { input.close() }
|
||||
}
|
||||
log.info { "closing connection" }
|
||||
}
|
||||
val device = ProxyDevice(input, output) {
|
||||
// Wait for connection be established or failed
|
||||
val device = ProxyDevice(input, output, doClose = {
|
||||
// we need to explicitly close the coroutine job, or it can hang for a long time
|
||||
// leaking resources.
|
||||
runCatching { output.close() }
|
||||
runCatching { input.close() }
|
||||
closeHandle.complete(true)
|
||||
// job.cancel()
|
||||
}
|
||||
})
|
||||
return device
|
||||
}
|
||||
|
||||
|
@ -24,6 +24,8 @@ import net.sergeych.kiloparsec.KiloInterface
|
||||
import net.sergeych.kiloparsec.KiloServerConnection
|
||||
import net.sergeych.kiloparsec.RemoteInterface
|
||||
import net.sergeych.mp_logger.*
|
||||
import net.sergeych.mp_tools.decodeBase64Compact
|
||||
import net.sergeych.mp_tools.encodeToBase64Compact
|
||||
import net.sergeych.tools.AtomicCounter
|
||||
import kotlin.time.Duration.Companion.seconds
|
||||
|
||||
@ -66,13 +68,18 @@ fun <S> Application.setupWebsocketServer(
|
||||
log.debug { "opening the connection" }
|
||||
val input = Channel<UByteArray>(256)
|
||||
val output = Channel<UByteArray>(256)
|
||||
var useBinary: Boolean? = null
|
||||
launch {
|
||||
log.debug { "starting output pump" }
|
||||
while (isActive) {
|
||||
try {
|
||||
send(output.receive().toByteArray())
|
||||
}
|
||||
catch(_: ClosedReceiveChannelException) {
|
||||
val block = output.receive()
|
||||
if (useBinary == false)
|
||||
send(block.asByteArray().encodeToBase64Compact())
|
||||
else
|
||||
send(block.toByteArray())
|
||||
|
||||
} catch (_: ClosedReceiveChannelException) {
|
||||
log.debug { "closing output pump as output channel is closed" }
|
||||
break
|
||||
}
|
||||
@ -91,21 +98,38 @@ fun <S> Application.setupWebsocketServer(
|
||||
}
|
||||
log.debug { "KSC started, looking for incoming frames" }
|
||||
for (f in incoming) {
|
||||
if (f is Frame.Binary)
|
||||
try {
|
||||
input.send(f.readBytes().toUByteArray())
|
||||
} catch (_: RemoteInterface.ClosedException) {
|
||||
log.warning { "caught local closed exception (strange!), closing" }
|
||||
break
|
||||
} catch (_: ClosedReceiveChannelException) {
|
||||
log.info { "receive channel is closed, closing connection" }
|
||||
break
|
||||
} catch (t: Throwable) {
|
||||
log.exception { "unexpected exception, server connection will close" to t }
|
||||
break
|
||||
try {
|
||||
when (f) {
|
||||
is Frame.Binary -> {
|
||||
if (useBinary == null) {
|
||||
log.debug { "Setting binary frame mode ------------------------------------" }
|
||||
useBinary = true
|
||||
}
|
||||
input.send(f.readBytes().toUByteArray())
|
||||
}
|
||||
|
||||
is Frame.Text -> {
|
||||
if (useBinary == null) {
|
||||
log.debug { "Setting text frame mode -----------------------------------" }
|
||||
useBinary = false
|
||||
}
|
||||
input.send(f.readText().decodeBase64Compact().asUByteArray())
|
||||
}
|
||||
|
||||
else -> {
|
||||
log.warning { "unexpected frame type ${f.frameType}, ignoring" }
|
||||
}
|
||||
}
|
||||
else
|
||||
log.warning { "unknown frame type ${f.frameType}, ignoring" }
|
||||
} catch (_: RemoteInterface.ClosedException) {
|
||||
log.warning { "caught local closed exception (strange!), closing" }
|
||||
break
|
||||
} catch (_: ClosedReceiveChannelException) {
|
||||
log.info { "receive channel is closed, closing connection" }
|
||||
break
|
||||
} catch (t: Throwable) {
|
||||
log.exception { "unexpected exception, server connection will close" to t }
|
||||
break
|
||||
}
|
||||
}
|
||||
log.debug { "closing the server" }
|
||||
close()
|
||||
|
@ -15,10 +15,13 @@ import io.ktor.server.engine.*
|
||||
import io.ktor.server.netty.*
|
||||
import kotlinx.coroutines.delay
|
||||
import kotlinx.coroutines.launch
|
||||
import kotlinx.coroutines.runBlocking
|
||||
import kotlinx.coroutines.test.runTest
|
||||
import net.sergeych.crypto2.SigningSecretKey
|
||||
import net.sergeych.crypto2.initCrypto
|
||||
import net.sergeych.kiloparsec.adapter.setupWebsocketServer
|
||||
import net.sergeych.kiloparsec.adapter.websocketClient
|
||||
import net.sergeych.kiloparsec.adapter.websocketTransportDevice
|
||||
import net.sergeych.mp_logger.Log
|
||||
import java.net.InetAddress
|
||||
import kotlin.random.Random
|
||||
@ -39,17 +42,18 @@ class ClientTest {
|
||||
|
||||
|
||||
@Test
|
||||
fun webSocketTest() = runTest {
|
||||
fun webSocketTest1() = runTest {
|
||||
initCrypto()
|
||||
// fun Application.
|
||||
val cmdClose by command<Unit,Unit>()
|
||||
val cmdGetFoo by command<Unit,String>()
|
||||
val cmdSetFoo by command<String,Unit>()
|
||||
val cmdCheckConnected by command<Unit,Boolean>()
|
||||
val cmdClose by command<Unit, Unit>()
|
||||
val cmdGetFoo by command<Unit, String>()
|
||||
val cmdSetFoo by command<String, Unit>()
|
||||
val cmdCheckConnected by command<Unit, Boolean>()
|
||||
|
||||
Log.connectConsole(Log.Level.DEBUG)
|
||||
|
||||
data class Session(var foo: String="not set")
|
||||
data class Session(var foo: String = "not set")
|
||||
|
||||
var closeCounter = 0
|
||||
val serverInterface = KiloInterface<Session>().apply {
|
||||
var connectedCalled = false
|
||||
@ -62,7 +66,7 @@ class ClientTest {
|
||||
}
|
||||
}
|
||||
|
||||
val port = Random.nextInt(8080,9090)
|
||||
val port = Random.nextInt(8080, 9090)
|
||||
val ns = embeddedServer(Netty, port = port, host = "0.0.0.0", module = {
|
||||
setupWebsocketServer(serverInterface) { Session() }
|
||||
}).start(wait = false)
|
||||
@ -73,7 +77,9 @@ class ClientTest {
|
||||
client.connectedStateFlow.collect {
|
||||
println("got: $closeCounter/$it")
|
||||
states += it
|
||||
if( !it) { closeCounter++ }
|
||||
if (!it) {
|
||||
closeCounter++
|
||||
}
|
||||
}
|
||||
}
|
||||
assertEquals(true, client.call(cmdCheckConnected))
|
||||
@ -94,7 +100,7 @@ class ClientTest {
|
||||
assertFalse { client.connectedStateFlow.value }
|
||||
|
||||
// this should be run on automatically reopen connection
|
||||
client.call(cmdSetFoo,"superbar")
|
||||
client.call(cmdSetFoo, "superbar")
|
||||
assertTrue { client.connectedStateFlow.value }
|
||||
assertEquals("superbar", client.call(cmdGetFoo))
|
||||
client.close()
|
||||
@ -104,4 +110,101 @@ class ClientTest {
|
||||
// println("stopped server")
|
||||
// println("closed client")
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
fun webSocketTest2() = runTest {
|
||||
initCrypto()
|
||||
// fun Application.
|
||||
val cmdClose by command<Unit, Unit>()
|
||||
val cmdGetFoo by command<Unit, String>()
|
||||
val cmdSetFoo by command<String, Unit>()
|
||||
val cmdCheckConnected by command<Unit, Boolean>()
|
||||
|
||||
Log.connectConsole(Log.Level.DEBUG)
|
||||
|
||||
data class Session(var foo: String = "not set")
|
||||
|
||||
var closeCounter = 0
|
||||
val serverInterface = KiloInterface<Session>().apply {
|
||||
var connectedCalled = false
|
||||
onConnected { connectedCalled = true }
|
||||
on(cmdGetFoo) { session.foo }
|
||||
on(cmdSetFoo) { session.foo = it }
|
||||
on(cmdCheckConnected) { connectedCalled }
|
||||
on(cmdClose) {
|
||||
throw LocalInterface.BreakConnectionException()
|
||||
}
|
||||
}
|
||||
|
||||
val port = Random.nextInt(8080, 9090)
|
||||
val ns = embeddedServer(Netty, port = port, host = "0.0.0.0", module = {
|
||||
setupWebsocketServer(serverInterface) { Session() }
|
||||
}).start(wait = false)
|
||||
|
||||
val client = websocketClient<Unit>("ws://localhost:$port/kp", useTextFrames = true)
|
||||
val states = mutableListOf<Boolean>()
|
||||
val collector = launch {
|
||||
client.connectedStateFlow.collect {
|
||||
println("got: $closeCounter/$it")
|
||||
states += it
|
||||
if (!it) {
|
||||
closeCounter++
|
||||
}
|
||||
}
|
||||
}
|
||||
assertEquals(true, client.call(cmdCheckConnected))
|
||||
assertTrue { client.connectedStateFlow.value }
|
||||
assertEquals("not set", client.call(cmdGetFoo))
|
||||
client.call(cmdSetFoo, "foo")
|
||||
assertEquals("foo", client.call(cmdGetFoo))
|
||||
|
||||
client.close()
|
||||
ns.stop()
|
||||
collector.cancel()
|
||||
|
||||
}
|
||||
|
||||
|
||||
@Test
|
||||
fun webSocketWaitForConnectTest() = runBlocking {
|
||||
initCrypto()
|
||||
// fun Application.
|
||||
Log.connectConsole(Log.Level.DEBUG)
|
||||
|
||||
val clientInterface = KiloInterface<Unit>().apply {}
|
||||
|
||||
val port = Random.nextInt(8080, 9090)
|
||||
var clientConnectCalls = 0
|
||||
// It should repeatedly reconnect, and we will count:
|
||||
KiloClient(clientInterface, SigningSecretKey.new()) {
|
||||
clientConnectCalls++
|
||||
KiloConnectionData(websocketTransportDevice("ws://localhost:$port/kp", true), Unit)
|
||||
}
|
||||
delay(1200)
|
||||
// and check:
|
||||
// println("connection attemtps: $clientConnectCalls")
|
||||
assertTrue { clientConnectCalls > 1 }
|
||||
}
|
||||
|
||||
@Test
|
||||
fun webSocketWaitForConnectTest2() = runBlocking {
|
||||
initCrypto()
|
||||
// fun Application.
|
||||
Log.connectConsole(Log.Level.DEBUG)
|
||||
|
||||
val clientInterface = KiloInterface<Unit>().apply {}
|
||||
|
||||
val port = Random.nextInt(8080, 9090)
|
||||
var clientConnectCalls = 0
|
||||
// It should repeatedly reconnect, and we will count:
|
||||
KiloClient(clientInterface, SigningSecretKey.new()) {
|
||||
clientConnectCalls++
|
||||
KiloConnectionData(websocketTransportDevice("ws://localhost:$port/kp", false), Unit)
|
||||
}
|
||||
delay(1200)
|
||||
// and check:
|
||||
// println("connection attemtps: $clientConnectCalls")
|
||||
assertTrue { clientConnectCalls > 1 }
|
||||
}
|
||||
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user