Compare commits
1 Commits
master
...
experiment
Author | SHA1 | Date | |
---|---|---|---|
d8f73e21b4 |
6
.idea/GitLink.xml
generated
Normal file
6
.idea/GitLink.xml
generated
Normal file
@ -0,0 +1,6 @@
|
||||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<project version="4">
|
||||
<component name="uk.co.ben_gibson.git.link.SettingsState">
|
||||
<option name="host" value="e0f86390-1091-4871-8aeb-f534fbc99cf0" />
|
||||
</component>
|
||||
</project>
|
1
.idea/gradle.xml
generated
1
.idea/gradle.xml
generated
@ -1,5 +1,6 @@
|
||||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<project version="4">
|
||||
<component name="GradleMigrationSettings" migrationVersion="1" />
|
||||
<component name="GradleSettings">
|
||||
<option name="linkedExternalProjectsSettings">
|
||||
<GradleProjectSettings>
|
||||
|
12
src/commonMain/kotlin/net/sergeych/kiloparsec/ProxyDevice.kt
Normal file
12
src/commonMain/kotlin/net/sergeych/kiloparsec/ProxyDevice.kt
Normal file
@ -0,0 +1,12 @@
|
||||
package net.sergeych.kiloparsec
|
||||
|
||||
import kotlinx.coroutines.channels.Channel
|
||||
|
||||
class ProxyDevice(capacity: Int=1024) : Transport.Device {
|
||||
override val input = Channel<UByteArray?>(capacity)
|
||||
override val output= Channel<UByteArray?>(capacity)
|
||||
override suspend fun close() {
|
||||
runCatching { input.close() }
|
||||
runCatching { output.close() }
|
||||
}
|
||||
}
|
@ -0,0 +1,6 @@
|
||||
package net.sergeych.kiloparsec.adapter
|
||||
|
||||
internal data class ConnectionId(
|
||||
val address: NetworkAddress,
|
||||
val id: UInt,
|
||||
)
|
@ -0,0 +1,117 @@
|
||||
package net.sergeych.kiloparsec.adapter
|
||||
|
||||
import kotlinx.coroutines.*
|
||||
import net.sergeych.crypto.Key
|
||||
import net.sergeych.kiloparsec.KiloClientConnection
|
||||
import net.sergeych.kiloparsec.KiloInterface
|
||||
import net.sergeych.kiloparsec.ProxyDevice
|
||||
import net.sergeych.mp_logger.*
|
||||
import net.sergeych.mp_tools.globalLaunch
|
||||
import net.sergeych.tools.AtomicCounter
|
||||
import net.sergeych.utools.pack
|
||||
import net.sergeych.utools.unpack
|
||||
|
||||
private val instanceCounter = AtomicCounter()
|
||||
|
||||
class DatagramClient<S>(
|
||||
private val connector: DatagramConnector,
|
||||
val serverAddress: NetworkAddress,
|
||||
private val localInterface: KiloInterface<S>,
|
||||
private val clientSigningKey: Key.Signing? = null,
|
||||
private val createSession: () -> S,
|
||||
) : LogTag("DGC:${instanceCounter.next()}") {
|
||||
|
||||
private var sessionId = 0u
|
||||
private var isClosed = false
|
||||
val job = globalLaunch {
|
||||
while (isActive && !isClosed) {
|
||||
openSession()
|
||||
pumpSession()
|
||||
debug { "closing session $sessionId" }
|
||||
sessionId = 0u
|
||||
}
|
||||
}
|
||||
|
||||
private suspend fun pumpSession() {
|
||||
try {
|
||||
val proxyDevice = ProxyDevice()
|
||||
val client = KiloClientConnection<S>(localInterface, proxyDevice, createSession(), clientSigningKey)
|
||||
coroutineScope {
|
||||
fun stop() {
|
||||
cancel()
|
||||
}
|
||||
launch {
|
||||
while (isActive) {
|
||||
if (sessionId == 0u) {
|
||||
error { "invalid zero sessionId in pump" }
|
||||
stop()
|
||||
} else {
|
||||
val data = proxyDevice.output.receive()
|
||||
if (data == null) {
|
||||
warning { "kiloconnection send empty frame: closing" }
|
||||
stop()
|
||||
} else {
|
||||
send(OuterPacket(sessionId, data))
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
while (isActive) {
|
||||
val packet: OuterPacket = unpack(connector.incoming.receive().message)
|
||||
if (packet.id == 0u) {
|
||||
val result: DatagramResult = unpack(packet.payload)
|
||||
if (result is DatagramResult.Error) {
|
||||
if (result.error == DatagramError.Closed) {
|
||||
info { "connection is closed on the remote part" }
|
||||
} else {
|
||||
warning { "unexpected error while pumping: ${result.error}" }
|
||||
}
|
||||
} else {
|
||||
warning { "unexpected result: $result" }
|
||||
}
|
||||
stop()
|
||||
} else {
|
||||
proxyDevice.input.send(packet.payload)
|
||||
}
|
||||
}
|
||||
}
|
||||
proxyDevice.close()
|
||||
} catch (x: CancellationException) {
|
||||
throw x
|
||||
} catch (x: Exception) {
|
||||
x.printStackTrace()
|
||||
}
|
||||
}
|
||||
|
||||
private suspend fun openSession() {
|
||||
while (!isClosed) {
|
||||
send(OuterPacket(0u, ubyteArrayOf()))
|
||||
val packet: OuterPacket = unpack(connector.incoming.receive().message)
|
||||
if (packet.id == 0u) {
|
||||
when (val result: DatagramResult = unpack(packet.payload)) {
|
||||
is DatagramResult.Error -> {
|
||||
warning { "error opening connection: ${result.error}" }
|
||||
}
|
||||
|
||||
is DatagramResult.Ok -> {
|
||||
debug { "connection established, our id is ${result.id}" }
|
||||
sessionId = result.id
|
||||
break
|
||||
}
|
||||
}
|
||||
} else {
|
||||
warning { "received unexpected openSession id: ${packet.id}" }
|
||||
}
|
||||
delay(500)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
private suspend fun send(packet: OuterPacket) {
|
||||
connector.send(pack(packet), serverAddress)
|
||||
}
|
||||
|
||||
fun close() {
|
||||
job.cancel()
|
||||
}
|
||||
}
|
@ -0,0 +1,7 @@
|
||||
package net.sergeych.kiloparsec.adapter
|
||||
|
||||
enum class DatagramError {
|
||||
Closed,
|
||||
NotExists,
|
||||
Busy
|
||||
}
|
@ -0,0 +1,15 @@
|
||||
package net.sergeych.kiloparsec.adapter
|
||||
|
||||
import kotlinx.serialization.SerialName
|
||||
import kotlinx.serialization.Serializable
|
||||
|
||||
@Serializable
|
||||
sealed class DatagramResult {
|
||||
@Serializable
|
||||
@SerialName("ok")
|
||||
data class Ok(val id: UInt) : DatagramResult()
|
||||
|
||||
@Serializable
|
||||
@SerialName("err")
|
||||
data class Error(val error: DatagramError) : DatagramResult()
|
||||
}
|
@ -0,0 +1,136 @@
|
||||
package net.sergeych.kiloparsec.adapter
|
||||
|
||||
import kotlinx.coroutines.channels.ClosedReceiveChannelException
|
||||
import kotlinx.coroutines.isActive
|
||||
import kotlinx.coroutines.sync.Mutex
|
||||
import kotlinx.coroutines.sync.withLock
|
||||
import kotlinx.datetime.Instant
|
||||
import net.sergeych.crypto.Key
|
||||
import net.sergeych.kiloparsec.KiloInterface
|
||||
import net.sergeych.kiloparsec.KiloServerConnection
|
||||
import net.sergeych.kiloparsec.ProxyDevice
|
||||
import net.sergeych.mp_logger.LogTag
|
||||
import net.sergeych.mp_logger.debug
|
||||
import net.sergeych.mp_tools.globalLaunch
|
||||
import net.sergeych.tools.AtomicCounter
|
||||
import net.sergeych.utools.now
|
||||
import net.sergeych.utools.pack
|
||||
import net.sergeych.utools.unpack
|
||||
|
||||
private val instanceCounter = AtomicCounter()
|
||||
|
||||
private class Connection<S>(
|
||||
val server: DatagramServer<S>,
|
||||
val connection: KiloServerConnection<S>,
|
||||
val device: ProxyDevice,
|
||||
val connectionId: ConnectionId,
|
||||
var lastActiveAt: Instant = now(),
|
||||
) {
|
||||
|
||||
suspend fun proceedFrame(frame: OuterPacket) {
|
||||
lastActiveAt = now()
|
||||
if (frame.payload.isEmpty()) {
|
||||
// ping
|
||||
server.sendData(connectionId,ubyteArrayOf())
|
||||
} else
|
||||
device.input.send(frame.payload)
|
||||
}
|
||||
|
||||
private val job = globalLaunch {
|
||||
while (isActive) {
|
||||
val x = device.output.receive() ?: break
|
||||
server.sendData(connectionId, x)
|
||||
}
|
||||
}
|
||||
|
||||
suspend fun close() {
|
||||
job.cancel()
|
||||
server.remove(connectionId)
|
||||
}
|
||||
}
|
||||
|
||||
class DatagramServer<S>(
|
||||
private val connector: DatagramConnector,
|
||||
private val localInterface: KiloInterface<S>,
|
||||
private val serverSigningKey: Key.Signing? = null,
|
||||
private val createSession: () -> S,
|
||||
) : LogTag("DGS:${instanceCounter.next()}") {
|
||||
|
||||
private val job = globalLaunch {
|
||||
while (isActive) {
|
||||
try {
|
||||
val datagram = connector.incoming.receive()
|
||||
val outer = unpack<OuterPacket>(datagram.message)
|
||||
if (outer.id == 0u)
|
||||
setupNewConnection(datagram.address, outer)
|
||||
else
|
||||
proceedExistingConnection(datagram.address, outer)
|
||||
} catch (_: ClosedReceiveChannelException) {
|
||||
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private val access = Mutex()
|
||||
private val connections = mutableMapOf<ConnectionId, Connection<S>>()
|
||||
|
||||
private fun findFreeId(address: NetworkAddress): UInt {
|
||||
val existingIds = connections.keys.filter { it.address == address }.map { it.id }.toSet()
|
||||
debug { "existing connections from $address: $existingIds" }
|
||||
var i = 0u
|
||||
while (i in existingIds) i++
|
||||
debug { "found the free id: $i" }
|
||||
return i
|
||||
}
|
||||
|
||||
private suspend fun setupNewConnection(address: NetworkAddress, packet: OuterPacket) {
|
||||
val id = access.withLock {
|
||||
val connectionId = ConnectionId(address, findFreeId(address))
|
||||
val proxy = ProxyDevice()
|
||||
connections[connectionId] = Connection(
|
||||
this,
|
||||
KiloServerConnection(localInterface, proxy, createSession(), serverSigningKey),
|
||||
proxy,
|
||||
connectionId,
|
||||
)
|
||||
connectionId
|
||||
}
|
||||
sendResult(id,DatagramResult.Ok(id.id))
|
||||
}
|
||||
|
||||
private suspend fun proceedExistingConnection(address: NetworkAddress, outerPacket: OuterPacket) {
|
||||
val connectionId = ConnectionId(address, outerPacket.id)
|
||||
val c = access.withLock { connections[connectionId] }
|
||||
if (c == null) {
|
||||
sendResult(connectionId, DatagramResult.Error(DatagramError.NotExists))
|
||||
} else {
|
||||
c.proceedFrame(outerPacket)
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Important. Closing the server also closes the connector which is supposed to be "owned"
|
||||
* exclusively by `DatagramServer`
|
||||
*/
|
||||
fun close() {
|
||||
job.cancel()
|
||||
runCatching {
|
||||
connector.close()
|
||||
}
|
||||
}
|
||||
|
||||
internal suspend fun remove(connectionId: ConnectionId) {
|
||||
access.withLock {
|
||||
connections.remove(connectionId)
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
internal suspend fun sendData(connectionId: ConnectionId,data: UByteArray) {
|
||||
connector.send(pack(OuterPacket(connectionId.id, data)), connectionId.address)
|
||||
}
|
||||
|
||||
internal suspend fun sendResult(connectionId: ConnectionId, result: DatagramResult) {
|
||||
connector.send(pack(OuterPacket(0u, pack(result))), connectionId.address)
|
||||
}
|
||||
}
|
@ -15,11 +15,25 @@ interface NetworkAddress {
|
||||
* Multiplatform datagram abstraction
|
||||
*/
|
||||
interface Datagram {
|
||||
/**
|
||||
* Received message
|
||||
*/
|
||||
val message: UByteArray
|
||||
|
||||
/**
|
||||
* Address from where the message was sent
|
||||
*/
|
||||
val address: NetworkAddress
|
||||
|
||||
/**
|
||||
* Send a datagram in response, e.g., to the [address].
|
||||
* This method is optimized per single per-datagram use. If you need to send many datagram, use [DatagramConnector].
|
||||
*/
|
||||
suspend fun respondWith(message: UByteArray)
|
||||
}
|
||||
interface DatagramReceiver {
|
||||
|
||||
@OptIn(ExperimentalStdlibApi::class)
|
||||
interface DatagramConnector: AutoCloseable {
|
||||
|
||||
val incoming: ReceiveChannel<Datagram>
|
||||
suspend fun send(message: UByteArray, networkAddress: NetworkAddress)
|
||||
@ -30,7 +44,7 @@ interface DatagramReceiver {
|
||||
|
||||
suspend fun send(message: UByteArray,host: String,port: Int) =
|
||||
send(message, NetworkAddress(host,port))
|
||||
fun close()
|
||||
override fun close()
|
||||
}
|
||||
|
||||
expect fun networkAddressOf(address: String): NetworkAddress
|
||||
|
@ -0,0 +1,9 @@
|
||||
package net.sergeych.kiloparsec.adapter
|
||||
|
||||
import kotlinx.serialization.Serializable
|
||||
|
||||
@Serializable
|
||||
internal data class OuterPacket(
|
||||
val id: UInt,
|
||||
val payload: UByteArray,
|
||||
)
|
@ -0,0 +1,8 @@
|
||||
package net.sergeych.tools
|
||||
|
||||
class AtomicCounter(startWith: Long=0) {
|
||||
private var op = ProtectedOp()
|
||||
private var counter = startWith
|
||||
|
||||
fun next(): Long = op { counter++ }
|
||||
}
|
@ -3,8 +3,6 @@ package net.sergeych.kiloparsec.adapter
|
||||
import kotlinx.coroutines.*
|
||||
import kotlinx.coroutines.channels.BufferOverflow
|
||||
import kotlinx.coroutines.channels.Channel
|
||||
import kotlinx.coroutines.sync.Mutex
|
||||
import kotlinx.coroutines.sync.withLock
|
||||
import net.sergeych.mp_logger.LogTag
|
||||
import net.sergeych.mp_logger.exception
|
||||
import net.sergeych.mp_logger.info
|
||||
@ -42,28 +40,28 @@ class UdpDatagram(override val message: UByteArray, val inetAddress: InetAddress
|
||||
JvmNetworkAddress(inetAddress, port)
|
||||
}
|
||||
|
||||
private val access = Mutex()
|
||||
|
||||
private var socket: DatagramSocket? = null
|
||||
override suspend fun respondWith(message: UByteArray) {
|
||||
withContext(Dispatchers.IO) {
|
||||
access.withLock {
|
||||
if (socket == null) socket = DatagramSocket()
|
||||
// We expect datagram is most often respond once or not responded at all, and
|
||||
// we create and close a socket to send
|
||||
DatagramSocket().use { socket ->
|
||||
val packet = DatagramPacket(
|
||||
message.toByteArray(),
|
||||
message.size,
|
||||
inetAddress,
|
||||
port
|
||||
)
|
||||
socket!!.send(packet)
|
||||
}
|
||||
socket.send(packet)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
|
||||
@OptIn(DelicateCoroutinesApi::class)
|
||||
class UdpServer(val port: Int) :
|
||||
DatagramReceiver, LogTag("UDPS:${counter.incrementAndGet()}") {
|
||||
DatagramConnector, LogTag("UDPS:${counter.incrementAndGet()}") {
|
||||
private var isClosed = false
|
||||
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user