Compare commits

...

1 Commits

12 changed files with 340 additions and 11 deletions

6
.idea/GitLink.xml generated Normal file
View File

@ -0,0 +1,6 @@
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="uk.co.ben_gibson.git.link.SettingsState">
<option name="host" value="e0f86390-1091-4871-8aeb-f534fbc99cf0" />
</component>
</project>

1
.idea/gradle.xml generated
View File

@ -1,5 +1,6 @@
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="GradleMigrationSettings" migrationVersion="1" />
<component name="GradleSettings">
<option name="linkedExternalProjectsSettings">
<GradleProjectSettings>

View File

@ -0,0 +1,12 @@
package net.sergeych.kiloparsec
import kotlinx.coroutines.channels.Channel
class ProxyDevice(capacity: Int=1024) : Transport.Device {
override val input = Channel<UByteArray?>(capacity)
override val output= Channel<UByteArray?>(capacity)
override suspend fun close() {
runCatching { input.close() }
runCatching { output.close() }
}
}

View File

@ -0,0 +1,6 @@
package net.sergeych.kiloparsec.adapter
internal data class ConnectionId(
val address: NetworkAddress,
val id: UInt,
)

View File

@ -0,0 +1,117 @@
package net.sergeych.kiloparsec.adapter
import kotlinx.coroutines.*
import net.sergeych.crypto.Key
import net.sergeych.kiloparsec.KiloClientConnection
import net.sergeych.kiloparsec.KiloInterface
import net.sergeych.kiloparsec.ProxyDevice
import net.sergeych.mp_logger.*
import net.sergeych.mp_tools.globalLaunch
import net.sergeych.tools.AtomicCounter
import net.sergeych.utools.pack
import net.sergeych.utools.unpack
private val instanceCounter = AtomicCounter()
class DatagramClient<S>(
private val connector: DatagramConnector,
val serverAddress: NetworkAddress,
private val localInterface: KiloInterface<S>,
private val clientSigningKey: Key.Signing? = null,
private val createSession: () -> S,
) : LogTag("DGC:${instanceCounter.next()}") {
private var sessionId = 0u
private var isClosed = false
val job = globalLaunch {
while (isActive && !isClosed) {
openSession()
pumpSession()
debug { "closing session $sessionId" }
sessionId = 0u
}
}
private suspend fun pumpSession() {
try {
val proxyDevice = ProxyDevice()
val client = KiloClientConnection<S>(localInterface, proxyDevice, createSession(), clientSigningKey)
coroutineScope {
fun stop() {
cancel()
}
launch {
while (isActive) {
if (sessionId == 0u) {
error { "invalid zero sessionId in pump" }
stop()
} else {
val data = proxyDevice.output.receive()
if (data == null) {
warning { "kiloconnection send empty frame: closing" }
stop()
} else {
send(OuterPacket(sessionId, data))
}
}
}
}
while (isActive) {
val packet: OuterPacket = unpack(connector.incoming.receive().message)
if (packet.id == 0u) {
val result: DatagramResult = unpack(packet.payload)
if (result is DatagramResult.Error) {
if (result.error == DatagramError.Closed) {
info { "connection is closed on the remote part" }
} else {
warning { "unexpected error while pumping: ${result.error}" }
}
} else {
warning { "unexpected result: $result" }
}
stop()
} else {
proxyDevice.input.send(packet.payload)
}
}
}
proxyDevice.close()
} catch (x: CancellationException) {
throw x
} catch (x: Exception) {
x.printStackTrace()
}
}
private suspend fun openSession() {
while (!isClosed) {
send(OuterPacket(0u, ubyteArrayOf()))
val packet: OuterPacket = unpack(connector.incoming.receive().message)
if (packet.id == 0u) {
when (val result: DatagramResult = unpack(packet.payload)) {
is DatagramResult.Error -> {
warning { "error opening connection: ${result.error}" }
}
is DatagramResult.Ok -> {
debug { "connection established, our id is ${result.id}" }
sessionId = result.id
break
}
}
} else {
warning { "received unexpected openSession id: ${packet.id}" }
}
delay(500)
}
}
private suspend fun send(packet: OuterPacket) {
connector.send(pack(packet), serverAddress)
}
fun close() {
job.cancel()
}
}

View File

@ -0,0 +1,7 @@
package net.sergeych.kiloparsec.adapter
enum class DatagramError {
Closed,
NotExists,
Busy
}

View File

@ -0,0 +1,15 @@
package net.sergeych.kiloparsec.adapter
import kotlinx.serialization.SerialName
import kotlinx.serialization.Serializable
@Serializable
sealed class DatagramResult {
@Serializable
@SerialName("ok")
data class Ok(val id: UInt) : DatagramResult()
@Serializable
@SerialName("err")
data class Error(val error: DatagramError) : DatagramResult()
}

View File

@ -0,0 +1,136 @@
package net.sergeych.kiloparsec.adapter
import kotlinx.coroutines.channels.ClosedReceiveChannelException
import kotlinx.coroutines.isActive
import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock
import kotlinx.datetime.Instant
import net.sergeych.crypto.Key
import net.sergeych.kiloparsec.KiloInterface
import net.sergeych.kiloparsec.KiloServerConnection
import net.sergeych.kiloparsec.ProxyDevice
import net.sergeych.mp_logger.LogTag
import net.sergeych.mp_logger.debug
import net.sergeych.mp_tools.globalLaunch
import net.sergeych.tools.AtomicCounter
import net.sergeych.utools.now
import net.sergeych.utools.pack
import net.sergeych.utools.unpack
private val instanceCounter = AtomicCounter()
private class Connection<S>(
val server: DatagramServer<S>,
val connection: KiloServerConnection<S>,
val device: ProxyDevice,
val connectionId: ConnectionId,
var lastActiveAt: Instant = now(),
) {
suspend fun proceedFrame(frame: OuterPacket) {
lastActiveAt = now()
if (frame.payload.isEmpty()) {
// ping
server.sendData(connectionId,ubyteArrayOf())
} else
device.input.send(frame.payload)
}
private val job = globalLaunch {
while (isActive) {
val x = device.output.receive() ?: break
server.sendData(connectionId, x)
}
}
suspend fun close() {
job.cancel()
server.remove(connectionId)
}
}
class DatagramServer<S>(
private val connector: DatagramConnector,
private val localInterface: KiloInterface<S>,
private val serverSigningKey: Key.Signing? = null,
private val createSession: () -> S,
) : LogTag("DGS:${instanceCounter.next()}") {
private val job = globalLaunch {
while (isActive) {
try {
val datagram = connector.incoming.receive()
val outer = unpack<OuterPacket>(datagram.message)
if (outer.id == 0u)
setupNewConnection(datagram.address, outer)
else
proceedExistingConnection(datagram.address, outer)
} catch (_: ClosedReceiveChannelException) {
}
}
}
private val access = Mutex()
private val connections = mutableMapOf<ConnectionId, Connection<S>>()
private fun findFreeId(address: NetworkAddress): UInt {
val existingIds = connections.keys.filter { it.address == address }.map { it.id }.toSet()
debug { "existing connections from $address: $existingIds" }
var i = 0u
while (i in existingIds) i++
debug { "found the free id: $i" }
return i
}
private suspend fun setupNewConnection(address: NetworkAddress, packet: OuterPacket) {
val id = access.withLock {
val connectionId = ConnectionId(address, findFreeId(address))
val proxy = ProxyDevice()
connections[connectionId] = Connection(
this,
KiloServerConnection(localInterface, proxy, createSession(), serverSigningKey),
proxy,
connectionId,
)
connectionId
}
sendResult(id,DatagramResult.Ok(id.id))
}
private suspend fun proceedExistingConnection(address: NetworkAddress, outerPacket: OuterPacket) {
val connectionId = ConnectionId(address, outerPacket.id)
val c = access.withLock { connections[connectionId] }
if (c == null) {
sendResult(connectionId, DatagramResult.Error(DatagramError.NotExists))
} else {
c.proceedFrame(outerPacket)
}
}
/**
* Important. Closing the server also closes the connector which is supposed to be "owned"
* exclusively by `DatagramServer`
*/
fun close() {
job.cancel()
runCatching {
connector.close()
}
}
internal suspend fun remove(connectionId: ConnectionId) {
access.withLock {
connections.remove(connectionId)
}
}
internal suspend fun sendData(connectionId: ConnectionId,data: UByteArray) {
connector.send(pack(OuterPacket(connectionId.id, data)), connectionId.address)
}
internal suspend fun sendResult(connectionId: ConnectionId, result: DatagramResult) {
connector.send(pack(OuterPacket(0u, pack(result))), connectionId.address)
}
}

View File

@ -15,11 +15,25 @@ interface NetworkAddress {
* Multiplatform datagram abstraction
*/
interface Datagram {
/**
* Received message
*/
val message: UByteArray
/**
* Address from where the message was sent
*/
val address: NetworkAddress
/**
* Send a datagram in response, e.g., to the [address].
* This method is optimized per single per-datagram use. If you need to send many datagram, use [DatagramConnector].
*/
suspend fun respondWith(message: UByteArray)
}
interface DatagramReceiver {
@OptIn(ExperimentalStdlibApi::class)
interface DatagramConnector: AutoCloseable {
val incoming: ReceiveChannel<Datagram>
suspend fun send(message: UByteArray, networkAddress: NetworkAddress)
@ -30,7 +44,7 @@ interface DatagramReceiver {
suspend fun send(message: UByteArray,host: String,port: Int) =
send(message, NetworkAddress(host,port))
fun close()
override fun close()
}
expect fun networkAddressOf(address: String): NetworkAddress

View File

@ -0,0 +1,9 @@
package net.sergeych.kiloparsec.adapter
import kotlinx.serialization.Serializable
@Serializable
internal data class OuterPacket(
val id: UInt,
val payload: UByteArray,
)

View File

@ -0,0 +1,8 @@
package net.sergeych.tools
class AtomicCounter(startWith: Long=0) {
private var op = ProtectedOp()
private var counter = startWith
fun next(): Long = op { counter++ }
}

View File

@ -3,8 +3,6 @@ package net.sergeych.kiloparsec.adapter
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.BufferOverflow
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock
import net.sergeych.mp_logger.LogTag
import net.sergeych.mp_logger.exception
import net.sergeych.mp_logger.info
@ -42,28 +40,28 @@ class UdpDatagram(override val message: UByteArray, val inetAddress: InetAddress
JvmNetworkAddress(inetAddress, port)
}
private val access = Mutex()
private var socket: DatagramSocket? = null
override suspend fun respondWith(message: UByteArray) {
withContext(Dispatchers.IO) {
access.withLock {
if (socket == null) socket = DatagramSocket()
// We expect datagram is most often respond once or not responded at all, and
// we create and close a socket to send
DatagramSocket().use { socket ->
val packet = DatagramPacket(
message.toByteArray(),
message.size,
inetAddress,
port
)
socket!!.send(packet)
socket.send(packet)
}
}
}
}
@OptIn(DelicateCoroutinesApi::class)
class UdpServer(val port: Int) :
DatagramReceiver, LogTag("UDPS:${counter.incrementAndGet()}") {
DatagramConnector, LogTag("UDPS:${counter.incrementAndGet()}") {
private var isClosed = false