Compare commits

..

No commits in common. "master" and "v0.6.3" have entirely different histories.

12 changed files with 88 additions and 187 deletions

1
.gitignore vendored
View File

@ -45,4 +45,3 @@ out/
.kotlin .kotlin
/.idea/workspace.xml /.idea/workspace.xml
/.gigaide/gigaide.properties /.gigaide/gigaide.properties
local.properties

View File

@ -1,29 +1,5 @@
<component name="ProjectCodeStyleConfiguration"> <component name="ProjectCodeStyleConfiguration">
<code_scheme name="Project" version="173"> <code_scheme name="Project" version="173">
<DBN-PSQL>
<case-options enabled="true">
<option name="KEYWORD_CASE" value="lower" />
<option name="FUNCTION_CASE" value="lower" />
<option name="PARAMETER_CASE" value="lower" />
<option name="DATATYPE_CASE" value="lower" />
<option name="OBJECT_CASE" value="preserve" />
</case-options>
<formatting-settings enabled="false" />
</DBN-PSQL>
<DBN-SQL>
<case-options enabled="true">
<option name="KEYWORD_CASE" value="lower" />
<option name="FUNCTION_CASE" value="lower" />
<option name="PARAMETER_CASE" value="lower" />
<option name="DATATYPE_CASE" value="lower" />
<option name="OBJECT_CASE" value="preserve" />
</case-options>
<formatting-settings enabled="false">
<option name="STATEMENT_SPACING" value="one_line" />
<option name="CLAUSE_CHOP_DOWN" value="chop_down_if_statement_long" />
<option name="ITERATION_ELEMENTS_WRAPPING" value="chop_down_if_not_single" />
</formatting-settings>
</DBN-SQL>
<ScalaCodeStyleSettings> <ScalaCodeStyleSettings>
<option name="MULTILINE_STRING_CLOSING_QUOTES_ON_NEW_LINE" value="true" /> <option name="MULTILINE_STRING_CLOSING_QUOTES_ON_NEW_LINE" value="true" />
</ScalaCodeStyleSettings> </ScalaCodeStyleSettings>

View File

@ -1,6 +1,6 @@
# Kiloparsec # Kiloparsec
__Recommended version is `0.6.8`: to keep the code compatible with current and further versions we __Recommended version is `0.4.1`: to keep the code compatible with current and further versions we
ask to upgrade to `0.4.2` at least.__ Starting from this version some package names are changed for ask to upgrade to `0.4.2` at least.__ Starting from this version some package names are changed for
better clarity and fast UDP endpoints are added. better clarity and fast UDP endpoints are added.
@ -82,7 +82,7 @@ It could be, depending on your project structure, something like:
```kotlin ```kotlin
val commonMain by getting { val commonMain by getting {
dependencies { dependencies {
api("net.sergeych:kiloparsec:0.6.8") api("net.sergeych:kiloparsec:0.4.1")
} }
} }
``` ```

View File

@ -8,18 +8,15 @@
* real dot sergeych at gmail. * real dot sergeych at gmail.
*/ */
import org.jetbrains.kotlin.gradle.ExperimentalWasmDsl
plugins { plugins {
kotlin("multiplatform") version "2.1.0" kotlin("multiplatform") version "2.1.0"
id("org.jetbrains.kotlin.plugin.serialization") version "2.1.0" id("org.jetbrains.kotlin.plugin.serialization") version "2.1.0"
id("com.android.library") version "8.5.2" apply true
`maven-publish` `maven-publish`
id("org.jetbrains.dokka") version "1.9.20" id("org.jetbrains.dokka") version "1.9.20"
} }
group = "net.sergeych" group = "net.sergeych"
version = "0.6.8" version = "0.6.3"
repositories { repositories {
mavenCentral() mavenCentral()
@ -37,22 +34,19 @@ kotlin {
} }
nodejs() nodejs()
} }
macosArm64() // macosArm64()
iosX64() // iosX64()
iosArm64() // iosArm64()
iosSimulatorArm64() // iosSimulatorArm64()
linuxX64() linuxX64()
linuxArm64() linuxArm64()
macosX64() // macosX64()
macosX64() // macosX64()
mingwX64() mingwX64()
androidTarget() // @OptIn(ExperimentalWasmDsl::class)
@OptIn(ExperimentalWasmDsl::class) // wasmJs()
wasmJs {
browser()
}
val ktor_version = "3.1.1" val ktor_version = "3.1.0"
sourceSets { sourceSets {
all { all {
@ -66,12 +60,7 @@ kotlin {
implementation("org.jetbrains.kotlinx:kotlinx-coroutines-core:1.10.1") implementation("org.jetbrains.kotlinx:kotlinx-coroutines-core:1.10.1")
implementation("org.jetbrains.kotlinx:kotlinx-serialization-json:1.7.2") implementation("org.jetbrains.kotlinx:kotlinx-serialization-json:1.7.2")
api("io.ktor:ktor-client-core:$ktor_version") api("io.ktor:ktor-client-core:$ktor_version")
api("net.sergeych:crypto2:0.8.4") api("net.sergeych:crypto2:0.7.4-SNAPSHOT")
}
}
val androidMain by getting {
dependencies {
implementation("io.ktor:ktor-client-okhttp:$ktor_version")
} }
} }
val ktorSocketMain by creating { val ktorSocketMain by creating {
@ -103,6 +92,7 @@ kotlin {
val jvmTest by getting { val jvmTest by getting {
dependsOn(ktorSocketTest) dependsOn(ktorSocketTest)
} }
val jsMain by getting { val jsMain by getting {
dependencies { dependencies {
implementation("io.ktor:ktor-client-js:$ktor_version") implementation("io.ktor:ktor-client-js:$ktor_version")
@ -172,15 +162,3 @@ tasks.dokkaHtml.configure {
} }
} }
android {
namespace = "net.sergeych.kiloparsec"
compileSdk = 34
defaultConfig {
minSdk = 24
}
compileOptions {
sourceCompatibility = JavaVersion.VERSION_17
targetCompatibility = JavaVersion.VERSION_17
}
}

View File

@ -10,5 +10,3 @@
kotlin.code.style=official kotlin.code.style=official
kotlin.mpp.applyDefaultHierarchyTemplate=false kotlin.mpp.applyDefaultHierarchyTemplate=false
kotlin.daemon.jvmargs=-Xmx2048m

View File

@ -10,6 +10,6 @@
distributionBase=GRADLE_USER_HOME distributionBase=GRADLE_USER_HOME
distributionPath=wrapper/dists distributionPath=wrapper/dists
distributionUrl=https\://services.gradle.org/distributions/gradle-8.7-bin.zip distributionUrl=https\://services.gradle.org/distributions/gradle-8.2-bin.zip
zipStoreBase=GRADLE_USER_HOME zipStoreBase=GRADLE_USER_HOME
zipStorePath=wrapper/dists zipStorePath=wrapper/dists

View File

@ -1,4 +1,4 @@
/* /*
* Copyright (c) 2025. Sergey S. Chernov - All Rights Reserved * Copyright (c) 2025. Sergey S. Chernov - All Rights Reserved
* *
* You may use, distribute and modify this code under the * You may use, distribute and modify this code under the
@ -10,7 +10,6 @@
pluginManagement { pluginManagement {
repositories { repositories {
google()
mavenCentral() mavenCentral()
gradlePluginPortal() gradlePluginPortal()
} }

View File

@ -91,11 +91,9 @@ class KiloClient<S>(
debug { "get device and session" } debug { "get device and session" }
val client = KiloClientConnection(localInterface, kc, secretKey) val client = KiloClientConnection(localInterface, kc, secretKey)
deferredClient.complete(client) deferredClient.complete(client)
debug { "starting client run"} client.run {
val r = runCatching { client.run {
_state.value = it _state.value = it
} } }
debug { "----------- client run finished: $r" }
resetDeferredClient() resetDeferredClient()
debug { "client run finished" } debug { "client run finished" }
} catch (_: RemoteInterface.ClosedException) { } catch (_: RemoteInterface.ClosedException) {
@ -111,7 +109,7 @@ class KiloClient<S>(
_state.value = false _state.value = false
resetDeferredClient() resetDeferredClient()
// reconnection timeout // reconnection timeout
delay(700) delay(100)
} }
} }

View File

@ -49,6 +49,7 @@ class KiloClientConnection<S>(
try { try {
// in parallel: keys and connection // in parallel: keys and connection
val deferredKeyPair = async { SafeKeyExchange() } val deferredKeyPair = async { SafeKeyExchange() }
debug { "opening device" }
debug { "got a transport device $device" } debug { "got a transport device $device" }
@ -61,11 +62,10 @@ class KiloClientConnection<S>(
debug { "transport started" } debug { "transport started" }
val pair = deferredKeyPair.await() val pair = deferredKeyPair.await()
debug { "keypair ready (1)" } debug { "keypair ready" }
val serverHe = transport.call(L0Request, Handshake(1u, pair.publicKey)) val serverHe = transport.call(L0Request, Handshake(1u, pair.publicKey))
debug { "got server HE (2)" }
val sk = pair.clientSessionKey(serverHe.publicKey) val sk = pair.clientSessionKey(serverHe.publicKey)
var params = KiloParams(false, transport, sk, session, null, this@KiloClientConnection) var params = KiloParams(false, transport, sk, session, null, this@KiloClientConnection)
@ -97,7 +97,8 @@ class KiloClientConnection<S>(
} catch (x: CancellationException) { } catch (x: CancellationException) {
info { "client is cancelled" } info { "client is cancelled" }
} catch (x: RemoteInterface.ClosedException) { } catch (x: RemoteInterface.ClosedException) {
debug { "connection closed/refused by remote" } x.printStackTrace()
info { "connection closed by remote" }
} finally { } finally {
onConnectedStateChanged?.invoke(false) onConnectedStateChanged?.invoke(false)
job?.cancel() job?.cancel()

View File

@ -134,13 +134,7 @@ class Transport<S>(
} }
// now we have mutex freed so we can call: // now we have mutex freed so we can call:
val r = runCatching { val r = runCatching { device.output.send(pack(b)) }
do {
val cr = device.output.trySend(pack(b))
if( cr.isClosed ) throw ClosedSendChannelException("can't send block: channel is closed")
delay(100)
} while(!cr.isSuccess)
}
if (!r.isSuccess) { if (!r.isSuccess) {
r.exceptionOrNull()?.let { r.exceptionOrNull()?.let {
exception { "failed to send output block" to it } exception { "failed to send output block" to it }
@ -277,7 +271,7 @@ class Transport<S>(
} }
debug { "no more active: $isActive / ${calls.size}" } debug { "no more active: $isActive / ${calls.size}" }
} }
debug { "exiting transport loop" } info { "exiting transport loop" }
} }
private suspend fun send(block: Block) { private suspend fun send(block: Block) {

View File

@ -20,10 +20,12 @@ import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.channels.ClosedReceiveChannelException import kotlinx.coroutines.channels.ClosedReceiveChannelException
import kotlinx.coroutines.channels.ClosedSendChannelException import kotlinx.coroutines.channels.ClosedSendChannelException
import kotlinx.coroutines.launch import kotlinx.coroutines.launch
import kotlinx.io.IOException
import net.sergeych.crypto2.SigningKey import net.sergeych.crypto2.SigningKey
import net.sergeych.kiloparsec.* import net.sergeych.kiloparsec.*
import net.sergeych.mp_logger.* import net.sergeych.mp_logger.LogTag
import net.sergeych.mp_logger.exception
import net.sergeych.mp_logger.info
import net.sergeych.mp_logger.warning
import net.sergeych.mp_tools.globalLaunch import net.sergeych.mp_tools.globalLaunch
import net.sergeych.tools.AtomicCounter import net.sergeych.tools.AtomicCounter
@ -54,9 +56,7 @@ fun <S> websocketClient(
*/ */
fun websocketTransportDevice( fun websocketTransportDevice(
path: String, path: String,
client: HttpClient = HttpClient { client: HttpClient = HttpClient { install(WebSockets) },
install(WebSockets)
},
): Transport.Device { ): Transport.Device {
var u = Url(path) var u = Url(path)
if (u.encodedPath.length <= 1) if (u.encodedPath.length <= 1)
@ -67,11 +67,8 @@ fun websocketTransportDevice(
val input = Channel<UByteArray>() val input = Channel<UByteArray>()
val output = Channel<UByteArray>() val output = Channel<UByteArray>()
val closeHandle = CompletableDeferred<Boolean>() val closeHandle = CompletableDeferred<Boolean>()
val readyHandle = CompletableDeferred<Unit>()
globalLaunch { globalLaunch {
val log = LogTag("KC:${counter.incrementAndGet()}") val log = LogTag("KC:${counter.incrementAndGet()}")
try {
client.webSocket({ client.webSocket({
url.protocol = u.protocol url.protocol = u.protocol
url.host = u.host url.host = u.host
@ -83,7 +80,6 @@ fun websocketTransportDevice(
log.info { "connected to the server" } log.info { "connected to the server" }
// println("SENDING!!!") // println("SENDING!!!")
// send("Helluva") // send("Helluva")
readyHandle.complete(Unit)
launch { launch {
try { try {
for (block in output) { for (block in output) {
@ -124,27 +120,17 @@ fun websocketTransportDevice(
log.warning { "Client is closing with error" } log.warning { "Client is closing with error" }
throw RemoteInterface.ClosedException() throw RemoteInterface.ClosedException()
} }
runCatching { output.close() } output.close()
runCatching { input.close() } input.close()
runCatching { close() }
}
} catch (x: IOException) {
if ("refused" in x.toString()) log.debug { "connection refused" }
else log.warning { "unexpected IO error $x" }
runCatching { output.close() }
runCatching { input.close() }
} }
log.info { "closing connection" } log.info { "closing connection" }
} }
// Wait for connection be established or failed val device = ProxyDevice(input, output) {
val device = ProxyDevice(input, output, doClose = {
// we need to explicitly close the coroutine job, or it can hang for a long time // we need to explicitly close the coroutine job, or it can hang for a long time
// leaking resources. // leaking resources.
runCatching { output.close() }
runCatching { input.close() }
closeHandle.complete(true) closeHandle.complete(true)
// job.cancel() // job.cancel()
}) }
return device return device
} }

View File

@ -15,13 +15,10 @@ import io.ktor.server.engine.*
import io.ktor.server.netty.* import io.ktor.server.netty.*
import kotlinx.coroutines.delay import kotlinx.coroutines.delay
import kotlinx.coroutines.launch import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
import kotlinx.coroutines.test.runTest import kotlinx.coroutines.test.runTest
import net.sergeych.crypto2.SigningSecretKey
import net.sergeych.crypto2.initCrypto import net.sergeych.crypto2.initCrypto
import net.sergeych.kiloparsec.adapter.setupWebsocketServer import net.sergeych.kiloparsec.adapter.setupWebsocketServer
import net.sergeych.kiloparsec.adapter.websocketClient import net.sergeych.kiloparsec.adapter.websocketClient
import net.sergeych.kiloparsec.adapter.websocketTransportDevice
import net.sergeych.mp_logger.Log import net.sergeych.mp_logger.Log
import java.net.InetAddress import java.net.InetAddress
import kotlin.random.Random import kotlin.random.Random
@ -45,15 +42,14 @@ class ClientTest {
fun webSocketTest() = runTest { fun webSocketTest() = runTest {
initCrypto() initCrypto()
// fun Application. // fun Application.
val cmdClose by command<Unit, Unit>() val cmdClose by command<Unit,Unit>()
val cmdGetFoo by command<Unit, String>() val cmdGetFoo by command<Unit,String>()
val cmdSetFoo by command<String, Unit>() val cmdSetFoo by command<String,Unit>()
val cmdCheckConnected by command<Unit, Boolean>() val cmdCheckConnected by command<Unit,Boolean>()
Log.connectConsole(Log.Level.DEBUG) Log.connectConsole(Log.Level.DEBUG)
data class Session(var foo: String = "not set") data class Session(var foo: String="not set")
var closeCounter = 0 var closeCounter = 0
val serverInterface = KiloInterface<Session>().apply { val serverInterface = KiloInterface<Session>().apply {
var connectedCalled = false var connectedCalled = false
@ -66,7 +62,7 @@ class ClientTest {
} }
} }
val port = Random.nextInt(8080, 9090) val port = Random.nextInt(8080,9090)
val ns = embeddedServer(Netty, port = port, host = "0.0.0.0", module = { val ns = embeddedServer(Netty, port = port, host = "0.0.0.0", module = {
setupWebsocketServer(serverInterface) { Session() } setupWebsocketServer(serverInterface) { Session() }
}).start(wait = false) }).start(wait = false)
@ -77,9 +73,7 @@ class ClientTest {
client.connectedStateFlow.collect { client.connectedStateFlow.collect {
println("got: $closeCounter/$it") println("got: $closeCounter/$it")
states += it states += it
if (!it) { if( !it) { closeCounter++ }
closeCounter++
}
} }
} }
assertEquals(true, client.call(cmdCheckConnected)) assertEquals(true, client.call(cmdCheckConnected))
@ -100,7 +94,7 @@ class ClientTest {
assertFalse { client.connectedStateFlow.value } assertFalse { client.connectedStateFlow.value }
// this should be run on automatically reopen connection // this should be run on automatically reopen connection
client.call(cmdSetFoo, "superbar") client.call(cmdSetFoo,"superbar")
assertTrue { client.connectedStateFlow.value } assertTrue { client.connectedStateFlow.value }
assertEquals("superbar", client.call(cmdGetFoo)) assertEquals("superbar", client.call(cmdGetFoo))
client.close() client.close()
@ -110,26 +104,4 @@ class ClientTest {
// println("stopped server") // println("stopped server")
// println("closed client") // println("closed client")
} }
@Test
fun webSocketWaitForConnectTest() = runBlocking {
initCrypto()
// fun Application.
Log.connectConsole(Log.Level.DEBUG)
val clientInterface = KiloInterface<Unit>().apply {}
val port = Random.nextInt(8080, 9090)
var clientConnectCalls = 0
// It should repeatedly reconnect, and we will count:
KiloClient(clientInterface, SigningSecretKey.new()) {
clientConnectCalls++
KiloConnectionData(websocketTransportDevice("ws://localhost:$port/kp"), Unit)
}
delay(1200)
// and check:
// println("connection attemtps: $clientConnectCalls")
assertTrue { clientConnectCalls > 1 }
}
} }