0.3.1 release: async pushes and better binary format

This commit is contained in:
Sergey Chernov 2024-08-10 11:14:13 +02:00
parent 8a21a836e5
commit d6f257de14
26 changed files with 290 additions and 93 deletions

8
.idea/artifacts/kiloparsec_js_0_2_4.xml generated Normal file
View File

@ -0,0 +1,8 @@
<component name="ArtifactManager">
<artifact type="jar" name="kiloparsec-js-0.2.4">
<output-path>$PROJECT_DIR$/build/libs</output-path>
<root id="archive" name="kiloparsec-js-0.2.4.jar">
<element id="module-output" name="kiloparsec.jsMain" />
</root>
</artifact>
</component>

8
.idea/artifacts/kiloparsec_js_0_2_5.xml generated Normal file
View File

@ -0,0 +1,8 @@
<component name="ArtifactManager">
<artifact type="jar" name="kiloparsec-js-0.2.5">
<output-path>$PROJECT_DIR$/build/libs</output-path>
<root id="archive" name="kiloparsec-js-0.2.5.jar">
<element id="module-output" name="kiloparsec.jsMain" />
</root>
</artifact>
</component>

View File

@ -0,0 +1,8 @@
<component name="ArtifactManager">
<artifact type="jar" name="kiloparsec-js-0.2.5-SNAPSHOT">
<output-path>$PROJECT_DIR$/build/libs</output-path>
<root id="archive" name="kiloparsec-js-0.2.5-SNAPSHOT.jar">
<element id="module-output" name="kiloparsec.jsMain" />
</root>
</artifact>
</component>

6
.idea/artifacts/kiloparsec_js_0_2_6.xml generated Normal file
View File

@ -0,0 +1,6 @@
<component name="ArtifactManager">
<artifact type="jar" name="kiloparsec-js-0.2.6">
<output-path>$PROJECT_DIR$/build/libs</output-path>
<root id="archive" name="kiloparsec-js-0.2.6.jar" />
</artifact>
</component>

View File

@ -0,0 +1,8 @@
<component name="ArtifactManager">
<artifact type="jar" name="kiloparsec-js-0.3.1-SNAPSHOT">
<output-path>$PROJECT_DIR$/build/libs</output-path>
<root id="archive" name="kiloparsec-js-0.3.1-SNAPSHOT.jar">
<element id="module-output" name="kiloparsec.jsMain" />
</root>
</artifact>
</component>

View File

@ -0,0 +1,8 @@
<component name="ArtifactManager">
<artifact type="jar" name="kiloparsec-jvm-0.2.4">
<output-path>$PROJECT_DIR$/build/libs</output-path>
<root id="archive" name="kiloparsec-jvm-0.2.4.jar">
<element id="module-output" name="kiloparsec.jvmMain" />
</root>
</artifact>
</component>

View File

@ -0,0 +1,8 @@
<component name="ArtifactManager">
<artifact type="jar" name="kiloparsec-jvm-0.2.5">
<output-path>$PROJECT_DIR$/build/libs</output-path>
<root id="archive" name="kiloparsec-jvm-0.2.5.jar">
<element id="module-output" name="kiloparsec.jvmMain" />
</root>
</artifact>
</component>

View File

@ -0,0 +1,8 @@
<component name="ArtifactManager">
<artifact type="jar" name="kiloparsec-jvm-0.2.5-SNAPSHOT">
<output-path>$PROJECT_DIR$/build/libs</output-path>
<root id="archive" name="kiloparsec-jvm-0.2.5-SNAPSHOT.jar">
<element id="module-output" name="kiloparsec.jvmMain" />
</root>
</artifact>
</component>

View File

@ -0,0 +1,6 @@
<component name="ArtifactManager">
<artifact type="jar" name="kiloparsec-jvm-0.2.6">
<output-path>$PROJECT_DIR$/build/libs</output-path>
<root id="archive" name="kiloparsec-jvm-0.2.6.jar" />
</artifact>
</component>

View File

@ -0,0 +1,8 @@
<component name="ArtifactManager">
<artifact type="jar" name="kiloparsec-jvm-0.3.1-SNAPSHOT">
<output-path>$PROJECT_DIR$/build/libs</output-path>
<root id="archive" name="kiloparsec-jvm-0.3.1-SNAPSHOT.jar">
<element id="module-output" name="kiloparsec.jvmMain" />
</root>
</artifact>
</component>

6
.idea/scala_compiler.xml generated Normal file
View File

@ -0,0 +1,6 @@
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="ScalaCompilerConfiguration">
<option name="separateProdTestSources" value="false" />
</component>
</project>

View File

@ -1,15 +1,23 @@
# Kiloparsec # Kiloparsec
__Recommended version is `0.3.1`: to keep the code compatible with current and further versions we
ask to upgrade to `0.3.1` at least.__
The new generation of __PARanoid SECurity__ protocol, advanced, faster, more secure. It also allows connecting any " The new generation of __PARanoid SECurity__ protocol, advanced, faster, more secure. It also allows connecting any "
block device" transport to the same local interface. Out if the box it block device" transport to the same local interface. Out if the box it
provides the following transports: provides the following transports:
| name | JVM | JS | native | | name | JVM | JS | native |
|----------------|-----|----|----------| |----------------|-----|----|-----------|
| TCP/IP server | ✓ | | β @0.2.6 | | TCP/IP server | ✓ | | >= 0.2.6 |
| TCP/IP client | ✓ | | β @0.2.6 | | TCP/IP client | ✓ | | >= @0.2.6 |
| Websock server | ✓ | | | | Websock server | ✓ | | |
| Websock client | ✓ | ✓ | ✓ | | Websock client | ✓ | ✓ | ✓ |
### Note on version compatibility
Protocols >= 0.3.0 are not binary compatible with previous version due to more compact binary
format. The format from 0.3.0 onwards is supposed to keep compatible.
### Supported native targets ### Supported native targets
@ -59,7 +67,7 @@ It could be, depending on your project structure, something like:
```kotlin ```kotlin
val commonMain by getting { val commonMain by getting {
dependencies { dependencies {
api("net.sergeych:kiloparsec:0.2.6") api("net.sergeych:kiloparsec:0.3.1")
} }
} }
``` ```
@ -118,7 +126,7 @@ assertEquals(FooArgs("bar", 117), client.call(cmdGetFoo))
## Create ktor-based server ## Create ktor-based server
Normally server side needs some session. It is convenient and avoid sending repeating data on each request speeding up Normally server side needs some session. It is convenient and avoid sending repeating data on each request speeding up
the protocol. With KILOPARSEC it is rather basic operation:\ the protocol. With KILOPARSEC it is rather basic operation:
~~~kotlin ~~~kotlin
// Our session just keeps Foo for cmd{Get|Set}Foo: // Our session just keeps Foo for cmd{Get|Set}Foo:
@ -143,9 +151,32 @@ val ns: NettyApplicationEngine = embeddedServer(Netty, port = 8080, host = "0.0.
setupWebsocketServer(serverInterface) { Session() } setupWebsocketServer(serverInterface) { Session() }
}).start(wait = false) }).start(wait = false)
~~~ ~~~
### TCP/IP client and server
Using plain TCP/IP is even simpler, and it works way faster than websocket one, and is _the same
protected as `wss://` variant abovve due to same kiloparsec encryption in both cases. Still, a TCP/IP
client is not available in Javascript browser targets and custom TCP ports could often be blocked by firewalls.
Documentation is available in samples here:
- [TCP/IP server creation](https://code.sergeych.net/docs/kiloparsec/kiloparsec/net.sergeych.kiloparsec/-kilo-server/index.html)
- [TCP/IP client](https://code.sergeych.net/docs/kiloparsec/kiloparsec/net.sergeych.kiloparsec/-kilo-client/index.html)
In short, there are two functions that implements aysnchronous TCP/IP transport on all platforms buy JS:
- [acceptTcpDevice](https://code.sergeych.net/docs/kiloparsec/kiloparsec/net.sergeych.kiloparsec.adapter/accept-tcp-device.html?query=fun%20acceptTcpDevice(port:%20Int):%20Flow%3CInetTransportDevice%3E) to create a server
- [connectTcpDevice](https://code.sergeych.net/docs/kiloparsec/kiloparsec/net.sergeych.kiloparsec.adapter/connect-tcp-device.html) to connect to the server
### Reusing code between servers
The same instance of the [KiloInterface](https://code.sergeych.net/docs/kiloparsec/kiloparsec/net.sergeych.kiloparsec/-kilo-interface/index.html?query=open%20class%20KiloInterface%3CS%3E%20:%20LocalInterface%3CKiloScope%3CS%3E%3E) could easily be reused with all instances of servers with different protocols.
This is a common proactive to create a business logic in a `KiloInterface`, then create a TCP/IP and Websocket servers passing the same instance of the logic to both.
## See also: ## See also:
- [Source documentation](https://code.sergeych.net/docs/kiloparsec/) - [Source documentation](https://code.sergeych.net/docs/kiloparsec/)

View File

@ -6,7 +6,7 @@ plugins {
} }
group = "net.sergeych" group = "net.sergeych"
version = "0.2.6" version = "0.3.1"
repositories { repositories {
mavenCentral() mavenCentral()

View File

@ -19,6 +19,11 @@ inline fun <reified A, reified R> command(overrideName: String? = null): Command
) )
} }
/**
* Declare a Push: Unit-returning command usually used with [RemoteInterface.push]
*/
inline fun <reified A> push(overrideName: String? = null): CommandDelegate<A, Unit> = command(overrideName)
/** /**
* Delegate to create [Command] via property * Delegate to create [Command] via property
*/ */

View File

@ -17,10 +17,33 @@ import net.sergeych.mp_logger.exception
import net.sergeych.mp_tools.globalLaunch import net.sergeych.mp_tools.globalLaunch
/** /**
* The auto-connecting client that reconnects to the kiloparsec server * The auto-connecting client that reconnects to the kiloparsec server,
* [KiloServer],
* and maintain connection state flow. Client factory launches a disconnected * and maintain connection state flow. Client factory launches a disconnected
* set of coroutines to support automatic reconnection, so you _must_ [close] * set of coroutines to support automatic reconnection, so you _must_ [close]
* it manually when it is not needed, otherwise it will continue to reconnect. * it manually when it is unnecessary, otherwise it will continue to reconnect.
*
* ## Usage
*
* Suppose we have TCP/IP server as in the [KiloServer] usage sample. Then we can connect
* to it providing TCP/IP connector like:
*
* ```kotlin
* val client = KiloClient<Unit>() {
* connect { connectTcpDevice("localhost:$port") }
* }
*
* // now we can invoke remote commands:
* assertEquals("unknown", client.call(cmdLoad))
*
* client.call(cmdSave, "foobar")
* assertEquals("foobar", client.call(cmdLoad))
* ```
*
* ## See also
*
* [KiloServer]
*
*/ */
class KiloClient<S>( class KiloClient<S>(
val localInterface: KiloInterface<S>, val localInterface: KiloInterface<S>,
@ -28,7 +51,6 @@ class KiloClient<S>(
connectionDataFactory: ConnectionDataFactory<S>, connectionDataFactory: ConnectionDataFactory<S>,
) : RemoteInterface, ) : RemoteInterface,
Loggable by LogTag("CLIF") { Loggable by LogTag("CLIF") {
val _state = MutableStateFlow(false) val _state = MutableStateFlow(false)
/** /**
@ -94,6 +116,15 @@ class KiloClient<S>(
throw t throw t
} }
override suspend fun <A> push(cmd: Command<A, Unit>, args: A) {
try {
deferredClient.await().push(cmd, args)
} catch (t: RemoteInterface.ClosedException) {
resetDeferredClient()
throw t
}
}
/** /**
* Current session token. This is a per-connection unique random value same on the client and server part so * Current session token. This is a per-connection unique random value same on the client and server part so
* it could be used as a nonce to pair MITM and like attacks, be sure that the server is actually * it could be used as a nonce to pair MITM and like attacks, be sure that the server is actually

View File

@ -100,4 +100,8 @@ class KiloClientConnection<S>(
suspend fun token() = deferredParams.await().token suspend fun token() = deferredParams.await().token
override suspend fun <A, R> call(cmd: Command<A, R>, args: A): R = override suspend fun <A, R> call(cmd: Command<A, R>, args: A): R =
kiloRemoteInterface.await().call(cmd, args) kiloRemoteInterface.await().call(cmd, args)
override suspend fun <A> push(cmd: Command<A, Unit>, args: A) {
kiloRemoteInterface.await().push(cmd, args)
}
} }

View File

@ -4,7 +4,13 @@ package net.sergeych.kiloparsec
* The local interface to provide functions, register errors for Kiloparsec users. Use it * The local interface to provide functions, register errors for Kiloparsec users. Use it
* with [KiloClient], [KiloClientConnection], [KiloServerConnection], etc. * with [KiloClient], [KiloClientConnection], [KiloServerConnection], etc.
* *
* BAse implementation registers relevant exceptions. * Base class implementation does the following:
*
* - It registers common exceptions from [RemoteInterface] and kotlin/java `IllegalArgumentException` and
* `IllegalStateException`
* - It provides [onConnected] handler
*
* See [KiloServer] for usage sample.
*/ */
open class KiloInterface<S> : LocalInterface<KiloScope<S>>() { open class KiloInterface<S> : LocalInterface<KiloScope<S>>() {

View File

@ -35,5 +35,10 @@ class KiloRemoteInterface<S>(
else -> throw RemoteInterface.Exception("unexpected block type: $block") else -> throw RemoteInterface.Exception("unexpected block type: $block")
} }
} }
override suspend fun <A> push(cmd: Command<A, Unit>, args: A) {
val params = deferredParams.await()
params.transport.call(L0Call, params.encrypt(cmd.packCall(args)))
}
} }

View File

@ -91,6 +91,9 @@ private val instances = AtomicCounter()
* Session("unknown") * Session("unknown")
* } * }
* ``` * ```
*
* See [KiloClient] to connect to the server.
*
* @param S the type of the server session object, returned by [sessionBuilder]. See above * @param S the type of the server session object, returned by [sessionBuilder]. See above
* @param clientInterface the interface available for remote calls * @param clientInterface the interface available for remote calls
* @param connections flow of incoming connections. Server stops when the flow is fully collected (normally * @param connections flow of incoming connections. Server stops when the flow is fully collected (normally

View File

@ -94,4 +94,8 @@ class KiloServerConnection<S>(
override suspend fun <A, R> call(cmd: Command<A, R>, args: A): R { override suspend fun <A, R> call(cmd: Command<A, R>, args: A): R {
return kiloRemoteInterface.await().call(cmd, args) return kiloRemoteInterface.await().call(cmd, args)
} }
override suspend fun <A> push(cmd: Command<A, Unit>, args: A) {
kiloRemoteInterface.await().push(cmd, args)
}
} }

View File

@ -50,4 +50,18 @@ interface RemoteInterface {
* Call the remote procedure with specified args and return its result * Call the remote procedure with specified args and return its result
*/ */
suspend fun <A, R> call(cmd: Command<A, R>, args: A): R suspend fun <A, R> call(cmd: Command<A, R>, args: A): R
/**
* Push the notification without waiting for reception or processing.
* It returns immediately after sending data to the transport (e.g., to the network).
* Use [call] if it is necessary to wait until the command will be received and processed by the remote.
*/
suspend fun <A> push(cmd: Command<A, Unit>, args: A)
/**
* Push the command with no args.
* It returns immediately after sending data to the transport (e.g., to the network).
* Use [call] if it is necessary to wait until the command will be received and processed by the remote.
*/
suspend fun push(cmd: Command<Unit,Unit>) = push(cmd,Unit)
} }

View File

@ -15,6 +15,7 @@ import kotlinx.serialization.descriptors.SerialDescriptor
import kotlinx.serialization.encoding.Decoder import kotlinx.serialization.encoding.Decoder
import kotlinx.serialization.encoding.Encoder import kotlinx.serialization.encoding.Encoder
import kotlinx.serialization.serializer import kotlinx.serialization.serializer
import net.sergeych.bipack.Unsigned
import net.sergeych.crypto2.toDump import net.sergeych.crypto2.toDump
import net.sergeych.kiloparsec.Transport.Device import net.sergeych.kiloparsec.Transport.Device
import net.sergeych.mp_logger.* import net.sergeych.mp_logger.*
@ -63,7 +64,12 @@ class Transport<S>(
@Serializable(TransportBlockSerializer::class) @Serializable(TransportBlockSerializer::class)
sealed class Block { sealed class Block {
@Serializable @Serializable
data class Call(val id: UInt, val name: String, val packedArgs: UByteArray) : Block() { data class Call(
@Unsigned
val id: UInt,
val name: String,
val packedArgs: UByteArray
) : Block() {
override fun equals(other: Any?): Boolean { override fun equals(other: Any?): Boolean {
if (this === other) return true if (this === other) return true
if (other !is Call) return false if (other !is Call) return false
@ -83,10 +89,10 @@ class Transport<S>(
} }
@Serializable @Serializable
data class Response(val forId: UInt, val packedResult: UByteArray) : Block() data class Response(@Unsigned val forId: UInt, val packedResult: UByteArray) : Block()
@Serializable @Serializable
data class Error(val forId: UInt, val code: String, val text: String? = null, val extra: UByteArray? = null) : data class Error(@Unsigned val forId: UInt, val code: String, val text: String? = null, val extra: UByteArray? = null) :
Block() { Block() {
val message by lazy { text ?: "remote exception: $code" } val message by lazy { text ?: "remote exception: $code" }
} }
@ -98,7 +104,8 @@ class Transport<S>(
var isClosed: Boolean = false var isClosed: Boolean = false
/** /**
* Send a call block for a command and packed args and return packed result if it is not an error * Send a call block for a command and packed args and return packed result if it is not an error. It suspends
* until receiving answer from the remote side, even if returns `Unit`.
* @throws RemoteInterface.RemoteException if the remote call caused an exception. Normally use [call] instead. * @throws RemoteInterface.RemoteException if the remote call caused an exception. Normally use [call] instead.
* @throws RemoteInterface.ClosedException * @throws RemoteInterface.ClosedException
*/ */
@ -111,12 +118,13 @@ class Transport<S>(
// We need to shield calls and lastID with mutex, but nothing more: // We need to shield calls and lastID with mutex, but nothing more:
access.withLock { access.withLock {
if (isClosed) throw RemoteInterface.ClosedException() if (isClosed) throw RemoteInterface.ClosedException()
// the order is important: first id in use MUST BE >= 1, not zero:
b = Block.Call(++lastId, name, packedArgs) b = Block.Call(++lastId, name, packedArgs)
calls[b.id] = deferred calls[b.id] = deferred
} }
// now we have mutex freed so we can call: // now we have mutex freed so we can call:
val r = runCatching { device.output.send(pack(b).also { debug { ">>>\n${it.toDump()}" } }) } val r = runCatching { device.output.send(pack(b).also { debug { ">>>\n${it.toDump()}" } }) }
if (!r.isSuccess) { if (!r.isSuccess) {
r.exceptionOrNull()?.let { r.exceptionOrNull()?.let {
exception { "failed to send output block" to it } exception { "failed to send output block" to it }
@ -131,6 +139,25 @@ class Transport<S>(
return deferred.await() return deferred.await()
} }
/**
* Send a call block for a command and packed args and return packed result if it is not an error. It suspends
* until receiving answer from the remote side, even if returns `Unit`.
* @throws RemoteInterface.RemoteException if the remote call caused an exception. Normally use [call] instead.
* @throws RemoteInterface.ClosedException
*/
private suspend fun sendPushBlock(name: String, packedArgs: UByteArray) {
if (isClosed) throw RemoteInterface.ClosedException()
// All push blocks have the same id == 0:
val b = Block.Call(0u, name, packedArgs)
val r = runCatching { device.output.send(pack(b).also { debug { ">>$\n${it.toDump()}" } }) }
when(val e = r.exceptionOrNull()) {
is RemoteInterface.ClosedException, is CancellationException, is RemoteInterface.RemoteException
-> throw e
else -> throw RemoteInterface.ClosedException()
}
}
/** /**
* Call the remote procedure with specified args and return its result * Call the remote procedure with specified args and return its result
*/ */
@ -139,6 +166,10 @@ class Transport<S>(
return unpack(cmd.resultSerializer, result) return unpack(cmd.resultSerializer, result)
} }
override suspend fun <A>push(cmd: Command<A,Unit>,args: A) {
sendPushBlock(cmd.name, pack(cmd.argsSerializer, args))
}
/** /**
* Start running the transport. This function suspends until the transport is closed * Start running the transport. This function suspends until the transport is closed
* normally or by error. If you need to cancel it prematurely, cancel the coroutine * normally or by error. If you need to cancel it prematurely, cancel the coroutine
@ -163,7 +194,7 @@ class Transport<S>(
warning { "decoded error: ${error::class.simpleName}: $error" } warning { "decoded error: ${error::class.simpleName}: $error" }
calls.remove(b.forId)?.completeExceptionally(localInterface.decodeError(b)) calls.remove(b.forId)?.completeExceptionally(localInterface.decodeError(b))
?: warning { "error handler not found for ${b.forId}" } ?: warning { "error handler not found for ${b.forId}" }
info { "error processed"} info { "error processed" }
} }
is Block.Response -> access.withLock { is Block.Response -> access.withLock {
@ -176,17 +207,21 @@ class Transport<S>(
is Block.Call -> launch { is Block.Call -> launch {
try { try {
send( if (b.id == 0u)
Block.Response( // Command does not waits return
b.id, localInterface.execute(commandContext, b.name, b.packedArgs)
localInterface.execute(commandContext, b.name, b.packedArgs) else
send(
Block.Response(
b.id,
localInterface.execute(commandContext, b.name, b.packedArgs)
)
) )
)
} catch (x: LocalInterface.BreakConnectionException) { } catch (x: LocalInterface.BreakConnectionException) {
// handler forced close // handler forced close
warning { "handler requested closing of the connection (${x.flushSendQueue}"} warning { "handler requested closing of the connection (${x.flushSendQueue}" }
isClosed = true isClosed = true
if( x.flushSendQueue ) device.flush() if (x.flushSendQueue) device.flush()
device.close() device.close()
} catch (x: RemoteInterface.RemoteException) { } catch (x: RemoteInterface.RemoteException) {
send(Block.Error(b.id, x.code, x.text, x.extra)) send(Block.Error(b.id, x.code, x.text, x.extra))
@ -196,26 +231,23 @@ class Transport<S>(
.also { debug { "command executed: ${b.name}" } } .also { debug { "command executed: ${b.name}" } }
} }
} }
debug { "=---------------------------------------------"} debug { "=---------------------------------------------" }
} }
debug { "input step performed closed=$isClosed active=$isActive"} debug { "input step performed closed=$isClosed active=$isActive" }
} catch (_: ClosedSendChannelException) { } catch (_: ClosedSendChannelException) {
info { "closed send channel" } info { "closed send channel" }
isClosed = true isClosed = true
} catch (_: ClosedReceiveChannelException) { } catch (_: ClosedReceiveChannelException) {
info { "closed receive channel"} info { "closed receive channel" }
isClosed = true isClosed = true
} } catch (cce: LocalInterface.BreakConnectionException) {
catch(cce: LocalInterface.BreakConnectionException) { info { "closing connection by local request ($cce)" }
info { "closing connection by local request ($cce)"}
device.close() device.close()
} } catch (t: RemoteInterface.ClosedException) {
catch(t: RemoteInterface.ClosedException) {
// it is ok: we just exit the coroutine normally // it is ok: we just exit the coroutine normally
// and mark we're closing // and mark we're closing
isClosed = true isClosed = true
} } catch (_: CancellationException) {
catch (_: CancellationException) {
info { "loop is cancelled with CancellationException" } info { "loop is cancelled with CancellationException" }
isClosed = true isClosed = true
} catch (t: Throwable) { } catch (t: Throwable) {
@ -226,7 +258,7 @@ class Transport<S>(
} }
debug { "leaving transport loop" } debug { "leaving transport loop" }
access.withLock { access.withLock {
debug { "access lock obtained"} debug { "access lock obtained" }
isClosed = true isClosed = true
debug { "closing device $device, calls in queue ${calls.size}" } debug { "closing device $device, calls in queue ${calls.size}" }
runCatching { device.close() } runCatching { device.close() }
@ -243,8 +275,7 @@ class Transport<S>(
private suspend fun send(block: Block) { private suspend fun send(block: Block) {
try { try {
device.output.send(pack(block)) device.output.send(pack(block))
} } catch (_: ClosedSendChannelException) {
catch(_: ClosedSendChannelException) {
throw RemoteInterface.ClosedException() throw RemoteInterface.ClosedException()
} }
} }
@ -275,7 +306,7 @@ object TransportBlockSerializer : KSerializer<Transport.Block> {
override fun deserialize(decoder: Decoder): Transport.Block = override fun deserialize(decoder: Decoder): Transport.Block =
when( val id = decoder.decodeByte().toInt()) { when (val id = decoder.decodeByte().toInt()) {
0 -> decoder.decodeSerializableValue(serializer<Transport.Block.Call>()) 0 -> decoder.decodeSerializableValue(serializer<Transport.Block.Call>())
1 -> decoder.decodeSerializableValue(serializer<Transport.Block.Error>()) 1 -> decoder.decodeSerializableValue(serializer<Transport.Block.Error>())
2 -> decoder.decodeSerializableValue(serializer<Transport.Block.Response>()) 2 -> decoder.decodeSerializableValue(serializer<Transport.Block.Response>())

View File

@ -11,46 +11,3 @@ data class NetworkAddress(
return "$host:$port" return "$host:$port"
} }
} }
//
///**
// * Multiplatform datagram abstraction
// */
//interface Datagram {
// /**
// * Received message
// */
// val message: UByteArray
//
// /**
// * Address from where the message was sent
// */
// val address: NetworkAddress
//}
//
//@OptIn(ExperimentalStdlibApi::class)
//interface DatagramConnector: AutoCloseable {
//
// val incoming: ReceiveChannel<Datagram>
// suspend fun send(message: UByteArray, networkAddress: NetworkAddress)
// @Suppress("unused")
// suspend fun send(message: UByteArray, datagramAddress: String) {
// send(message, datagramAddress.toNetworkAddress())
// }
//
// suspend fun send(message: UByteArray,host: String,port: Int) =
// send(message, NetworkAddress(host,port))
// override fun close()
//}
//
//expect fun NetworkAddress(host: String,port: Int): NetworkAddress
//
//fun String.toNetworkAddress() : NetworkAddress {
// val (host, port) = this.split(":").map { it.trim()}
// return NetworkAddress(host, port.toInt())
//}
//
//expect fun acceptTcpDevice(port: Int): Flow<InetTransportDevice>
//
//expect suspend fun connectTcpDevice(address: NetworkAddress): InetTransportDevice
//
//suspend fun connectTcpDevice(address: String) = connectTcpDevice(address.toNetworkAddress())

View File

@ -184,10 +184,16 @@ class TransportTest {
val cmdRemoteExceptionTest by command<Unit, String>() val cmdRemoteExceptionTest by command<Unit, String>()
val cmdBreak by command<Unit, Unit>() val cmdBreak by command<Unit, Unit>()
val cmdPushServer by push<String>()
val pushedFromServer = CompletableDeferred<String>()
val serverInterface = KiloInterface<String>().apply { val serverInterface = KiloInterface<String>().apply {
on(cmdPing) { on(cmdPing) {
"pong! [$it]" "pong! [$it]"
} }
on(cmdPushServer) {
pushedFromServer.complete(it)
}
on(cmdGetToken) { on(cmdGetToken) {
sessionToken sessionToken
} }
@ -249,11 +255,17 @@ class TransportTest {
assertEquals("client pong: foo", kiloServerConnection.call(cmdPing, "foo")) assertEquals("client pong: foo", kiloServerConnection.call(cmdPing, "foo"))
assertEquals("server push: bar", kiloServerConnection.call(cmdPush, "bar")) assertEquals("server push: bar", kiloServerConnection.call(cmdPush, "bar"))
client.push(cmdPushServer, "42")
assertEquals("**-s1-c1-s2-c2", client.call(cmdChainCallServer1, "**")) assertEquals("**-s1-c1-s2-c2", client.call(cmdChainCallServer1, "**"))
assertThrows<TestException> { client.call(cmdException) } assertThrows<TestException> { client.call(cmdException) }
assertEquals("ok: te-local", client.call(cmdRemoteExceptionTest)) assertEquals("ok: te-local", client.call(cmdRemoteExceptionTest))
// wait for push to be received and check
assertEquals("42", pushedFromServer.await())
assertThrows<RemoteInterface.ClosedException> { assertThrows<RemoteInterface.ClosedException> {
client.call(cmdBreak) client.call(cmdBreak)
} }

View File

@ -13,6 +13,8 @@ import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock import kotlinx.coroutines.sync.withLock
import kotlinx.datetime.Clock import kotlinx.datetime.Clock
import net.sergeych.kiloparsec.AsyncVarint import net.sergeych.kiloparsec.AsyncVarint
import net.sergeych.kiloparsec.KiloClient
import net.sergeych.kiloparsec.KiloServer
import net.sergeych.kiloparsec.LocalInterface import net.sergeych.kiloparsec.LocalInterface
import net.sergeych.mp_logger.* import net.sergeych.mp_logger.*
import net.sergeych.mp_tools.globalLaunch import net.sergeych.mp_tools.globalLaunch
@ -27,6 +29,10 @@ class ProtocolException(text: String, cause: Throwable? = null) : RuntimeExcepti
const val MAX_TCP_BLOCK_SIZE = 16776216 const val MAX_TCP_BLOCK_SIZE = 16776216
val PING_INACTIVITY_TIME = 30.seconds val PING_INACTIVITY_TIME = 30.seconds
/**
* Listen for incoming TCP/IP connections on all local interfaces and the specified [port]
* anc create flow of [InetTransportDevice] suitable for [KiloClient].
*/
fun acceptTcpDevice(port: Int): Flow<InetTransportDevice> { fun acceptTcpDevice(port: Int): Flow<InetTransportDevice> {
val selectorManager = SelectorManager(Dispatchers.IO) val selectorManager = SelectorManager(Dispatchers.IO)
val serverSocket = aSocket(selectorManager).tcp().bind("127.0.0.1", port) val serverSocket = aSocket(selectorManager).tcp().bind("127.0.0.1", port)
@ -41,12 +47,19 @@ fun acceptTcpDevice(port: Int): Flow<InetTransportDevice> {
suspend fun connectTcpDevice(address: String) = connectTcpDevice(address.toNetworkAddress()) suspend fun connectTcpDevice(address: String) = connectTcpDevice(address.toNetworkAddress())
/**
* Connect to the TCP/IP server (see [KiloServer]) at the specified address and provide th compatible
* [InetTransportDevice] to use with [KiloClient].
*/
suspend fun connectTcpDevice(address: NetworkAddress): InetTransportDevice { suspend fun connectTcpDevice(address: NetworkAddress): InetTransportDevice {
val selectorManager = SelectorManager(Dispatchers.IO) val selectorManager = SelectorManager(Dispatchers.IO)
val socket = aSocket(selectorManager).tcp().connect(address.host, address.port) val socket = aSocket(selectorManager).tcp().connect(address.host, address.port)
return inetTransportDevice(socket) return inetTransportDevice(socket)
} }
/**
* Parse `host:port` string into the [NetworkAddress]
*/
fun String.toNetworkAddress(): NetworkAddress { fun String.toNetworkAddress(): NetworkAddress {
val (host, port) = this.split(":").map { it.trim() } val (host, port) = this.split(":").map { it.trim() }
return NetworkAddress(host, port.toInt()) return NetworkAddress(host, port.toInt())

View File

@ -1,4 +1,3 @@
import kotlinx.coroutines.delay
import kotlinx.coroutines.test.runTest import kotlinx.coroutines.test.runTest
import net.sergeych.crypto2.initCrypto import net.sergeych.crypto2.initCrypto
import net.sergeych.kiloparsec.* import net.sergeych.kiloparsec.*
@ -45,16 +44,16 @@ val cmdException by command<Unit, Unit>()
Session("unknown") Session("unknown")
} }
val client = KiloClient<Unit>() { val client = KiloClient<Unit>() {
addErrors(cli) addErrors(cli)
connect { connectTcpDevice("localhost:$port") } // TODO: add register error variant
} connect { connectTcpDevice("localhost:$port") }
delay(500) }
assertEquals("start", client.call(cmdLoad)) assertEquals("start", client.call(cmdLoad))
client.call(cmdSave, "foobar") client.call(cmdSave, "foobar")
assertEquals("foobar", client.call(cmdLoad)) assertEquals("foobar", client.call(cmdLoad))
val res = kotlin.runCatching { client.call(cmdException) } val res = kotlin.runCatching { client.call(cmdException) }
assertIs<TestException>(res.exceptionOrNull()) assertIs<TestException>(res.exceptionOrNull())