diff --git a/.idea/artifacts/kiloparsec_js_0_2_4.xml b/.idea/artifacts/kiloparsec_js_0_2_4.xml new file mode 100644 index 0000000..57ef9b8 --- /dev/null +++ b/.idea/artifacts/kiloparsec_js_0_2_4.xml @@ -0,0 +1,8 @@ + + + $PROJECT_DIR$/build/libs + + + + + \ No newline at end of file diff --git a/.idea/artifacts/kiloparsec_js_0_2_5.xml b/.idea/artifacts/kiloparsec_js_0_2_5.xml new file mode 100644 index 0000000..73941c0 --- /dev/null +++ b/.idea/artifacts/kiloparsec_js_0_2_5.xml @@ -0,0 +1,8 @@ + + + $PROJECT_DIR$/build/libs + + + + + \ No newline at end of file diff --git a/.idea/artifacts/kiloparsec_js_0_2_5_SNAPSHOT.xml b/.idea/artifacts/kiloparsec_js_0_2_5_SNAPSHOT.xml new file mode 100644 index 0000000..95de44f --- /dev/null +++ b/.idea/artifacts/kiloparsec_js_0_2_5_SNAPSHOT.xml @@ -0,0 +1,8 @@ + + + $PROJECT_DIR$/build/libs + + + + + \ No newline at end of file diff --git a/.idea/artifacts/kiloparsec_js_0_2_6.xml b/.idea/artifacts/kiloparsec_js_0_2_6.xml new file mode 100644 index 0000000..36447e6 --- /dev/null +++ b/.idea/artifacts/kiloparsec_js_0_2_6.xml @@ -0,0 +1,6 @@ + + + $PROJECT_DIR$/build/libs + + + \ No newline at end of file diff --git a/.idea/artifacts/kiloparsec_js_0_3_1_SNAPSHOT.xml b/.idea/artifacts/kiloparsec_js_0_3_1_SNAPSHOT.xml new file mode 100644 index 0000000..5b8624b --- /dev/null +++ b/.idea/artifacts/kiloparsec_js_0_3_1_SNAPSHOT.xml @@ -0,0 +1,8 @@ + + + $PROJECT_DIR$/build/libs + + + + + \ No newline at end of file diff --git a/.idea/artifacts/kiloparsec_jvm_0_2_4.xml b/.idea/artifacts/kiloparsec_jvm_0_2_4.xml new file mode 100644 index 0000000..6f1b67c --- /dev/null +++ b/.idea/artifacts/kiloparsec_jvm_0_2_4.xml @@ -0,0 +1,8 @@ + + + $PROJECT_DIR$/build/libs + + + + + \ No newline at end of file diff --git a/.idea/artifacts/kiloparsec_jvm_0_2_5.xml b/.idea/artifacts/kiloparsec_jvm_0_2_5.xml new file mode 100644 index 0000000..ee10a70 --- /dev/null +++ b/.idea/artifacts/kiloparsec_jvm_0_2_5.xml @@ -0,0 +1,8 @@ + + + $PROJECT_DIR$/build/libs + + + + + \ No newline at end of file diff --git a/.idea/artifacts/kiloparsec_jvm_0_2_5_SNAPSHOT.xml b/.idea/artifacts/kiloparsec_jvm_0_2_5_SNAPSHOT.xml new file mode 100644 index 0000000..6f80efe --- /dev/null +++ b/.idea/artifacts/kiloparsec_jvm_0_2_5_SNAPSHOT.xml @@ -0,0 +1,8 @@ + + + $PROJECT_DIR$/build/libs + + + + + \ No newline at end of file diff --git a/.idea/artifacts/kiloparsec_jvm_0_2_6.xml b/.idea/artifacts/kiloparsec_jvm_0_2_6.xml new file mode 100644 index 0000000..e625a5f --- /dev/null +++ b/.idea/artifacts/kiloparsec_jvm_0_2_6.xml @@ -0,0 +1,6 @@ + + + $PROJECT_DIR$/build/libs + + + \ No newline at end of file diff --git a/.idea/artifacts/kiloparsec_jvm_0_3_1_SNAPSHOT.xml b/.idea/artifacts/kiloparsec_jvm_0_3_1_SNAPSHOT.xml new file mode 100644 index 0000000..5727a66 --- /dev/null +++ b/.idea/artifacts/kiloparsec_jvm_0_3_1_SNAPSHOT.xml @@ -0,0 +1,8 @@ + + + $PROJECT_DIR$/build/libs + + + + + \ No newline at end of file diff --git a/.idea/scala_compiler.xml b/.idea/scala_compiler.xml new file mode 100644 index 0000000..3c0e0f6 --- /dev/null +++ b/.idea/scala_compiler.xml @@ -0,0 +1,6 @@ + + + + + \ No newline at end of file diff --git a/README.md b/README.md index cef920e..e653958 100644 --- a/README.md +++ b/README.md @@ -1,15 +1,23 @@ # Kiloparsec +__Recommended version is `0.3.1`: to keep the code compatible with current and further versions we +ask to upgrade to `0.3.1` at least.__ + The new generation of __PARanoid SECurity__ protocol, advanced, faster, more secure. It also allows connecting any " block device" transport to the same local interface. Out if the box it provides the following transports: -| name | JVM | JS | native | -|----------------|-----|----|----------| -| TCP/IP server | ✓ | | β @0.2.6 | -| TCP/IP client | ✓ | | β @0.2.6 | -| Websock server | ✓ | | | -| Websock client | ✓ | ✓ | ✓ | +| name | JVM | JS | native | +|----------------|-----|----|-----------| +| TCP/IP server | ✓ | | >= 0.2.6 | +| TCP/IP client | ✓ | | >= @0.2.6 | +| Websock server | ✓ | | | +| Websock client | ✓ | ✓ | ✓ | + +### Note on version compatibility + +Protocols >= 0.3.0 are not binary compatible with previous version due to more compact binary +format. The format from 0.3.0 onwards is supposed to keep compatible. ### Supported native targets @@ -59,7 +67,7 @@ It could be, depending on your project structure, something like: ```kotlin val commonMain by getting { dependencies { - api("net.sergeych:kiloparsec:0.2.6") + api("net.sergeych:kiloparsec:0.3.1") } } ``` @@ -118,7 +126,7 @@ assertEquals(FooArgs("bar", 117), client.call(cmdGetFoo)) ## Create ktor-based server Normally server side needs some session. It is convenient and avoid sending repeating data on each request speeding up -the protocol. With KILOPARSEC it is rather basic operation:\ +the protocol. With KILOPARSEC it is rather basic operation: ~~~kotlin // Our session just keeps Foo for cmd{Get|Set}Foo: @@ -143,9 +151,32 @@ val ns: NettyApplicationEngine = embeddedServer(Netty, port = 8080, host = "0.0. setupWebsocketServer(serverInterface) { Session() } }).start(wait = false) - ~~~ +### TCP/IP client and server + +Using plain TCP/IP is even simpler, and it works way faster than websocket one, and is _the same +protected as `wss://` variant abovve due to same kiloparsec encryption in both cases. Still, a TCP/IP +client is not available in Javascript browser targets and custom TCP ports could often be blocked by firewalls. + +Documentation is available in samples here: + +- [TCP/IP server creation](https://code.sergeych.net/docs/kiloparsec/kiloparsec/net.sergeych.kiloparsec/-kilo-server/index.html) + +- [TCP/IP client](https://code.sergeych.net/docs/kiloparsec/kiloparsec/net.sergeych.kiloparsec/-kilo-client/index.html) + +In short, there are two functions that implements aysnchronous TCP/IP transport on all platforms buy JS: + +- [acceptTcpDevice](https://code.sergeych.net/docs/kiloparsec/kiloparsec/net.sergeych.kiloparsec.adapter/accept-tcp-device.html?query=fun%20acceptTcpDevice(port:%20Int):%20Flow%3CInetTransportDevice%3E) to create a server + +- [connectTcpDevice](https://code.sergeych.net/docs/kiloparsec/kiloparsec/net.sergeych.kiloparsec.adapter/connect-tcp-device.html) to connect to the server + +### Reusing code between servers + +The same instance of the [KiloInterface](https://code.sergeych.net/docs/kiloparsec/kiloparsec/net.sergeych.kiloparsec/-kilo-interface/index.html?query=open%20class%20KiloInterface%3CS%3E%20:%20LocalInterface%3CKiloScope%3CS%3E%3E) could easily be reused with all instances of servers with different protocols. + +This is a common proactive to create a business logic in a `KiloInterface`, then create a TCP/IP and Websocket servers passing the same instance of the logic to both. + ## See also: - [Source documentation](https://code.sergeych.net/docs/kiloparsec/) diff --git a/build.gradle.kts b/build.gradle.kts index c040344..9bb07da 100644 --- a/build.gradle.kts +++ b/build.gradle.kts @@ -6,7 +6,7 @@ plugins { } group = "net.sergeych" -version = "0.2.6" +version = "0.3.1" repositories { mavenCentral() diff --git a/src/commonMain/kotlin/net/sergeych/kiloparsec/CommandDelegate.kt b/src/commonMain/kotlin/net/sergeych/kiloparsec/CommandDelegate.kt index 061a6e3..aa12131 100644 --- a/src/commonMain/kotlin/net/sergeych/kiloparsec/CommandDelegate.kt +++ b/src/commonMain/kotlin/net/sergeych/kiloparsec/CommandDelegate.kt @@ -19,6 +19,11 @@ inline fun command(overrideName: String? = null): Command ) } +/** + * Declare a Push: Unit-returning command usually used with [RemoteInterface.push] + */ +inline fun push(overrideName: String? = null): CommandDelegate = command(overrideName) + /** * Delegate to create [Command] via property */ diff --git a/src/commonMain/kotlin/net/sergeych/kiloparsec/KiloClient.kt b/src/commonMain/kotlin/net/sergeych/kiloparsec/KiloClient.kt index 144deb8..a154954 100644 --- a/src/commonMain/kotlin/net/sergeych/kiloparsec/KiloClient.kt +++ b/src/commonMain/kotlin/net/sergeych/kiloparsec/KiloClient.kt @@ -17,10 +17,33 @@ import net.sergeych.mp_logger.exception import net.sergeych.mp_tools.globalLaunch /** - * The auto-connecting client that reconnects to the kiloparsec server + * The auto-connecting client that reconnects to the kiloparsec server, + * [KiloServer], * and maintain connection state flow. Client factory launches a disconnected * set of coroutines to support automatic reconnection, so you _must_ [close] - * it manually when it is not needed, otherwise it will continue to reconnect. + * it manually when it is unnecessary, otherwise it will continue to reconnect. + * + * ## Usage + * + * Suppose we have TCP/IP server as in the [KiloServer] usage sample. Then we can connect + * to it providing TCP/IP connector like: + * + * ```kotlin + * val client = KiloClient() { + * connect { connectTcpDevice("localhost:$port") } + * } + * + * // now we can invoke remote commands: + * assertEquals("unknown", client.call(cmdLoad)) + * + * client.call(cmdSave, "foobar") + * assertEquals("foobar", client.call(cmdLoad)) + * ``` + * + * ## See also + * + * [KiloServer] + * */ class KiloClient( val localInterface: KiloInterface, @@ -28,7 +51,6 @@ class KiloClient( connectionDataFactory: ConnectionDataFactory, ) : RemoteInterface, Loggable by LogTag("CLIF") { - val _state = MutableStateFlow(false) /** @@ -94,6 +116,15 @@ class KiloClient( throw t } + override suspend fun push(cmd: Command, args: A) { + try { + deferredClient.await().push(cmd, args) + } catch (t: RemoteInterface.ClosedException) { + resetDeferredClient() + throw t + } + } + /** * Current session token. This is a per-connection unique random value same on the client and server part so * it could be used as a nonce to pair MITM and like attacks, be sure that the server is actually diff --git a/src/commonMain/kotlin/net/sergeych/kiloparsec/KiloClientConnection.kt b/src/commonMain/kotlin/net/sergeych/kiloparsec/KiloClientConnection.kt index 02396c9..476ac28 100644 --- a/src/commonMain/kotlin/net/sergeych/kiloparsec/KiloClientConnection.kt +++ b/src/commonMain/kotlin/net/sergeych/kiloparsec/KiloClientConnection.kt @@ -100,4 +100,8 @@ class KiloClientConnection( suspend fun token() = deferredParams.await().token override suspend fun call(cmd: Command, args: A): R = kiloRemoteInterface.await().call(cmd, args) + + override suspend fun push(cmd: Command, args: A) { + kiloRemoteInterface.await().push(cmd, args) + } } \ No newline at end of file diff --git a/src/commonMain/kotlin/net/sergeych/kiloparsec/KiloInterface.kt b/src/commonMain/kotlin/net/sergeych/kiloparsec/KiloInterface.kt index 25fb03a..aa20536 100644 --- a/src/commonMain/kotlin/net/sergeych/kiloparsec/KiloInterface.kt +++ b/src/commonMain/kotlin/net/sergeych/kiloparsec/KiloInterface.kt @@ -4,7 +4,13 @@ package net.sergeych.kiloparsec * The local interface to provide functions, register errors for Kiloparsec users. Use it * with [KiloClient], [KiloClientConnection], [KiloServerConnection], etc. * - * BAse implementation registers relevant exceptions. + * Base class implementation does the following: + * + * - It registers common exceptions from [RemoteInterface] and kotlin/java `IllegalArgumentException` and + * `IllegalStateException` + * - It provides [onConnected] handler + * + * See [KiloServer] for usage sample. */ open class KiloInterface : LocalInterface>() { diff --git a/src/commonMain/kotlin/net/sergeych/kiloparsec/KiloRemoteInterface.kt b/src/commonMain/kotlin/net/sergeych/kiloparsec/KiloRemoteInterface.kt index cf15943..bcf6283 100644 --- a/src/commonMain/kotlin/net/sergeych/kiloparsec/KiloRemoteInterface.kt +++ b/src/commonMain/kotlin/net/sergeych/kiloparsec/KiloRemoteInterface.kt @@ -35,5 +35,10 @@ class KiloRemoteInterface( else -> throw RemoteInterface.Exception("unexpected block type: $block") } } + + override suspend fun push(cmd: Command, args: A) { + val params = deferredParams.await() + params.transport.call(L0Call, params.encrypt(cmd.packCall(args))) + } } diff --git a/src/commonMain/kotlin/net/sergeych/kiloparsec/KiloServer.kt b/src/commonMain/kotlin/net/sergeych/kiloparsec/KiloServer.kt index 8f4ebda..0ab32fb 100644 --- a/src/commonMain/kotlin/net/sergeych/kiloparsec/KiloServer.kt +++ b/src/commonMain/kotlin/net/sergeych/kiloparsec/KiloServer.kt @@ -91,6 +91,9 @@ private val instances = AtomicCounter() * Session("unknown") * } * ``` + * + * See [KiloClient] to connect to the server. + * * @param S the type of the server session object, returned by [sessionBuilder]. See above * @param clientInterface the interface available for remote calls * @param connections flow of incoming connections. Server stops when the flow is fully collected (normally diff --git a/src/commonMain/kotlin/net/sergeych/kiloparsec/KiloServerConnection.kt b/src/commonMain/kotlin/net/sergeych/kiloparsec/KiloServerConnection.kt index 654eae0..5944dde 100644 --- a/src/commonMain/kotlin/net/sergeych/kiloparsec/KiloServerConnection.kt +++ b/src/commonMain/kotlin/net/sergeych/kiloparsec/KiloServerConnection.kt @@ -94,4 +94,8 @@ class KiloServerConnection( override suspend fun call(cmd: Command, args: A): R { return kiloRemoteInterface.await().call(cmd, args) } + + override suspend fun push(cmd: Command, args: A) { + kiloRemoteInterface.await().push(cmd, args) + } } \ No newline at end of file diff --git a/src/commonMain/kotlin/net/sergeych/kiloparsec/RemoteInterface.kt b/src/commonMain/kotlin/net/sergeych/kiloparsec/RemoteInterface.kt index 5bb2b2f..c250f1a 100644 --- a/src/commonMain/kotlin/net/sergeych/kiloparsec/RemoteInterface.kt +++ b/src/commonMain/kotlin/net/sergeych/kiloparsec/RemoteInterface.kt @@ -50,4 +50,18 @@ interface RemoteInterface { * Call the remote procedure with specified args and return its result */ suspend fun call(cmd: Command, args: A): R + + /** + * Push the notification without waiting for reception or processing. + * It returns immediately after sending data to the transport (e.g., to the network). + * Use [call] if it is necessary to wait until the command will be received and processed by the remote. + */ + suspend fun push(cmd: Command, args: A) + + /** + * Push the command with no args. + * It returns immediately after sending data to the transport (e.g., to the network). + * Use [call] if it is necessary to wait until the command will be received and processed by the remote. + */ + suspend fun push(cmd: Command) = push(cmd,Unit) } \ No newline at end of file diff --git a/src/commonMain/kotlin/net/sergeych/kiloparsec/Transport.kt b/src/commonMain/kotlin/net/sergeych/kiloparsec/Transport.kt index f5ec4d8..6a4384e 100644 --- a/src/commonMain/kotlin/net/sergeych/kiloparsec/Transport.kt +++ b/src/commonMain/kotlin/net/sergeych/kiloparsec/Transport.kt @@ -15,6 +15,7 @@ import kotlinx.serialization.descriptors.SerialDescriptor import kotlinx.serialization.encoding.Decoder import kotlinx.serialization.encoding.Encoder import kotlinx.serialization.serializer +import net.sergeych.bipack.Unsigned import net.sergeych.crypto2.toDump import net.sergeych.kiloparsec.Transport.Device import net.sergeych.mp_logger.* @@ -63,7 +64,12 @@ class Transport( @Serializable(TransportBlockSerializer::class) sealed class Block { @Serializable - data class Call(val id: UInt, val name: String, val packedArgs: UByteArray) : Block() { + data class Call( + @Unsigned + val id: UInt, + val name: String, + val packedArgs: UByteArray + ) : Block() { override fun equals(other: Any?): Boolean { if (this === other) return true if (other !is Call) return false @@ -83,10 +89,10 @@ class Transport( } @Serializable - data class Response(val forId: UInt, val packedResult: UByteArray) : Block() + data class Response(@Unsigned val forId: UInt, val packedResult: UByteArray) : Block() @Serializable - data class Error(val forId: UInt, val code: String, val text: String? = null, val extra: UByteArray? = null) : + data class Error(@Unsigned val forId: UInt, val code: String, val text: String? = null, val extra: UByteArray? = null) : Block() { val message by lazy { text ?: "remote exception: $code" } } @@ -98,7 +104,8 @@ class Transport( var isClosed: Boolean = false /** - * Send a call block for a command and packed args and return packed result if it is not an error + * Send a call block for a command and packed args and return packed result if it is not an error. It suspends + * until receiving answer from the remote side, even if returns `Unit`. * @throws RemoteInterface.RemoteException if the remote call caused an exception. Normally use [call] instead. * @throws RemoteInterface.ClosedException */ @@ -111,12 +118,13 @@ class Transport( // We need to shield calls and lastID with mutex, but nothing more: access.withLock { if (isClosed) throw RemoteInterface.ClosedException() + // the order is important: first id in use MUST BE >= 1, not zero: b = Block.Call(++lastId, name, packedArgs) calls[b.id] = deferred } // now we have mutex freed so we can call: - val r = runCatching { device.output.send(pack(b).also { debug { ">>>\n${it.toDump()}" } }) } + val r = runCatching { device.output.send(pack(b).also { debug { ">>>\n${it.toDump()}" } }) } if (!r.isSuccess) { r.exceptionOrNull()?.let { exception { "failed to send output block" to it } @@ -131,6 +139,25 @@ class Transport( return deferred.await() } + /** + * Send a call block for a command and packed args and return packed result if it is not an error. It suspends + * until receiving answer from the remote side, even if returns `Unit`. + * @throws RemoteInterface.RemoteException if the remote call caused an exception. Normally use [call] instead. + * @throws RemoteInterface.ClosedException + */ + private suspend fun sendPushBlock(name: String, packedArgs: UByteArray) { + if (isClosed) throw RemoteInterface.ClosedException() + + // All push blocks have the same id == 0: + val b = Block.Call(0u, name, packedArgs) + val r = runCatching { device.output.send(pack(b).also { debug { ">>$\n${it.toDump()}" } }) } + when(val e = r.exceptionOrNull()) { + is RemoteInterface.ClosedException, is CancellationException, is RemoteInterface.RemoteException + -> throw e + else -> throw RemoteInterface.ClosedException() + } + } + /** * Call the remote procedure with specified args and return its result */ @@ -139,6 +166,10 @@ class Transport( return unpack(cmd.resultSerializer, result) } + override suspend fun push(cmd: Command,args: A) { + sendPushBlock(cmd.name, pack(cmd.argsSerializer, args)) + } + /** * Start running the transport. This function suspends until the transport is closed * normally or by error. If you need to cancel it prematurely, cancel the coroutine @@ -163,7 +194,7 @@ class Transport( warning { "decoded error: ${error::class.simpleName}: $error" } calls.remove(b.forId)?.completeExceptionally(localInterface.decodeError(b)) ?: warning { "error handler not found for ${b.forId}" } - info { "error processed"} + info { "error processed" } } is Block.Response -> access.withLock { @@ -176,17 +207,21 @@ class Transport( is Block.Call -> launch { try { - send( - Block.Response( - b.id, - localInterface.execute(commandContext, b.name, b.packedArgs) + if (b.id == 0u) + // Command does not waits return + localInterface.execute(commandContext, b.name, b.packedArgs) + else + send( + Block.Response( + b.id, + localInterface.execute(commandContext, b.name, b.packedArgs) + ) ) - ) } catch (x: LocalInterface.BreakConnectionException) { // handler forced close - warning { "handler requested closing of the connection (${x.flushSendQueue}"} + warning { "handler requested closing of the connection (${x.flushSendQueue}" } isClosed = true - if( x.flushSendQueue ) device.flush() + if (x.flushSendQueue) device.flush() device.close() } catch (x: RemoteInterface.RemoteException) { send(Block.Error(b.id, x.code, x.text, x.extra)) @@ -196,26 +231,23 @@ class Transport( .also { debug { "command executed: ${b.name}" } } } } - debug { "=---------------------------------------------"} + debug { "=---------------------------------------------" } } - debug { "input step performed closed=$isClosed active=$isActive"} + debug { "input step performed closed=$isClosed active=$isActive" } } catch (_: ClosedSendChannelException) { info { "closed send channel" } isClosed = true } catch (_: ClosedReceiveChannelException) { - info { "closed receive channel"} + info { "closed receive channel" } isClosed = true - } - catch(cce: LocalInterface.BreakConnectionException) { - info { "closing connection by local request ($cce)"} + } catch (cce: LocalInterface.BreakConnectionException) { + info { "closing connection by local request ($cce)" } device.close() - } - catch(t: RemoteInterface.ClosedException) { + } catch (t: RemoteInterface.ClosedException) { // it is ok: we just exit the coroutine normally // and mark we're closing isClosed = true - } - catch (_: CancellationException) { + } catch (_: CancellationException) { info { "loop is cancelled with CancellationException" } isClosed = true } catch (t: Throwable) { @@ -226,7 +258,7 @@ class Transport( } debug { "leaving transport loop" } access.withLock { - debug { "access lock obtained"} + debug { "access lock obtained" } isClosed = true debug { "closing device $device, calls in queue ${calls.size}" } runCatching { device.close() } @@ -243,8 +275,7 @@ class Transport( private suspend fun send(block: Block) { try { device.output.send(pack(block)) - } - catch(_: ClosedSendChannelException) { + } catch (_: ClosedSendChannelException) { throw RemoteInterface.ClosedException() } } @@ -275,7 +306,7 @@ object TransportBlockSerializer : KSerializer { override fun deserialize(decoder: Decoder): Transport.Block = - when( val id = decoder.decodeByte().toInt()) { + when (val id = decoder.decodeByte().toInt()) { 0 -> decoder.decodeSerializableValue(serializer()) 1 -> decoder.decodeSerializableValue(serializer()) 2 -> decoder.decodeSerializableValue(serializer()) diff --git a/src/commonMain/kotlin/net/sergeych/kiloparsec/adapter/NetworkProvider.kt b/src/commonMain/kotlin/net/sergeych/kiloparsec/adapter/NetworkProvider.kt index 4e2e790..f026cca 100644 --- a/src/commonMain/kotlin/net/sergeych/kiloparsec/adapter/NetworkProvider.kt +++ b/src/commonMain/kotlin/net/sergeych/kiloparsec/adapter/NetworkProvider.kt @@ -11,46 +11,3 @@ data class NetworkAddress( return "$host:$port" } } -// -///** -// * Multiplatform datagram abstraction -// */ -//interface Datagram { -// /** -// * Received message -// */ -// val message: UByteArray -// -// /** -// * Address from where the message was sent -// */ -// val address: NetworkAddress -//} -// -//@OptIn(ExperimentalStdlibApi::class) -//interface DatagramConnector: AutoCloseable { -// -// val incoming: ReceiveChannel -// suspend fun send(message: UByteArray, networkAddress: NetworkAddress) -// @Suppress("unused") -// suspend fun send(message: UByteArray, datagramAddress: String) { -// send(message, datagramAddress.toNetworkAddress()) -// } -// -// suspend fun send(message: UByteArray,host: String,port: Int) = -// send(message, NetworkAddress(host,port)) -// override fun close() -//} -// -//expect fun NetworkAddress(host: String,port: Int): NetworkAddress -// -//fun String.toNetworkAddress() : NetworkAddress { -// val (host, port) = this.split(":").map { it.trim()} -// return NetworkAddress(host, port.toInt()) -//} -// -//expect fun acceptTcpDevice(port: Int): Flow -// -//expect suspend fun connectTcpDevice(address: NetworkAddress): InetTransportDevice -// -//suspend fun connectTcpDevice(address: String) = connectTcpDevice(address.toNetworkAddress()) diff --git a/src/commonTest/kotlin/TransportTest.kt b/src/commonTest/kotlin/TransportTest.kt index 818264d..a18a5ac 100644 --- a/src/commonTest/kotlin/TransportTest.kt +++ b/src/commonTest/kotlin/TransportTest.kt @@ -184,10 +184,16 @@ class TransportTest { val cmdRemoteExceptionTest by command() val cmdBreak by command() + val cmdPushServer by push() + val pushedFromServer = CompletableDeferred() + val serverInterface = KiloInterface().apply { on(cmdPing) { "pong! [$it]" } + on(cmdPushServer) { + pushedFromServer.complete(it) + } on(cmdGetToken) { sessionToken } @@ -249,11 +255,17 @@ class TransportTest { assertEquals("client pong: foo", kiloServerConnection.call(cmdPing, "foo")) assertEquals("server push: bar", kiloServerConnection.call(cmdPush, "bar")) + client.push(cmdPushServer, "42") + assertEquals("**-s1-c1-s2-c2", client.call(cmdChainCallServer1, "**")) + assertThrows { client.call(cmdException) } assertEquals("ok: te-local", client.call(cmdRemoteExceptionTest)) + // wait for push to be received and check + assertEquals("42", pushedFromServer.await()) + assertThrows { client.call(cmdBreak) } diff --git a/src/ktorSocketMain/kotlin/adapter/socketClient.kt b/src/ktorSocketMain/kotlin/adapter/socketClient.kt index 5d67f19..28ce76a 100644 --- a/src/ktorSocketMain/kotlin/adapter/socketClient.kt +++ b/src/ktorSocketMain/kotlin/adapter/socketClient.kt @@ -13,6 +13,8 @@ import kotlinx.coroutines.sync.Mutex import kotlinx.coroutines.sync.withLock import kotlinx.datetime.Clock import net.sergeych.kiloparsec.AsyncVarint +import net.sergeych.kiloparsec.KiloClient +import net.sergeych.kiloparsec.KiloServer import net.sergeych.kiloparsec.LocalInterface import net.sergeych.mp_logger.* import net.sergeych.mp_tools.globalLaunch @@ -27,6 +29,10 @@ class ProtocolException(text: String, cause: Throwable? = null) : RuntimeExcepti const val MAX_TCP_BLOCK_SIZE = 16776216 val PING_INACTIVITY_TIME = 30.seconds +/** + * Listen for incoming TCP/IP connections on all local interfaces and the specified [port] + * anc create flow of [InetTransportDevice] suitable for [KiloClient]. + */ fun acceptTcpDevice(port: Int): Flow { val selectorManager = SelectorManager(Dispatchers.IO) val serverSocket = aSocket(selectorManager).tcp().bind("127.0.0.1", port) @@ -41,12 +47,19 @@ fun acceptTcpDevice(port: Int): Flow { suspend fun connectTcpDevice(address: String) = connectTcpDevice(address.toNetworkAddress()) +/** + * Connect to the TCP/IP server (see [KiloServer]) at the specified address and provide th compatible + * [InetTransportDevice] to use with [KiloClient]. + */ suspend fun connectTcpDevice(address: NetworkAddress): InetTransportDevice { val selectorManager = SelectorManager(Dispatchers.IO) val socket = aSocket(selectorManager).tcp().connect(address.host, address.port) return inetTransportDevice(socket) } +/** + * Parse `host:port` string into the [NetworkAddress] + */ fun String.toNetworkAddress(): NetworkAddress { val (host, port) = this.split(":").map { it.trim() } return NetworkAddress(host, port.toInt()) diff --git a/src/ktorSocketTest/kotlin/TcpTest.kt b/src/ktorSocketTest/kotlin/TcpTest.kt index 2d44ad2..e32ff76 100644 --- a/src/ktorSocketTest/kotlin/TcpTest.kt +++ b/src/ktorSocketTest/kotlin/TcpTest.kt @@ -1,4 +1,3 @@ -import kotlinx.coroutines.delay import kotlinx.coroutines.test.runTest import net.sergeych.crypto2.initCrypto import net.sergeych.kiloparsec.* @@ -45,16 +44,16 @@ val cmdException by command() Session("unknown") } - val client = KiloClient() { - addErrors(cli) - connect { connectTcpDevice("localhost:$port") } - } - delay(500) +val client = KiloClient() { + addErrors(cli) + // TODO: add register error variant + connect { connectTcpDevice("localhost:$port") } +} - assertEquals("start", client.call(cmdLoad)) +assertEquals("start", client.call(cmdLoad)) - client.call(cmdSave, "foobar") - assertEquals("foobar", client.call(cmdLoad)) +client.call(cmdSave, "foobar") +assertEquals("foobar", client.call(cmdLoad)) val res = kotlin.runCatching { client.call(cmdException) } assertIs(res.exceptionOrNull())