basic udp: need a package defrag/bust layer, IP won't work well
This commit is contained in:
parent
6c10c2e578
commit
d8f73e21b4
6
.idea/GitLink.xml
generated
Normal file
6
.idea/GitLink.xml
generated
Normal file
@ -0,0 +1,6 @@
|
|||||||
|
<?xml version="1.0" encoding="UTF-8"?>
|
||||||
|
<project version="4">
|
||||||
|
<component name="uk.co.ben_gibson.git.link.SettingsState">
|
||||||
|
<option name="host" value="e0f86390-1091-4871-8aeb-f534fbc99cf0" />
|
||||||
|
</component>
|
||||||
|
</project>
|
1
.idea/gradle.xml
generated
1
.idea/gradle.xml
generated
@ -1,5 +1,6 @@
|
|||||||
<?xml version="1.0" encoding="UTF-8"?>
|
<?xml version="1.0" encoding="UTF-8"?>
|
||||||
<project version="4">
|
<project version="4">
|
||||||
|
<component name="GradleMigrationSettings" migrationVersion="1" />
|
||||||
<component name="GradleSettings">
|
<component name="GradleSettings">
|
||||||
<option name="linkedExternalProjectsSettings">
|
<option name="linkedExternalProjectsSettings">
|
||||||
<GradleProjectSettings>
|
<GradleProjectSettings>
|
||||||
|
12
src/commonMain/kotlin/net/sergeych/kiloparsec/ProxyDevice.kt
Normal file
12
src/commonMain/kotlin/net/sergeych/kiloparsec/ProxyDevice.kt
Normal file
@ -0,0 +1,12 @@
|
|||||||
|
package net.sergeych.kiloparsec
|
||||||
|
|
||||||
|
import kotlinx.coroutines.channels.Channel
|
||||||
|
|
||||||
|
class ProxyDevice(capacity: Int=1024) : Transport.Device {
|
||||||
|
override val input = Channel<UByteArray?>(capacity)
|
||||||
|
override val output= Channel<UByteArray?>(capacity)
|
||||||
|
override suspend fun close() {
|
||||||
|
runCatching { input.close() }
|
||||||
|
runCatching { output.close() }
|
||||||
|
}
|
||||||
|
}
|
@ -0,0 +1,6 @@
|
|||||||
|
package net.sergeych.kiloparsec.adapter
|
||||||
|
|
||||||
|
internal data class ConnectionId(
|
||||||
|
val address: NetworkAddress,
|
||||||
|
val id: UInt,
|
||||||
|
)
|
@ -0,0 +1,117 @@
|
|||||||
|
package net.sergeych.kiloparsec.adapter
|
||||||
|
|
||||||
|
import kotlinx.coroutines.*
|
||||||
|
import net.sergeych.crypto.Key
|
||||||
|
import net.sergeych.kiloparsec.KiloClientConnection
|
||||||
|
import net.sergeych.kiloparsec.KiloInterface
|
||||||
|
import net.sergeych.kiloparsec.ProxyDevice
|
||||||
|
import net.sergeych.mp_logger.*
|
||||||
|
import net.sergeych.mp_tools.globalLaunch
|
||||||
|
import net.sergeych.tools.AtomicCounter
|
||||||
|
import net.sergeych.utools.pack
|
||||||
|
import net.sergeych.utools.unpack
|
||||||
|
|
||||||
|
private val instanceCounter = AtomicCounter()
|
||||||
|
|
||||||
|
class DatagramClient<S>(
|
||||||
|
private val connector: DatagramConnector,
|
||||||
|
val serverAddress: NetworkAddress,
|
||||||
|
private val localInterface: KiloInterface<S>,
|
||||||
|
private val clientSigningKey: Key.Signing? = null,
|
||||||
|
private val createSession: () -> S,
|
||||||
|
) : LogTag("DGC:${instanceCounter.next()}") {
|
||||||
|
|
||||||
|
private var sessionId = 0u
|
||||||
|
private var isClosed = false
|
||||||
|
val job = globalLaunch {
|
||||||
|
while (isActive && !isClosed) {
|
||||||
|
openSession()
|
||||||
|
pumpSession()
|
||||||
|
debug { "closing session $sessionId" }
|
||||||
|
sessionId = 0u
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private suspend fun pumpSession() {
|
||||||
|
try {
|
||||||
|
val proxyDevice = ProxyDevice()
|
||||||
|
val client = KiloClientConnection<S>(localInterface, proxyDevice, createSession(), clientSigningKey)
|
||||||
|
coroutineScope {
|
||||||
|
fun stop() {
|
||||||
|
cancel()
|
||||||
|
}
|
||||||
|
launch {
|
||||||
|
while (isActive) {
|
||||||
|
if (sessionId == 0u) {
|
||||||
|
error { "invalid zero sessionId in pump" }
|
||||||
|
stop()
|
||||||
|
} else {
|
||||||
|
val data = proxyDevice.output.receive()
|
||||||
|
if (data == null) {
|
||||||
|
warning { "kiloconnection send empty frame: closing" }
|
||||||
|
stop()
|
||||||
|
} else {
|
||||||
|
send(OuterPacket(sessionId, data))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
while (isActive) {
|
||||||
|
val packet: OuterPacket = unpack(connector.incoming.receive().message)
|
||||||
|
if (packet.id == 0u) {
|
||||||
|
val result: DatagramResult = unpack(packet.payload)
|
||||||
|
if (result is DatagramResult.Error) {
|
||||||
|
if (result.error == DatagramError.Closed) {
|
||||||
|
info { "connection is closed on the remote part" }
|
||||||
|
} else {
|
||||||
|
warning { "unexpected error while pumping: ${result.error}" }
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
warning { "unexpected result: $result" }
|
||||||
|
}
|
||||||
|
stop()
|
||||||
|
} else {
|
||||||
|
proxyDevice.input.send(packet.payload)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
proxyDevice.close()
|
||||||
|
} catch (x: CancellationException) {
|
||||||
|
throw x
|
||||||
|
} catch (x: Exception) {
|
||||||
|
x.printStackTrace()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private suspend fun openSession() {
|
||||||
|
while (!isClosed) {
|
||||||
|
send(OuterPacket(0u, ubyteArrayOf()))
|
||||||
|
val packet: OuterPacket = unpack(connector.incoming.receive().message)
|
||||||
|
if (packet.id == 0u) {
|
||||||
|
when (val result: DatagramResult = unpack(packet.payload)) {
|
||||||
|
is DatagramResult.Error -> {
|
||||||
|
warning { "error opening connection: ${result.error}" }
|
||||||
|
}
|
||||||
|
|
||||||
|
is DatagramResult.Ok -> {
|
||||||
|
debug { "connection established, our id is ${result.id}" }
|
||||||
|
sessionId = result.id
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
warning { "received unexpected openSession id: ${packet.id}" }
|
||||||
|
}
|
||||||
|
delay(500)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
private suspend fun send(packet: OuterPacket) {
|
||||||
|
connector.send(pack(packet), serverAddress)
|
||||||
|
}
|
||||||
|
|
||||||
|
fun close() {
|
||||||
|
job.cancel()
|
||||||
|
}
|
||||||
|
}
|
@ -0,0 +1,7 @@
|
|||||||
|
package net.sergeych.kiloparsec.adapter
|
||||||
|
|
||||||
|
enum class DatagramError {
|
||||||
|
Closed,
|
||||||
|
NotExists,
|
||||||
|
Busy
|
||||||
|
}
|
@ -0,0 +1,15 @@
|
|||||||
|
package net.sergeych.kiloparsec.adapter
|
||||||
|
|
||||||
|
import kotlinx.serialization.SerialName
|
||||||
|
import kotlinx.serialization.Serializable
|
||||||
|
|
||||||
|
@Serializable
|
||||||
|
sealed class DatagramResult {
|
||||||
|
@Serializable
|
||||||
|
@SerialName("ok")
|
||||||
|
data class Ok(val id: UInt) : DatagramResult()
|
||||||
|
|
||||||
|
@Serializable
|
||||||
|
@SerialName("err")
|
||||||
|
data class Error(val error: DatagramError) : DatagramResult()
|
||||||
|
}
|
@ -0,0 +1,136 @@
|
|||||||
|
package net.sergeych.kiloparsec.adapter
|
||||||
|
|
||||||
|
import kotlinx.coroutines.channels.ClosedReceiveChannelException
|
||||||
|
import kotlinx.coroutines.isActive
|
||||||
|
import kotlinx.coroutines.sync.Mutex
|
||||||
|
import kotlinx.coroutines.sync.withLock
|
||||||
|
import kotlinx.datetime.Instant
|
||||||
|
import net.sergeych.crypto.Key
|
||||||
|
import net.sergeych.kiloparsec.KiloInterface
|
||||||
|
import net.sergeych.kiloparsec.KiloServerConnection
|
||||||
|
import net.sergeych.kiloparsec.ProxyDevice
|
||||||
|
import net.sergeych.mp_logger.LogTag
|
||||||
|
import net.sergeych.mp_logger.debug
|
||||||
|
import net.sergeych.mp_tools.globalLaunch
|
||||||
|
import net.sergeych.tools.AtomicCounter
|
||||||
|
import net.sergeych.utools.now
|
||||||
|
import net.sergeych.utools.pack
|
||||||
|
import net.sergeych.utools.unpack
|
||||||
|
|
||||||
|
private val instanceCounter = AtomicCounter()
|
||||||
|
|
||||||
|
private class Connection<S>(
|
||||||
|
val server: DatagramServer<S>,
|
||||||
|
val connection: KiloServerConnection<S>,
|
||||||
|
val device: ProxyDevice,
|
||||||
|
val connectionId: ConnectionId,
|
||||||
|
var lastActiveAt: Instant = now(),
|
||||||
|
) {
|
||||||
|
|
||||||
|
suspend fun proceedFrame(frame: OuterPacket) {
|
||||||
|
lastActiveAt = now()
|
||||||
|
if (frame.payload.isEmpty()) {
|
||||||
|
// ping
|
||||||
|
server.sendData(connectionId,ubyteArrayOf())
|
||||||
|
} else
|
||||||
|
device.input.send(frame.payload)
|
||||||
|
}
|
||||||
|
|
||||||
|
private val job = globalLaunch {
|
||||||
|
while (isActive) {
|
||||||
|
val x = device.output.receive() ?: break
|
||||||
|
server.sendData(connectionId, x)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
suspend fun close() {
|
||||||
|
job.cancel()
|
||||||
|
server.remove(connectionId)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
class DatagramServer<S>(
|
||||||
|
private val connector: DatagramConnector,
|
||||||
|
private val localInterface: KiloInterface<S>,
|
||||||
|
private val serverSigningKey: Key.Signing? = null,
|
||||||
|
private val createSession: () -> S,
|
||||||
|
) : LogTag("DGS:${instanceCounter.next()}") {
|
||||||
|
|
||||||
|
private val job = globalLaunch {
|
||||||
|
while (isActive) {
|
||||||
|
try {
|
||||||
|
val datagram = connector.incoming.receive()
|
||||||
|
val outer = unpack<OuterPacket>(datagram.message)
|
||||||
|
if (outer.id == 0u)
|
||||||
|
setupNewConnection(datagram.address, outer)
|
||||||
|
else
|
||||||
|
proceedExistingConnection(datagram.address, outer)
|
||||||
|
} catch (_: ClosedReceiveChannelException) {
|
||||||
|
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private val access = Mutex()
|
||||||
|
private val connections = mutableMapOf<ConnectionId, Connection<S>>()
|
||||||
|
|
||||||
|
private fun findFreeId(address: NetworkAddress): UInt {
|
||||||
|
val existingIds = connections.keys.filter { it.address == address }.map { it.id }.toSet()
|
||||||
|
debug { "existing connections from $address: $existingIds" }
|
||||||
|
var i = 0u
|
||||||
|
while (i in existingIds) i++
|
||||||
|
debug { "found the free id: $i" }
|
||||||
|
return i
|
||||||
|
}
|
||||||
|
|
||||||
|
private suspend fun setupNewConnection(address: NetworkAddress, packet: OuterPacket) {
|
||||||
|
val id = access.withLock {
|
||||||
|
val connectionId = ConnectionId(address, findFreeId(address))
|
||||||
|
val proxy = ProxyDevice()
|
||||||
|
connections[connectionId] = Connection(
|
||||||
|
this,
|
||||||
|
KiloServerConnection(localInterface, proxy, createSession(), serverSigningKey),
|
||||||
|
proxy,
|
||||||
|
connectionId,
|
||||||
|
)
|
||||||
|
connectionId
|
||||||
|
}
|
||||||
|
sendResult(id,DatagramResult.Ok(id.id))
|
||||||
|
}
|
||||||
|
|
||||||
|
private suspend fun proceedExistingConnection(address: NetworkAddress, outerPacket: OuterPacket) {
|
||||||
|
val connectionId = ConnectionId(address, outerPacket.id)
|
||||||
|
val c = access.withLock { connections[connectionId] }
|
||||||
|
if (c == null) {
|
||||||
|
sendResult(connectionId, DatagramResult.Error(DatagramError.NotExists))
|
||||||
|
} else {
|
||||||
|
c.proceedFrame(outerPacket)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Important. Closing the server also closes the connector which is supposed to be "owned"
|
||||||
|
* exclusively by `DatagramServer`
|
||||||
|
*/
|
||||||
|
fun close() {
|
||||||
|
job.cancel()
|
||||||
|
runCatching {
|
||||||
|
connector.close()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
internal suspend fun remove(connectionId: ConnectionId) {
|
||||||
|
access.withLock {
|
||||||
|
connections.remove(connectionId)
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
internal suspend fun sendData(connectionId: ConnectionId,data: UByteArray) {
|
||||||
|
connector.send(pack(OuterPacket(connectionId.id, data)), connectionId.address)
|
||||||
|
}
|
||||||
|
|
||||||
|
internal suspend fun sendResult(connectionId: ConnectionId, result: DatagramResult) {
|
||||||
|
connector.send(pack(OuterPacket(0u, pack(result))), connectionId.address)
|
||||||
|
}
|
||||||
|
}
|
@ -15,11 +15,25 @@ interface NetworkAddress {
|
|||||||
* Multiplatform datagram abstraction
|
* Multiplatform datagram abstraction
|
||||||
*/
|
*/
|
||||||
interface Datagram {
|
interface Datagram {
|
||||||
|
/**
|
||||||
|
* Received message
|
||||||
|
*/
|
||||||
val message: UByteArray
|
val message: UByteArray
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Address from where the message was sent
|
||||||
|
*/
|
||||||
val address: NetworkAddress
|
val address: NetworkAddress
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Send a datagram in response, e.g., to the [address].
|
||||||
|
* This method is optimized per single per-datagram use. If you need to send many datagram, use [DatagramConnector].
|
||||||
|
*/
|
||||||
suspend fun respondWith(message: UByteArray)
|
suspend fun respondWith(message: UByteArray)
|
||||||
}
|
}
|
||||||
interface DatagramReceiver {
|
|
||||||
|
@OptIn(ExperimentalStdlibApi::class)
|
||||||
|
interface DatagramConnector: AutoCloseable {
|
||||||
|
|
||||||
val incoming: ReceiveChannel<Datagram>
|
val incoming: ReceiveChannel<Datagram>
|
||||||
suspend fun send(message: UByteArray, networkAddress: NetworkAddress)
|
suspend fun send(message: UByteArray, networkAddress: NetworkAddress)
|
||||||
@ -30,7 +44,7 @@ interface DatagramReceiver {
|
|||||||
|
|
||||||
suspend fun send(message: UByteArray,host: String,port: Int) =
|
suspend fun send(message: UByteArray,host: String,port: Int) =
|
||||||
send(message, NetworkAddress(host,port))
|
send(message, NetworkAddress(host,port))
|
||||||
fun close()
|
override fun close()
|
||||||
}
|
}
|
||||||
|
|
||||||
expect fun networkAddressOf(address: String): NetworkAddress
|
expect fun networkAddressOf(address: String): NetworkAddress
|
||||||
|
@ -0,0 +1,9 @@
|
|||||||
|
package net.sergeych.kiloparsec.adapter
|
||||||
|
|
||||||
|
import kotlinx.serialization.Serializable
|
||||||
|
|
||||||
|
@Serializable
|
||||||
|
internal data class OuterPacket(
|
||||||
|
val id: UInt,
|
||||||
|
val payload: UByteArray,
|
||||||
|
)
|
@ -0,0 +1,8 @@
|
|||||||
|
package net.sergeych.tools
|
||||||
|
|
||||||
|
class AtomicCounter(startWith: Long=0) {
|
||||||
|
private var op = ProtectedOp()
|
||||||
|
private var counter = startWith
|
||||||
|
|
||||||
|
fun next(): Long = op { counter++ }
|
||||||
|
}
|
@ -3,8 +3,6 @@ package net.sergeych.kiloparsec.adapter
|
|||||||
import kotlinx.coroutines.*
|
import kotlinx.coroutines.*
|
||||||
import kotlinx.coroutines.channels.BufferOverflow
|
import kotlinx.coroutines.channels.BufferOverflow
|
||||||
import kotlinx.coroutines.channels.Channel
|
import kotlinx.coroutines.channels.Channel
|
||||||
import kotlinx.coroutines.sync.Mutex
|
|
||||||
import kotlinx.coroutines.sync.withLock
|
|
||||||
import net.sergeych.mp_logger.LogTag
|
import net.sergeych.mp_logger.LogTag
|
||||||
import net.sergeych.mp_logger.exception
|
import net.sergeych.mp_logger.exception
|
||||||
import net.sergeych.mp_logger.info
|
import net.sergeych.mp_logger.info
|
||||||
@ -42,28 +40,28 @@ class UdpDatagram(override val message: UByteArray, val inetAddress: InetAddress
|
|||||||
JvmNetworkAddress(inetAddress, port)
|
JvmNetworkAddress(inetAddress, port)
|
||||||
}
|
}
|
||||||
|
|
||||||
private val access = Mutex()
|
|
||||||
|
|
||||||
private var socket: DatagramSocket? = null
|
|
||||||
override suspend fun respondWith(message: UByteArray) {
|
override suspend fun respondWith(message: UByteArray) {
|
||||||
withContext(Dispatchers.IO) {
|
withContext(Dispatchers.IO) {
|
||||||
access.withLock {
|
// We expect datagram is most often respond once or not responded at all, and
|
||||||
if (socket == null) socket = DatagramSocket()
|
// we create and close a socket to send
|
||||||
|
DatagramSocket().use { socket ->
|
||||||
val packet = DatagramPacket(
|
val packet = DatagramPacket(
|
||||||
message.toByteArray(),
|
message.toByteArray(),
|
||||||
message.size,
|
message.size,
|
||||||
inetAddress,
|
inetAddress,
|
||||||
port
|
port
|
||||||
)
|
)
|
||||||
socket!!.send(packet)
|
socket.send(packet)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
@OptIn(DelicateCoroutinesApi::class)
|
@OptIn(DelicateCoroutinesApi::class)
|
||||||
class UdpServer(val port: Int) :
|
class UdpServer(val port: Int) :
|
||||||
DatagramReceiver, LogTag("UDPS:${counter.incrementAndGet()}") {
|
DatagramConnector, LogTag("UDPS:${counter.incrementAndGet()}") {
|
||||||
private var isClosed = false
|
private var isClosed = false
|
||||||
|
|
||||||
|
|
||||||
|
Loading…
x
Reference in New Issue
Block a user