From 4d178d951f8dbb3336f4d47febfe7dc508467a57 Mon Sep 17 00:00:00 2001 From: sergeych Date: Sun, 11 Aug 2024 14:17:11 +0200 Subject: [PATCH] 0.3.2: UDP support --- .idea/artifacts/kiloparsec_js_0_3_1.xml | 8 + .idea/artifacts/kiloparsec_jvm_0_3_1.xml | 8 + .idea/uiDesigner.xml | 124 ++++++++++++ README.md | 39 +++- build.gradle.kts | 4 +- .../net/sergeych/kiloparsec/KiloClient.kt | 1 + .../kotlin/net/sergeych/kiloparsec/tools.kt | 20 +- .../kiloparsec/adapter/TcpProvider.kt} | 4 +- .../sergeych/kiloparsec/adapter/UdpBlock.kt | 102 ++++++++++ .../kiloparsec/adapter/UdpConnector.kt | 18 ++ .../kiloparsec/adapter/UdpProvider.kt | 121 ++++++++++++ .../sergeych/kiloparsec/adapter/UdpServer.kt | 119 ++++++++++++ .../kiloparsec/adapter/UdpSocketTransport.kt | 171 +++++++++++++++++ src/ktorSocketTest/kotlin/InternetTest.kt | 181 ++++++++++++++++++ src/ktorSocketTest/kotlin/TcpTest.kt | 69 ------- 15 files changed, 905 insertions(+), 84 deletions(-) create mode 100644 .idea/artifacts/kiloparsec_js_0_3_1.xml create mode 100644 .idea/artifacts/kiloparsec_jvm_0_3_1.xml create mode 100644 .idea/uiDesigner.xml rename src/ktorSocketMain/kotlin/{adapter/socketClient.kt => net/sergeych/kiloparsec/adapter/TcpProvider.kt} (97%) create mode 100644 src/ktorSocketMain/kotlin/net/sergeych/kiloparsec/adapter/UdpBlock.kt create mode 100644 src/ktorSocketMain/kotlin/net/sergeych/kiloparsec/adapter/UdpConnector.kt create mode 100644 src/ktorSocketMain/kotlin/net/sergeych/kiloparsec/adapter/UdpProvider.kt create mode 100644 src/ktorSocketMain/kotlin/net/sergeych/kiloparsec/adapter/UdpServer.kt create mode 100644 src/ktorSocketMain/kotlin/net/sergeych/kiloparsec/adapter/UdpSocketTransport.kt create mode 100644 src/ktorSocketTest/kotlin/InternetTest.kt delete mode 100644 src/ktorSocketTest/kotlin/TcpTest.kt diff --git a/.idea/artifacts/kiloparsec_js_0_3_1.xml b/.idea/artifacts/kiloparsec_js_0_3_1.xml new file mode 100644 index 0000000..3f32c15 --- /dev/null +++ b/.idea/artifacts/kiloparsec_js_0_3_1.xml @@ -0,0 +1,8 @@ + + + $PROJECT_DIR$/build/libs + + + + + \ No newline at end of file diff --git a/.idea/artifacts/kiloparsec_jvm_0_3_1.xml b/.idea/artifacts/kiloparsec_jvm_0_3_1.xml new file mode 100644 index 0000000..1220c3e --- /dev/null +++ b/.idea/artifacts/kiloparsec_jvm_0_3_1.xml @@ -0,0 +1,8 @@ + + + $PROJECT_DIR$/build/libs + + + + + \ No newline at end of file diff --git a/.idea/uiDesigner.xml b/.idea/uiDesigner.xml new file mode 100644 index 0000000..2b63946 --- /dev/null +++ b/.idea/uiDesigner.xml @@ -0,0 +1,124 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + \ No newline at end of file diff --git a/README.md b/README.md index a56d6fc..61c52b8 100644 --- a/README.md +++ b/README.md @@ -1,18 +1,21 @@ # Kiloparsec -__Recommended version is `0.3.1`: to keep the code compatible with current and further versions we -ask to upgrade to `0.3.1` at least.__ +__Recommended version is `0.3.2`: to keep the code compatible with current and further versions we +ask to upgrade to `0.3.2` at least.__ Starting from this version some pacakage names are changed for +better clarity and fast UDP endpoints are added. The new generation of __PARanoid SECurity__ protocol, advanced, faster, more secure. It also allows connecting any " block device" transport to the same local interface. Out if the box it provides the following transports: -| name | JVM | JS | native | -|----------------|-----|----|----------| -| TCP/IP server | ✓ | | >= 0.2.6 | -| TCP/IP client | ✓ | | >= 0.2.6 | -| Websock server | ✓ | | | -| Websock client | ✓ | ✓ | ✓ | +| name | JVM | JS | native | +|----------------|--------|----|--------| +| TCP/IP server | ✓ | | 0.2.6+ | +| TCP/IP client | ✓ | | 0.2.6+ | +| UDP server | 0.3.2+ | | 0.3.2+ | +| UDP client | 0.3.2+ | | 0.3.2+ | +| Websock server | ✓ | | | +| Websock client | ✓ | ✓ | ✓ | ### Note on version compatibility @@ -23,7 +26,7 @@ format. The format from 0.3.0 onwards is supposed to keep compatible. - iosArm64, iosX64 - macosArm64, macosArm64 -- linxArm64, linuxX64 +- linuxArm64, linuxX64 ### Non-native targets @@ -67,7 +70,7 @@ It could be, depending on your project structure, something like: ```kotlin val commonMain by getting { dependencies { - api("net.sergeych:kiloparsec:0.3.1") + api("net.sergeych:kiloparsec:0.3.2") } } ``` @@ -171,6 +174,22 @@ In short, there are two functions that implements asynchronous TCP/IP transport - [connectTcpDevice](https://code.sergeych.net/docs/kiloparsec/kiloparsec/net.sergeych.kiloparsec.adapter/connect-tcp-device.html) to connect to the server +## UDP client and server + +Is very much straightforward, same as with TCP/IP: + +- [UDP server creation](https://code.sergeych.net/docs/kiloparsec/kiloparsec/net.sergeych.kiloparsec.adapter/accept-udp-device.html) +- [Connect UDP client](https://code.sergeych.net/docs/kiloparsec/kiloparsec/net.sergeych.kiloparsec.adapter/connect-udp-device.html) + +### UDP specifics + +Each command invocation and result are packed in a separate UDP diagram using effective binary packing. +Thus for the best results commands and results should be relatively short, best to fit into 240 bytes. While bigger datagrams are often transmitted successfully, the probability of the effective transmission drops with the size. + +Kiloparsec UDP transport does not retransmits not delivered packets. Use TCP/IP or websocket if it is a concern. + +For the best results we recommend using [push](https://code.sergeych.net/docs/kiloparsec/kiloparsec/net.sergeych.kiloparsec/-remote-interface/index.html#1558240250%2FFunctions%2F788909594) for remote interfaces with UDP. + ## Reusing code between servers The same instance of the [KiloInterface](https://code.sergeych.net/docs/kiloparsec/kiloparsec/net.sergeych.kiloparsec/-kilo-interface/index.html?query=open%20class%20KiloInterface%3CS%3E%20:%20LocalInterface%3CKiloScope%3CS%3E%3E) could easily be reused with all instances of servers with different protocols. diff --git a/build.gradle.kts b/build.gradle.kts index 9bb07da..3cad3a3 100644 --- a/build.gradle.kts +++ b/build.gradle.kts @@ -1,12 +1,12 @@ plugins { - kotlin("multiplatform") version "2.0.0" + kotlin("multiplatform") version "2.0.10" id("org.jetbrains.kotlin.plugin.serialization") version "2.0.0" `maven-publish` id("org.jetbrains.dokka") version "1.9.20" } group = "net.sergeych" -version = "0.3.1" +version = "0.3.2" repositories { mavenCentral() diff --git a/src/commonMain/kotlin/net/sergeych/kiloparsec/KiloClient.kt b/src/commonMain/kotlin/net/sergeych/kiloparsec/KiloClient.kt index a154954..f07ca4b 100644 --- a/src/commonMain/kotlin/net/sergeych/kiloparsec/KiloClient.kt +++ b/src/commonMain/kotlin/net/sergeych/kiloparsec/KiloClient.kt @@ -89,6 +89,7 @@ class KiloClient( } _state.value = false resetDeferredClient() + // reconnection timeout delay(100) } } diff --git a/src/commonMain/kotlin/net/sergeych/kiloparsec/tools.kt b/src/commonMain/kotlin/net/sergeych/kiloparsec/tools.kt index af7d3c3..7194733 100644 --- a/src/commonMain/kotlin/net/sergeych/kiloparsec/tools.kt +++ b/src/commonMain/kotlin/net/sergeych/kiloparsec/tools.kt @@ -1,6 +1,24 @@ package net.sergeych.kiloparsec +import kotlinx.coroutines.sync.Mutex +import kotlinx.coroutines.sync.withLock + fun String.encodeToUByteArray() = encodeToByteArray().toUByteArray() -fun UByteArray.decodeFromUByteArray(): String = toByteArray().decodeToString() \ No newline at end of file +class SyncValue(initialValue: T) { + private val access = Mutex() + + var value = initialValue + private set + + suspend fun mutate(f: suspend (T)->T): T = access.withLock { f(value).also { value = it } } + + @Suppress("unused") + suspend fun getAndSet(newValue: T): T = mutate { + val old = value + value = newValue + old + } + +} diff --git a/src/ktorSocketMain/kotlin/adapter/socketClient.kt b/src/ktorSocketMain/kotlin/net/sergeych/kiloparsec/adapter/TcpProvider.kt similarity index 97% rename from src/ktorSocketMain/kotlin/adapter/socketClient.kt rename to src/ktorSocketMain/kotlin/net/sergeych/kiloparsec/adapter/TcpProvider.kt index 28ce76a..1d51554 100644 --- a/src/ktorSocketMain/kotlin/adapter/socketClient.kt +++ b/src/ktorSocketMain/kotlin/net/sergeych/kiloparsec/adapter/TcpProvider.kt @@ -33,9 +33,9 @@ val PING_INACTIVITY_TIME = 30.seconds * Listen for incoming TCP/IP connections on all local interfaces and the specified [port] * anc create flow of [InetTransportDevice] suitable for [KiloClient]. */ -fun acceptTcpDevice(port: Int): Flow { +fun acceptTcpDevice(port: Int,localInterface: String = "0.0.0.0"): Flow { val selectorManager = SelectorManager(Dispatchers.IO) - val serverSocket = aSocket(selectorManager).tcp().bind("127.0.0.1", port) + val serverSocket = aSocket(selectorManager).tcp().bind(localInterface, port) return flow { while (true) { serverSocket.accept().let { sock -> diff --git a/src/ktorSocketMain/kotlin/net/sergeych/kiloparsec/adapter/UdpBlock.kt b/src/ktorSocketMain/kotlin/net/sergeych/kiloparsec/adapter/UdpBlock.kt new file mode 100644 index 0000000..89b665a --- /dev/null +++ b/src/ktorSocketMain/kotlin/net/sergeych/kiloparsec/adapter/UdpBlock.kt @@ -0,0 +1,102 @@ +package net.sergeych.kiloparsec.adapter + +import io.ktor.network.sockets.* +import io.ktor.utils.io.core.* +import net.sergeych.crypto2.toDump +import net.sergeych.kiloparsec.adapter.UdpBlock.Companion.CANCEL_BLOCK +import net.sergeych.kiloparsec.adapter.UdpBlock.Companion.ESCAPE_BYTE +import net.sergeych.kiloparsec.adapter.UdpBlock.Companion.PING_BLOCK +import net.sergeych.kiloparsec.adapter.UdpBlock.Companion.decode + +/** + * Encoded block for UDP datagram space-savvy. Minimum dara size is two bytes, which is fine + * for Kiloparsec blocks. + * + * First byte is encoded using [ESCAPE_BYTE] depending on the second byte: + * + * | 0 | 1 | meaning | + * |---|---|---------| + * | [ESCAPE_BYTE] | [ESCAPE_BYTE] | Data block, dropping first byte | + * | [ESCAPE_BYTE] | [PING_BLOCK] | Ping block, reset timers | + * | [ESCAPE_BYTE] | [CANCEL_BLOCK] | close connection | + * | any other | * | data block, all bytes | + * + * Use [encoded] and [toDatagram] to create binary or the datagram from a block, and [decode] to restore. + * + * We do not use serialization to speed up the transport layer. + */ +sealed class UdpBlock { + /** + * Block to show that the connection is closed and should also be closed on the other side + */ + object Cancel : UdpBlock() + + /** + * Parties show pings if there is no activity to keep it alive, detect connection loss and in some + * cases revive NAT/Proxy state in routers. + */ + object Ping : UdpBlock() + + /** + * Parsec data block. Could not be smaller than two bytes. + */ + class Data(val data: UByteArray) : UdpBlock() { + override fun toString(): String { + return "UDP Data (${data.size}):\n${data.toDump()}" + } + init { + if( data.size < 2) throw IllegalArgumentException("data must be at least 2 bytes") + } + } + + val encoded: UByteArray by lazy { + when(this) { + is Data -> { + // Do we need escaping? + if( data[0] == ESCAPE_BYTE ) + escapeAsArray + data + else + data + } + is Cancel -> cancelAsArray + is Ping -> pingAsArray + } + } + + fun toDatagram(address: SocketAddress): Datagram { + val encoded = encoded.toByteArray() + return Datagram(ByteReadPacket(encoded, 0, encoded.size), address) + } + + companion object { + val ESCAPE_BYTE = 255.toUByte() + val PING_BLOCK = 0.toUByte() + val CANCEL_BLOCK = 1.toUByte() + + private val escapeAsArray = ubyteArrayOf(ESCAPE_BYTE) + private val pingAsArray = ubyteArrayOf(ESCAPE_BYTE, PING_BLOCK) + private val cancelAsArray = ubyteArrayOf(ESCAPE_BYTE, CANCEL_BLOCK) + + fun decode(data: UByteArray): UdpBlock { + if (data.size < 2) + throw UdpTransportException("block too short: ${data.size}") + return if( data[0] != ESCAPE_BYTE ) + // plain data + Data(data) + else { + when(val b2 = data[1]) { + ESCAPE_BYTE -> { + // Escaped first byte, then plain data + Data(data.sliceArray(1 ..< data.size)) + } + PING_BLOCK -> Ping + CANCEL_BLOCK -> Cancel + else -> throw UdpTransportException("invalid block type: $b2") + } + } + } + + fun decode(datagram: Datagram) = + decode(datagram.packet.readBytes().toUByteArray()) + } +} \ No newline at end of file diff --git a/src/ktorSocketMain/kotlin/net/sergeych/kiloparsec/adapter/UdpConnector.kt b/src/ktorSocketMain/kotlin/net/sergeych/kiloparsec/adapter/UdpConnector.kt new file mode 100644 index 0000000..cc473ed --- /dev/null +++ b/src/ktorSocketMain/kotlin/net/sergeych/kiloparsec/adapter/UdpConnector.kt @@ -0,0 +1,18 @@ +package net.sergeych.kiloparsec.adapter + +import io.ktor.network.sockets.* + +/** + * The interface for common UDP connector shared by UDP components + */ +internal interface UdpConnector { + /** + * Called when client connection is done so the provider could free resources + */ + suspend fun disconnectClient(address: SocketAddress) + + /** + * Send a block from a proper UDP socket + */ + suspend fun sendBlock(block: UdpBlock, toAddress: SocketAddress) +} \ No newline at end of file diff --git a/src/ktorSocketMain/kotlin/net/sergeych/kiloparsec/adapter/UdpProvider.kt b/src/ktorSocketMain/kotlin/net/sergeych/kiloparsec/adapter/UdpProvider.kt new file mode 100644 index 0000000..a738c40 --- /dev/null +++ b/src/ktorSocketMain/kotlin/net/sergeych/kiloparsec/adapter/UdpProvider.kt @@ -0,0 +1,121 @@ +package net.sergeych.kiloparsec.adapter + +import io.ktor.network.selector.* +import io.ktor.network.sockets.* +import io.ktor.utils.io.CancellationException +import kotlinx.coroutines.* +import kotlinx.coroutines.flow.Flow +import net.sergeych.kiloparsec.KiloClient +import net.sergeych.kiloparsec.KiloServer +import net.sergeych.kiloparsec.RemoteInterface +import net.sergeych.mp_tools.globalLaunch +import net.sergeych.tools.AtomicCounter + +internal val udpCounter = AtomicCounter(0) + +class UdpTransportException(override val message: String) : RemoteInterface.InvalidDataException(message) + +/** + * Listen for incoming UDP connections and provide transport flow for it. See also [UdpServer.transportFlow] + * for another way to create a server. Use it with [KiloServer]: + * ```kotlin + * // Whatever server session data we might need: + * data class Session( + * var data: String, + * ) + * + * // declare some commands (normally in a shared module): + * val cmdSave by command() + * val cmdLoad by command() + * val cmdDrop by command() + * val cmdException by command() + * + * // Interface using the session above, can be shared between many + * // server types and instances (different ports and protocols): + * val cli = KiloInterface().apply { + * onConnected { session.data = "start" } + * on(cmdSave) { session.data = it } + * on(cmdLoad) { + * session.data + * } + * on(cmdException) { + * throw TestException() + * } + * on(cmdDrop) { + * throw LocalInterface.BreakConnectionException() + * } + * } + * // Now create a server to accept incoming UDPs on our port: + * val server = KiloServer(cli, acceptUdpDevice(port)) { + * // This initializes new session for each incoming command + * Session("unknown") + * } + * ``` + * + * See [connectUdpDevice] for the client sample. + * + * When it is necessary to stop listening to some port, use [UdpServer] instead. + */ +fun acceptUdpDevice(port: Int, localInterface: String = "0.0.0.0"): Flow = + UdpServer(port, localInterface).transportFlow + +/** + * Connect to UDP server (see [acceptUdpDevice]) and return a [InetTransportDevice] for it. It + * should be used with [KiloClient] as connection provider: + * ```kotlin + * val client = KiloClient() { + * connect { connectUdpDevice("localhost:$port") } + * } + * // now we can execute remote commands: + * assertEquals("start", client.call(cmdLoad)) + * ``` + */ +fun connectUdpDevice(hostPort: String) = connectUdpDevice(hostPort.toNetworkAddress()) + +/** + * Connect to UDP server (see [acceptUdpDevice]) and return a [InetTransportDevice] for it. It + * should be used with [KiloClient] as connection provider: + * ```kotlin + * val client = KiloClient() { + * connect { connectUdpDevice("localhost:$port") } + * } + * // now we can execute remote commands: + * assertEquals("start", client.call(cmdLoad)) + * ``` + */ +fun connectUdpDevice(addr: NetworkAddress): InetTransportDevice { + val selectorManager = SelectorManager(Dispatchers.IO) + val remoteAddress = InetSocketAddress(addr.host, addr.port) + val socket = aSocket(selectorManager).udp().connect(remoteAddress) + + val done = CompletableDeferred() + + val transport = UdpSocketTransport(object : UdpConnector { + override suspend fun sendBlock(block: UdpBlock, toAddress: SocketAddress) { + socket.send(block.toDatagram(remoteAddress)) + } + + override suspend fun disconnectClient(address: SocketAddress) { + done.complete(Unit) + } + + }, remoteAddress, false) + + globalLaunch { + launch { + while (isActive) { + try { + transport.processIncoming(UdpBlock.decode(socket.receive())) + } catch (_: CancellationException) { + break + } catch (e: Exception) { + transport.close() + break + } + } + } + done.await() + } + + return transport.transportDevice +} \ No newline at end of file diff --git a/src/ktorSocketMain/kotlin/net/sergeych/kiloparsec/adapter/UdpServer.kt b/src/ktorSocketMain/kotlin/net/sergeych/kiloparsec/adapter/UdpServer.kt new file mode 100644 index 0000000..6748146 --- /dev/null +++ b/src/ktorSocketMain/kotlin/net/sergeych/kiloparsec/adapter/UdpServer.kt @@ -0,0 +1,119 @@ +package net.sergeych.kiloparsec.adapter + +import io.ktor.network.selector.* +import io.ktor.network.sockets.* +import io.ktor.utils.io.* +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.IO +import kotlinx.coroutines.channels.ClosedReceiveChannelException +import kotlinx.coroutines.flow.flow +import kotlinx.coroutines.sync.Mutex +import kotlinx.coroutines.sync.withLock +import net.sergeych.kiloparsec.KiloServer +import net.sergeych.mp_logger.LogTag +import net.sergeych.mp_logger.Loggable +import net.sergeych.mp_logger.debug +import net.sergeych.mp_logger.exception + +/** + * UDP server for kiloparsec. Unlike [acceptUdpDevice], it allow stopping listening + * to the port when need with [close]. Use [transportFlow] with [KiloServer], here is the + * basic sample: + * + * ```kotlin + * val uServer = UdpServer(port) + * KiloServer(cli, uServer.transportFlow()) { + * Session("unknown") + * } + * + * // server is now active and accepts connections + * // ... + * + * // close and stop listening to the port: + * uServer.close() + * ``` + * + * See [acceptUdpDevice] for more information. + */ +class UdpServer(val port: Int,localInterface: String = "0.0.0.0") : + Loggable by LogTag("UDPS${udpCounter.incrementAndGet()}"), UdpConnector { + + private val sessions = mutableMapOf() + private val access = Mutex() + + private val selectorManager = SelectorManager(Dispatchers.IO) + private val serverSocket = aSocket(selectorManager).udp().bind(InetSocketAddress(localInterface, port)) + + override suspend fun disconnectClient(address: SocketAddress) { + access.withLock { sessions.remove(address) } + } + + /** + * a transport flow of [InetTransportDevice] suitable to be used with [KiloServer], see [UdpServer] for the + * usage sample. + */ + val transportFlow by lazy { + flow { + while(true) { + try { + val datagram = serverSocket.receive() + val block = UdpBlock.decode(datagram) + val remoteAddress = datagram.address + + access.withLock { + if (block == UdpBlock.Cancel) { + // if the cancel comes to already closed transport, do nothing + sessions.remove(remoteAddress)?.processIncoming(block) + } else { + sessions.getOrPut(remoteAddress) { + // new connection: create transport + debug { "Creating new connection to $remoteAddress" } + UdpSocketTransport(this@UdpServer, remoteAddress, true) + // and emit it: + .also { emit(it.transportDevice) } + }.processIncoming(block) + } + } + } + catch(_: CancellationException) { break } + catch(_: ClosedReceiveChannelException) { + break + } + catch(e: Exception) { + exception { "unexpected exception in incoming datagram processing" to e } + close() + break + } + } + } + } + + override suspend fun sendBlock(block: UdpBlock, toAddress: SocketAddress) { + serverSocket.send(block.toDatagram(toAddress)) + } + + val isClosed: Boolean get() = serverSocket.isClosed + + /** + * Close the UDP server. Calling it will cause: + * + * - Closing nound UDP socket on [port] + * - Closing all pending connections + * - cancelling the [transportFlow], which will cause Kiloparsec server to also stop + * + * Call suspends until socket and all sessions are closed. Later calls do nothing. + */ + suspend fun close() { + access.withLock { + if (!isClosed) { + runCatching { serverSocket.close() } + } + } + while(sessions.isNotEmpty()) { + runCatching { + access.withLock { sessions.values.firstOrNull() } + ?.close() + } + } + } +} \ No newline at end of file diff --git a/src/ktorSocketMain/kotlin/net/sergeych/kiloparsec/adapter/UdpSocketTransport.kt b/src/ktorSocketMain/kotlin/net/sergeych/kiloparsec/adapter/UdpSocketTransport.kt new file mode 100644 index 0000000..7dd3cfa --- /dev/null +++ b/src/ktorSocketMain/kotlin/net/sergeych/kiloparsec/adapter/UdpSocketTransport.kt @@ -0,0 +1,171 @@ +package net.sergeych.kiloparsec.adapter + +import io.ktor.network.sockets.* +import io.ktor.utils.io.* +import kotlinx.coroutines.channels.BufferOverflow +import kotlinx.coroutines.channels.Channel +import kotlinx.coroutines.channels.ClosedReceiveChannelException +import kotlinx.coroutines.channels.ClosedSendChannelException +import kotlinx.coroutines.coroutineScope +import kotlinx.coroutines.delay +import kotlinx.coroutines.launch +import kotlinx.datetime.Clock +import net.sergeych.kiloparsec.SyncValue +import net.sergeych.mp_logger.Log +import net.sergeych.mp_logger.Loggable +import net.sergeych.mp_logger.debug +import net.sergeych.mp_logger.exception +import net.sergeych.mp_tools.globalLaunch +import kotlin.time.Duration.Companion.seconds + +/** + * This is a common part of UDP transport shared between client and server connections. + * It should not be used directly but bu the [UdpServer], [acceptUdpDevice] and [connectUdpDevice] + * respectively. + */ +internal class UdpSocketTransport(private val server: UdpConnector, val socketAddress: SocketAddress, val isServer: Boolean) : + Loggable { + + // IMPORTANT! Log stuff must be the first (or you shot your leg): + val address = (socketAddress as InetSocketAddress).let { NetworkAddress(it.hostname, it.port) } + override var logTag: String = "UDPT:$address${if (isServer) ":server" else ":client"}" + override var logLevel: Log.Level? = Log.Level.DEBUG + + // Pinger params: keep them first! + private var lastSendAt = Clock.System.now() + private var lastReceived = Clock.System.now() + private val pingTimeout = 30.seconds + private val pingSleep = pingTimeout / 3 + private val pingMinTimeout = pingTimeout / 2 + + // TODO: break on inactivity + private val inactivityBreakTimeout = 30.seconds + val inputDataBlocks = Channel(256, onBufferOverflow = BufferOverflow.DROP_OLDEST) + val outputDataBlocks = Channel(256, onBufferOverflow = BufferOverflow.DROP_OLDEST) + + val inputUdpBlocks = Channel(256, onBufferOverflow = BufferOverflow.DROP_OLDEST) + + private val job = globalLaunch { + coroutineScope { + launch { convertOutput() } + launch { convertInput() } + launch { pinger() } + } + } + + init { + // This is iverly important: it requires that + // all members are initialized before use. Otherwise kotlin + // may execute class members pr + debug { "initialization done" } + } + + + val transportDevice: InetTransportDevice by lazy { + InetTransportDevice(inputDataBlocks, outputDataBlocks, address, { close() }, {}) + } + + private val closedFlag = SyncValue(false) + + val isClosed: Boolean = closedFlag.value + + suspend fun close() { + closedFlag.mutate { + if (!it) { + runCatching { server.sendBlock(UdpBlock.Cancel, socketAddress) } + server.disconnectClient(socketAddress) + runCatching { inputDataBlocks.close() } + runCatching { outputDataBlocks.close() } + job.cancel() + } + true + } + } + + + private suspend fun send(block: UdpBlock) { + server.sendBlock(block, socketAddress) + lastSendAt = Clock.System.now() + } + + /** + * Process the block recoded by the server. Note that it should properly process all + * block types, e.g. close on [UdpBlock.Cancel], etc. Server will not close us! + * + * __Important: it should not block, instead, server expects it to return ASAP__, so it + * executes in a local coroutine context. + * + * Also it should not throw exceptions. + */ + fun processIncoming(block: UdpBlock) { + inputUdpBlocks.trySend(block) + } + + suspend fun convertInput() { + while(!isClosed) { + when (val block = inputUdpBlocks.receiveCatching().getOrNull()) { + + null -> break + + is UdpBlock.Cancel -> globalLaunch { + debug { "received cancel block, requesting close" } + kotlin.runCatching { close() } + } + + is UdpBlock.Data -> { + // input does not block, it uses DROP_OLDEST policy + lastReceived = Clock.System.now() + val result = kotlin.runCatching { inputDataBlocks.send(block.data) } + when (val e = result.exceptionOrNull()) { + null -> {} + is ClosedSendChannelException -> { + debug { "received close channel" } + close() + } + + is CancellationException -> {} + else -> { + exception { "unexpected exception" to e } + close() + } + } + } + + UdpBlock.Ping -> { + lastReceived = Clock.System.now() + if (lastSendAt - lastReceived > pingMinTimeout) send(UdpBlock.Ping) + } + } + } + } + + private suspend fun pinger() { + while (!isClosed) { + delay(pingSleep) + if (Clock.System.now() - lastSendAt >= pingTimeout) { + debug { "pinger sends a ping on timeout" } + send(UdpBlock.Ping) + } + } + } + + private suspend fun convertOutput() { + while (!isClosed) { + try { + server.sendBlock(UdpBlock.Data(outputDataBlocks.receive()), socketAddress) + } catch (e: CancellationException) { + // this is ok + break + } catch (e: ClosedReceiveChannelException) { + debug { "input channel is closed, closing" } + close() + break + } catch (e: Exception) { + exception { "unexpected exception in convertOutput" to e } + close() + break + } + } + debug { "exiting convertOutput" } + } +} \ No newline at end of file diff --git a/src/ktorSocketTest/kotlin/InternetTest.kt b/src/ktorSocketTest/kotlin/InternetTest.kt new file mode 100644 index 0000000..4c718b3 --- /dev/null +++ b/src/ktorSocketTest/kotlin/InternetTest.kt @@ -0,0 +1,181 @@ +import kotlinx.coroutines.test.runTest +import net.sergeych.crypto2.initCrypto +import net.sergeych.kiloparsec.* +import net.sergeych.kiloparsec.adapter.* +import net.sergeych.mp_logger.Log +import kotlin.random.Random +import kotlin.test.Test +import kotlin.test.assertEquals +import kotlin.test.assertIs + +class InternetrTest { + class TestException : Exception("test1") + + @Test + fun tcpTest() = runTest { + initCrypto() + Log.connectConsole(Log.Level.DEBUG) + data class Session( + var data: String, + ) + + val port = 27170 + Random.nextInt(1, 200) + + val cmdSave by command() + val cmdLoad by command() + val cmdDrop by command() + val cmdException by command() + + val cli = KiloInterface().apply { + registerError { TestException() } + onConnected { session.data = "start" } + on(cmdSave) { session.data = it } + on(cmdLoad) { + session.data + } + on(cmdException) { + throw TestException() + } + on(cmdDrop) { + throw LocalInterface.BreakConnectionException() + } + } + val server = KiloServer(cli, acceptTcpDevice(port)) { + Session("unknown") + } + + val client = KiloClient() { + addErrors(cli) + // TODO: add register error variant + connect { connectTcpDevice("localhost:$port") } + } + + assertEquals("start", client.call(cmdLoad)) + + client.call(cmdSave, "foobar") + assertEquals("foobar", client.call(cmdLoad)) + + val res = kotlin.runCatching { client.call(cmdException) } + assertIs(res.exceptionOrNull()) + assertEquals("foobar", client.call(cmdLoad)) + + assertThrows { client.call(cmdDrop) } + + // reconnect? + assertEquals("start", client.call(cmdLoad)) + + server.close() + } + + @Test + fun udpTest() = runTest { + initCrypto() + Log.connectConsole(Log.Level.DEBUG) + data class Session( + var data: String, + ) + + val port = 27170 + Random.nextInt(1, 200) + + val cmdSave by command() + val cmdLoad by command() + val cmdDrop by command() + val cmdException by command() + + val cli = KiloInterface().apply { + registerError { TestException() } + onConnected { session.data = "start" } + on(cmdSave) { session.data = it } + on(cmdLoad) { + session.data + } + on(cmdException) { + throw TestException() + } + on(cmdDrop) { + throw LocalInterface.BreakConnectionException() + } + } + val server = KiloServer(cli, acceptUdpDevice(port)) { + Session("unknown") + } + + val client = KiloClient() { + addErrors(cli) + connect { connectUdpDevice("localhost:$port") } + } + + assertEquals("start", client.call(cmdLoad)) + + client.call(cmdSave, "foobar") + assertEquals("foobar", client.call(cmdLoad)) + + val res = kotlin.runCatching { client.call(cmdException) } + assertIs(res.exceptionOrNull()) + assertEquals("foobar", client.call(cmdLoad)) + + assertThrows { client.call(cmdDrop) } + + // reconnect? + assertEquals("start", client.call(cmdLoad)) + + server.close() + } + + @Test + fun udpServerTest() = runTest { + initCrypto() + Log.connectConsole(Log.Level.DEBUG) + data class Session( + var data: String, + ) + + val port = 27170 + Random.nextInt(1, 200) + + val cmdSave by command() + val cmdLoad by command() + val cmdDrop by command() + val cmdException by command() + + val cli = KiloInterface().apply { + registerError { TestException() } + onConnected { session.data = "start" } + on(cmdSave) { session.data = it } + on(cmdLoad) { + session.data + } + on(cmdException) { + throw TestException() + } + on(cmdDrop) { + throw LocalInterface.BreakConnectionException() + } + } + val uServer = UdpServer(port) + KiloServer(cli, uServer.transportFlow) { + Session("unknown") + } + + val client = KiloClient() { + addErrors(cli) + connect { connectUdpDevice("localhost:$port") } + } + + assertEquals("start", client.call(cmdLoad)) + + client.call(cmdSave, "foobar") + assertEquals("foobar", client.call(cmdLoad)) + + val res = kotlin.runCatching { client.call(cmdException) } + assertIs(res.exceptionOrNull()) + assertEquals("foobar", client.call(cmdLoad)) + + assertThrows { client.call(cmdDrop) } + + // reconnect? + assertEquals("start", client.call(cmdLoad)) + + uServer.close() +// server.close() + } +} \ No newline at end of file diff --git a/src/ktorSocketTest/kotlin/TcpTest.kt b/src/ktorSocketTest/kotlin/TcpTest.kt deleted file mode 100644 index e32ff76..0000000 --- a/src/ktorSocketTest/kotlin/TcpTest.kt +++ /dev/null @@ -1,69 +0,0 @@ -import kotlinx.coroutines.test.runTest -import net.sergeych.crypto2.initCrypto -import net.sergeych.kiloparsec.* -import net.sergeych.kiloparsec.adapter.acceptTcpDevice -import net.sergeych.kiloparsec.adapter.connectTcpDevice -import kotlin.random.Random -import kotlin.test.Test -import kotlin.test.assertEquals -import kotlin.test.assertIs - -class TcpTest { - class TestException : Exception("test1") - - @Test - fun tcpTest() = runTest { - initCrypto() - // Log.connectConsole(Log.Level.DEBUG) -data class Session( - var data: String, -) - - val port = 27170 + Random.nextInt(1, 200) - -val cmdSave by command() -val cmdLoad by command() -val cmdDrop by command() -val cmdException by command() - - val cli = KiloInterface().apply { - registerError { TestException() } - onConnected { session.data = "start" } - on(cmdSave) { session.data = it } - on(cmdLoad) { - session.data - } - on(cmdException) { - throw TestException() - } - on(cmdDrop) { - throw LocalInterface.BreakConnectionException() - } - } - val server = KiloServer(cli, acceptTcpDevice(port)) { - Session("unknown") - } - -val client = KiloClient() { - addErrors(cli) - // TODO: add register error variant - connect { connectTcpDevice("localhost:$port") } -} - -assertEquals("start", client.call(cmdLoad)) - -client.call(cmdSave, "foobar") -assertEquals("foobar", client.call(cmdLoad)) - - val res = kotlin.runCatching { client.call(cmdException) } - assertIs(res.exceptionOrNull()) - assertEquals("foobar", client.call(cmdLoad)) - - assertThrows { client.call(cmdDrop) } - - // reconnect? - assertEquals("start", client.call(cmdLoad)) - - server.close() - } -} \ No newline at end of file