0.3.2: UDP timeout support, fix #4

This commit is contained in:
Sergey Chernov 2024-08-11 15:10:00 +02:00
parent 4d178d951f
commit 40b8723132
12 changed files with 130 additions and 37 deletions

8
.idea/artifacts/kiloparsec_js_0_3_2.xml generated Normal file
View File

@ -0,0 +1,8 @@
<component name="ArtifactManager">
<artifact type="jar" name="kiloparsec-js-0.3.2">
<output-path>$PROJECT_DIR$/build/libs</output-path>
<root id="archive" name="kiloparsec-js-0.3.2.jar">
<element id="module-output" name="kiloparsec.jsMain" />
</root>
</artifact>
</component>

8
.idea/artifacts/kiloparsec_js_0_3_3.xml generated Normal file
View File

@ -0,0 +1,8 @@
<component name="ArtifactManager">
<artifact type="jar" name="kiloparsec-js-0.3.3">
<output-path>$PROJECT_DIR$/build/libs</output-path>
<root id="archive" name="kiloparsec-js-0.3.3.jar">
<element id="module-output" name="kiloparsec.jsMain" />
</root>
</artifact>
</component>

View File

@ -0,0 +1,8 @@
<component name="ArtifactManager">
<artifact type="jar" name="kiloparsec-jvm-0.3.2">
<output-path>$PROJECT_DIR$/build/libs</output-path>
<root id="archive" name="kiloparsec-jvm-0.3.2.jar">
<element id="module-output" name="kiloparsec.jvmMain" />
</root>
</artifact>
</component>

View File

@ -0,0 +1,8 @@
<component name="ArtifactManager">
<artifact type="jar" name="kiloparsec-jvm-0.3.3">
<output-path>$PROJECT_DIR$/build/libs</output-path>
<root id="archive" name="kiloparsec-jvm-0.3.3.jar">
<element id="module-output" name="kiloparsec.jvmMain" />
</root>
</artifact>
</component>

6
.idea/markdown.xml generated Normal file
View File

@ -0,0 +1,6 @@
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="MarkdownSettings">
<option name="showProblemsInCodeBlocks" value="false" />
</component>
</project>

View File

@ -1,6 +1,6 @@
# Kiloparsec # Kiloparsec
__Recommended version is `0.3.2`: to keep the code compatible with current and further versions we __Recommended version is `0.3.3`: to keep the code compatible with current and further versions we
ask to upgrade to `0.3.2` at least.__ Starting from this version some pacakage names are changed for ask to upgrade to `0.3.2` at least.__ Starting from this version some pacakage names are changed for
better clarity and fast UDP endpoints are added. better clarity and fast UDP endpoints are added.
@ -183,6 +183,8 @@ Is very much straightforward, same as with TCP/IP:
### UDP specifics ### UDP specifics
#### Command size
Each command invocation and result are packed in a separate UDP diagram using effective binary packing. Each command invocation and result are packed in a separate UDP diagram using effective binary packing.
Thus for the best results commands and results should be relatively short, best to fit into 240 bytes. While bigger datagrams are often transmitted successfully, the probability of the effective transmission drops with the size. Thus for the best results commands and results should be relatively short, best to fit into 240 bytes. While bigger datagrams are often transmitted successfully, the probability of the effective transmission drops with the size.
@ -190,6 +192,14 @@ Kiloparsec UDP transport does not retransmits not delivered packets. Use TCP/IP
For the best results we recommend using [push](https://code.sergeych.net/docs/kiloparsec/kiloparsec/net.sergeych.kiloparsec/-remote-interface/index.html#1558240250%2FFunctions%2F788909594) for remote interfaces with UDP. For the best results we recommend using [push](https://code.sergeych.net/docs/kiloparsec/kiloparsec/net.sergeych.kiloparsec/-remote-interface/index.html#1558240250%2FFunctions%2F788909594) for remote interfaces with UDP.
#### Timeouts
As Datagrams do not form protocol itself, kiloparsec issues pings when no data is circulated between parties.
When no pings are received long enough, kiloparsec connection is closed. There are `maxInactivityTimeout` in all
relevant functions and constructors.
Client shoudl not issue pings manually.
## Reusing code between servers ## Reusing code between servers
The same instance of the [KiloInterface](https://code.sergeych.net/docs/kiloparsec/kiloparsec/net.sergeych.kiloparsec/-kilo-interface/index.html?query=open%20class%20KiloInterface%3CS%3E%20:%20LocalInterface%3CKiloScope%3CS%3E%3E) could easily be reused with all instances of servers with different protocols. The same instance of the [KiloInterface](https://code.sergeych.net/docs/kiloparsec/kiloparsec/net.sergeych.kiloparsec/-kilo-interface/index.html?query=open%20class%20KiloInterface%3CS%3E%20:%20LocalInterface%3CKiloScope%3CS%3E%3E) could easily be reused with all instances of servers with different protocols.

View File

@ -6,7 +6,7 @@ plugins {
} }
group = "net.sergeych" group = "net.sergeych"
version = "0.3.2" version = "0.3.3"
repositories { repositories {
mavenCentral() mavenCentral()

View File

@ -27,7 +27,7 @@ private val logCounter = AtomicCounter(0)
class ProtocolException(text: String, cause: Throwable? = null) : RuntimeException(text, cause) class ProtocolException(text: String, cause: Throwable? = null) : RuntimeException(text, cause)
const val MAX_TCP_BLOCK_SIZE = 16776216 const val MAX_TCP_BLOCK_SIZE = 16776216
val PING_INACTIVITY_TIME = 30.seconds internal val PING_INACTIVITY_TIME = 30.seconds
/** /**
* Listen for incoming TCP/IP connections on all local interfaces and the specified [port] * Listen for incoming TCP/IP connections on all local interfaces and the specified [port]

View File

@ -10,6 +10,8 @@ import net.sergeych.kiloparsec.KiloServer
import net.sergeych.kiloparsec.RemoteInterface import net.sergeych.kiloparsec.RemoteInterface
import net.sergeych.mp_tools.globalLaunch import net.sergeych.mp_tools.globalLaunch
import net.sergeych.tools.AtomicCounter import net.sergeych.tools.AtomicCounter
import kotlin.time.Duration
import kotlin.time.Duration.Companion.minutes
internal val udpCounter = AtomicCounter(0) internal val udpCounter = AtomicCounter(0)
@ -55,9 +57,40 @@ class UdpTransportException(override val message: String) : RemoteInterface.Inva
* See [connectUdpDevice] for the client sample. * See [connectUdpDevice] for the client sample.
* *
* When it is necessary to stop listening to some port, use [UdpServer] instead. * When it is necessary to stop listening to some port, use [UdpServer] instead.
*
* @param port port to listen
* @param localInterface string form local interface to listen
* @param maxInactivityTimeout maximum silence time after which the connection is supposed to be lost.
* the module automatically issues pings on inactivity when there is no data often enough
* to maintain the connection open.
*/ */
fun acceptUdpDevice(port: Int, localInterface: String = "0.0.0.0"): Flow<InetTransportDevice> = fun acceptUdpDevice(
UdpServer(port, localInterface).transportFlow port: Int,
localInterface: String = "0.0.0.0",
maxInactivityTimeout: Duration = 2.minutes,
): Flow<InetTransportDevice> =
UdpServer(port, localInterface,maxInactivityTimeout).transportFlow
/**
* Connect to UDP server (see [acceptUdpDevice] or [UdpServer]) and return a [InetTransportDevice] for it. It
* should be used with [KiloClient] as connection provider:
* ```kotlin
* val client = KiloClient<Unit>() {
* connect { connectUdpDevice("localhost:$port") }
* }
* // now we can execute remote commands:
* assertEquals("start", client.call(cmdLoad))
* ```
*
* @param hostPort "host:port" string address of the remote UDP port to connect to
* @param maxInactivityTimeout maximum silence time after which the connection is supposed to be lost.
* the module automatically issues pings on inactivity when there is no data often enough
* to maintain the connection open.
*/
fun connectUdpDevice(
hostPort: String,
maxInactivityTimeout: Duration = 2.minutes,
) = connectUdpDevice(hostPort.toNetworkAddress(), maxInactivityTimeout)
/** /**
* Connect to UDP server (see [acceptUdpDevice]) and return a [InetTransportDevice] for it. It * Connect to UDP server (see [acceptUdpDevice]) and return a [InetTransportDevice] for it. It
@ -69,21 +102,15 @@ fun acceptUdpDevice(port: Int, localInterface: String = "0.0.0.0"): Flow<InetTra
* // now we can execute remote commands: * // now we can execute remote commands:
* assertEquals("start", client.call(cmdLoad)) * assertEquals("start", client.call(cmdLoad))
* ``` * ```
* @param addr the network address where to connect to
* @param maxInactivityTimeout maximum silence time after which the connection is supposed to be lost.
* the module automatically issues pings on inactivity when there is no data often enough
* to maintain the connection open.
*/ */
fun connectUdpDevice(hostPort: String) = connectUdpDevice(hostPort.toNetworkAddress()) fun connectUdpDevice(
addr: NetworkAddress,
/** maxInactivityTimeout: Duration = 2.minutes,
* Connect to UDP server (see [acceptUdpDevice]) and return a [InetTransportDevice] for it. It ): InetTransportDevice {
* should be used with [KiloClient] as connection provider:
* ```kotlin
* val client = KiloClient<Unit>() {
* connect { connectUdpDevice("localhost:$port") }
* }
* // now we can execute remote commands:
* assertEquals("start", client.call(cmdLoad))
* ```
*/
fun connectUdpDevice(addr: NetworkAddress): InetTransportDevice {
val selectorManager = SelectorManager(Dispatchers.IO) val selectorManager = SelectorManager(Dispatchers.IO)
val remoteAddress = InetSocketAddress(addr.host, addr.port) val remoteAddress = InetSocketAddress(addr.host, addr.port)
val socket = aSocket(selectorManager).udp().connect(remoteAddress) val socket = aSocket(selectorManager).udp().connect(remoteAddress)
@ -99,7 +126,7 @@ fun connectUdpDevice(addr: NetworkAddress): InetTransportDevice {
done.complete(Unit) done.complete(Unit)
} }
}, remoteAddress, false) }, remoteAddress, false, maxInactivityTimeout)
globalLaunch { globalLaunch {
launch { launch {

View File

@ -14,6 +14,8 @@ import net.sergeych.mp_logger.LogTag
import net.sergeych.mp_logger.Loggable import net.sergeych.mp_logger.Loggable
import net.sergeych.mp_logger.debug import net.sergeych.mp_logger.debug
import net.sergeych.mp_logger.exception import net.sergeych.mp_logger.exception
import kotlin.time.Duration
import kotlin.time.Duration.Companion.minutes
/** /**
* UDP server for kiloparsec. Unlike [acceptUdpDevice], it allow stopping listening * UDP server for kiloparsec. Unlike [acceptUdpDevice], it allow stopping listening
@ -34,8 +36,15 @@ import net.sergeych.mp_logger.exception
* ``` * ```
* *
* See [acceptUdpDevice] for more information. * See [acceptUdpDevice] for more information.
*
* @param port port to listen to
* @param localInterface string form of local interface to listen to
* @param maxInactivityTimeout maximum silence time after which the connection is supposed to be lost.
* the module automatically issues pings on inactivity when there is no data often enough
* to maintain the connection open.
*/ */
class UdpServer(val port: Int,localInterface: String = "0.0.0.0") : class UdpServer(val port: Int, localInterface: String = "0.0.0.0", maxInactivityTimeout: Duration = 2.minutes) :
Loggable by LogTag("UDPS${udpCounter.incrementAndGet()}"), UdpConnector { Loggable by LogTag("UDPS${udpCounter.incrementAndGet()}"), UdpConnector {
private val sessions = mutableMapOf<SocketAddress, UdpSocketTransport>() private val sessions = mutableMapOf<SocketAddress, UdpSocketTransport>()
@ -54,7 +63,7 @@ class UdpServer(val port: Int,localInterface: String = "0.0.0.0") :
*/ */
val transportFlow by lazy { val transportFlow by lazy {
flow { flow {
while(true) { while (true) {
try { try {
val datagram = serverSocket.receive() val datagram = serverSocket.receive()
val block = UdpBlock.decode(datagram) val block = UdpBlock.decode(datagram)
@ -68,18 +77,17 @@ class UdpServer(val port: Int,localInterface: String = "0.0.0.0") :
sessions.getOrPut(remoteAddress) { sessions.getOrPut(remoteAddress) {
// new connection: create transport // new connection: create transport
debug { "Creating new connection to $remoteAddress" } debug { "Creating new connection to $remoteAddress" }
UdpSocketTransport(this@UdpServer, remoteAddress, true) UdpSocketTransport(this@UdpServer, remoteAddress, true, maxInactivityTimeout)
// and emit it: // and emit it:
.also { emit(it.transportDevice) } .also { emit(it.transportDevice) }
}.processIncoming(block) }.processIncoming(block)
} }
} }
} } catch (_: CancellationException) {
catch(_: CancellationException) { break }
catch(_: ClosedReceiveChannelException) {
break break
} } catch (_: ClosedReceiveChannelException) {
catch(e: Exception) { break
} catch (e: Exception) {
exception { "unexpected exception in incoming datagram processing" to e } exception { "unexpected exception in incoming datagram processing" to e }
close() close()
break break
@ -109,7 +117,7 @@ class UdpServer(val port: Int,localInterface: String = "0.0.0.0") :
runCatching { serverSocket.close() } runCatching { serverSocket.close() }
} }
} }
while(sessions.isNotEmpty()) { while (sessions.isNotEmpty()) {
runCatching { runCatching {
access.withLock { sessions.values.firstOrNull() } access.withLock { sessions.values.firstOrNull() }
?.close() ?.close()

View File

@ -16,14 +16,19 @@ import net.sergeych.mp_logger.Loggable
import net.sergeych.mp_logger.debug import net.sergeych.mp_logger.debug
import net.sergeych.mp_logger.exception import net.sergeych.mp_logger.exception
import net.sergeych.mp_tools.globalLaunch import net.sergeych.mp_tools.globalLaunch
import kotlin.time.Duration.Companion.seconds import kotlin.time.Duration
/** /**
* This is a common part of UDP transport shared between client and server connections. * This is a common part of UDP transport shared between client and server connections.
* It should not be used directly but bu the [UdpServer], [acceptUdpDevice] and [connectUdpDevice] * It should not be used directly but bu the [UdpServer], [acceptUdpDevice] and [connectUdpDevice]
* respectively. * respectively.
*/ */
internal class UdpSocketTransport(private val server: UdpConnector, val socketAddress: SocketAddress, val isServer: Boolean) : internal class UdpSocketTransport(
private val server: UdpConnector,
val socketAddress: SocketAddress,
val isServer: Boolean,
val maxInactivityTimeout: Duration
) :
Loggable { Loggable {
// IMPORTANT! Log stuff must be the first (or you shot your leg): // IMPORTANT! Log stuff must be the first (or you shot your leg):
@ -34,12 +39,10 @@ internal class UdpSocketTransport(private val server: UdpConnector, val socketAd
// Pinger params: keep them first! // Pinger params: keep them first!
private var lastSendAt = Clock.System.now() private var lastSendAt = Clock.System.now()
private var lastReceived = Clock.System.now() private var lastReceived = Clock.System.now()
private val pingTimeout = 30.seconds private val pingTimeout = maxInactivityTimeout / 3
private val pingSleep = pingTimeout / 3 private val pingSleep = pingTimeout / 3
private val pingMinTimeout = pingTimeout / 2 private val pingMinTimeout = pingTimeout * 2 / 3
// TODO: break on inactivity
private val inactivityBreakTimeout = 30.seconds
val inputDataBlocks = Channel<UByteArray>(256, onBufferOverflow = BufferOverflow.DROP_OLDEST) val inputDataBlocks = Channel<UByteArray>(256, onBufferOverflow = BufferOverflow.DROP_OLDEST)
val outputDataBlocks = Channel<UByteArray>(256, onBufferOverflow = BufferOverflow.DROP_OLDEST) val outputDataBlocks = Channel<UByteArray>(256, onBufferOverflow = BufferOverflow.DROP_OLDEST)
@ -142,7 +145,12 @@ internal class UdpSocketTransport(private val server: UdpConnector, val socketAd
private suspend fun pinger() { private suspend fun pinger() {
while (!isClosed) { while (!isClosed) {
delay(pingSleep) delay(pingSleep)
if (Clock.System.now() - lastSendAt >= pingTimeout) { val inactivity = Clock.System.now() - lastSendAt
if( inactivity > maxInactivityTimeout) {
debug { "inactivity timout: closing the connection" }
close()
}
if (inactivity >= pingTimeout) {
debug { "pinger sends a ping on timeout" } debug { "pinger sends a ping on timeout" }
send(UdpBlock.Ping) send(UdpBlock.Ping)
} }

View File

@ -1,7 +1,9 @@
package net.sergeych.kiloparsec.adapter
import assertThrows
import kotlinx.coroutines.test.runTest import kotlinx.coroutines.test.runTest
import net.sergeych.crypto2.initCrypto import net.sergeych.crypto2.initCrypto
import net.sergeych.kiloparsec.* import net.sergeych.kiloparsec.*
import net.sergeych.kiloparsec.adapter.*
import net.sergeych.mp_logger.Log import net.sergeych.mp_logger.Log
import kotlin.random.Random import kotlin.random.Random
import kotlin.test.Test import kotlin.test.Test