From f92431a281e342552990a5d730b97a8843e34082 Mon Sep 17 00:00:00 2001 From: sergeych Date: Wed, 15 Nov 2023 02:25:57 +0300 Subject: [PATCH] network transport with remote address, universal server --- .../net/sergeych/kiloparsec/KiloServer.kt | 27 +++++++++++++++++++ ...tProxyDevice.kt => InetTransportDevice.kt} | 2 +- .../kiloparsec/adapter/NetworkProvider.kt | 5 ++-- .../kiloparsec/adapter/NetworkProvider.js.kt | 5 ++-- .../kiloparsec/adapter/NetworkProvider.jvm.kt | 10 ++----- .../kiloparsec/adapter/asyncSocketToDevice.kt | 6 ++--- .../kiloparsec/adapters/NetworkTest.kt | 12 --------- .../adapter/NetworkProvider.native.kt | 5 ++-- 8 files changed, 39 insertions(+), 33 deletions(-) create mode 100644 src/commonMain/kotlin/net/sergeych/kiloparsec/KiloServer.kt rename src/commonMain/kotlin/net/sergeych/kiloparsec/adapter/{InetProxyDevice.kt => InetTransportDevice.kt} (75%) diff --git a/src/commonMain/kotlin/net/sergeych/kiloparsec/KiloServer.kt b/src/commonMain/kotlin/net/sergeych/kiloparsec/KiloServer.kt new file mode 100644 index 0000000..78c6080 --- /dev/null +++ b/src/commonMain/kotlin/net/sergeych/kiloparsec/KiloServer.kt @@ -0,0 +1,27 @@ +package net.sergeych.kiloparsec + +import kotlinx.coroutines.flow.Flow +import kotlinx.coroutines.launch +import net.sergeych.crypto.Key +import net.sergeych.mp_tools.globalLaunch + +@Suppress("unused") +class KiloServer( + private val clientInterface: KiloInterface, + private val connections: Flow, + private val serverSigningKey: Key.Signing? = null, + private val sessionBuilder: ()->S, + ) { + + private val job = globalLaunch { + connections.collect { device -> + launch { + KiloServerConnection(clientInterface,device,sessionBuilder(), serverSigningKey).run() + } + } + } + + fun close() { + job.cancel() + } +} \ No newline at end of file diff --git a/src/commonMain/kotlin/net/sergeych/kiloparsec/adapter/InetProxyDevice.kt b/src/commonMain/kotlin/net/sergeych/kiloparsec/adapter/InetTransportDevice.kt similarity index 75% rename from src/commonMain/kotlin/net/sergeych/kiloparsec/adapter/InetProxyDevice.kt rename to src/commonMain/kotlin/net/sergeych/kiloparsec/adapter/InetTransportDevice.kt index 524f96d..0ee855c 100644 --- a/src/commonMain/kotlin/net/sergeych/kiloparsec/adapter/InetProxyDevice.kt +++ b/src/commonMain/kotlin/net/sergeych/kiloparsec/adapter/InetTransportDevice.kt @@ -3,7 +3,7 @@ package net.sergeych.kiloparsec.adapter import kotlinx.coroutines.channels.Channel @Suppress("unused") -class InetProxyDevice( +class InetTransportDevice( inputChannel: Channel, outputChannel: Channel, val remoteAddress: NetworkAddress, diff --git a/src/commonMain/kotlin/net/sergeych/kiloparsec/adapter/NetworkProvider.kt b/src/commonMain/kotlin/net/sergeych/kiloparsec/adapter/NetworkProvider.kt index 1de4cca..2b9bf95 100644 --- a/src/commonMain/kotlin/net/sergeych/kiloparsec/adapter/NetworkProvider.kt +++ b/src/commonMain/kotlin/net/sergeych/kiloparsec/adapter/NetworkProvider.kt @@ -2,7 +2,6 @@ package net.sergeych.kiloparsec.adapter import kotlinx.coroutines.channels.ReceiveChannel import kotlinx.coroutines.flow.Flow -import net.sergeych.kiloparsec.Transport /** * Multiplatform implementation of an internet address. @@ -51,6 +50,6 @@ fun CharSequence.toNetworkAddress() : NetworkAddress { } -expect fun acceptTcpDevice(port: Int): Flow +expect fun acceptTcpDevice(port: Int): Flow -expect suspend fun connectTcpDevice(address: NetworkAddress): Transport.Device \ No newline at end of file +expect suspend fun connectTcpDevice(address: NetworkAddress): InetTransportDevice \ No newline at end of file diff --git a/src/jsMain/kotlin/net/sergeych/kiloparsec/adapter/NetworkProvider.js.kt b/src/jsMain/kotlin/net/sergeych/kiloparsec/adapter/NetworkProvider.js.kt index 7f7d68c..bc5044d 100644 --- a/src/jsMain/kotlin/net/sergeych/kiloparsec/adapter/NetworkProvider.js.kt +++ b/src/jsMain/kotlin/net/sergeych/kiloparsec/adapter/NetworkProvider.js.kt @@ -1,16 +1,15 @@ package net.sergeych.kiloparsec.adapter import kotlinx.coroutines.flow.Flow -import net.sergeych.kiloparsec.Transport actual fun NetworkAddress(host: String, port: Int): NetworkAddress { TODO("Not yet implemented") } -actual fun acceptTcpDevice(port: Int): Flow { +actual fun acceptTcpDevice(port: Int): Flow { TODO("Not yet implemented") } -actual suspend fun connectTcpDevice(address: NetworkAddress): Transport.Device { +actual suspend fun connectTcpDevice(address: NetworkAddress): InetTransportDevice { TODO("Not yet implemented") } \ No newline at end of file diff --git a/src/jvmMain/kotlin/net/sergeych/kiloparsec/adapter/NetworkProvider.jvm.kt b/src/jvmMain/kotlin/net/sergeych/kiloparsec/adapter/NetworkProvider.jvm.kt index 3bdad24..ac2d830 100644 --- a/src/jvmMain/kotlin/net/sergeych/kiloparsec/adapter/NetworkProvider.jvm.kt +++ b/src/jvmMain/kotlin/net/sergeych/kiloparsec/adapter/NetworkProvider.jvm.kt @@ -1,12 +1,10 @@ package net.sergeych.kiloparsec.adapter import kotlinx.coroutines.Dispatchers -import kotlinx.coroutines.channels.SendChannel import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.flow import kotlinx.coroutines.suspendCancellableCoroutine import kotlinx.coroutines.withContext -import net.sergeych.kiloparsec.Transport import java.net.InetAddress import java.net.InetSocketAddress import java.nio.channels.AsynchronousServerSocketChannel @@ -16,7 +14,7 @@ import kotlin.coroutines.suspendCoroutine actual fun NetworkAddress(host: String, port: Int): NetworkAddress = JvmNetworkAddress(InetAddress.getByName(host), port) -actual fun acceptTcpDevice(port: Int): Flow { +actual fun acceptTcpDevice(port: Int): Flow { return flow { val socket = withContext(Dispatchers.IO) { AsynchronousServerSocketChannel.open().also { @@ -35,7 +33,7 @@ actual fun acceptTcpDevice(port: Int): Flow { } } -actual suspend fun connectTcpDevice(address: NetworkAddress): Transport.Device { +actual suspend fun connectTcpDevice(address: NetworkAddress): InetTransportDevice { address as JvmNetworkAddress val socket = withContext(Dispatchers.IO) { AsynchronousSocketChannel.open() @@ -45,7 +43,3 @@ actual suspend fun connectTcpDevice(address: NetworkAddress): Transport.Device { } return asyncSocketToDevice(socket) } - -suspend fun SendChannel.sendAll(bytes: Collection) { - for (b in bytes) send(b) -} \ No newline at end of file diff --git a/src/jvmMain/kotlin/net/sergeych/kiloparsec/adapter/asyncSocketToDevice.kt b/src/jvmMain/kotlin/net/sergeych/kiloparsec/adapter/asyncSocketToDevice.kt index 1963e05..17903e4 100644 --- a/src/jvmMain/kotlin/net/sergeych/kiloparsec/adapter/asyncSocketToDevice.kt +++ b/src/jvmMain/kotlin/net/sergeych/kiloparsec/adapter/asyncSocketToDevice.kt @@ -31,8 +31,8 @@ private fun encode(block: UByteArray): ByteArray * to the socket in a global scope.These are closed when transport is closed * or the socket is closed, for example, by network failure. */ -suspend fun asyncSocketToDevice(socket: AsynchronousSocketChannel): InetProxyDevice { - val deferredDevice = CompletableDeferred() +suspend fun asyncSocketToDevice(socket: AsynchronousSocketChannel): InetTransportDevice { + val deferredDevice = CompletableDeferred() globalLaunch { coroutineScope { fun stop() { @@ -109,7 +109,7 @@ suspend fun asyncSocketToDevice(socket: AsynchronousSocketChannel): InetProxyDev // SocketAddress. val addr = socket.remoteAddress as InetSocketAddress deferredDevice.complete( - InetProxyDevice(inputBlocks, outputBlocks, JvmNetworkAddress(addr.address,addr.port)) { stop() } + InetTransportDevice(inputBlocks, outputBlocks, JvmNetworkAddress(addr.address,addr.port)) { stop() } ) } globalLaunch { socket.close() } diff --git a/src/jvmTest/kotlin/net/sergeych/kiloparsec/adapters/NetworkTest.kt b/src/jvmTest/kotlin/net/sergeych/kiloparsec/adapters/NetworkTest.kt index ce2e751..a2183e4 100644 --- a/src/jvmTest/kotlin/net/sergeych/kiloparsec/adapters/NetworkTest.kt +++ b/src/jvmTest/kotlin/net/sergeych/kiloparsec/adapters/NetworkTest.kt @@ -38,7 +38,6 @@ class NetworkTest { val j = launch { serverFlow.collect { device -> launch { - println("connected!") device.output.send("Hello, world!".encodeToUByteArray()) device.output.send("Great".encodeToUByteArray()) while (true) { @@ -56,29 +55,18 @@ class NetworkTest { } yield() run { - println("x0") val s = connectTcpDevice("127.0.1.1:17171".toNetworkAddress()) - println("x1") assertEquals("Hello, world!", s.input.receive()!!.decodeFromUByteArray()) - println("x2") assertEquals("Great", s.input.receive()!!.decodeFromUByteArray()) - println("x3") s.output.send("Goodbye".encodeToUByteArray()) - println("pre1") s.close() - println("pre2") } val s1 = connectTcpDevice("127.0.1.1:17171".toNetworkAddress()) - println("conn-0-1") assertEquals("Hello, world!", s1.input.receive()!!.decodeFromUByteArray()) - println("conn-0-2") assertEquals("Great", s1.input.receive()!!.decodeFromUByteArray()) - println("1") s1.output.send("die".encodeToUByteArray()) - println("2") delay(200) s1.close() - println("3 -- the -- end") j.cancelAndJoin() } } diff --git a/src/nativeMain/kotlin/net/sergeych/kiloparsec/adapter/NetworkProvider.native.kt b/src/nativeMain/kotlin/net/sergeych/kiloparsec/adapter/NetworkProvider.native.kt index 7f7d68c..bc5044d 100644 --- a/src/nativeMain/kotlin/net/sergeych/kiloparsec/adapter/NetworkProvider.native.kt +++ b/src/nativeMain/kotlin/net/sergeych/kiloparsec/adapter/NetworkProvider.native.kt @@ -1,16 +1,15 @@ package net.sergeych.kiloparsec.adapter import kotlinx.coroutines.flow.Flow -import net.sergeych.kiloparsec.Transport actual fun NetworkAddress(host: String, port: Int): NetworkAddress { TODO("Not yet implemented") } -actual fun acceptTcpDevice(port: Int): Flow { +actual fun acceptTcpDevice(port: Int): Flow { TODO("Not yet implemented") } -actual suspend fun connectTcpDevice(address: NetworkAddress): Transport.Device { +actual suspend fun connectTcpDevice(address: NetworkAddress): InetTransportDevice { TODO("Not yet implemented") } \ No newline at end of file