Compare commits

...

17 Commits

Author SHA1 Message Date
2b35112566 added copyright & tradewartime licensing 2025-02-20 12:10:09 +03:00
a80820b9a8 fixed/improved readme for publications 2025-02-20 11:58:13 +03:00
c0bc0e3dfe fix some ktor deprecations 2025-02-18 11:31:21 +03:00
2d593e4107 coroutines core version upgraded 2025-02-18 11:30:47 +03:00
c1bd6f09a9 0.6.* started:
kotlin upgrade to 2.1.0
ktor updgared to 3.1.6
less debug noise
yet no wasmJS
2025-02-18 11:24:47 +03:00
0ff27e6de9 kotlin upgrade to 2.1.0
less debug noise
use upgraded crypto2 version
2025-02-03 17:14:29 +03:00
59906bbd2f kotlin upgrade to 2.0.21 2024-12-26 03:25:11 +03:00
7871dc2d3d better support for code-based exceptions 2024-12-10 10:36:57 +03:00
a0dce8e604 we don't need to install Routing in web socket server - it is not used and an conflict 2024-11-27 10:45:23 +07:00
04ffde421d 0.5.1-SNAPSHOT: crypto2 upgraded 2024-11-26 18:57:59 +07:00
9545ca28cf websocket client now includes transport device to use in higher order protocols 2024-11-23 11:47:56 +07:00
4098358233 +client.localIdentity 2024-11-22 09:18:07 +07:00
f2d8330ccc multiple onConnect in sessions support 2024-11-19 18:20:53 +07:00
1032eebbbe extending functionality for kilogin 2024-11-19 12:12:41 +07:00
93ab8ddf91 extending functionality for kilogin 2024-09-15 12:28:40 +03:00
6ce1b576ee version bump 2024-09-01 19:42:23 +02:00
99e98827f7 version bump 2024-08-30 11:05:19 +02:00
47 changed files with 824 additions and 231 deletions

2
.gitignore vendored
View File

@ -5,6 +5,7 @@ build/
!**/src/test/**/build/
### IntelliJ IDEA ###
.idea
.idea/modules.xml
.idea/jarRepositories.xml
.idea/compiler.xml
@ -43,3 +44,4 @@ out/
# More
.kotlin
/.idea/workspace.xml
/.gigaide/gigaide.properties

2
.idea/misc.xml generated
View File

@ -3,7 +3,7 @@
<component name="FrameworkDetectionExcludesConfiguration">
<file type="web" url="file://$PROJECT_DIR$" />
</component>
<component name="ProjectRootManager" version="2" languageLevel="JDK_17" default="true" project-jdk-name="17 (5)" project-jdk-type="JavaSDK">
<component name="ProjectRootManager" version="2" languageLevel="JDK_17" default="true" project-jdk-name="corretto-17" project-jdk-type="JavaSDK">
<output url="file://$PROJECT_DIR$/out" />
</component>
</project>

View File

@ -1,24 +1,26 @@
# Kiloparsec
__Recommended version is `0.4.1`: to keep the code compatible with current and further versions we
ask to upgrade to `0.4.2` at least.__ Starting from this version some pacakage names are changed for
ask to upgrade to `0.4.2` at least.__ Starting from this version some package names are changed for
better clarity and fast UDP endpoints are added.
The new generation of __PARanoid SECurity__ protocol, advanced, faster, more secure. It also allows connecting any "
block device" transport to the same local interface. Out if the box it
provides the following transports:
| name | JVM | JS | native |
|----------------|--------|----|--------|
| TCP/IP server | ✓ | | 0.2.6+ |
| TCP/IP client | ✓ | | 0.2.6+ |
| UDP server | 0.3.2+ | | 0.3.2+ |
| UDP client | 0.3.2+ | | 0.3.2+ |
| Websock server | ✓ | | |
| Websock client | ✓ | ✓ | ✓ |
| name | JVM | JS | native |
|-------------------|--------|----|--------|
| TCP/IP server | ✓ | | 0.2.6+ |
| TCP/IP client | ✓ | | 0.2.6+ |
| UDP server | 0.3.2+ | | 0.3.2+ |
| UDP client | 0.3.2+ | | 0.3.2+ |
| Websockets server | ✓ | | |
| Websockets client | ✓ | ✓ | ✓ |
### Note on version compatibility
Version 0.5.1 could be backward incompatible due to upgrade of the crypto2.
Protocols >= 0.3.0 are not binary compatible with previous version due to more compact binary
format. The format from 0.3.0 onwards is supposed to keep compatible.
@ -34,8 +36,8 @@ We recommend to upgrade to 0.4+ ASAP as public/shared key id derivation method w
### Non-native targets
- JS (browser and nodeJS)
- JVM (android, macos, windows, linx, everywhere where JRE is installed)
- JS (browser and Node.js)
- JVM (android, macOS, windows, linux, everywhere where JRE is installed)
## TCP/IP and UDP transports
@ -44,13 +46,13 @@ there is currently no widely adopted sockets for browser javascript.
While UDP is faster than TCP/IP, it is less reliable, especially with commands and return values that serializes to more than 240 bytes approx, and has no retransmission facilities (use TCP!). UDP though shines when all you need is to [push](https://code.sergeych.net/docs/kiloparsec/kiloparsec/net.sergeych.kiloparsec/-remote-interface/push.html) with little or no data in it.
## Websock server
## Websockets server
While it is much slower than TCP or UDP, it is still faster than any http-based API; it uses binary frames based on
the Ktor server framework to easily integrate with web services. We recommend using it instead of a classic HTTP API as
it beats it in terms of speed and server load even with HTTP/2.
We recommend to create the `KiloInterface<S>` instance and connect it to the websock and tcp servers in real
We recommend to create the `KiloInterface<S>` instance and connect it to the websockets and tcp servers in real
applications to get easy access from anywhere.
## Websocket client
@ -109,7 +111,7 @@ val cmdPushClient by command<String, Unit>()
## Call it from the client:
Remember, we need to implement client interface `cmdPushClient` in our example, so we need to provide
local interace too:
local interface too:
```kotlin
// Unit: no session on the client:
@ -139,7 +141,7 @@ assertEquals(FooArgs("bar", 117), client.call(cmdGetFoo))
## Create ktor-based server
Normally server side needs some session. It is convenient and avoid sending repeating data on each request speeding up
the protocol. With KILOPARSEC it is rather basic operation:
the protocol. With KILOPARSEC, it is rather basic operation:
~~~kotlin
// Our session just keeps Foo for cmd{Get|Set}Foo:
@ -196,9 +198,9 @@ Is very much straightforward, same as with TCP/IP:
#### Command size
Each command invocation and result are packed in a separate UDP diagram using effective binary packing.
Thus for the best results commands and results should be relatively short, best to fit into 240 bytes. While bigger datagrams are often transmitted successfully, the probability of the effective transmission drops with the size.
Thus, for the best results commands and results should be relatively short, best to fit into 240 bytes. While bigger datagrams are often transmitted successfully, the probability of the effective transmission drops with the size.
Kiloparsec UDP transport does not retransmits not delivered packets. Use TCP/IP or websocket if it is a concern.
Kiloparsec UDP transport does not retransmit not delivered packets. Use TCP/IP or websocket if it is a concern.
For the best results we recommend using [push](https://code.sergeych.net/docs/kiloparsec/kiloparsec/net.sergeych.kiloparsec/-remote-interface/index.html#1558240250%2FFunctions%2F788909594) for remote interfaces with UDP.
@ -208,7 +210,7 @@ As Datagrams do not form protocol itself, kiloparsec issues pings when no data i
When no pings are received long enough, kiloparsec connection is closed. There are `maxInactivityTimeout` in all
relevant functions and constructors.
Client shoudl not issue pings manually.
Client should not issue pings manually.
## Reusing code between servers
@ -220,12 +222,11 @@ This is a common proactive to create a business logic in a `KiloInterface`, then
We do not recommend to rely on TLS (HTTPS://, WSS://) host identification solely, in the modern world there is
a high probability of attacks on unfriendly (in respect to at least some of your users) states to the SSL certificates
chain, in which case the MITM and spoofing will be undetected. Check the [remoteId](https://code.sergeych.net/docs/kiloparsec/kiloparsec/net.sergeych.kiloparsec/-kilo-client/remote-id.html?query=suspend%20fun%20remoteId():%20VerifyingPublicKey?) in your client on each connection and provide the safe [serverSecretKey](https://code.sergeych.net/docs/kiloparsec/kiloparsec/net.sergeych.kiloparsec/-kilo-server/index.html) when creating a server.
chain, in which case the [MITM attack] and spoofing will be undetected. Check the [remoteId](https://code.sergeych.net/docs/kiloparsec/kiloparsec/net.sergeych.kiloparsec/-kilo-client/remote-id.html?query=suspend%20fun%20remoteId():%20VerifyingPublicKey?) in your client on each connection and provide the safe [serverSecretKey](https://code.sergeych.net/docs/kiloparsec/kiloparsec/net.sergeych.kiloparsec/-kilo-server/index.html) when creating a server.
This will effectively protetcs against certificate chain spoofing in the case of the application installed from the trusted source.
This will effectively protect against certificate chain spoofing in the case of the application installed from the trusted source.
__Important note__. The web application could not be completely secured this way unless is loaded from the IP-address, as the DNS could be spoofed the same, especially when used with `Cloudflare` or other CDN that can
transparently substitute the whole site. In the case of we applications we strongly recommend not to use CDN except your own where you can control actual traffic rules.
__Important note__. The web application could not be completely secured this way unless is loaded from the IP-address, as the DNS could be spoofed the same, especially when used with `Cloudflare` or other CDN that can transparently substitute the whole site. For applications, we strongly recommend not to use CDN except your own, controlled ones. You generally can't neither detect nor repel [MITM attack] performed from _any single cloudflare 'ray'_.
## See also:
@ -252,13 +253,13 @@ All RPC is performed over the encrypted connection.
# Technical description
Kiloparsec is a dull-duplex fully async (coroutine based) Remote Procedure Call protocol with typed parameters
Kiloparsec is a full-duplex fully async (coroutine based) Remote Procedure Call protocol with typed parameters
and support for serializing exceptions (e.g. exception thrown while executing remote command will be caught and
rethrown at the caller context).
Kiloparsec is not REST, it _has advanced session mechanisms_ and built-in authentication based on the same curve keys.
Integrated tools to prevent MITM attacks include also non-transferred independently generated token that is calculated
independently on the ends and is never transferred with the network. Comparing it somehow (visually, with QR code, etc)
independently on the ends and is never transferred with the network. Comparing it somehow (visually, with QR code, etc.)
could add a very robust guarantee of the connection safety and ingenuity.
Kiloparsec has built-in completely asynchronous (coroutine based top-down) transport layer based on TCP (JVM only as for
@ -267,4 +268,10 @@ JVM only insofar.
# Licensing
Currently, you need to obtain a license from https://8-rays.dev or Sergey Chernov.
This is work in progress, not yet moved to public domain;
you need to obtain a license from https://8-rays.dev or [Sergey Chernov]. For open source projects it will most be free on some special terms.
It will be moved to open source; we also guarantee that it will be moved to open source immediately if the software export restrictions will be lifted. We do not support such practices here at 8-rays.dev.
[MITM]: https://en.wikipedia.org/wiki/Man-in-the-middle_attack
[Sergey Chernov]: https://t.me/real_sergeych

View File

@ -1,4 +1,14 @@
#!/bin/bash
#
# Copyright (c) 2025. Sergey S. Chernov - All Rights Reserved
#
# You may use, distribute and modify this code under the
# terms of the private license, which you must obtain from the author
#
# To obtain the license, contact the author: https://t.me/real_sergeych or email to
# real dot sergeych at gmail.
#
set -e
./gradlew dokkaHtml
rsync -avz ./build/dokka/* code.sergeych.net:/bigstore/sergeych_pub/code/docs/kiloparsec

View File

@ -1,36 +1,52 @@
/*
* Copyright (c) 2025. Sergey S. Chernov - All Rights Reserved
*
* You may use, distribute and modify this code under the
* terms of the private license, which you must obtain from the author
*
* To obtain the license, contact the author: https://t.me/real_sergeych or email to
* real dot sergeych at gmail.
*/
plugins {
kotlin("multiplatform") version "2.0.10"
id("org.jetbrains.kotlin.plugin.serialization") version "2.0.0"
kotlin("multiplatform") version "2.1.0"
id("org.jetbrains.kotlin.plugin.serialization") version "2.1.0"
`maven-publish`
id("org.jetbrains.dokka") version "1.9.20"
}
group = "net.sergeych"
version = "0.4.1"
version = "0.6.1-SNAPSHOT"
repositories {
mavenCentral()
mavenLocal()
maven("https://maven.universablockchain.com/")
maven("https://gitea.sergeych.net/api/packages/SergeychWorks/maven")
maven("https://gitea.sergeych.net/api/packages/YoungBlood/maven")
}
kotlin {
jvmToolchain(17)
jvm()
js {
browser {
}
nodejs()
}
macosArm64()
iosX64()
iosArm64()
iosSimulatorArm64()
// macosArm64()
// iosX64()
// iosArm64()
// iosSimulatorArm64()
linuxX64()
linuxArm64()
macosX64()
// macosX64()
// macosX64()
mingwX64()
// @OptIn(ExperimentalWasmDsl::class)
// wasmJs()
val ktor_version = "2.3.12"
val ktor_version = "3.1.0"
sourceSets {
all {
@ -41,10 +57,10 @@ kotlin {
val commonMain by getting {
dependencies {
implementation("org.jetbrains.kotlinx:kotlinx-coroutines-core:1.7.3")
implementation("org.jetbrains.kotlinx:kotlinx-serialization-json:1.7.1")
implementation("org.jetbrains.kotlinx:kotlinx-coroutines-core:1.10.1")
implementation("org.jetbrains.kotlinx:kotlinx-serialization-json:1.7.2")
api("io.ktor:ktor-client-core:$ktor_version")
api("net.sergeych:crypto2:0.5.3")
api("net.sergeych:crypto2:0.7.2-SNAPSHOT")
}
}
val ktorSocketMain by creating {
@ -83,27 +99,27 @@ kotlin {
}
}
val jsTest by getting
val macosArm64Main by getting {
dependsOn(ktorSocketMain)
}
val macosArm64Test by getting {
dependsOn(ktorSocketTest)
}
val macosX64Main by getting {
dependsOn(ktorSocketMain)
}
val iosX64Main by getting {
dependsOn(ktorSocketMain)
}
val iosX64Test by getting {
dependsOn(ktorSocketTest)
}
val iosArm64Main by getting {
dependsOn(ktorSocketMain)
}
val iosArm64Test by getting {
dependsOn(ktorSocketTest)
}
// val macosArm64Main by getting {
// dependsOn(ktorSocketMain)
// }
// val macosArm64Test by getting {
// dependsOn(ktorSocketTest)
// }
// val macosX64Main by getting {
// dependsOn(ktorSocketMain)
// }
// val iosX64Main by getting {
// dependsOn(ktorSocketMain)
// }
// val iosX64Test by getting {
// dependsOn(ktorSocketTest)
// }
// val iosArm64Main by getting {
// dependsOn(ktorSocketMain)
// }
// val iosArm64Test by getting {
// dependsOn(ktorSocketTest)
// }
val linuxArm64Main by getting {
dependsOn(ktorSocketMain)
}

View File

@ -1,2 +1,12 @@
#
# Copyright (c) 2025. Sergey S. Chernov - All Rights Reserved
#
# You may use, distribute and modify this code under the
# terms of the private license, which you must obtain from the author
#
# To obtain the license, contact the author: https://t.me/real_sergeych or email to
# real dot sergeych at gmail.
#
kotlin.code.style=official
kotlin.mpp.applyDefaultHierarchyTemplate=false

View File

@ -1,3 +1,13 @@
#
# Copyright (c) 2025. Sergey S. Chernov - All Rights Reserved
#
# You may use, distribute and modify this code under the
# terms of the private license, which you must obtain from the author
#
# To obtain the license, contact the author: https://t.me/real_sergeych or email to
# real dot sergeych at gmail.
#
distributionBase=GRADLE_USER_HOME
distributionPath=wrapper/dists
distributionUrl=https\://services.gradle.org/distributions/gradle-8.2-bin.zip

16
gradlew vendored
View File

@ -1,19 +1,13 @@
#!/bin/sh
#
# Copyright © 2015-2021 the original authors.
# Copyright (c) 2025. Sergey S. Chernov - All Rights Reserved
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# You may use, distribute and modify this code under the
# terms of the private license, which you must obtain from the author
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# To obtain the license, contact the author: https://t.me/real_sergeych or email to
# real dot sergeych at gmail.
#
##############################################################################

View File

@ -1,3 +1,13 @@
/*
* Copyright (c) 2025. Sergey S. Chernov - All Rights Reserved
*
* You may use, distribute and modify this code under the
* terms of the private license, which you must obtain from the author
*
* To obtain the license, contact the author: https://t.me/real_sergeych or email to
* real dot sergeych at gmail.
*/
pluginManagement {
repositories {
mavenCentral()

View File

@ -1,3 +1,13 @@
/*
* Copyright (c) 2025. Sergey S. Chernov - All Rights Reserved
*
* You may use, distribute and modify this code under the
* terms of the private license, which you must obtain from the author
*
* To obtain the license, contact the author: https://t.me/real_sergeych or email to
* real dot sergeych at gmail.
*/
package net.sergeych.kiloparsec
import io.ktor.utils.io.*

View File

@ -0,0 +1,52 @@
/*
* Copyright (c) 2025. Sergey S. Chernov - All Rights Reserved
*
* You may use, distribute and modify this code under the
* terms of the private license, which you must obtain from the author
*
* To obtain the license, contact the author: https://t.me/real_sergeych or email to
* real dot sergeych at gmail.
*/
package net.sergeych.kiloparsec
import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock
/**
* Multiplatform atomically mutable value to be used in [kotlinx.coroutines],
* with suspending mutating operations, see [mutate].
*
* Actual value can be either changed in a block of [mutate] when
* new value _depends on the current value_ or with [reset].
*
* [value] getter is suspended because it waits until the mutation finishes
*/
open class AtomicAsyncValue<T>(initialValue: T) {
private var actualValue = initialValue
private val access = Mutex()
/**
* Change the value: get the current and set to the returned, all in the
* atomic suspend operation. All other mutating requests including assigning to [value]
* will be blocked and queued.
* @return result of the mutation. Note that immediate call to property [value]
* could already return modified bu some other thread value!
*/
suspend fun mutate(mutator: suspend (T) -> T): T = access.withLock {
actualValue = mutator(actualValue)
actualValue
}
/**
* Atomic get or set the value. Atomic get means if there is a [mutate] in progress
* it will wait until the mutation finishes and then return the correct result.
*/
suspend fun value() = access.withLock { actualValue }
/**
* Set the new value without checking it. Shortcut to
* ```mutate { value = newValue }```
*/
suspend fun reset(value: T) = mutate { value }
}

View File

@ -1,3 +1,13 @@
/*
* Copyright (c) 2025. Sergey S. Chernov - All Rights Reserved
*
* You may use, distribute and modify this code under the
* terms of the private license, which you must obtain from the author
*
* To obtain the license, contact the author: https://t.me/real_sergeych or email to
* real dot sergeych at gmail.
*/
package net.sergeych.kiloparsec
import kotlinx.serialization.KSerializer
@ -5,6 +15,8 @@ import kotlinx.serialization.Serializable
import net.sergeych.bintools.toDataSource
import net.sergeych.bipack.BipackDecoder
import net.sergeych.bipack.BipackEncoder
import net.sergeych.kiloparsec.Command.Call
import net.sergeych.kiloparsec.Command.Companion.unpackCall
import net.sergeych.utools.unpack
/**
@ -12,8 +24,20 @@ import net.sergeych.utools.unpack
* in node-2-node protocols and client API, and most importantly in calling smart contract
* methods. This is essentially a Kotlin binding to typesafe serialize command calls and
* deserialize results.
*
* To create command instances, it is recommended to use [command] that returns [CommandDelegate].
*
* Use [packCall] to serialize the command call with some arguments.
*
* Note that `Command` instances themselves are not serialized, instead, the call is serialized,
* in the form of [Call], containing name and properly serialized arguments.
*
* [unpackCall] deserialized result of the [packCall] so the proper handler for the command could
* be used. Then the result of the execution could be packed with [exec] and then unpacked with
* [unpackResult].
*
*/
class Command<A, R>(
open class Command<A, R>(
val name: String,
val argsSerializer: KSerializer<A>,
val resultSerializer: KSerializer<R>
@ -21,13 +45,29 @@ class Command<A, R>(
@Serializable
data class Call(val name: String,val serializedArgs: UByteArray)
fun packCall(args: A): UByteArray = BipackEncoder.encode(
Call(name, BipackEncoder.encode(argsSerializer, args).toUByteArray())
).toUByteArray()
/**
* Pack command invocation with specified arguments.
*/
fun packCall(args: A): UByteArray = BipackEncoder.encode(createCall(args)).toUByteArray()
/**
* Create [Call] instance for specified args vy serializing it properly
*/
fun createCall(args: A): Call = Call(name, BipackEncoder.encode(argsSerializer, args).toUByteArray())
/**
* Unpack result, obtained by [exec].
*/
fun unpackResult(packedResult: UByteArray): R =
unpack(resultSerializer, packedResult)
/**
* Execute a command unpacking args.
*
* @param packedArgs arguments, as provided by [packCall] in the [Call] instance
* @param handler actual code to execute the command
* @return properly serialized result to be unpacked with [unpackResult].
*/
suspend fun exec(packedArgs: UByteArray, handler: suspend (A) -> R): UByteArray =
BipackEncoder.encode(
resultSerializer,
@ -36,6 +76,10 @@ class Command<A, R>(
).toUByteArray()
companion object {
/**
* Unpack command invocation instance from [packCall]. Use [exec] to deserialize arguments and
* perform command.
*/
fun unpackCall(packedCall: UByteArray): Call = BipackDecoder.decode(packedCall.toDataSource())
}
}

View File

@ -1,3 +1,13 @@
/*
* Copyright (c) 2025. Sergey S. Chernov - All Rights Reserved
*
* You may use, distribute and modify this code under the
* terms of the private license, which you must obtain from the author
*
* To obtain the license, contact the author: https://t.me/real_sergeych or email to
* real dot sergeych at gmail.
*/
package net.sergeych.kiloparsec
import kotlinx.serialization.KSerializer

View File

@ -1,3 +1,13 @@
/*
* Copyright (c) 2025. Sergey S. Chernov - All Rights Reserved
*
* You may use, distribute and modify this code under the
* terms of the private license, which you must obtain from the author
*
* To obtain the license, contact the author: https://t.me/real_sergeych or email to
* real dot sergeych at gmail.
*/
package net.sergeych.kiloparsec
/**

View File

@ -0,0 +1,16 @@
/*
* Copyright (c) 2025. Sergey S. Chernov - All Rights Reserved
*
* You may use, distribute and modify this code under the
* terms of the private license, which you must obtain from the author
*
* To obtain the license, contact the author: https://t.me/real_sergeych or email to
* real dot sergeych at gmail.
*/
package net.sergeych.kiloparsec
interface ExceptionWithCode {
val code: String
val message: String?
}

View File

@ -1,3 +1,13 @@
/*
* Copyright (c) 2025. Sergey S. Chernov - All Rights Reserved
*
* You may use, distribute and modify this code under the
* terms of the private license, which you must obtain from the author
*
* To obtain the license, contact the author: https://t.me/real_sergeych or email to
* real dot sergeych at gmail.
*/
package net.sergeych.kiloparsec
import kotlinx.coroutines.CancellationException
@ -9,6 +19,7 @@ import kotlinx.coroutines.isActive
import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock
import net.sergeych.crypto2.SigningKey
import net.sergeych.crypto2.VerifyingKey
import net.sergeych.crypto2.VerifyingPublicKey
import net.sergeych.mp_logger.LogTag
import net.sergeych.mp_logger.Loggable
@ -60,6 +71,14 @@ class KiloClient<S>(
@Suppress("unused")
val connectedStateFlow = _state.asStateFlow()
/**
* The verifying, or public, key identifying client sessions. It could be used to
* restore environment on reconnection. This is what remote side, e.g. server, sees as
* [KiloScope.remoteIdentity].
*/
@Suppress("unused")
val localIdentity: VerifyingKey? = secretKey?.verifyingKey
private var deferredClient = CompletableDeferred<KiloClientConnection<S>>()
private val job =
@ -141,7 +160,7 @@ class KiloClient<S>(
* a key. Connection is established either with a properly authenticated key or no key at all.
*/
@Suppress("unused")
suspend fun remoteId(): VerifyingPublicKey? = deferredClient.await().remoteId()
suspend fun remoteIdentity(): VerifyingPublicKey? = deferredClient.await().remoteId()
companion object {
class Builder<S>() {
@ -156,8 +175,19 @@ class KiloClient<S>(
var secretIdKey: SigningKey? = null
/**
* Build local command implementations (remotely callable ones), exception
* class handlers, etc.
* Build local command implementations, those callable from the server, exception
* class handlers, and anything else [KiloInterface] allows. Usage sample:
*
* ```kotlin
* val client = KiloClient {
* connect { connectTcpDevice("localhost:$port") }
* local {
* on(cmdPing) {
* "pong! $it"
* }
* }
* }
* ```
*/
fun local(f: KiloInterface<S>.() -> Unit) {
interfaceBuilder = f

View File

@ -1,3 +1,13 @@
/*
* Copyright (c) 2025. Sergey S. Chernov - All Rights Reserved
*
* You may use, distribute and modify this code under the
* terms of the private license, which you must obtain from the author
*
* To obtain the license, contact the author: https://t.me/real_sergeych or email to
* real dot sergeych at gmail.
*/
package net.sergeych.kiloparsec
import kotlinx.coroutines.*
@ -80,7 +90,7 @@ class KiloClientConnection<S>(
kiloRemoteInterface.complete(
KiloRemoteInterface(deferredParams, clientInterface)
)
clientInterface.onConnectHandler?.invoke(params.scope)
clientInterface.onConnectHandlers.invokeAll(params.scope)
onConnectedStateChanged?.invoke(true)
job.join()
@ -104,4 +114,7 @@ class KiloClientConnection<S>(
override suspend fun <A> push(cmd: Command<A, Unit>, args: A) {
kiloRemoteInterface.await().push(cmd, args)
}
}
}
internal fun <S>Collection<KiloHandler<S>>.invokeAll(scope: KiloScope<S>) =
forEach { runCatching { scope.it() } }

View File

@ -1,5 +1,16 @@
/*
* Copyright (c) 2025. Sergey S. Chernov - All Rights Reserved
*
* You may use, distribute and modify this code under the
* terms of the private license, which you must obtain from the author
*
* To obtain the license, contact the author: https://t.me/real_sergeych or email to
* real dot sergeych at gmail.
*/
package net.sergeych.kiloparsec
typealias KiloHandler<S> = KiloScope<S>.()->Unit
/**
* The local interface to provide functions, register errors for Kiloparsec users. Use it
* with [KiloClient], [KiloClientConnection], [KiloServerConnection], etc.
@ -14,12 +25,19 @@ package net.sergeych.kiloparsec
*/
open class KiloInterface<S> : LocalInterface<KiloScope<S>>() {
internal var onConnectHandler: (KiloScope<S>.()->Unit) ? = null
internal val onConnectHandlers = mutableListOf<KiloHandler<S>>()
fun onConnected(f: KiloScope<S>.()->Unit) { onConnectHandler = f }
/**
* Registers handler [f] for [onConnected] event, to the head or the end of handler list.
*
* @param addFirst if true, [f] will be added to the beginning of the list of handlers
*/
fun onConnected(addFirst: Boolean = false, f: KiloScope<S>.()->Unit) {
if( addFirst ) onConnectHandlers.add(0, f) else onConnectHandlers += f
}
init {
registerError { RemoteInterface.UnknownCommand() }
registerError { RemoteInterface.UnknownCommand(it) }
registerError { RemoteInterface.InternalError(it) }
registerError { RemoteInterface.ClosedException(it) }
// registerError { RemoteInterface.SecurityException(it) }

View File

@ -1,3 +1,13 @@
/*
* Copyright (c) 2025. Sergey S. Chernov - All Rights Reserved
*
* You may use, distribute and modify this code under the
* terms of the private license, which you must obtain from the author
*
* To obtain the license, contact the author: https://t.me/real_sergeych or email to
* real dot sergeych at gmail.
*/
package net.sergeych.kiloparsec
import kotlinx.coroutines.CompletableDeferred

View File

@ -1,3 +1,13 @@
/*
* Copyright (c) 2025. Sergey S. Chernov - All Rights Reserved
*
* You may use, distribute and modify this code under the
* terms of the private license, which you must obtain from the author
*
* To obtain the license, contact the author: https://t.me/real_sergeych or email to
* real dot sergeych at gmail.
*/
package net.sergeych.kiloparsec
import kotlinx.serialization.Serializable

View File

@ -1,3 +1,13 @@
/*
* Copyright (c) 2025. Sergey S. Chernov - All Rights Reserved
*
* You may use, distribute and modify this code under the
* terms of the private license, which you must obtain from the author
*
* To obtain the license, contact the author: https://t.me/real_sergeych or email to
* real dot sergeych at gmail.
*/
package net.sergeych.kiloparsec
import kotlinx.coroutines.CompletableDeferred

View File

@ -1,3 +1,13 @@
/*
* Copyright (c) 2025. Sergey S. Chernov - All Rights Reserved
*
* You may use, distribute and modify this code under the
* terms of the private license, which you must obtain from the author
*
* To obtain the license, contact the author: https://t.me/real_sergeych or email to
* real dot sergeych at gmail.
*/
package net.sergeych.kiloparsec
import net.sergeych.crypto2.SigningKey

View File

@ -1,3 +1,13 @@
/*
* Copyright (c) 2025. Sergey S. Chernov - All Rights Reserved
*
* You may use, distribute and modify this code under the
* terms of the private license, which you must obtain from the author
*
* To obtain the license, contact the author: https://t.me/real_sergeych or email to
* real dot sergeych at gmail.
*/
package net.sergeych.kiloparsec
import kotlinx.coroutines.CancellationException

View File

@ -1,3 +1,13 @@
/*
* Copyright (c) 2025. Sergey S. Chernov - All Rights Reserved
*
* You may use, distribute and modify this code under the
* terms of the private license, which you must obtain from the author
*
* To obtain the license, contact the author: https://t.me/real_sergeych or email to
* real dot sergeych at gmail.
*/
package net.sergeych.kiloparsec
import kotlinx.coroutines.CompletableDeferred
@ -75,7 +85,7 @@ class KiloServerConnection<S>(
kiloRemoteInterface.complete(
KiloRemoteInterface(deferredParams, clientInterface)
)
clientInterface.onConnectHandler?.invoke(p.scope)
clientInterface.onConnectHandlers.invokeAll(p.scope)
}
}

View File

@ -1,3 +1,13 @@
/*
* Copyright (c) 2025. Sergey S. Chernov - All Rights Reserved
*
* You may use, distribute and modify this code under the
* terms of the private license, which you must obtain from the author
*
* To obtain the license, contact the author: https://t.me/real_sergeych or email to
* real dot sergeych at gmail.
*/
package net.sergeych.kiloparsec
import net.sergeych.mp_logger.LogTag
@ -67,7 +77,7 @@ open class LocalInterface<S> : Loggable by LogTag("LocalInterface${idCounter.inc
name: String,
packedArgs: UByteArray,
): UByteArray =
(commands[name] ?: throw RemoteInterface.UnknownCommand())
(commands[name] ?: throw RemoteInterface.UnknownCommand(name))
.invoke(scope, packedArgs)
@ -106,7 +116,8 @@ open class LocalInterface<S> : Loggable by LogTag("LocalInterface${idCounter.inc
}
fun getErrorCode(t: Throwable): String? =
errorByClass[t::class] ?: errorProviders.firstNonNull { it.getErrorCode(t) }
(t as? ExceptionWithCode)?.code
?: errorByClass[t::class] ?: errorProviders.firstNonNull { it.getErrorCode(t) }
fun encodeError(forId: UInt, t: Throwable): Transport.Block.Error =
if (t is RemoteInterface.ClosedException) {

View File

@ -1,3 +1,13 @@
/*
* Copyright (c) 2025. Sergey S. Chernov - All Rights Reserved
*
* You may use, distribute and modify this code under the
* terms of the private license, which you must obtain from the author
*
* To obtain the license, contact the author: https://t.me/real_sergeych or email to
* real dot sergeych at gmail.
*/
package net.sergeych.kiloparsec
/**
@ -40,7 +50,7 @@ interface RemoteInterface {
/**
* Command is not supported by the remote party
*/
class UnknownCommand : RemoteException("UnknownCommand")
class UnknownCommand(commandName: String) : RemoteException("UnknownCommand: $commandName")
open class InternalError(code: String="0"): RemoteException("Internal error: $code")

View File

@ -1,3 +1,13 @@
/*
* Copyright (c) 2025. Sergey S. Chernov - All Rights Reserved
*
* You may use, distribute and modify this code under the
* terms of the private license, which you must obtain from the author
*
* To obtain the license, contact the author: https://t.me/real_sergeych or email to
* real dot sergeych at gmail.
*/
package net.sergeych.kiloparsec
import kotlinx.coroutines.*
@ -124,7 +134,7 @@ class Transport<S>(
}
// now we have mutex freed so we can call:
val r = runCatching { device.output.send(pack(b).also { debug { ">>>\n${it.toDump()}" } }) }
val r = runCatching { device.output.send(pack(b)) }
if (!r.isSuccess) {
r.exceptionOrNull()?.let {
exception { "failed to send output block" to it }
@ -184,10 +194,7 @@ class Transport<S>(
while (isActive && !isClosed) {
try {
device.input.receive().let { packed ->
debug { "<<<\n${packed.toDump()}" }
val b = unpack<Block>(packed)
debug { "<<$ $b" }
debug { "access state: ${access.isLocked}" }
when (b) {
is Block.Error -> access.withLock {
val error = localInterface.decodeError(b)
@ -198,10 +205,7 @@ class Transport<S>(
}
is Block.Response -> access.withLock {
calls.remove(b.forId)?.let {
debug { "activating wait handle for ${b.forId}" }
it.complete(b.packedResult)
}
calls.remove(b.forId)?.complete(b.packedResult)
?: warning { "wait handle not found for ${b.forId}" }
}
@ -228,11 +232,10 @@ class Transport<S>(
} catch (t: Throwable) {
send(Block.Error(b.id, "UnknownError", t.message))
}
.also { debug { "command executed: ${b.name}" } }
}
}
}
debug { "input step performed closed=$isClosed active=$isActive" }
// debug { "input step performed closed=$isClosed active=$isActive" }
} catch (_: ClosedSendChannelException) {
info { "closed send channel" }
isClosed = true

View File

@ -1,3 +1,13 @@
/*
* Copyright (c) 2025. Sergey S. Chernov - All Rights Reserved
*
* You may use, distribute and modify this code under the
* terms of the private license, which you must obtain from the author
*
* To obtain the license, contact the author: https://t.me/real_sergeych or email to
* real dot sergeych at gmail.
*/
package net.sergeych.kiloparsec.adapter
import kotlinx.coroutines.channels.Channel

View File

@ -1,3 +1,13 @@
/*
* Copyright (c) 2025. Sergey S. Chernov - All Rights Reserved
*
* You may use, distribute and modify this code under the
* terms of the private license, which you must obtain from the author
*
* To obtain the license, contact the author: https://t.me/real_sergeych or email to
* real dot sergeych at gmail.
*/
package net.sergeych.kiloparsec.adapter
/**

View File

@ -1,3 +1,13 @@
/*
* Copyright (c) 2025. Sergey S. Chernov - All Rights Reserved
*
* You may use, distribute and modify this code under the
* terms of the private license, which you must obtain from the author
*
* To obtain the license, contact the author: https://t.me/real_sergeych or email to
* real dot sergeych at gmail.
*/
package net.sergeych.kiloparsec.adapter
import kotlinx.coroutines.channels.Channel

View File

@ -1,3 +1,13 @@
/*
* Copyright (c) 2025. Sergey S. Chernov - All Rights Reserved
*
* You may use, distribute and modify this code under the
* terms of the private license, which you must obtain from the author
*
* To obtain the license, contact the author: https://t.me/real_sergeych or email to
* real dot sergeych at gmail.
*/
package net.sergeych.kiloparsec.adapter
import io.ktor.client.*
@ -11,10 +21,7 @@ import kotlinx.coroutines.channels.ClosedReceiveChannelException
import kotlinx.coroutines.channels.ClosedSendChannelException
import kotlinx.coroutines.launch
import net.sergeych.crypto2.SigningKey
import net.sergeych.kiloparsec.KiloClient
import net.sergeych.kiloparsec.KiloConnectionData
import net.sergeych.kiloparsec.KiloInterface
import net.sergeych.kiloparsec.RemoteInterface
import net.sergeych.kiloparsec.*
import net.sergeych.mp_logger.LogTag
import net.sergeych.mp_logger.exception
import net.sergeych.mp_logger.info
@ -24,91 +31,106 @@ import net.sergeych.tools.AtomicCounter
private val counter = AtomicCounter()
/**
* Shortcut to create websocket client. Use [websocketTransportDevice] with [KiloClient]
* for fine-grained control.
*/
fun <S> websocketClient(
path: String,
clientInterface: KiloInterface<S> = KiloInterface(),
client: HttpClient = HttpClient { install(WebSockets) },
secretKey: SigningKey? = null,
sessionMaker: () -> S = {
@Suppress("UNCHECKED_CAST")
Unit as S
},
): KiloClient<S> {
return KiloClient(clientInterface, secretKey) {
KiloConnectionData(websocketTransportDevice(path), sessionMaker())
}
}
/**
* Create kilopaarsec transport over websocket (ws or wss).
* @param path websocket path (must start with ws:// or wss:// and contain a path part)
* @client use default [HttpClient], it installs [WebSockets] plugin
*/
fun websocketTransportDevice(
path: String,
client: HttpClient = HttpClient { install(WebSockets) },
): Transport.Device {
var u = Url(path)
if (u.encodedPath.length <= 1)
u = URLBuilder(u).apply {
encodedPath = "/kp"
}.build()
return KiloClient(clientInterface, secretKey) {
val input = Channel<UByteArray>()
val output = Channel<UByteArray>()
val closeHandle = CompletableDeferred<Boolean>()
globalLaunch {
val log = LogTag("KC:${counter.incrementAndGet()}")
client.webSocket({
url.protocol = u.protocol
url.host = u.host
url.port = u.port
url.encodedPath = u.encodedPath
url.parameters.appendAll(u.parameters)
log.info { "kiloparsec server URL: $url" }
}) {
log.info { "connected to the server" }
val input = Channel<UByteArray>()
val output = Channel<UByteArray>()
val closeHandle = CompletableDeferred<Boolean>()
globalLaunch {
val log = LogTag("KC:${counter.incrementAndGet()}")
client.webSocket({
url.protocol = u.protocol
url.host = u.host
url.port = u.port
url.encodedPath = u.encodedPath
url.parameters.appendAll(u.parameters)
log.info { "kiloparsec server URL: $url" }
}) {
log.info { "connected to the server" }
// println("SENDING!!!")
// send("Helluva")
launch {
try {
for (block in output) {
send(block.toByteArray())
launch {
try {
for (block in output) {
send(block.toByteArray())
}
log.info { "input is closed, closing the websocket" }
if (closeHandle.isActive) closeHandle.complete(true)
} catch (_: ClosedSendChannelException) {
log.info { "send channel closed" }
} catch (_: CancellationException) {
} catch (t: Throwable) {
log.info { "unexpected exception in websock sender: ${t.stackTraceToString()}" }
closeHandle.completeExceptionally(t)
}
if (closeHandle.isActive) closeHandle.complete(false)
}
launch {
try {
for (f in incoming) {
if (f is Frame.Binary) {
input.send(f.readBytes().toUByteArray())
} else {
log.warning { "ignoring unexpected frame of type ${f.frameType}" }
}
log.info { "input is closed, closing the websocket" }
if (closeHandle.isActive) closeHandle.complete(true)
} catch (_: ClosedSendChannelException) {
log.info { "send channel closed" }
}
catch(_: CancellationException) {}
catch(t: Throwable) {
log.info { "unexpected exception in websock sender: ${t.stackTraceToString()}" }
closeHandle.completeExceptionally(t)
}
if (closeHandle.isActive) closeHandle.complete(true)
} catch (_: CancellationException) {
if (closeHandle.isActive) closeHandle.complete(false)
} catch (_: ClosedReceiveChannelException) {
log.warning { "receive channel closed unexpectedly" }
if (closeHandle.isActive) closeHandle.complete(false)
} catch (t: Throwable) {
log.exception { "unexpected error" to t }
if (closeHandle.isActive) closeHandle.complete(false)
}
launch {
try {
for (f in incoming) {
if (f is Frame.Binary) {
input.send(f.readBytes().toUByteArray())
} else {
log.warning { "ignoring unexpected frame of type ${f.frameType}" }
}
}
if (closeHandle.isActive) closeHandle.complete(true)
} catch (_: CancellationException) {
if (closeHandle.isActive) closeHandle.complete(false)
} catch (_: ClosedReceiveChannelException) {
log.warning { "receive channel closed unexpectedly" }
if (closeHandle.isActive) closeHandle.complete(false)
} catch (t: Throwable) {
log.exception { "unexpected error" to t }
if (closeHandle.isActive) closeHandle.complete(false)
}
}
if(!closeHandle.await()) {
log.warning { "Client is closing with error" }
throw RemoteInterface.ClosedException()
}
output.close()
input.close()
}
log.info { "closing connection" }
if (!closeHandle.await()) {
log.warning { "Client is closing with error" }
throw RemoteInterface.ClosedException()
}
output.close()
input.close()
}
val device = ProxyDevice(input, output) {
// we need to explicitly close the coroutine job, or it can hang for a long time
// leaking resources.
closeHandle.complete(true)
// job.cancel()
}
KiloConnectionData(device, sessionMaker())
log.info { "closing connection" }
}
val device = ProxyDevice(input, output) {
// we need to explicitly close the coroutine job, or it can hang for a long time
// leaking resources.
closeHandle.complete(true)
// job.cancel()
}
return device
}

View File

@ -1,3 +1,13 @@
/*
* Copyright (c) 2025. Sergey S. Chernov - All Rights Reserved
*
* You may use, distribute and modify this code under the
* terms of the private license, which you must obtain from the author
*
* To obtain the license, contact the author: https://t.me/real_sergeych or email to
* real dot sergeych at gmail.
*/
package net.sergeych.kiloparsec
import kotlinx.serialization.Serializable

View File

@ -1,3 +1,13 @@
/*
* Copyright (c) 2025. Sergey S. Chernov - All Rights Reserved
*
* You may use, distribute and modify this code under the
* terms of the private license, which you must obtain from the author
*
* To obtain the license, contact the author: https://t.me/real_sergeych or email to
* real dot sergeych at gmail.
*/
package net.sergeych.kiloparsec
import kotlinx.coroutines.sync.Mutex

View File

@ -1,3 +1,13 @@
/*
* Copyright (c) 2025. Sergey S. Chernov - All Rights Reserved
*
* You may use, distribute and modify this code under the
* terms of the private license, which you must obtain from the author
*
* To obtain the license, contact the author: https://t.me/real_sergeych or email to
* real dot sergeych at gmail.
*/
import kotlinx.coroutines.test.runTest
import net.sergeych.crypto2.IllegalSignatureException
import net.sergeych.crypto2.SealedBox

View File

@ -1,3 +1,13 @@
/*
* Copyright (c) 2025. Sergey S. Chernov - All Rights Reserved
*
* You may use, distribute and modify this code under the
* terms of the private license, which you must obtain from the author
*
* To obtain the license, contact the author: https://t.me/real_sergeych or email to
* real dot sergeych at gmail.
*/
import kotlinx.coroutines.test.runTest
import kotlinx.datetime.Instant
import net.sergeych.bipack.BipackEncoder

View File

@ -1,3 +1,13 @@
/*
* Copyright (c) 2025. Sergey S. Chernov - All Rights Reserved
*
* You may use, distribute and modify this code under the
* terms of the private license, which you must obtain from the author
*
* To obtain the license, contact the author: https://t.me/real_sergeych or email to
* real dot sergeych at gmail.
*/
class ToolsTest {
// @Test
// fun testRemoceCmd() {

View File

@ -1,3 +1,13 @@
/*
* Copyright (c) 2025. Sergey S. Chernov - All Rights Reserved
*
* You may use, distribute and modify this code under the
* terms of the private license, which you must obtain from the author
*
* To obtain the license, contact the author: https://t.me/real_sergeych or email to
* real dot sergeych at gmail.
*/
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.channels.ReceiveChannel

View File

@ -1,3 +1,13 @@
/*
* Copyright (c) 2025. Sergey S. Chernov - All Rights Reserved
*
* You may use, distribute and modify this code under the
* terms of the private license, which you must obtain from the author
*
* To obtain the license, contact the author: https://t.me/real_sergeych or email to
* real dot sergeych at gmail.
*/
import kotlin.test.fail
inline fun <reified T: Throwable>assertThrows(f: ()->Unit): T {

View File

@ -1,3 +1,13 @@
/*
* Copyright (c) 2025. Sergey S. Chernov - All Rights Reserved
*
* You may use, distribute and modify this code under the
* terms of the private license, which you must obtain from the author
*
* To obtain the license, contact the author: https://t.me/real_sergeych or email to
* real dot sergeych at gmail.
*/
package net.sergeych.kiloparsec.adapter
import io.ktor.server.application.*
@ -10,25 +20,42 @@ import kotlinx.coroutines.channels.ClosedReceiveChannelException
import kotlinx.coroutines.isActive
import kotlinx.coroutines.launch
import net.sergeych.crypto2.SigningKey
import net.sergeych.crypto2.toDump
import net.sergeych.kiloparsec.KiloInterface
import net.sergeych.kiloparsec.KiloServerConnection
import net.sergeych.kiloparsec.RemoteInterface
import net.sergeych.mp_logger.*
import net.sergeych.tools.AtomicCounter
import java.time.Duration
import kotlin.time.Duration.Companion.seconds
/**
* Create a ktor-based websocket server.
* This call install Routing and WebSockets with proper configuration.
*
* The course of action is:
*
* - create LocalInterface and populate it with functionality
* - call this method with localInterface
* - optionally, connect the same interface with TCP or UDP providers on supported platforms,
* in which case it might be useful to hae session creating function [createSession] separate.
*
* _Note_: [KiloInterface] as for now does not contain session creation in it as we suggest
* session could be transport specific.
*
* @param localInterface where the actual work is performed.
* @param path default http path to the websocket.
* @param serverKey optional key to authenticate the connection. If the client specify expected
* server key it should match of connection will not be established.
* @param createSession function to create a server session.
*/
fun <S> Application.setupWebsocketServer(
localInterface: KiloInterface<S>,
path: String = "/kp",
serverKey: SigningKey? = null,
createSession: () -> S,
) {
install(Routing)
install(WebSockets) {
pingPeriod = Duration.ofSeconds(15)
timeout = Duration.ofSeconds(15)
pingPeriod = 60.seconds //Duration.ofSeconds(15)
timeout = 45.seconds
maxFrameSize = Long.MAX_VALUE
masking = false
}
@ -64,12 +91,9 @@ fun <S> Application.setupWebsocketServer(
}
log.debug { "KSC started, looking for incoming frames" }
for (f in incoming) {
log.debug { "incoming frame: ${f.frameType}" }
if (f is Frame.Binary)
try {
input.send(f.readBytes().toUByteArray().also {
log.debug { "in frame\n${it.toDump()}" }
})
input.send(f.readBytes().toUByteArray())
} catch (_: RemoteInterface.ClosedException) {
log.warning { "caught local closed exception (strange!), closing" }
break

View File

@ -1,8 +1,19 @@
/*
* Copyright (c) 2025. Sergey S. Chernov - All Rights Reserved
*
* You may use, distribute and modify this code under the
* terms of the private license, which you must obtain from the author
*
* To obtain the license, contact the author: https://t.me/real_sergeych or email to
* real dot sergeych at gmail.
*/
package net.sergeych.kiloparsec
import assertThrows
import io.ktor.server.engine.*
import io.ktor.server.netty.*
import kotlinx.coroutines.delay
import kotlinx.coroutines.launch
import kotlinx.coroutines.test.runTest
import net.sergeych.crypto2.initCrypto
@ -10,6 +21,7 @@ import net.sergeych.kiloparsec.adapter.setupWebsocketServer
import net.sergeych.kiloparsec.adapter.websocketClient
import net.sergeych.mp_logger.Log
import java.net.InetAddress
import kotlin.random.Random
import kotlin.test.Test
import kotlin.test.assertEquals
import kotlin.test.assertFalse
@ -50,11 +62,12 @@ class ClientTest {
}
}
val ns: NettyApplicationEngine = embeddedServer(Netty, port = 8080, host = "0.0.0.0", module = {
val port = Random.nextInt(8080,9090)
val ns = embeddedServer(Netty, port = port, host = "0.0.0.0", module = {
setupWebsocketServer(serverInterface) { Session() }
}).start(wait = false)
val client = websocketClient<Unit>("ws://localhost:8080/kp")
val client = websocketClient<Unit>("ws://localhost:$port/kp")
val states = mutableListOf<Boolean>()
val collector = launch {
client.connectedStateFlow.collect {
@ -75,6 +88,9 @@ class ClientTest {
}
// connection should now be closed
// the problem is: it needs some unspecified time to close
// as it is async process.
delay(100)
assertFalse { client.connectedStateFlow.value }
// this should be run on automatically reopen connection

View File

@ -1,8 +1,19 @@
/*
* Copyright (c) 2025. Sergey S. Chernov - All Rights Reserved
*
* You may use, distribute and modify this code under the
* terms of the private license, which you must obtain from the author
*
* To obtain the license, contact the author: https://t.me/real_sergeych or email to
* real dot sergeych at gmail.
*/
package net.sergeych.kiloparsec.adapter
import io.ktor.network.selector.*
import io.ktor.network.sockets.*
import io.ktor.utils.io.*
import io.ktor.utils.io.writeByte
import kotlinx.coroutines.*
import kotlinx.coroutines.CancellationException
import kotlinx.coroutines.channels.Channel
@ -12,14 +23,10 @@ import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock
import kotlinx.datetime.Clock
import net.sergeych.kiloparsec.AsyncVarint
import net.sergeych.kiloparsec.KiloClient
import net.sergeych.kiloparsec.KiloServer
import net.sergeych.kiloparsec.LocalInterface
import net.sergeych.kiloparsec.*
import net.sergeych.mp_logger.*
import net.sergeych.mp_tools.globalLaunch
import net.sergeych.tools.AtomicCounter
import net.sergeych.tools.AtomicValue
import kotlin.time.Duration.Companion.seconds
private val logCounter = AtomicCounter(0)
@ -33,10 +40,10 @@ internal val PING_INACTIVITY_TIME = 30.seconds
* Listen for incoming TCP/IP connections on all local interfaces and the specified [port]
* anc create flow of [InetTransportDevice] suitable for [KiloClient].
*/
fun acceptTcpDevice(port: Int,localInterface: String = "0.0.0.0"): Flow<InetTransportDevice> {
fun acceptTcpDevice(port: Int, localInterface: String = "0.0.0.0"): Flow<InetTransportDevice> {
val selectorManager = SelectorManager(Dispatchers.IO)
val serverSocket = aSocket(selectorManager).tcp().bind(localInterface, port)
return flow {
val serverSocket = aSocket(selectorManager).tcp().bind(localInterface, port)
while (true) {
serverSocket.accept().let { sock ->
emit(inetTransportDevice(sock, "srv"))
@ -74,7 +81,7 @@ private fun inetTransportDevice(
val outputBlocks = Channel<UByteArray>(4096)
val log = LogTag("TCPT${logCounter.incrementAndGet()}:$suffix:$networkAddress")
val job = AtomicValue<Job?>(null)
val job = AtomicAsyncValue<Job?>(null)
val sockOutput = sock.openWriteChannel()
val sockInput = runCatching { sock.openReadChannel() }.getOrElse {
@ -82,16 +89,16 @@ private fun inetTransportDevice(
throw IllegalStateException("failed to open read channel")
}
fun stop() {
suspend fun stop() {
job.mutate {
if ( it != null ) {
if (it != null) {
log.debug { "stopping" }
runCatching { inputBlocks.close() }
runCatching { outputBlocks.close() }
// The problem: on mac platofrms closing the socket does not close its input
// and output channels!
runCatching { sockInput.cancel() }
runCatching { sockOutput.close() }
runCatching { sockOutput.flushAndClose() }
if (!sock.isClosed)
runCatching {
log.debug { "closing socket by stop" }
@ -108,46 +115,47 @@ private fun inetTransportDevice(
}
var lastActiveAt = Clock.System.now()
job.value = globalLaunch {
launch {
globalLaunch {
job.reset(globalLaunch {
launch {
log.debug { "opening read channel" }
log.debug { "opening read channel" }
while (isActive && sock.isActive) {
try {
val size = AsyncVarint.decodeUnsigned(sockInput).toInt()
if (size > MAX_TCP_BLOCK_SIZE) // 16M is a max command block
throw ProtocolException("Illegal block size: $size should be < $MAX_TCP_BLOCK_SIZE")
val data = ByteArray(size)
if (size == 0) {
log.debug { "ping received" }
lastActiveAt = Clock.System.now()
} else {
sockInput.readFully(data, 0, size)
inputBlocks.send(data.toUByteArray())
while (isActive && sock.isActive) {
try {
val size = AsyncVarint.decodeUnsigned(sockInput).toInt()
if (size > MAX_TCP_BLOCK_SIZE) // 16M is a max command block
throw ProtocolException("Illegal block size: $size should be < $MAX_TCP_BLOCK_SIZE")
val data = ByteArray(size)
if (size == 0) {
log.debug { "ping received" }
lastActiveAt = Clock.System.now()
} else {
sockInput.readFully(data, 0, size)
inputBlocks.send(data.toUByteArray())
}
} catch (e: ClosedReceiveChannelException) {
log.error { "closed receive channel " }
stop()
break
} catch (_: CancellationException) {
log.error { "cancellation exception " }
break
} catch (e: Exception) {
log.exception { "unexpected exception in TCP socket read" to e }
stop()
break
}
} catch (e: ClosedReceiveChannelException) {
log.error { "closed receive channel " }
stop()
break
} catch (_: CancellationException) {
log.error { "cancellation exception " }
break
} catch (e: Exception) {
log.exception { "unexpected exception in TCP socket read" to e }
stop()
break
}
}
}
})
launch {
val outAccess = Mutex()
var lastSentAt = Clock.System.now()
launch {
while (isActive && sock.isActive) {
delay(500)
val activityTime = if(lastSentAt > lastActiveAt) lastSentAt else lastActiveAt
val activityTime = if (lastSentAt > lastActiveAt) lastSentAt else lastActiveAt
if (Clock.System.now() - activityTime > PING_INACTIVITY_TIME) {
log.debug { "pinging for inactivity" }
val repeat = outAccess.withLock {

View File

@ -1,7 +1,18 @@
/*
* Copyright (c) 2025. Sergey S. Chernov - All Rights Reserved
*
* You may use, distribute and modify this code under the
* terms of the private license, which you must obtain from the author
*
* To obtain the license, contact the author: https://t.me/real_sergeych or email to
* real dot sergeych at gmail.
*/
package net.sergeych.kiloparsec.adapter
import io.ktor.network.sockets.*
import io.ktor.utils.io.core.*
import kotlinx.io.readByteArray
import net.sergeych.crypto2.toDump
import net.sergeych.kiloparsec.adapter.UdpBlock.Companion.CANCEL_BLOCK
import net.sergeych.kiloparsec.adapter.UdpBlock.Companion.ESCAPE_BYTE
@ -97,6 +108,6 @@ sealed class UdpBlock {
}
fun decode(datagram: Datagram) =
decode(datagram.packet.readBytes().toUByteArray())
decode(datagram.packet.readByteArray().toUByteArray())
}
}

View File

@ -1,3 +1,13 @@
/*
* Copyright (c) 2025. Sergey S. Chernov - All Rights Reserved
*
* You may use, distribute and modify this code under the
* terms of the private license, which you must obtain from the author
*
* To obtain the license, contact the author: https://t.me/real_sergeych or email to
* real dot sergeych at gmail.
*/
package net.sergeych.kiloparsec.adapter
import io.ktor.network.sockets.*

View File

@ -1,3 +1,13 @@
/*
* Copyright (c) 2025. Sergey S. Chernov - All Rights Reserved
*
* You may use, distribute and modify this code under the
* terms of the private license, which you must obtain from the author
*
* To obtain the license, contact the author: https://t.me/real_sergeych or email to
* real dot sergeych at gmail.
*/
package net.sergeych.kiloparsec.adapter
import io.ktor.network.selector.*
@ -87,7 +97,7 @@ fun acceptUdpDevice(
* the module automatically issues pings on inactivity when there is no data often enough
* to maintain the connection open.
*/
fun connectUdpDevice(
suspend fun connectUdpDevice(
hostPort: String,
maxInactivityTimeout: Duration = 2.minutes,
) = connectUdpDevice(hostPort.toNetworkAddress(), maxInactivityTimeout)
@ -107,16 +117,16 @@ fun connectUdpDevice(
* the module automatically issues pings on inactivity when there is no data often enough
* to maintain the connection open.
*/
fun connectUdpDevice(
suspend fun connectUdpDevice(
addr: NetworkAddress,
maxInactivityTimeout: Duration = 2.minutes,
): InetTransportDevice {
val selectorManager = SelectorManager(Dispatchers.IO)
val remoteAddress = InetSocketAddress(addr.host, addr.port)
val socket = aSocket(selectorManager).udp().connect(remoteAddress)
val done = CompletableDeferred<Unit>()
val socket = aSocket(selectorManager).udp().connect(remoteAddress)
val transport = UdpSocketTransport(object : UdpConnector {
override suspend fun sendBlock(block: UdpBlock, toAddress: SocketAddress) {
socket.send(block.toDatagram(remoteAddress))

View File

@ -1,3 +1,13 @@
/*
* Copyright (c) 2025. Sergey S. Chernov - All Rights Reserved
*
* You may use, distribute and modify this code under the
* terms of the private license, which you must obtain from the author
*
* To obtain the license, contact the author: https://t.me/real_sergeych or email to
* real dot sergeych at gmail.
*/
package net.sergeych.kiloparsec.adapter
import io.ktor.network.selector.*
@ -14,6 +24,7 @@ import net.sergeych.mp_logger.LogTag
import net.sergeych.mp_logger.Loggable
import net.sergeych.mp_logger.debug
import net.sergeych.mp_logger.exception
import net.sergeych.mp_tools.globalDefer
import kotlin.time.Duration
import kotlin.time.Duration.Companion.minutes
@ -51,7 +62,9 @@ class UdpServer(val port: Int, localInterface: String = "0.0.0.0", maxInactivity
private val access = Mutex()
private val selectorManager = SelectorManager(Dispatchers.IO)
private val serverSocket = aSocket(selectorManager).udp().bind(InetSocketAddress(localInterface, port))
private val serverSocket = globalDefer {
aSocket(selectorManager).udp().bind(InetSocketAddress(localInterface, port))
}
override suspend fun disconnectClient(address: SocketAddress) {
access.withLock { sessions.remove(address) }
@ -65,7 +78,7 @@ class UdpServer(val port: Int, localInterface: String = "0.0.0.0", maxInactivity
flow {
while (true) {
try {
val datagram = serverSocket.receive()
val datagram = serverSocket.await().receive()
val block = UdpBlock.decode(datagram)
val remoteAddress = datagram.address
@ -97,10 +110,10 @@ class UdpServer(val port: Int, localInterface: String = "0.0.0.0", maxInactivity
}
override suspend fun sendBlock(block: UdpBlock, toAddress: SocketAddress) {
serverSocket.send(block.toDatagram(toAddress))
serverSocket.await().send(block.toDatagram(toAddress))
}
val isClosed: Boolean get() = serverSocket.isClosed
suspend fun isClosed(): Boolean = serverSocket.await().isClosed
/**
* Close the UDP server. Calling it will cause:
@ -113,8 +126,8 @@ class UdpServer(val port: Int, localInterface: String = "0.0.0.0", maxInactivity
*/
suspend fun close() {
access.withLock {
if (!isClosed) {
runCatching { serverSocket.close() }
if (!isClosed()) {
runCatching { serverSocket.await().close() }
}
}
while (sessions.isNotEmpty()) {

View File

@ -1,3 +1,13 @@
/*
* Copyright (c) 2025. Sergey S. Chernov - All Rights Reserved
*
* You may use, distribute and modify this code under the
* terms of the private license, which you must obtain from the author
*
* To obtain the license, contact the author: https://t.me/real_sergeych or email to
* real dot sergeych at gmail.
*/
package net.sergeych.kiloparsec.adapter
import io.ktor.network.sockets.*

View File

@ -1,3 +1,13 @@
/*
* Copyright (c) 2025. Sergey S. Chernov - All Rights Reserved
*
* You may use, distribute and modify this code under the
* terms of the private license, which you must obtain from the author
*
* To obtain the license, contact the author: https://t.me/real_sergeych or email to
* real dot sergeych at gmail.
*/
package net.sergeych.kiloparsec.adapter
import assertThrows
@ -10,7 +20,7 @@ import kotlin.test.Test
import kotlin.test.assertEquals
import kotlin.test.assertIs
class InternetrTest {
class InternetTest {
class TestException : Exception("test1")
@Test
@ -26,9 +36,11 @@ class InternetrTest {
val cmdSave by command<String, Unit>()
val cmdLoad by command<Unit, String>()
val cmdDrop by command<Unit, Unit>()
val cmdPing by command<String, String>()
val cmdException by command<Unit, Unit>()
val cmdCallClient by command<String, String>()
val cli = KiloInterface<Session>().apply {
val serverInterface = KiloInterface<Session>().apply {
registerError { TestException() }
onConnected { session.data = "start" }
on(cmdSave) { session.data = it }
@ -41,15 +53,26 @@ class InternetrTest {
on(cmdDrop) {
throw LocalInterface.BreakConnectionException()
}
on(cmdCallClient) {
remote.call(cmdPing, it)
}
}
val server = KiloServer(cli, acceptTcpDevice(port)) {
val server = KiloServer(serverInterface, acceptTcpDevice(port)) {
Session("unknown")
}
val client = KiloClient<Unit>() {
addErrors(cli)
data class LocalSession(val localFoo: String)
val client = KiloClient {
addErrors(serverInterface)
session { LocalSession("unknown") }
// TODO: add register error variant
connect { connectTcpDevice("localhost:$port") }
local {
on(cmdPing) {
"pong! $it"
}
}
}
assertEquals("start", client.call(cmdLoad))
@ -65,7 +88,7 @@ class InternetrTest {
// reconnect?
assertEquals("start", client.call(cmdLoad))
assertEquals("pong! 42", client.call(cmdCallClient, "42"))
server.close()
}