From 40b8723132107d3c37276db45dc41ada28ab0696 Mon Sep 17 00:00:00 2001 From: sergeych Date: Sun, 11 Aug 2024 15:10:00 +0200 Subject: [PATCH] 0.3.2: UDP timeout support, fix #4 --- .idea/artifacts/kiloparsec_js_0_3_2.xml | 8 +++ .idea/artifacts/kiloparsec_js_0_3_3.xml | 8 +++ .idea/artifacts/kiloparsec_jvm_0_3_2.xml | 8 +++ .idea/artifacts/kiloparsec_jvm_0_3_3.xml | 8 +++ .idea/markdown.xml | 6 ++ README.md | 12 +++- build.gradle.kts | 2 +- .../kiloparsec/adapter/TcpProvider.kt | 2 +- .../kiloparsec/adapter/UdpProvider.kt | 61 +++++++++++++------ .../sergeych/kiloparsec/adapter/UdpServer.kt | 26 +++++--- .../kiloparsec/adapter/UdpSocketTransport.kt | 22 ++++--- .../kiloparsec/adapter}/InternetTest.kt | 4 +- 12 files changed, 130 insertions(+), 37 deletions(-) create mode 100644 .idea/artifacts/kiloparsec_js_0_3_2.xml create mode 100644 .idea/artifacts/kiloparsec_js_0_3_3.xml create mode 100644 .idea/artifacts/kiloparsec_jvm_0_3_2.xml create mode 100644 .idea/artifacts/kiloparsec_jvm_0_3_3.xml create mode 100644 .idea/markdown.xml rename src/ktorSocketTest/kotlin/{ => net/sergeych/kiloparsec/adapter}/InternetTest.kt (98%) diff --git a/.idea/artifacts/kiloparsec_js_0_3_2.xml b/.idea/artifacts/kiloparsec_js_0_3_2.xml new file mode 100644 index 0000000..29616d4 --- /dev/null +++ b/.idea/artifacts/kiloparsec_js_0_3_2.xml @@ -0,0 +1,8 @@ + + + $PROJECT_DIR$/build/libs + + + + + \ No newline at end of file diff --git a/.idea/artifacts/kiloparsec_js_0_3_3.xml b/.idea/artifacts/kiloparsec_js_0_3_3.xml new file mode 100644 index 0000000..7060d4c --- /dev/null +++ b/.idea/artifacts/kiloparsec_js_0_3_3.xml @@ -0,0 +1,8 @@ + + + $PROJECT_DIR$/build/libs + + + + + \ No newline at end of file diff --git a/.idea/artifacts/kiloparsec_jvm_0_3_2.xml b/.idea/artifacts/kiloparsec_jvm_0_3_2.xml new file mode 100644 index 0000000..a4a0dc5 --- /dev/null +++ b/.idea/artifacts/kiloparsec_jvm_0_3_2.xml @@ -0,0 +1,8 @@ + + + $PROJECT_DIR$/build/libs + + + + + \ No newline at end of file diff --git a/.idea/artifacts/kiloparsec_jvm_0_3_3.xml b/.idea/artifacts/kiloparsec_jvm_0_3_3.xml new file mode 100644 index 0000000..0bc79fb --- /dev/null +++ b/.idea/artifacts/kiloparsec_jvm_0_3_3.xml @@ -0,0 +1,8 @@ + + + $PROJECT_DIR$/build/libs + + + + + \ No newline at end of file diff --git a/.idea/markdown.xml b/.idea/markdown.xml new file mode 100644 index 0000000..f6d2542 --- /dev/null +++ b/.idea/markdown.xml @@ -0,0 +1,6 @@ + + + + + \ No newline at end of file diff --git a/README.md b/README.md index 61c52b8..5005847 100644 --- a/README.md +++ b/README.md @@ -1,6 +1,6 @@ # Kiloparsec -__Recommended version is `0.3.2`: to keep the code compatible with current and further versions we +__Recommended version is `0.3.3`: to keep the code compatible with current and further versions we ask to upgrade to `0.3.2` at least.__ Starting from this version some pacakage names are changed for better clarity and fast UDP endpoints are added. @@ -183,6 +183,8 @@ Is very much straightforward, same as with TCP/IP: ### UDP specifics +#### Command size + Each command invocation and result are packed in a separate UDP diagram using effective binary packing. Thus for the best results commands and results should be relatively short, best to fit into 240 bytes. While bigger datagrams are often transmitted successfully, the probability of the effective transmission drops with the size. @@ -190,6 +192,14 @@ Kiloparsec UDP transport does not retransmits not delivered packets. Use TCP/IP For the best results we recommend using [push](https://code.sergeych.net/docs/kiloparsec/kiloparsec/net.sergeych.kiloparsec/-remote-interface/index.html#1558240250%2FFunctions%2F788909594) for remote interfaces with UDP. +#### Timeouts + +As Datagrams do not form protocol itself, kiloparsec issues pings when no data is circulated between parties. +When no pings are received long enough, kiloparsec connection is closed. There are `maxInactivityTimeout` in all +relevant functions and constructors. + +Client shoudl not issue pings manually. + ## Reusing code between servers The same instance of the [KiloInterface](https://code.sergeych.net/docs/kiloparsec/kiloparsec/net.sergeych.kiloparsec/-kilo-interface/index.html?query=open%20class%20KiloInterface%3CS%3E%20:%20LocalInterface%3CKiloScope%3CS%3E%3E) could easily be reused with all instances of servers with different protocols. diff --git a/build.gradle.kts b/build.gradle.kts index 3cad3a3..725cfb1 100644 --- a/build.gradle.kts +++ b/build.gradle.kts @@ -6,7 +6,7 @@ plugins { } group = "net.sergeych" -version = "0.3.2" +version = "0.3.3" repositories { mavenCentral() diff --git a/src/ktorSocketMain/kotlin/net/sergeych/kiloparsec/adapter/TcpProvider.kt b/src/ktorSocketMain/kotlin/net/sergeych/kiloparsec/adapter/TcpProvider.kt index 1d51554..8c9f87a 100644 --- a/src/ktorSocketMain/kotlin/net/sergeych/kiloparsec/adapter/TcpProvider.kt +++ b/src/ktorSocketMain/kotlin/net/sergeych/kiloparsec/adapter/TcpProvider.kt @@ -27,7 +27,7 @@ private val logCounter = AtomicCounter(0) class ProtocolException(text: String, cause: Throwable? = null) : RuntimeException(text, cause) const val MAX_TCP_BLOCK_SIZE = 16776216 -val PING_INACTIVITY_TIME = 30.seconds +internal val PING_INACTIVITY_TIME = 30.seconds /** * Listen for incoming TCP/IP connections on all local interfaces and the specified [port] diff --git a/src/ktorSocketMain/kotlin/net/sergeych/kiloparsec/adapter/UdpProvider.kt b/src/ktorSocketMain/kotlin/net/sergeych/kiloparsec/adapter/UdpProvider.kt index a738c40..fa5e2b4 100644 --- a/src/ktorSocketMain/kotlin/net/sergeych/kiloparsec/adapter/UdpProvider.kt +++ b/src/ktorSocketMain/kotlin/net/sergeych/kiloparsec/adapter/UdpProvider.kt @@ -10,6 +10,8 @@ import net.sergeych.kiloparsec.KiloServer import net.sergeych.kiloparsec.RemoteInterface import net.sergeych.mp_tools.globalLaunch import net.sergeych.tools.AtomicCounter +import kotlin.time.Duration +import kotlin.time.Duration.Companion.minutes internal val udpCounter = AtomicCounter(0) @@ -55,9 +57,40 @@ class UdpTransportException(override val message: String) : RemoteInterface.Inva * See [connectUdpDevice] for the client sample. * * When it is necessary to stop listening to some port, use [UdpServer] instead. + * + * @param port port to listen + * @param localInterface string form local interface to listen + * @param maxInactivityTimeout maximum silence time after which the connection is supposed to be lost. + * the module automatically issues pings on inactivity when there is no data often enough + * to maintain the connection open. */ -fun acceptUdpDevice(port: Int, localInterface: String = "0.0.0.0"): Flow = - UdpServer(port, localInterface).transportFlow +fun acceptUdpDevice( + port: Int, + localInterface: String = "0.0.0.0", + maxInactivityTimeout: Duration = 2.minutes, +): Flow = + UdpServer(port, localInterface,maxInactivityTimeout).transportFlow + +/** + * Connect to UDP server (see [acceptUdpDevice] or [UdpServer]) and return a [InetTransportDevice] for it. It + * should be used with [KiloClient] as connection provider: + * ```kotlin + * val client = KiloClient() { + * connect { connectUdpDevice("localhost:$port") } + * } + * // now we can execute remote commands: + * assertEquals("start", client.call(cmdLoad)) + * ``` + * + * @param hostPort "host:port" string address of the remote UDP port to connect to + * @param maxInactivityTimeout maximum silence time after which the connection is supposed to be lost. + * the module automatically issues pings on inactivity when there is no data often enough + * to maintain the connection open. + */ +fun connectUdpDevice( + hostPort: String, + maxInactivityTimeout: Duration = 2.minutes, +) = connectUdpDevice(hostPort.toNetworkAddress(), maxInactivityTimeout) /** * Connect to UDP server (see [acceptUdpDevice]) and return a [InetTransportDevice] for it. It @@ -69,21 +102,15 @@ fun acceptUdpDevice(port: Int, localInterface: String = "0.0.0.0"): Flow() { - * connect { connectUdpDevice("localhost:$port") } - * } - * // now we can execute remote commands: - * assertEquals("start", client.call(cmdLoad)) - * ``` - */ -fun connectUdpDevice(addr: NetworkAddress): InetTransportDevice { +fun connectUdpDevice( + addr: NetworkAddress, + maxInactivityTimeout: Duration = 2.minutes, +): InetTransportDevice { val selectorManager = SelectorManager(Dispatchers.IO) val remoteAddress = InetSocketAddress(addr.host, addr.port) val socket = aSocket(selectorManager).udp().connect(remoteAddress) @@ -99,7 +126,7 @@ fun connectUdpDevice(addr: NetworkAddress): InetTransportDevice { done.complete(Unit) } - }, remoteAddress, false) + }, remoteAddress, false, maxInactivityTimeout) globalLaunch { launch { diff --git a/src/ktorSocketMain/kotlin/net/sergeych/kiloparsec/adapter/UdpServer.kt b/src/ktorSocketMain/kotlin/net/sergeych/kiloparsec/adapter/UdpServer.kt index 6748146..378a252 100644 --- a/src/ktorSocketMain/kotlin/net/sergeych/kiloparsec/adapter/UdpServer.kt +++ b/src/ktorSocketMain/kotlin/net/sergeych/kiloparsec/adapter/UdpServer.kt @@ -14,6 +14,8 @@ import net.sergeych.mp_logger.LogTag import net.sergeych.mp_logger.Loggable import net.sergeych.mp_logger.debug import net.sergeych.mp_logger.exception +import kotlin.time.Duration +import kotlin.time.Duration.Companion.minutes /** * UDP server for kiloparsec. Unlike [acceptUdpDevice], it allow stopping listening @@ -34,8 +36,15 @@ import net.sergeych.mp_logger.exception * ``` * * See [acceptUdpDevice] for more information. + * + * @param port port to listen to + * @param localInterface string form of local interface to listen to + * @param maxInactivityTimeout maximum silence time after which the connection is supposed to be lost. + * the module automatically issues pings on inactivity when there is no data often enough + * to maintain the connection open. + */ -class UdpServer(val port: Int,localInterface: String = "0.0.0.0") : +class UdpServer(val port: Int, localInterface: String = "0.0.0.0", maxInactivityTimeout: Duration = 2.minutes) : Loggable by LogTag("UDPS${udpCounter.incrementAndGet()}"), UdpConnector { private val sessions = mutableMapOf() @@ -54,7 +63,7 @@ class UdpServer(val port: Int,localInterface: String = "0.0.0.0") : */ val transportFlow by lazy { flow { - while(true) { + while (true) { try { val datagram = serverSocket.receive() val block = UdpBlock.decode(datagram) @@ -68,18 +77,17 @@ class UdpServer(val port: Int,localInterface: String = "0.0.0.0") : sessions.getOrPut(remoteAddress) { // new connection: create transport debug { "Creating new connection to $remoteAddress" } - UdpSocketTransport(this@UdpServer, remoteAddress, true) + UdpSocketTransport(this@UdpServer, remoteAddress, true, maxInactivityTimeout) // and emit it: .also { emit(it.transportDevice) } }.processIncoming(block) } } - } - catch(_: CancellationException) { break } - catch(_: ClosedReceiveChannelException) { + } catch (_: CancellationException) { break - } - catch(e: Exception) { + } catch (_: ClosedReceiveChannelException) { + break + } catch (e: Exception) { exception { "unexpected exception in incoming datagram processing" to e } close() break @@ -109,7 +117,7 @@ class UdpServer(val port: Int,localInterface: String = "0.0.0.0") : runCatching { serverSocket.close() } } } - while(sessions.isNotEmpty()) { + while (sessions.isNotEmpty()) { runCatching { access.withLock { sessions.values.firstOrNull() } ?.close() diff --git a/src/ktorSocketMain/kotlin/net/sergeych/kiloparsec/adapter/UdpSocketTransport.kt b/src/ktorSocketMain/kotlin/net/sergeych/kiloparsec/adapter/UdpSocketTransport.kt index 7dd3cfa..dd23662 100644 --- a/src/ktorSocketMain/kotlin/net/sergeych/kiloparsec/adapter/UdpSocketTransport.kt +++ b/src/ktorSocketMain/kotlin/net/sergeych/kiloparsec/adapter/UdpSocketTransport.kt @@ -16,14 +16,19 @@ import net.sergeych.mp_logger.Loggable import net.sergeych.mp_logger.debug import net.sergeych.mp_logger.exception import net.sergeych.mp_tools.globalLaunch -import kotlin.time.Duration.Companion.seconds +import kotlin.time.Duration /** * This is a common part of UDP transport shared between client and server connections. * It should not be used directly but bu the [UdpServer], [acceptUdpDevice] and [connectUdpDevice] * respectively. */ -internal class UdpSocketTransport(private val server: UdpConnector, val socketAddress: SocketAddress, val isServer: Boolean) : +internal class UdpSocketTransport( + private val server: UdpConnector, + val socketAddress: SocketAddress, + val isServer: Boolean, + val maxInactivityTimeout: Duration +) : Loggable { // IMPORTANT! Log stuff must be the first (or you shot your leg): @@ -34,12 +39,10 @@ internal class UdpSocketTransport(private val server: UdpConnector, val socketAd // Pinger params: keep them first! private var lastSendAt = Clock.System.now() private var lastReceived = Clock.System.now() - private val pingTimeout = 30.seconds + private val pingTimeout = maxInactivityTimeout / 3 private val pingSleep = pingTimeout / 3 - private val pingMinTimeout = pingTimeout / 2 + private val pingMinTimeout = pingTimeout * 2 / 3 - // TODO: break on inactivity - private val inactivityBreakTimeout = 30.seconds val inputDataBlocks = Channel(256, onBufferOverflow = BufferOverflow.DROP_OLDEST) val outputDataBlocks = Channel(256, onBufferOverflow = BufferOverflow.DROP_OLDEST) @@ -142,7 +145,12 @@ internal class UdpSocketTransport(private val server: UdpConnector, val socketAd private suspend fun pinger() { while (!isClosed) { delay(pingSleep) - if (Clock.System.now() - lastSendAt >= pingTimeout) { + val inactivity = Clock.System.now() - lastSendAt + if( inactivity > maxInactivityTimeout) { + debug { "inactivity timout: closing the connection" } + close() + } + if (inactivity >= pingTimeout) { debug { "pinger sends a ping on timeout" } send(UdpBlock.Ping) } diff --git a/src/ktorSocketTest/kotlin/InternetTest.kt b/src/ktorSocketTest/kotlin/net/sergeych/kiloparsec/adapter/InternetTest.kt similarity index 98% rename from src/ktorSocketTest/kotlin/InternetTest.kt rename to src/ktorSocketTest/kotlin/net/sergeych/kiloparsec/adapter/InternetTest.kt index 4c718b3..a25bdcd 100644 --- a/src/ktorSocketTest/kotlin/InternetTest.kt +++ b/src/ktorSocketTest/kotlin/net/sergeych/kiloparsec/adapter/InternetTest.kt @@ -1,7 +1,9 @@ +package net.sergeych.kiloparsec.adapter + +import assertThrows import kotlinx.coroutines.test.runTest import net.sergeych.crypto2.initCrypto import net.sergeych.kiloparsec.* -import net.sergeych.kiloparsec.adapter.* import net.sergeych.mp_logger.Log import kotlin.random.Random import kotlin.test.Test