0.3.2: UDP support

This commit is contained in:
Sergey Chernov 2024-08-11 14:17:11 +02:00
parent f6fbf8e58e
commit 4d178d951f
15 changed files with 905 additions and 84 deletions

8
.idea/artifacts/kiloparsec_js_0_3_1.xml generated Normal file
View File

@ -0,0 +1,8 @@
<component name="ArtifactManager">
<artifact type="jar" name="kiloparsec-js-0.3.1">
<output-path>$PROJECT_DIR$/build/libs</output-path>
<root id="archive" name="kiloparsec-js-0.3.1.jar">
<element id="module-output" name="kiloparsec.jsMain" />
</root>
</artifact>
</component>

View File

@ -0,0 +1,8 @@
<component name="ArtifactManager">
<artifact type="jar" name="kiloparsec-jvm-0.3.1">
<output-path>$PROJECT_DIR$/build/libs</output-path>
<root id="archive" name="kiloparsec-jvm-0.3.1.jar">
<element id="module-output" name="kiloparsec.jvmMain" />
</root>
</artifact>
</component>

124
.idea/uiDesigner.xml generated Normal file
View File

@ -0,0 +1,124 @@
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="Palette2">
<group name="Swing">
<item class="com.intellij.uiDesigner.HSpacer" tooltip-text="Horizontal Spacer" icon="/com/intellij/uiDesigner/icons/hspacer.svg" removable="false" auto-create-binding="false" can-attach-label="false">
<default-constraints vsize-policy="1" hsize-policy="6" anchor="0" fill="1" />
</item>
<item class="com.intellij.uiDesigner.VSpacer" tooltip-text="Vertical Spacer" icon="/com/intellij/uiDesigner/icons/vspacer.svg" removable="false" auto-create-binding="false" can-attach-label="false">
<default-constraints vsize-policy="6" hsize-policy="1" anchor="0" fill="2" />
</item>
<item class="javax.swing.JPanel" icon="/com/intellij/uiDesigner/icons/panel.svg" removable="false" auto-create-binding="false" can-attach-label="false">
<default-constraints vsize-policy="3" hsize-policy="3" anchor="0" fill="3" />
</item>
<item class="javax.swing.JScrollPane" icon="/com/intellij/uiDesigner/icons/scrollPane.svg" removable="false" auto-create-binding="false" can-attach-label="true">
<default-constraints vsize-policy="7" hsize-policy="7" anchor="0" fill="3" />
</item>
<item class="javax.swing.JButton" icon="/com/intellij/uiDesigner/icons/button.svg" removable="false" auto-create-binding="true" can-attach-label="false">
<default-constraints vsize-policy="0" hsize-policy="3" anchor="0" fill="1" />
<initial-values>
<property name="text" value="Button" />
</initial-values>
</item>
<item class="javax.swing.JRadioButton" icon="/com/intellij/uiDesigner/icons/radioButton.svg" removable="false" auto-create-binding="true" can-attach-label="false">
<default-constraints vsize-policy="0" hsize-policy="3" anchor="8" fill="0" />
<initial-values>
<property name="text" value="RadioButton" />
</initial-values>
</item>
<item class="javax.swing.JCheckBox" icon="/com/intellij/uiDesigner/icons/checkBox.svg" removable="false" auto-create-binding="true" can-attach-label="false">
<default-constraints vsize-policy="0" hsize-policy="3" anchor="8" fill="0" />
<initial-values>
<property name="text" value="CheckBox" />
</initial-values>
</item>
<item class="javax.swing.JLabel" icon="/com/intellij/uiDesigner/icons/label.svg" removable="false" auto-create-binding="false" can-attach-label="false">
<default-constraints vsize-policy="0" hsize-policy="0" anchor="8" fill="0" />
<initial-values>
<property name="text" value="Label" />
</initial-values>
</item>
<item class="javax.swing.JTextField" icon="/com/intellij/uiDesigner/icons/textField.svg" removable="false" auto-create-binding="true" can-attach-label="true">
<default-constraints vsize-policy="0" hsize-policy="6" anchor="8" fill="1">
<preferred-size width="150" height="-1" />
</default-constraints>
</item>
<item class="javax.swing.JPasswordField" icon="/com/intellij/uiDesigner/icons/passwordField.svg" removable="false" auto-create-binding="true" can-attach-label="true">
<default-constraints vsize-policy="0" hsize-policy="6" anchor="8" fill="1">
<preferred-size width="150" height="-1" />
</default-constraints>
</item>
<item class="javax.swing.JFormattedTextField" icon="/com/intellij/uiDesigner/icons/formattedTextField.svg" removable="false" auto-create-binding="true" can-attach-label="true">
<default-constraints vsize-policy="0" hsize-policy="6" anchor="8" fill="1">
<preferred-size width="150" height="-1" />
</default-constraints>
</item>
<item class="javax.swing.JTextArea" icon="/com/intellij/uiDesigner/icons/textArea.svg" removable="false" auto-create-binding="true" can-attach-label="true">
<default-constraints vsize-policy="6" hsize-policy="6" anchor="0" fill="3">
<preferred-size width="150" height="50" />
</default-constraints>
</item>
<item class="javax.swing.JTextPane" icon="/com/intellij/uiDesigner/icons/textPane.svg" removable="false" auto-create-binding="true" can-attach-label="true">
<default-constraints vsize-policy="6" hsize-policy="6" anchor="0" fill="3">
<preferred-size width="150" height="50" />
</default-constraints>
</item>
<item class="javax.swing.JEditorPane" icon="/com/intellij/uiDesigner/icons/editorPane.svg" removable="false" auto-create-binding="true" can-attach-label="true">
<default-constraints vsize-policy="6" hsize-policy="6" anchor="0" fill="3">
<preferred-size width="150" height="50" />
</default-constraints>
</item>
<item class="javax.swing.JComboBox" icon="/com/intellij/uiDesigner/icons/comboBox.svg" removable="false" auto-create-binding="true" can-attach-label="true">
<default-constraints vsize-policy="0" hsize-policy="2" anchor="8" fill="1" />
</item>
<item class="javax.swing.JTable" icon="/com/intellij/uiDesigner/icons/table.svg" removable="false" auto-create-binding="true" can-attach-label="false">
<default-constraints vsize-policy="6" hsize-policy="6" anchor="0" fill="3">
<preferred-size width="150" height="50" />
</default-constraints>
</item>
<item class="javax.swing.JList" icon="/com/intellij/uiDesigner/icons/list.svg" removable="false" auto-create-binding="true" can-attach-label="false">
<default-constraints vsize-policy="6" hsize-policy="2" anchor="0" fill="3">
<preferred-size width="150" height="50" />
</default-constraints>
</item>
<item class="javax.swing.JTree" icon="/com/intellij/uiDesigner/icons/tree.svg" removable="false" auto-create-binding="true" can-attach-label="false">
<default-constraints vsize-policy="6" hsize-policy="6" anchor="0" fill="3">
<preferred-size width="150" height="50" />
</default-constraints>
</item>
<item class="javax.swing.JTabbedPane" icon="/com/intellij/uiDesigner/icons/tabbedPane.svg" removable="false" auto-create-binding="true" can-attach-label="false">
<default-constraints vsize-policy="3" hsize-policy="3" anchor="0" fill="3">
<preferred-size width="200" height="200" />
</default-constraints>
</item>
<item class="javax.swing.JSplitPane" icon="/com/intellij/uiDesigner/icons/splitPane.svg" removable="false" auto-create-binding="false" can-attach-label="false">
<default-constraints vsize-policy="3" hsize-policy="3" anchor="0" fill="3">
<preferred-size width="200" height="200" />
</default-constraints>
</item>
<item class="javax.swing.JSpinner" icon="/com/intellij/uiDesigner/icons/spinner.svg" removable="false" auto-create-binding="true" can-attach-label="true">
<default-constraints vsize-policy="0" hsize-policy="6" anchor="8" fill="1" />
</item>
<item class="javax.swing.JSlider" icon="/com/intellij/uiDesigner/icons/slider.svg" removable="false" auto-create-binding="true" can-attach-label="false">
<default-constraints vsize-policy="0" hsize-policy="6" anchor="8" fill="1" />
</item>
<item class="javax.swing.JSeparator" icon="/com/intellij/uiDesigner/icons/separator.svg" removable="false" auto-create-binding="false" can-attach-label="false">
<default-constraints vsize-policy="6" hsize-policy="6" anchor="0" fill="3" />
</item>
<item class="javax.swing.JProgressBar" icon="/com/intellij/uiDesigner/icons/progressbar.svg" removable="false" auto-create-binding="true" can-attach-label="false">
<default-constraints vsize-policy="0" hsize-policy="6" anchor="0" fill="1" />
</item>
<item class="javax.swing.JToolBar" icon="/com/intellij/uiDesigner/icons/toolbar.svg" removable="false" auto-create-binding="false" can-attach-label="false">
<default-constraints vsize-policy="0" hsize-policy="6" anchor="0" fill="1">
<preferred-size width="-1" height="20" />
</default-constraints>
</item>
<item class="javax.swing.JToolBar$Separator" icon="/com/intellij/uiDesigner/icons/toolbarSeparator.svg" removable="false" auto-create-binding="false" can-attach-label="false">
<default-constraints vsize-policy="0" hsize-policy="0" anchor="0" fill="1" />
</item>
<item class="javax.swing.JScrollBar" icon="/com/intellij/uiDesigner/icons/scrollbar.svg" removable="false" auto-create-binding="true" can-attach-label="false">
<default-constraints vsize-policy="6" hsize-policy="0" anchor="0" fill="2" />
</item>
</group>
</component>
</project>

View File

@ -1,18 +1,21 @@
# Kiloparsec
__Recommended version is `0.3.1`: to keep the code compatible with current and further versions we
ask to upgrade to `0.3.1` at least.__
__Recommended version is `0.3.2`: to keep the code compatible with current and further versions we
ask to upgrade to `0.3.2` at least.__ Starting from this version some pacakage names are changed for
better clarity and fast UDP endpoints are added.
The new generation of __PARanoid SECurity__ protocol, advanced, faster, more secure. It also allows connecting any "
block device" transport to the same local interface. Out if the box it
provides the following transports:
| name | JVM | JS | native |
|----------------|-----|----|----------|
| TCP/IP server | ✓ | | >= 0.2.6 |
| TCP/IP client | ✓ | | >= 0.2.6 |
| Websock server | ✓ | | |
| Websock client | ✓ | ✓ | ✓ |
| name | JVM | JS | native |
|----------------|--------|----|--------|
| TCP/IP server | ✓ | | 0.2.6+ |
| TCP/IP client | ✓ | | 0.2.6+ |
| UDP server | 0.3.2+ | | 0.3.2+ |
| UDP client | 0.3.2+ | | 0.3.2+ |
| Websock server | ✓ | | |
| Websock client | ✓ | ✓ | ✓ |
### Note on version compatibility
@ -23,7 +26,7 @@ format. The format from 0.3.0 onwards is supposed to keep compatible.
- iosArm64, iosX64
- macosArm64, macosArm64
- linxArm64, linuxX64
- linuxArm64, linuxX64
### Non-native targets
@ -67,7 +70,7 @@ It could be, depending on your project structure, something like:
```kotlin
val commonMain by getting {
dependencies {
api("net.sergeych:kiloparsec:0.3.1")
api("net.sergeych:kiloparsec:0.3.2")
}
}
```
@ -171,6 +174,22 @@ In short, there are two functions that implements asynchronous TCP/IP transport
- [connectTcpDevice](https://code.sergeych.net/docs/kiloparsec/kiloparsec/net.sergeych.kiloparsec.adapter/connect-tcp-device.html) to connect to the server
## UDP client and server
Is very much straightforward, same as with TCP/IP:
- [UDP server creation](https://code.sergeych.net/docs/kiloparsec/kiloparsec/net.sergeych.kiloparsec.adapter/accept-udp-device.html)
- [Connect UDP client](https://code.sergeych.net/docs/kiloparsec/kiloparsec/net.sergeych.kiloparsec.adapter/connect-udp-device.html)
### UDP specifics
Each command invocation and result are packed in a separate UDP diagram using effective binary packing.
Thus for the best results commands and results should be relatively short, best to fit into 240 bytes. While bigger datagrams are often transmitted successfully, the probability of the effective transmission drops with the size.
Kiloparsec UDP transport does not retransmits not delivered packets. Use TCP/IP or websocket if it is a concern.
For the best results we recommend using [push](https://code.sergeych.net/docs/kiloparsec/kiloparsec/net.sergeych.kiloparsec/-remote-interface/index.html#1558240250%2FFunctions%2F788909594) for remote interfaces with UDP.
## Reusing code between servers
The same instance of the [KiloInterface](https://code.sergeych.net/docs/kiloparsec/kiloparsec/net.sergeych.kiloparsec/-kilo-interface/index.html?query=open%20class%20KiloInterface%3CS%3E%20:%20LocalInterface%3CKiloScope%3CS%3E%3E) could easily be reused with all instances of servers with different protocols.

View File

@ -1,12 +1,12 @@
plugins {
kotlin("multiplatform") version "2.0.0"
kotlin("multiplatform") version "2.0.10"
id("org.jetbrains.kotlin.plugin.serialization") version "2.0.0"
`maven-publish`
id("org.jetbrains.dokka") version "1.9.20"
}
group = "net.sergeych"
version = "0.3.1"
version = "0.3.2"
repositories {
mavenCentral()

View File

@ -89,6 +89,7 @@ class KiloClient<S>(
}
_state.value = false
resetDeferredClient()
// reconnection timeout
delay(100)
}
}

View File

@ -1,6 +1,24 @@
package net.sergeych.kiloparsec
import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock
fun String.encodeToUByteArray() =
encodeToByteArray().toUByteArray()
fun UByteArray.decodeFromUByteArray(): String = toByteArray().decodeToString()
class SyncValue<T>(initialValue: T) {
private val access = Mutex()
var value = initialValue
private set
suspend fun mutate(f: suspend (T)->T): T = access.withLock { f(value).also { value = it } }
@Suppress("unused")
suspend fun getAndSet(newValue: T): T = mutate {
val old = value
value = newValue
old
}
}

View File

@ -33,9 +33,9 @@ val PING_INACTIVITY_TIME = 30.seconds
* Listen for incoming TCP/IP connections on all local interfaces and the specified [port]
* anc create flow of [InetTransportDevice] suitable for [KiloClient].
*/
fun acceptTcpDevice(port: Int): Flow<InetTransportDevice> {
fun acceptTcpDevice(port: Int,localInterface: String = "0.0.0.0"): Flow<InetTransportDevice> {
val selectorManager = SelectorManager(Dispatchers.IO)
val serverSocket = aSocket(selectorManager).tcp().bind("127.0.0.1", port)
val serverSocket = aSocket(selectorManager).tcp().bind(localInterface, port)
return flow {
while (true) {
serverSocket.accept().let { sock ->

View File

@ -0,0 +1,102 @@
package net.sergeych.kiloparsec.adapter
import io.ktor.network.sockets.*
import io.ktor.utils.io.core.*
import net.sergeych.crypto2.toDump
import net.sergeych.kiloparsec.adapter.UdpBlock.Companion.CANCEL_BLOCK
import net.sergeych.kiloparsec.adapter.UdpBlock.Companion.ESCAPE_BYTE
import net.sergeych.kiloparsec.adapter.UdpBlock.Companion.PING_BLOCK
import net.sergeych.kiloparsec.adapter.UdpBlock.Companion.decode
/**
* Encoded block for UDP datagram space-savvy. Minimum dara size is two bytes, which is fine
* for Kiloparsec blocks.
*
* First byte is encoded using [ESCAPE_BYTE] depending on the second byte:
*
* | 0 | 1 | meaning |
* |---|---|---------|
* | [ESCAPE_BYTE] | [ESCAPE_BYTE] | Data block, dropping first byte |
* | [ESCAPE_BYTE] | [PING_BLOCK] | Ping block, reset timers |
* | [ESCAPE_BYTE] | [CANCEL_BLOCK] | close connection |
* | any other | * | data block, all bytes |
*
* Use [encoded] and [toDatagram] to create binary or the datagram from a block, and [decode] to restore.
*
* We do not use serialization to speed up the transport layer.
*/
sealed class UdpBlock {
/**
* Block to show that the connection is closed and should also be closed on the other side
*/
object Cancel : UdpBlock()
/**
* Parties show pings if there is no activity to keep it alive, detect connection loss and in some
* cases revive NAT/Proxy state in routers.
*/
object Ping : UdpBlock()
/**
* Parsec data block. Could not be smaller than two bytes.
*/
class Data(val data: UByteArray) : UdpBlock() {
override fun toString(): String {
return "UDP Data (${data.size}):\n${data.toDump()}"
}
init {
if( data.size < 2) throw IllegalArgumentException("data must be at least 2 bytes")
}
}
val encoded: UByteArray by lazy {
when(this) {
is Data -> {
// Do we need escaping?
if( data[0] == ESCAPE_BYTE )
escapeAsArray + data
else
data
}
is Cancel -> cancelAsArray
is Ping -> pingAsArray
}
}
fun toDatagram(address: SocketAddress): Datagram {
val encoded = encoded.toByteArray()
return Datagram(ByteReadPacket(encoded, 0, encoded.size), address)
}
companion object {
val ESCAPE_BYTE = 255.toUByte()
val PING_BLOCK = 0.toUByte()
val CANCEL_BLOCK = 1.toUByte()
private val escapeAsArray = ubyteArrayOf(ESCAPE_BYTE)
private val pingAsArray = ubyteArrayOf(ESCAPE_BYTE, PING_BLOCK)
private val cancelAsArray = ubyteArrayOf(ESCAPE_BYTE, CANCEL_BLOCK)
fun decode(data: UByteArray): UdpBlock {
if (data.size < 2)
throw UdpTransportException("block too short: ${data.size}")
return if( data[0] != ESCAPE_BYTE )
// plain data
Data(data)
else {
when(val b2 = data[1]) {
ESCAPE_BYTE -> {
// Escaped first byte, then plain data
Data(data.sliceArray(1 ..< data.size))
}
PING_BLOCK -> Ping
CANCEL_BLOCK -> Cancel
else -> throw UdpTransportException("invalid block type: $b2")
}
}
}
fun decode(datagram: Datagram) =
decode(datagram.packet.readBytes().toUByteArray())
}
}

View File

@ -0,0 +1,18 @@
package net.sergeych.kiloparsec.adapter
import io.ktor.network.sockets.*
/**
* The interface for common UDP connector shared by UDP components
*/
internal interface UdpConnector {
/**
* Called when client connection is done so the provider could free resources
*/
suspend fun disconnectClient(address: SocketAddress)
/**
* Send a block from a proper UDP socket
*/
suspend fun sendBlock(block: UdpBlock, toAddress: SocketAddress)
}

View File

@ -0,0 +1,121 @@
package net.sergeych.kiloparsec.adapter
import io.ktor.network.selector.*
import io.ktor.network.sockets.*
import io.ktor.utils.io.CancellationException
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.Flow
import net.sergeych.kiloparsec.KiloClient
import net.sergeych.kiloparsec.KiloServer
import net.sergeych.kiloparsec.RemoteInterface
import net.sergeych.mp_tools.globalLaunch
import net.sergeych.tools.AtomicCounter
internal val udpCounter = AtomicCounter(0)
class UdpTransportException(override val message: String) : RemoteInterface.InvalidDataException(message)
/**
* Listen for incoming UDP connections and provide transport flow for it. See also [UdpServer.transportFlow]
* for another way to create a server. Use it with [KiloServer]:
* ```kotlin
* // Whatever server session data we might need:
* data class Session(
* var data: String,
* )
*
* // declare some commands (normally in a shared module):
* val cmdSave by command<String, Unit>()
* val cmdLoad by command<Unit, String>()
* val cmdDrop by command<Unit, Unit>()
* val cmdException by command<Unit, Unit>()
*
* // Interface using the session above, can be shared between many
* // server types and instances (different ports and protocols):
* val cli = KiloInterface<Session>().apply {
* onConnected { session.data = "start" }
* on(cmdSave) { session.data = it }
* on(cmdLoad) {
* session.data
* }
* on(cmdException) {
* throw TestException()
* }
* on(cmdDrop) {
* throw LocalInterface.BreakConnectionException()
* }
* }
* // Now create a server to accept incoming UDPs on our port:
* val server = KiloServer(cli, acceptUdpDevice(port)) {
* // This initializes new session for each incoming command
* Session("unknown")
* }
* ```
*
* See [connectUdpDevice] for the client sample.
*
* When it is necessary to stop listening to some port, use [UdpServer] instead.
*/
fun acceptUdpDevice(port: Int, localInterface: String = "0.0.0.0"): Flow<InetTransportDevice> =
UdpServer(port, localInterface).transportFlow
/**
* Connect to UDP server (see [acceptUdpDevice]) and return a [InetTransportDevice] for it. It
* should be used with [KiloClient] as connection provider:
* ```kotlin
* val client = KiloClient<Unit>() {
* connect { connectUdpDevice("localhost:$port") }
* }
* // now we can execute remote commands:
* assertEquals("start", client.call(cmdLoad))
* ```
*/
fun connectUdpDevice(hostPort: String) = connectUdpDevice(hostPort.toNetworkAddress())
/**
* Connect to UDP server (see [acceptUdpDevice]) and return a [InetTransportDevice] for it. It
* should be used with [KiloClient] as connection provider:
* ```kotlin
* val client = KiloClient<Unit>() {
* connect { connectUdpDevice("localhost:$port") }
* }
* // now we can execute remote commands:
* assertEquals("start", client.call(cmdLoad))
* ```
*/
fun connectUdpDevice(addr: NetworkAddress): InetTransportDevice {
val selectorManager = SelectorManager(Dispatchers.IO)
val remoteAddress = InetSocketAddress(addr.host, addr.port)
val socket = aSocket(selectorManager).udp().connect(remoteAddress)
val done = CompletableDeferred<Unit>()
val transport = UdpSocketTransport(object : UdpConnector {
override suspend fun sendBlock(block: UdpBlock, toAddress: SocketAddress) {
socket.send(block.toDatagram(remoteAddress))
}
override suspend fun disconnectClient(address: SocketAddress) {
done.complete(Unit)
}
}, remoteAddress, false)
globalLaunch {
launch {
while (isActive) {
try {
transport.processIncoming(UdpBlock.decode(socket.receive()))
} catch (_: CancellationException) {
break
} catch (e: Exception) {
transport.close()
break
}
}
}
done.await()
}
return transport.transportDevice
}

View File

@ -0,0 +1,119 @@
package net.sergeych.kiloparsec.adapter
import io.ktor.network.selector.*
import io.ktor.network.sockets.*
import io.ktor.utils.io.*
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.IO
import kotlinx.coroutines.channels.ClosedReceiveChannelException
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock
import net.sergeych.kiloparsec.KiloServer
import net.sergeych.mp_logger.LogTag
import net.sergeych.mp_logger.Loggable
import net.sergeych.mp_logger.debug
import net.sergeych.mp_logger.exception
/**
* UDP server for kiloparsec. Unlike [acceptUdpDevice], it allow stopping listening
* to the port when need with [close]. Use [transportFlow] with [KiloServer], here is the
* basic sample:
*
* ```kotlin
* val uServer = UdpServer(port)
* KiloServer(cli, uServer.transportFlow()) {
* Session("unknown")
* }
*
* // server is now active and accepts connections
* // ...
*
* // close and stop listening to the port:
* uServer.close()
* ```
*
* See [acceptUdpDevice] for more information.
*/
class UdpServer(val port: Int,localInterface: String = "0.0.0.0") :
Loggable by LogTag("UDPS${udpCounter.incrementAndGet()}"), UdpConnector {
private val sessions = mutableMapOf<SocketAddress, UdpSocketTransport>()
private val access = Mutex()
private val selectorManager = SelectorManager(Dispatchers.IO)
private val serverSocket = aSocket(selectorManager).udp().bind(InetSocketAddress(localInterface, port))
override suspend fun disconnectClient(address: SocketAddress) {
access.withLock { sessions.remove(address) }
}
/**
* a transport flow of [InetTransportDevice] suitable to be used with [KiloServer], see [UdpServer] for the
* usage sample.
*/
val transportFlow by lazy {
flow {
while(true) {
try {
val datagram = serverSocket.receive()
val block = UdpBlock.decode(datagram)
val remoteAddress = datagram.address
access.withLock {
if (block == UdpBlock.Cancel) {
// if the cancel comes to already closed transport, do nothing
sessions.remove(remoteAddress)?.processIncoming(block)
} else {
sessions.getOrPut(remoteAddress) {
// new connection: create transport
debug { "Creating new connection to $remoteAddress" }
UdpSocketTransport(this@UdpServer, remoteAddress, true)
// and emit it:
.also { emit(it.transportDevice) }
}.processIncoming(block)
}
}
}
catch(_: CancellationException) { break }
catch(_: ClosedReceiveChannelException) {
break
}
catch(e: Exception) {
exception { "unexpected exception in incoming datagram processing" to e }
close()
break
}
}
}
}
override suspend fun sendBlock(block: UdpBlock, toAddress: SocketAddress) {
serverSocket.send(block.toDatagram(toAddress))
}
val isClosed: Boolean get() = serverSocket.isClosed
/**
* Close the UDP server. Calling it will cause:
*
* - Closing nound UDP socket on [port]
* - Closing all pending connections
* - cancelling the [transportFlow], which will cause Kiloparsec server to also stop
*
* Call suspends until socket and all sessions are closed. Later calls do nothing.
*/
suspend fun close() {
access.withLock {
if (!isClosed) {
runCatching { serverSocket.close() }
}
}
while(sessions.isNotEmpty()) {
runCatching {
access.withLock { sessions.values.firstOrNull() }
?.close()
}
}
}
}

View File

@ -0,0 +1,171 @@
package net.sergeych.kiloparsec.adapter
import io.ktor.network.sockets.*
import io.ktor.utils.io.*
import kotlinx.coroutines.channels.BufferOverflow
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.channels.ClosedReceiveChannelException
import kotlinx.coroutines.channels.ClosedSendChannelException
import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.delay
import kotlinx.coroutines.launch
import kotlinx.datetime.Clock
import net.sergeych.kiloparsec.SyncValue
import net.sergeych.mp_logger.Log
import net.sergeych.mp_logger.Loggable
import net.sergeych.mp_logger.debug
import net.sergeych.mp_logger.exception
import net.sergeych.mp_tools.globalLaunch
import kotlin.time.Duration.Companion.seconds
/**
* This is a common part of UDP transport shared between client and server connections.
* It should not be used directly but bu the [UdpServer], [acceptUdpDevice] and [connectUdpDevice]
* respectively.
*/
internal class UdpSocketTransport(private val server: UdpConnector, val socketAddress: SocketAddress, val isServer: Boolean) :
Loggable {
// IMPORTANT! Log stuff must be the first (or you shot your leg):
val address = (socketAddress as InetSocketAddress).let { NetworkAddress(it.hostname, it.port) }
override var logTag: String = "UDPT:$address${if (isServer) ":server" else ":client"}"
override var logLevel: Log.Level? = Log.Level.DEBUG
// Pinger params: keep them first!
private var lastSendAt = Clock.System.now()
private var lastReceived = Clock.System.now()
private val pingTimeout = 30.seconds
private val pingSleep = pingTimeout / 3
private val pingMinTimeout = pingTimeout / 2
// TODO: break on inactivity
private val inactivityBreakTimeout = 30.seconds
val inputDataBlocks = Channel<UByteArray>(256, onBufferOverflow = BufferOverflow.DROP_OLDEST)
val outputDataBlocks = Channel<UByteArray>(256, onBufferOverflow = BufferOverflow.DROP_OLDEST)
val inputUdpBlocks = Channel<UdpBlock>(256, onBufferOverflow = BufferOverflow.DROP_OLDEST)
private val job = globalLaunch {
coroutineScope {
launch { convertOutput() }
launch { convertInput() }
launch { pinger() }
}
}
init {
// This is iverly important: it requires that
// all members are initialized before use. Otherwise kotlin
// may execute class members pr
debug { "initialization done" }
}
val transportDevice: InetTransportDevice by lazy {
InetTransportDevice(inputDataBlocks, outputDataBlocks, address, { close() }, {})
}
private val closedFlag = SyncValue(false)
val isClosed: Boolean = closedFlag.value
suspend fun close() {
closedFlag.mutate {
if (!it) {
runCatching { server.sendBlock(UdpBlock.Cancel, socketAddress) }
server.disconnectClient(socketAddress)
runCatching { inputDataBlocks.close() }
runCatching { outputDataBlocks.close() }
job.cancel()
}
true
}
}
private suspend fun send(block: UdpBlock) {
server.sendBlock(block, socketAddress)
lastSendAt = Clock.System.now()
}
/**
* Process the block recoded by the server. Note that it should properly process all
* block types, e.g. close on [UdpBlock.Cancel], etc. Server will not close us!
*
* __Important: it should not block, instead, server expects it to return ASAP__, so it
* executes in a local coroutine context.
*
* Also it should not throw exceptions.
*/
fun processIncoming(block: UdpBlock) {
inputUdpBlocks.trySend(block)
}
suspend fun convertInput() {
while(!isClosed) {
when (val block = inputUdpBlocks.receiveCatching().getOrNull()) {
null -> break
is UdpBlock.Cancel -> globalLaunch {
debug { "received cancel block, requesting close" }
kotlin.runCatching { close() }
}
is UdpBlock.Data -> {
// input does not block, it uses DROP_OLDEST policy
lastReceived = Clock.System.now()
val result = kotlin.runCatching { inputDataBlocks.send(block.data) }
when (val e = result.exceptionOrNull()) {
null -> {}
is ClosedSendChannelException -> {
debug { "received close channel" }
close()
}
is CancellationException -> {}
else -> {
exception { "unexpected exception" to e }
close()
}
}
}
UdpBlock.Ping -> {
lastReceived = Clock.System.now()
if (lastSendAt - lastReceived > pingMinTimeout) send(UdpBlock.Ping)
}
}
}
}
private suspend fun pinger() {
while (!isClosed) {
delay(pingSleep)
if (Clock.System.now() - lastSendAt >= pingTimeout) {
debug { "pinger sends a ping on timeout" }
send(UdpBlock.Ping)
}
}
}
private suspend fun convertOutput() {
while (!isClosed) {
try {
server.sendBlock(UdpBlock.Data(outputDataBlocks.receive()), socketAddress)
} catch (e: CancellationException) {
// this is ok
break
} catch (e: ClosedReceiveChannelException) {
debug { "input channel is closed, closing" }
close()
break
} catch (e: Exception) {
exception { "unexpected exception in convertOutput" to e }
close()
break
}
}
debug { "exiting convertOutput" }
}
}

View File

@ -0,0 +1,181 @@
import kotlinx.coroutines.test.runTest
import net.sergeych.crypto2.initCrypto
import net.sergeych.kiloparsec.*
import net.sergeych.kiloparsec.adapter.*
import net.sergeych.mp_logger.Log
import kotlin.random.Random
import kotlin.test.Test
import kotlin.test.assertEquals
import kotlin.test.assertIs
class InternetrTest {
class TestException : Exception("test1")
@Test
fun tcpTest() = runTest {
initCrypto()
Log.connectConsole(Log.Level.DEBUG)
data class Session(
var data: String,
)
val port = 27170 + Random.nextInt(1, 200)
val cmdSave by command<String, Unit>()
val cmdLoad by command<Unit, String>()
val cmdDrop by command<Unit, Unit>()
val cmdException by command<Unit, Unit>()
val cli = KiloInterface<Session>().apply {
registerError { TestException() }
onConnected { session.data = "start" }
on(cmdSave) { session.data = it }
on(cmdLoad) {
session.data
}
on(cmdException) {
throw TestException()
}
on(cmdDrop) {
throw LocalInterface.BreakConnectionException()
}
}
val server = KiloServer(cli, acceptTcpDevice(port)) {
Session("unknown")
}
val client = KiloClient<Unit>() {
addErrors(cli)
// TODO: add register error variant
connect { connectTcpDevice("localhost:$port") }
}
assertEquals("start", client.call(cmdLoad))
client.call(cmdSave, "foobar")
assertEquals("foobar", client.call(cmdLoad))
val res = kotlin.runCatching { client.call(cmdException) }
assertIs<TestException>(res.exceptionOrNull())
assertEquals("foobar", client.call(cmdLoad))
assertThrows<RemoteInterface.ClosedException> { client.call(cmdDrop) }
// reconnect?
assertEquals("start", client.call(cmdLoad))
server.close()
}
@Test
fun udpTest() = runTest {
initCrypto()
Log.connectConsole(Log.Level.DEBUG)
data class Session(
var data: String,
)
val port = 27170 + Random.nextInt(1, 200)
val cmdSave by command<String, Unit>()
val cmdLoad by command<Unit, String>()
val cmdDrop by command<Unit, Unit>()
val cmdException by command<Unit, Unit>()
val cli = KiloInterface<Session>().apply {
registerError { TestException() }
onConnected { session.data = "start" }
on(cmdSave) { session.data = it }
on(cmdLoad) {
session.data
}
on(cmdException) {
throw TestException()
}
on(cmdDrop) {
throw LocalInterface.BreakConnectionException()
}
}
val server = KiloServer(cli, acceptUdpDevice(port)) {
Session("unknown")
}
val client = KiloClient<Unit>() {
addErrors(cli)
connect { connectUdpDevice("localhost:$port") }
}
assertEquals("start", client.call(cmdLoad))
client.call(cmdSave, "foobar")
assertEquals("foobar", client.call(cmdLoad))
val res = kotlin.runCatching { client.call(cmdException) }
assertIs<TestException>(res.exceptionOrNull())
assertEquals("foobar", client.call(cmdLoad))
assertThrows<RemoteInterface.ClosedException> { client.call(cmdDrop) }
// reconnect?
assertEquals("start", client.call(cmdLoad))
server.close()
}
@Test
fun udpServerTest() = runTest {
initCrypto()
Log.connectConsole(Log.Level.DEBUG)
data class Session(
var data: String,
)
val port = 27170 + Random.nextInt(1, 200)
val cmdSave by command<String, Unit>()
val cmdLoad by command<Unit, String>()
val cmdDrop by command<Unit, Unit>()
val cmdException by command<Unit, Unit>()
val cli = KiloInterface<Session>().apply {
registerError { TestException() }
onConnected { session.data = "start" }
on(cmdSave) { session.data = it }
on(cmdLoad) {
session.data
}
on(cmdException) {
throw TestException()
}
on(cmdDrop) {
throw LocalInterface.BreakConnectionException()
}
}
val uServer = UdpServer(port)
KiloServer(cli, uServer.transportFlow) {
Session("unknown")
}
val client = KiloClient<Unit>() {
addErrors(cli)
connect { connectUdpDevice("localhost:$port") }
}
assertEquals("start", client.call(cmdLoad))
client.call(cmdSave, "foobar")
assertEquals("foobar", client.call(cmdLoad))
val res = kotlin.runCatching { client.call(cmdException) }
assertIs<TestException>(res.exceptionOrNull())
assertEquals("foobar", client.call(cmdLoad))
assertThrows<RemoteInterface.ClosedException> { client.call(cmdDrop) }
// reconnect?
assertEquals("start", client.call(cmdLoad))
uServer.close()
// server.close()
}
}

View File

@ -1,69 +0,0 @@
import kotlinx.coroutines.test.runTest
import net.sergeych.crypto2.initCrypto
import net.sergeych.kiloparsec.*
import net.sergeych.kiloparsec.adapter.acceptTcpDevice
import net.sergeych.kiloparsec.adapter.connectTcpDevice
import kotlin.random.Random
import kotlin.test.Test
import kotlin.test.assertEquals
import kotlin.test.assertIs
class TcpTest {
class TestException : Exception("test1")
@Test
fun tcpTest() = runTest {
initCrypto()
// Log.connectConsole(Log.Level.DEBUG)
data class Session(
var data: String,
)
val port = 27170 + Random.nextInt(1, 200)
val cmdSave by command<String, Unit>()
val cmdLoad by command<Unit, String>()
val cmdDrop by command<Unit, Unit>()
val cmdException by command<Unit, Unit>()
val cli = KiloInterface<Session>().apply {
registerError { TestException() }
onConnected { session.data = "start" }
on(cmdSave) { session.data = it }
on(cmdLoad) {
session.data
}
on(cmdException) {
throw TestException()
}
on(cmdDrop) {
throw LocalInterface.BreakConnectionException()
}
}
val server = KiloServer(cli, acceptTcpDevice(port)) {
Session("unknown")
}
val client = KiloClient<Unit>() {
addErrors(cli)
// TODO: add register error variant
connect { connectTcpDevice("localhost:$port") }
}
assertEquals("start", client.call(cmdLoad))
client.call(cmdSave, "foobar")
assertEquals("foobar", client.call(cmdLoad))
val res = kotlin.runCatching { client.call(cmdException) }
assertIs<TestException>(res.exceptionOrNull())
assertEquals("foobar", client.call(cmdLoad))
assertThrows<RemoteInterface.ClosedException> { client.call(cmdDrop) }
// reconnect?
assertEquals("start", client.call(cmdLoad))
server.close()
}
}