Compare commits

...

9 Commits

13 changed files with 320 additions and 110 deletions

1
.gitignore vendored
View File

@ -45,3 +45,4 @@ out/
.kotlin
/.idea/workspace.xml
/.gigaide/gigaide.properties
local.properties

View File

@ -1,5 +1,29 @@
<component name="ProjectCodeStyleConfiguration">
<code_scheme name="Project" version="173">
<DBN-PSQL>
<case-options enabled="true">
<option name="KEYWORD_CASE" value="lower" />
<option name="FUNCTION_CASE" value="lower" />
<option name="PARAMETER_CASE" value="lower" />
<option name="DATATYPE_CASE" value="lower" />
<option name="OBJECT_CASE" value="preserve" />
</case-options>
<formatting-settings enabled="false" />
</DBN-PSQL>
<DBN-SQL>
<case-options enabled="true">
<option name="KEYWORD_CASE" value="lower" />
<option name="FUNCTION_CASE" value="lower" />
<option name="PARAMETER_CASE" value="lower" />
<option name="DATATYPE_CASE" value="lower" />
<option name="OBJECT_CASE" value="preserve" />
</case-options>
<formatting-settings enabled="false">
<option name="STATEMENT_SPACING" value="one_line" />
<option name="CLAUSE_CHOP_DOWN" value="chop_down_if_statement_long" />
<option name="ITERATION_ELEMENTS_WRAPPING" value="chop_down_if_not_single" />
</formatting-settings>
</DBN-SQL>
<ScalaCodeStyleSettings>
<option name="MULTILINE_STRING_CLOSING_QUOTES_ON_NEW_LINE" value="true" />
</ScalaCodeStyleSettings>

View File

@ -1,6 +1,6 @@
# Kiloparsec
__Recommended version is `0.4.1`: to keep the code compatible with current and further versions we
__Recommended version is `0.6.8`: to keep the code compatible with current and further versions we
ask to upgrade to `0.4.2` at least.__ Starting from this version some package names are changed for
better clarity and fast UDP endpoints are added.
@ -19,6 +19,8 @@ provides the following transports:
### Note on version compatibility
Since version 0.6.9 websocket protocol supports both text and binary frames; old clients are backward compatible with mew servers, but new clients only can work with older servers only in default binary frame mode. Upgrade also your servers to get better websocket compatibility [^1].
Version 0.5.1 could be backward incompatible due to upgrade of the crypto2.
Protocols >= 0.3.0 are not binary compatible with previous version due to more compact binary
@ -82,7 +84,7 @@ It could be, depending on your project structure, something like:
```kotlin
val commonMain by getting {
dependencies {
api("net.sergeych:kiloparsec:0.4.1")
api("net.sergeych:kiloparsec:0.6.8")
}
}
```
@ -274,4 +276,5 @@ This is work in progress, not yet moved to public domain;
It will be moved to open source; we also guarantee that it will be moved to open source immediately if the software export restrictions will be lifted. We do not support such practices here at 8-rays.dev.
[MITM]: https://en.wikipedia.org/wiki/Man-in-the-middle_attack
[Sergey Chernov]: https://t.me/real_sergeych
[Sergey Chernov]: https://t.me/real_sergeych
[^1]: On some new Xiaomi phones we found problems with websocket binary frames, probably in ktor; use text frames otherwise.

View File

@ -8,15 +8,18 @@
* real dot sergeych at gmail.
*/
import org.jetbrains.kotlin.gradle.ExperimentalWasmDsl
plugins {
kotlin("multiplatform") version "2.1.0"
id("org.jetbrains.kotlin.plugin.serialization") version "2.1.0"
id("com.android.library") version "8.5.2" apply true
`maven-publish`
id("org.jetbrains.dokka") version "1.9.20"
}
group = "net.sergeych"
version = "0.6.3"
version = "0.6.9-SNAPSHOT"
repositories {
mavenCentral()
@ -34,19 +37,22 @@ kotlin {
}
nodejs()
}
// macosArm64()
// iosX64()
// iosArm64()
// iosSimulatorArm64()
macosArm64()
iosX64()
iosArm64()
iosSimulatorArm64()
linuxX64()
linuxArm64()
// macosX64()
// macosX64()
macosX64()
macosX64()
mingwX64()
// @OptIn(ExperimentalWasmDsl::class)
// wasmJs()
androidTarget()
@OptIn(ExperimentalWasmDsl::class)
wasmJs {
browser()
}
val ktor_version = "3.1.0"
val ktor_version = "3.1.1"
sourceSets {
all {
@ -58,9 +64,14 @@ kotlin {
val commonMain by getting {
dependencies {
implementation("org.jetbrains.kotlinx:kotlinx-coroutines-core:1.10.1")
implementation("org.jetbrains.kotlinx:kotlinx-serialization-json:1.7.2")
implementation("org.jetbrains.kotlinx:kotlinx-serialization-json:1.8.1")
api("io.ktor:ktor-client-core:$ktor_version")
api("net.sergeych:crypto2:0.7.4-SNAPSHOT")
api("net.sergeych:crypto2:0.8.4")
}
}
val androidMain by getting {
dependencies {
implementation("io.ktor:ktor-client-okhttp:$ktor_version")
}
}
val ktorSocketMain by creating {
@ -73,7 +84,7 @@ kotlin {
dependencies {
implementation(kotlin("test"))
implementation("org.slf4j:slf4j-simple:2.0.9")
implementation("org.jetbrains.kotlinx:kotlinx-coroutines-test:1.8.1")
implementation("org.jetbrains.kotlinx:kotlinx-coroutines-test:1.10.1")
}
}
val ktorSocketTest by creating {
@ -92,7 +103,6 @@ kotlin {
val jvmTest by getting {
dependsOn(ktorSocketTest)
}
val jsMain by getting {
dependencies {
implementation("io.ktor:ktor-client-js:$ktor_version")
@ -162,3 +172,15 @@ tasks.dokkaHtml.configure {
}
}
android {
namespace = "net.sergeych.kiloparsec"
compileSdk = 34
defaultConfig {
minSdk = 24
}
compileOptions {
sourceCompatibility = JavaVersion.VERSION_17
targetCompatibility = JavaVersion.VERSION_17
}
}

View File

@ -9,4 +9,6 @@
#
kotlin.code.style=official
kotlin.mpp.applyDefaultHierarchyTemplate=false
kotlin.mpp.applyDefaultHierarchyTemplate=false
kotlin.daemon.jvmargs=-Xmx2048m

View File

@ -10,6 +10,6 @@
distributionBase=GRADLE_USER_HOME
distributionPath=wrapper/dists
distributionUrl=https\://services.gradle.org/distributions/gradle-8.2-bin.zip
distributionUrl=https\://services.gradle.org/distributions/gradle-8.7-bin.zip
zipStoreBase=GRADLE_USER_HOME
zipStorePath=wrapper/dists

View File

@ -1,4 +1,4 @@
/*
/*
* Copyright (c) 2025. Sergey S. Chernov - All Rights Reserved
*
* You may use, distribute and modify this code under the
@ -10,6 +10,7 @@
pluginManagement {
repositories {
google()
mavenCentral()
gradlePluginPortal()
}

View File

@ -91,9 +91,11 @@ class KiloClient<S>(
debug { "get device and session" }
val client = KiloClientConnection(localInterface, kc, secretKey)
deferredClient.complete(client)
client.run {
debug { "starting client run"}
val r = runCatching { client.run {
_state.value = it
}
} }
debug { "----------- client run finished: $r" }
resetDeferredClient()
debug { "client run finished" }
} catch (_: RemoteInterface.ClosedException) {
@ -109,7 +111,7 @@ class KiloClient<S>(
_state.value = false
resetDeferredClient()
// reconnection timeout
delay(100)
delay(700)
}
}

View File

@ -49,7 +49,6 @@ class KiloClientConnection<S>(
try {
// in parallel: keys and connection
val deferredKeyPair = async { SafeKeyExchange() }
debug { "opening device" }
debug { "got a transport device $device" }
@ -62,10 +61,11 @@ class KiloClientConnection<S>(
debug { "transport started" }
val pair = deferredKeyPair.await()
debug { "keypair ready" }
debug { "keypair ready (1)" }
val serverHe = transport.call(L0Request, Handshake(1u, pair.publicKey))
debug { "got server HE (2)" }
val sk = pair.clientSessionKey(serverHe.publicKey)
var params = KiloParams(false, transport, sk, session, null, this@KiloClientConnection)
@ -97,8 +97,7 @@ class KiloClientConnection<S>(
} catch (x: CancellationException) {
info { "client is cancelled" }
} catch (x: RemoteInterface.ClosedException) {
x.printStackTrace()
info { "connection closed by remote" }
debug { "connection closed/refused by remote" }
} finally {
onConnectedStateChanged?.invoke(false)
job?.cancel()

View File

@ -134,7 +134,13 @@ class Transport<S>(
}
// now we have mutex freed so we can call:
val r = runCatching { device.output.send(pack(b)) }
val r = runCatching {
do {
val cr = device.output.trySend(pack(b))
if( cr.isClosed ) throw ClosedSendChannelException("can't send block: channel is closed")
delay(100)
} while(!cr.isSuccess)
}
if (!r.isSuccess) {
r.exceptionOrNull()?.let {
exception { "failed to send output block" to it }
@ -271,7 +277,7 @@ class Transport<S>(
}
debug { "no more active: $isActive / ${calls.size}" }
}
info { "exiting transport loop" }
debug { "exiting transport loop" }
}
private suspend fun send(block: Block) {

View File

@ -20,12 +20,12 @@ import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.channels.ClosedReceiveChannelException
import kotlinx.coroutines.channels.ClosedSendChannelException
import kotlinx.coroutines.launch
import kotlinx.io.IOException
import net.sergeych.crypto2.SigningKey
import net.sergeych.kiloparsec.*
import net.sergeych.mp_logger.LogTag
import net.sergeych.mp_logger.exception
import net.sergeych.mp_logger.info
import net.sergeych.mp_logger.warning
import net.sergeych.mp_logger.*
import net.sergeych.mp_tools.decodeBase64Compact
import net.sergeych.mp_tools.encodeToBase64Compact
import net.sergeych.mp_tools.globalLaunch
import net.sergeych.tools.AtomicCounter
@ -39,13 +39,14 @@ fun <S> websocketClient(
path: String,
clientInterface: KiloInterface<S> = KiloInterface(),
secretKey: SigningKey? = null,
useTextFrames: Boolean = false,
sessionMaker: () -> S = {
@Suppress("UNCHECKED_CAST")
Unit as S
},
): KiloClient<S> {
return KiloClient(clientInterface, secretKey) {
KiloConnectionData(websocketTransportDevice(path), sessionMaker())
KiloConnectionData(websocketTransportDevice(path, useTextFrames), sessionMaker())
}
}
@ -56,7 +57,10 @@ fun <S> websocketClient(
*/
fun websocketTransportDevice(
path: String,
client: HttpClient = HttpClient { install(WebSockets) },
useTextFrames: Boolean = false,
client: HttpClient = HttpClient {
install(WebSockets)
},
): Transport.Device {
var u = Url(path)
if (u.encodedPath.length <= 1)
@ -67,70 +71,89 @@ fun websocketTransportDevice(
val input = Channel<UByteArray>()
val output = Channel<UByteArray>()
val closeHandle = CompletableDeferred<Boolean>()
val readyHandle = CompletableDeferred<Unit>()
globalLaunch {
val log = LogTag("KC:${counter.incrementAndGet()}")
client.webSocket({
url.protocol = u.protocol
url.host = u.host
url.port = u.port
url.encodedPath = u.encodedPath
url.parameters.appendAll(u.parameters)
log.info { "kiloparsec server URL: $url" }
}) {
log.info { "connected to the server" }
try {
client.webSocket({
url.protocol = u.protocol
url.host = u.host
url.port = u.port
url.encodedPath = u.encodedPath
url.parameters.appendAll(u.parameters)
log.info { "kiloparsec server URL: $url" }
}) {
log.info { "connected to the server" }
// println("SENDING!!!")
// send("Helluva")
launch {
try {
for (block in output) {
send(block.toByteArray())
}
log.info { "input is closed, closing the websocket" }
if (closeHandle.isActive) closeHandle.complete(true)
} catch (_: ClosedSendChannelException) {
log.info { "send channel closed" }
} catch (_: CancellationException) {
} catch (t: Throwable) {
log.info { "unexpected exception in websock sender: ${t.stackTraceToString()}" }
closeHandle.completeExceptionally(t)
}
if (closeHandle.isActive) closeHandle.complete(false)
}
launch {
try {
for (f in incoming) {
if (f is Frame.Binary) {
input.send(f.readBytes().toUByteArray())
} else {
log.warning { "ignoring unexpected frame of type ${f.frameType}" }
readyHandle.complete(Unit)
launch {
try {
for (block in output) {
if (useTextFrames)
send(
Frame.Text(block.asByteArray().encodeToBase64Compact())
)
else
send(block.toByteArray())
}
log.info { "input is closed, closing the websocket" }
if (closeHandle.isActive) closeHandle.complete(true)
} catch (_: ClosedSendChannelException) {
log.info { "send channel closed" }
} catch (_: CancellationException) {
} catch (t: Throwable) {
log.info { "unexpected exception in websock sender: ${t.stackTraceToString()}" }
closeHandle.completeExceptionally(t)
}
if (closeHandle.isActive) closeHandle.complete(true)
} catch (_: CancellationException) {
if (closeHandle.isActive) closeHandle.complete(false)
} catch (_: ClosedReceiveChannelException) {
log.warning { "receive channel closed unexpectedly" }
if (closeHandle.isActive) closeHandle.complete(false)
} catch (t: Throwable) {
log.exception { "unexpected error" to t }
if (closeHandle.isActive) closeHandle.complete(false)
}
launch {
try {
for (f in incoming) {
when (f) {
is Frame.Binary -> input.send(f.readBytes().toUByteArray())
is Frame.Text -> input.send(f.readText().decodeBase64Compact().toUByteArray())
else -> log.warning { "ignoring unexpected frame of type ${f.frameType}" }
}
}
if (closeHandle.isActive) closeHandle.complete(true)
} catch (_: CancellationException) {
if (closeHandle.isActive) closeHandle.complete(false)
} catch (_: ClosedReceiveChannelException) {
log.warning { "receive channel closed unexpectedly" }
if (closeHandle.isActive) closeHandle.complete(false)
} catch (t: Throwable) {
log.exception { "unexpected error" to t }
if (closeHandle.isActive) closeHandle.complete(false)
}
}
if (!closeHandle.await()) {
log.warning { "Client is closing with error" }
throw RemoteInterface.ClosedException()
}
runCatching { output.close() }
runCatching { input.close() }
runCatching { close() }
}
if (!closeHandle.await()) {
log.warning { "Client is closing with error" }
throw RemoteInterface.ClosedException()
}
output.close()
input.close()
} catch (x: IOException) {
if ("refused" in x.toString()) log.debug { "connection refused" }
else log.warning { "unexpected IO error $x" }
runCatching { output.close() }
runCatching { input.close() }
}
log.info { "closing connection" }
}
val device = ProxyDevice(input, output) {
// Wait for connection be established or failed
val device = ProxyDevice(input, output, doClose = {
// we need to explicitly close the coroutine job, or it can hang for a long time
// leaking resources.
runCatching { output.close() }
runCatching { input.close() }
closeHandle.complete(true)
// job.cancel()
}
})
return device
}

View File

@ -24,6 +24,8 @@ import net.sergeych.kiloparsec.KiloInterface
import net.sergeych.kiloparsec.KiloServerConnection
import net.sergeych.kiloparsec.RemoteInterface
import net.sergeych.mp_logger.*
import net.sergeych.mp_tools.decodeBase64Compact
import net.sergeych.mp_tools.encodeToBase64Compact
import net.sergeych.tools.AtomicCounter
import kotlin.time.Duration.Companion.seconds
@ -66,13 +68,18 @@ fun <S> Application.setupWebsocketServer(
log.debug { "opening the connection" }
val input = Channel<UByteArray>(256)
val output = Channel<UByteArray>(256)
var useBinary: Boolean? = null
launch {
log.debug { "starting output pump" }
while (isActive) {
try {
send(output.receive().toByteArray())
}
catch(_: ClosedReceiveChannelException) {
val block = output.receive()
if (useBinary == false)
send(block.asByteArray().encodeToBase64Compact())
else
send(block.toByteArray())
} catch (_: ClosedReceiveChannelException) {
log.debug { "closing output pump as output channel is closed" }
break
}
@ -91,21 +98,38 @@ fun <S> Application.setupWebsocketServer(
}
log.debug { "KSC started, looking for incoming frames" }
for (f in incoming) {
if (f is Frame.Binary)
try {
input.send(f.readBytes().toUByteArray())
} catch (_: RemoteInterface.ClosedException) {
log.warning { "caught local closed exception (strange!), closing" }
break
} catch (_: ClosedReceiveChannelException) {
log.info { "receive channel is closed, closing connection" }
break
} catch (t: Throwable) {
log.exception { "unexpected exception, server connection will close" to t }
break
try {
when (f) {
is Frame.Binary -> {
if (useBinary == null) {
log.debug { "Setting binary frame mode ------------------------------------" }
useBinary = true
}
input.send(f.readBytes().toUByteArray())
}
is Frame.Text -> {
if (useBinary == null) {
log.debug { "Setting text frame mode -----------------------------------" }
useBinary = false
}
input.send(f.readText().decodeBase64Compact().asUByteArray())
}
else -> {
log.warning { "unexpected frame type ${f.frameType}, ignoring" }
}
}
else
log.warning { "unknown frame type ${f.frameType}, ignoring" }
} catch (_: RemoteInterface.ClosedException) {
log.warning { "caught local closed exception (strange!), closing" }
break
} catch (_: ClosedReceiveChannelException) {
log.info { "receive channel is closed, closing connection" }
break
} catch (t: Throwable) {
log.exception { "unexpected exception, server connection will close" to t }
break
}
}
log.debug { "closing the server" }
close()

View File

@ -15,10 +15,13 @@ import io.ktor.server.engine.*
import io.ktor.server.netty.*
import kotlinx.coroutines.delay
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
import kotlinx.coroutines.test.runTest
import net.sergeych.crypto2.SigningSecretKey
import net.sergeych.crypto2.initCrypto
import net.sergeych.kiloparsec.adapter.setupWebsocketServer
import net.sergeych.kiloparsec.adapter.websocketClient
import net.sergeych.kiloparsec.adapter.websocketTransportDevice
import net.sergeych.mp_logger.Log
import java.net.InetAddress
import kotlin.random.Random
@ -39,17 +42,18 @@ class ClientTest {
@Test
fun webSocketTest() = runTest {
fun webSocketTest1() = runTest {
initCrypto()
// fun Application.
val cmdClose by command<Unit,Unit>()
val cmdGetFoo by command<Unit,String>()
val cmdSetFoo by command<String,Unit>()
val cmdCheckConnected by command<Unit,Boolean>()
val cmdClose by command<Unit, Unit>()
val cmdGetFoo by command<Unit, String>()
val cmdSetFoo by command<String, Unit>()
val cmdCheckConnected by command<Unit, Boolean>()
Log.connectConsole(Log.Level.DEBUG)
data class Session(var foo: String="not set")
data class Session(var foo: String = "not set")
var closeCounter = 0
val serverInterface = KiloInterface<Session>().apply {
var connectedCalled = false
@ -62,7 +66,7 @@ class ClientTest {
}
}
val port = Random.nextInt(8080,9090)
val port = Random.nextInt(8080, 9090)
val ns = embeddedServer(Netty, port = port, host = "0.0.0.0", module = {
setupWebsocketServer(serverInterface) { Session() }
}).start(wait = false)
@ -73,7 +77,9 @@ class ClientTest {
client.connectedStateFlow.collect {
println("got: $closeCounter/$it")
states += it
if( !it) { closeCounter++ }
if (!it) {
closeCounter++
}
}
}
assertEquals(true, client.call(cmdCheckConnected))
@ -94,7 +100,7 @@ class ClientTest {
assertFalse { client.connectedStateFlow.value }
// this should be run on automatically reopen connection
client.call(cmdSetFoo,"superbar")
client.call(cmdSetFoo, "superbar")
assertTrue { client.connectedStateFlow.value }
assertEquals("superbar", client.call(cmdGetFoo))
client.close()
@ -104,4 +110,101 @@ class ClientTest {
// println("stopped server")
// println("closed client")
}
}
@Test
fun webSocketTest2() = runTest {
initCrypto()
// fun Application.
val cmdClose by command<Unit, Unit>()
val cmdGetFoo by command<Unit, String>()
val cmdSetFoo by command<String, Unit>()
val cmdCheckConnected by command<Unit, Boolean>()
Log.connectConsole(Log.Level.DEBUG)
data class Session(var foo: String = "not set")
var closeCounter = 0
val serverInterface = KiloInterface<Session>().apply {
var connectedCalled = false
onConnected { connectedCalled = true }
on(cmdGetFoo) { session.foo }
on(cmdSetFoo) { session.foo = it }
on(cmdCheckConnected) { connectedCalled }
on(cmdClose) {
throw LocalInterface.BreakConnectionException()
}
}
val port = Random.nextInt(8080, 9090)
val ns = embeddedServer(Netty, port = port, host = "0.0.0.0", module = {
setupWebsocketServer(serverInterface) { Session() }
}).start(wait = false)
val client = websocketClient<Unit>("ws://localhost:$port/kp", useTextFrames = true)
val states = mutableListOf<Boolean>()
val collector = launch {
client.connectedStateFlow.collect {
println("got: $closeCounter/$it")
states += it
if (!it) {
closeCounter++
}
}
}
assertEquals(true, client.call(cmdCheckConnected))
assertTrue { client.connectedStateFlow.value }
assertEquals("not set", client.call(cmdGetFoo))
client.call(cmdSetFoo, "foo")
assertEquals("foo", client.call(cmdGetFoo))
client.close()
ns.stop()
collector.cancel()
}
@Test
fun webSocketWaitForConnectTest() = runBlocking {
initCrypto()
// fun Application.
Log.connectConsole(Log.Level.DEBUG)
val clientInterface = KiloInterface<Unit>().apply {}
val port = Random.nextInt(8080, 9090)
var clientConnectCalls = 0
// It should repeatedly reconnect, and we will count:
KiloClient(clientInterface, SigningSecretKey.new()) {
clientConnectCalls++
KiloConnectionData(websocketTransportDevice("ws://localhost:$port/kp", true), Unit)
}
delay(1200)
// and check:
// println("connection attemtps: $clientConnectCalls")
assertTrue { clientConnectCalls > 1 }
}
@Test
fun webSocketWaitForConnectTest2() = runBlocking {
initCrypto()
// fun Application.
Log.connectConsole(Log.Level.DEBUG)
val clientInterface = KiloInterface<Unit>().apply {}
val port = Random.nextInt(8080, 9090)
var clientConnectCalls = 0
// It should repeatedly reconnect, and we will count:
KiloClient(clientInterface, SigningSecretKey.new()) {
clientConnectCalls++
KiloConnectionData(websocketTransportDevice("ws://localhost:$port/kp", false), Unit)
}
delay(1200)
// and check:
// println("connection attemtps: $clientConnectCalls")
assertTrue { clientConnectCalls > 1 }
}
}