From fe29bec1b0a41d31b95a83fce6036d69d3785bfa Mon Sep 17 00:00:00 2001 From: sergeych Date: Tue, 14 Nov 2023 01:47:30 +0300 Subject: [PATCH] basic tcp connect async inmplementation for JVM/NIO --- .../kotlin/net/sergeych/crypto/tools.kt | 25 +++++ .../kiloparsec/adapter/NetworkProvider.kt | 21 ++-- .../kiloparsec/adapter/ProxyDevice.kt | 18 ++++ .../kiloparsec/adapter/DatagramProvider.js.kt | 3 - .../kiloparsec/adapter/NetworkProvider.js.kt | 11 ++ .../kiloparsec/adapter/ContinuationHandler.kt | 22 ++++ .../adapter/DatagramProvider.jvm.kt | 8 -- .../kiloparsec/adapter/NetworkProvider.jvm.kt | 31 ++++++ .../sergeych/kiloparsec/adapter/UdpServer.kt | 28 +---- .../kiloparsec/adapter/asyncSocketToDevice.kt | 101 ++++++++++++++++++ .../{UServerTest.kt => NetworkTest.kt} | 15 ++- .../adapter/DatagramProvider.native.kt | 3 - .../adapter/NetworkProvider.native.kt | 11 ++ 13 files changed, 249 insertions(+), 48 deletions(-) create mode 100644 src/commonMain/kotlin/net/sergeych/kiloparsec/adapter/ProxyDevice.kt create mode 100644 src/jvmMain/kotlin/net/sergeych/kiloparsec/adapter/ContinuationHandler.kt delete mode 100644 src/jvmMain/kotlin/net/sergeych/kiloparsec/adapter/DatagramProvider.jvm.kt create mode 100644 src/jvmMain/kotlin/net/sergeych/kiloparsec/adapter/asyncSocketToDevice.kt rename src/jvmTest/kotlin/net/sergeych/kiloparsec/adapters/{UServerTest.kt => NetworkTest.kt} (64%) diff --git a/src/commonMain/kotlin/net/sergeych/crypto/tools.kt b/src/commonMain/kotlin/net/sergeych/crypto/tools.kt index 5391bb7..3d2be30 100644 --- a/src/commonMain/kotlin/net/sergeych/crypto/tools.kt +++ b/src/commonMain/kotlin/net/sergeych/crypto/tools.kt @@ -5,6 +5,7 @@ package net.sergeych.crypto import com.ionspin.kotlin.crypto.secretbox.SecretBox import com.ionspin.kotlin.crypto.secretbox.crypto_secretbox_NONCEBYTES import com.ionspin.kotlin.crypto.util.LibsodiumRandom +import kotlinx.coroutines.channels.ReceiveChannel import kotlinx.serialization.Serializable import net.sergeych.bintools.toDataSource import net.sergeych.bipack.BipackDecoder @@ -26,6 +27,30 @@ data class WithFill( constructor(data: UByteArray, fillSize: Int) : this(data, randomBytes(fillSize)) } +suspend fun readVarUnsigned(input: ReceiveChannel): UInt { + var result = 0u + var cnt = 0 + while(true) { + val b = input.receive().toUInt() + result = (result shr 7) or (b and 0x7fu) + if( (b and 0x80u) != 0u ) break + if( ++cnt > 4 ) throw IllegalArgumentException("overflow while decoding varuint") + } + return result +} + +fun encodeVarUnsigned(value: UInt): UByteArray { + val result = mutableListOf() + var rest = value + do { + val mask = if( rest <= 0x7fu ) 0x80u else 0u + result.add( (mask or (rest and 0x7fu)).toUByte() ) + rest = rest shr 7 + } while(rest != 0u) + return result.toUByteArray() +} + + fun randomBytes(n: Int): UByteArray = if (n > 0) LibsodiumRandom.buf(n) else ubyteArrayOf() fun randomBytes(n: UInt): UByteArray = if (n > 0u) LibsodiumRandom.buf(n.toInt()) else ubyteArrayOf() diff --git a/src/commonMain/kotlin/net/sergeych/kiloparsec/adapter/NetworkProvider.kt b/src/commonMain/kotlin/net/sergeych/kiloparsec/adapter/NetworkProvider.kt index 345d1fa..f4177f9 100644 --- a/src/commonMain/kotlin/net/sergeych/kiloparsec/adapter/NetworkProvider.kt +++ b/src/commonMain/kotlin/net/sergeych/kiloparsec/adapter/NetworkProvider.kt @@ -1,6 +1,8 @@ package net.sergeych.kiloparsec.adapter import kotlinx.coroutines.channels.ReceiveChannel +import kotlinx.coroutines.flow.Flow +import net.sergeych.kiloparsec.Transport /** * Multiplatform implementation of an internet address. @@ -24,12 +26,6 @@ interface Datagram { * Address from where the message was sent */ val address: NetworkAddress - - /** - * Send a datagram in response, e.g., to the [address]. - * This method is optimized per single per-datagram use. If you need to send many datagram, use [DatagramConnector]. - */ - suspend fun respondWith(message: UByteArray) } @OptIn(ExperimentalStdlibApi::class) @@ -39,7 +35,7 @@ interface DatagramConnector: AutoCloseable { suspend fun send(message: UByteArray, networkAddress: NetworkAddress) @Suppress("unused") suspend fun send(message: UByteArray, datagramAddress: String) { - send(message, networkAddressOf(datagramAddress)) + send(message, datagramAddress.toNetworkAddress()) } suspend fun send(message: UByteArray,host: String,port: Int) = @@ -47,5 +43,14 @@ interface DatagramConnector: AutoCloseable { override fun close() } -expect fun networkAddressOf(address: String): NetworkAddress expect fun NetworkAddress(host: String,port: Int): NetworkAddress + +fun CharSequence.toNetworkAddress() : NetworkAddress { + val (host, port) = this.split(":").map { it.trim()} + return NetworkAddress(host, port.toInt()) +} + + +expect fun acceptTcpDevice(pord: Int): Flow + +expect suspend fun connectTcpDevice(address: NetworkAddress): Transport.Device \ No newline at end of file diff --git a/src/commonMain/kotlin/net/sergeych/kiloparsec/adapter/ProxyDevice.kt b/src/commonMain/kotlin/net/sergeych/kiloparsec/adapter/ProxyDevice.kt new file mode 100644 index 0000000..914bc11 --- /dev/null +++ b/src/commonMain/kotlin/net/sergeych/kiloparsec/adapter/ProxyDevice.kt @@ -0,0 +1,18 @@ +package net.sergeych.kiloparsec.adapter + +import kotlinx.coroutines.channels.Channel +import kotlinx.coroutines.channels.ReceiveChannel +import kotlinx.coroutines.channels.SendChannel +import net.sergeych.kiloparsec.Transport + +class ProxyDevice( + inputChannel: Channel, + outputChannel: Channel, + private val onClose: ()->Unit = {}): Transport.Device { + + override val input: ReceiveChannel = inputChannel + override val output: SendChannel = outputChannel + override suspend fun close() { + onClose() + } +} \ No newline at end of file diff --git a/src/jsMain/kotlin/net/sergeych/kiloparsec/adapter/DatagramProvider.js.kt b/src/jsMain/kotlin/net/sergeych/kiloparsec/adapter/DatagramProvider.js.kt index ff55b73..64d0da0 100644 --- a/src/jsMain/kotlin/net/sergeych/kiloparsec/adapter/DatagramProvider.js.kt +++ b/src/jsMain/kotlin/net/sergeych/kiloparsec/adapter/DatagramProvider.js.kt @@ -1,5 +1,2 @@ package net.sergeych.kiloparsec.adapter -actual fun networkAddressOf(address: String): NetworkAddress { - TODO("Not yet implemented") -} \ No newline at end of file diff --git a/src/jsMain/kotlin/net/sergeych/kiloparsec/adapter/NetworkProvider.js.kt b/src/jsMain/kotlin/net/sergeych/kiloparsec/adapter/NetworkProvider.js.kt index d4dc4ab..5e08d7d 100644 --- a/src/jsMain/kotlin/net/sergeych/kiloparsec/adapter/NetworkProvider.js.kt +++ b/src/jsMain/kotlin/net/sergeych/kiloparsec/adapter/NetworkProvider.js.kt @@ -1,5 +1,16 @@ package net.sergeych.kiloparsec.adapter +import kotlinx.coroutines.flow.Flow +import net.sergeych.kiloparsec.Transport + actual fun NetworkAddress(host: String, port: Int): NetworkAddress { TODO("Not yet implemented") +} + +actual fun acceptTcpDevice(pord: Int): Flow { + TODO("Not yet implemented") +} + +actual suspend fun connectTcpDevice(address: NetworkAddress): Transport.Device { + TODO("Not yet implemented") } \ No newline at end of file diff --git a/src/jvmMain/kotlin/net/sergeych/kiloparsec/adapter/ContinuationHandler.kt b/src/jvmMain/kotlin/net/sergeych/kiloparsec/adapter/ContinuationHandler.kt new file mode 100644 index 0000000..582c1be --- /dev/null +++ b/src/jvmMain/kotlin/net/sergeych/kiloparsec/adapter/ContinuationHandler.kt @@ -0,0 +1,22 @@ +package net.sergeych.kiloparsec.adapter + +import java.nio.channels.CompletionHandler +import kotlin.coroutines.Continuation +import kotlin.coroutines.resume +import kotlin.coroutines.resumeWithException + +open class ContinuationHandler : CompletionHandler> { + override fun completed(result: T, attachment: Continuation) { + println("completed $result") + attachment.resume(result) + } + + override fun failed(exc: Throwable, attachment: Continuation) { + println("failed $exc") + attachment.resumeWithException(exc) + } +} + +object VoidCompletionHandler: ContinuationHandler() + +object IntCompletionHandler: ContinuationHandler() \ No newline at end of file diff --git a/src/jvmMain/kotlin/net/sergeych/kiloparsec/adapter/DatagramProvider.jvm.kt b/src/jvmMain/kotlin/net/sergeych/kiloparsec/adapter/DatagramProvider.jvm.kt deleted file mode 100644 index 7a8cea4..0000000 --- a/src/jvmMain/kotlin/net/sergeych/kiloparsec/adapter/DatagramProvider.jvm.kt +++ /dev/null @@ -1,8 +0,0 @@ -package net.sergeych.kiloparsec.adapter - -import java.net.InetAddress - -actual fun networkAddressOf(address: String): NetworkAddress { - val (host,port) = address.split(":") - return JvmNetworkAddress(InetAddress.getByName(host), port.toInt()) -} \ No newline at end of file diff --git a/src/jvmMain/kotlin/net/sergeych/kiloparsec/adapter/NetworkProvider.jvm.kt b/src/jvmMain/kotlin/net/sergeych/kiloparsec/adapter/NetworkProvider.jvm.kt index 6de79e8..8de27a3 100644 --- a/src/jvmMain/kotlin/net/sergeych/kiloparsec/adapter/NetworkProvider.jvm.kt +++ b/src/jvmMain/kotlin/net/sergeych/kiloparsec/adapter/NetworkProvider.jvm.kt @@ -1,6 +1,37 @@ package net.sergeych.kiloparsec.adapter +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.channels.SendChannel +import kotlinx.coroutines.flow.Flow +import kotlinx.coroutines.flow.flow +import kotlinx.coroutines.withContext +import net.sergeych.kiloparsec.Transport import java.net.InetAddress +import java.nio.channels.AsynchronousSocketChannel +import kotlin.coroutines.suspendCoroutine actual fun NetworkAddress(host: String, port: Int): NetworkAddress = JvmNetworkAddress(InetAddress.getByName(host), port) + +actual fun acceptTcpDevice(pord: Int): Flow { + return flow { + + TODO("Not yet implemented") + } +} + +actual suspend fun connectTcpDevice(address: NetworkAddress): Transport.Device { + address as JvmNetworkAddress + val socket = withContext(Dispatchers.IO) { + AsynchronousSocketChannel.open() + } + suspendCoroutine { cont -> + socket.connect(address.socketAddress, cont, VoidCompletionHandler) + } + println("connected") + return asyncSocketToDevice(socket) +} + +suspend fun SendChannel.sendAll(bytes: Collection) { + for( b in bytes) send(b) +} \ No newline at end of file diff --git a/src/jvmMain/kotlin/net/sergeych/kiloparsec/adapter/UdpServer.kt b/src/jvmMain/kotlin/net/sergeych/kiloparsec/adapter/UdpServer.kt index 4766d0f..2a3ce76 100644 --- a/src/jvmMain/kotlin/net/sergeych/kiloparsec/adapter/UdpServer.kt +++ b/src/jvmMain/kotlin/net/sergeych/kiloparsec/adapter/UdpServer.kt @@ -3,15 +3,11 @@ package net.sergeych.kiloparsec.adapter import kotlinx.coroutines.* import kotlinx.coroutines.channels.BufferOverflow import kotlinx.coroutines.channels.Channel -import kotlinx.coroutines.sync.Mutex -import kotlinx.coroutines.sync.withLock import net.sergeych.mp_logger.LogTag import net.sergeych.mp_logger.exception import net.sergeych.mp_logger.info import net.sergeych.mp_logger.warning -import java.net.DatagramPacket -import java.net.DatagramSocket -import java.net.InetAddress +import java.net.* import java.util.concurrent.atomic.AtomicInteger private val counter = AtomicInteger(0) @@ -28,6 +24,8 @@ class JvmNetworkAddress(val inetAddress: InetAddress, override val port: Int) : return true } + val socketAddress: SocketAddress by lazy { InetSocketAddress(inetAddress,port) } + override fun hashCode(): Int { var result = inetAddress.hashCode() result = 31 * result + port @@ -42,28 +40,12 @@ class UdpDatagram(override val message: UByteArray, val inetAddress: InetAddress JvmNetworkAddress(inetAddress, port) } - private val access = Mutex() - - private var socket: DatagramSocket? = null - override suspend fun respondWith(message: UByteArray) { - withContext(Dispatchers.IO) { - access.withLock { - if (socket == null) socket = DatagramSocket() - val packet = DatagramPacket( - message.toByteArray(), - message.size, - inetAddress, - port - ) - socket!!.send(packet) - } - } - } } + @OptIn(DelicateCoroutinesApi::class) class UdpServer(val port: Int) : - DatagramReceiver, LogTag("UDPS:${counter.incrementAndGet()}") { + DatagramConnector, LogTag("UDPS:${counter.incrementAndGet()}") { private var isClosed = false diff --git a/src/jvmMain/kotlin/net/sergeych/kiloparsec/adapter/asyncSocketToDevice.kt b/src/jvmMain/kotlin/net/sergeych/kiloparsec/adapter/asyncSocketToDevice.kt new file mode 100644 index 0000000..ac8daf4 --- /dev/null +++ b/src/jvmMain/kotlin/net/sergeych/kiloparsec/adapter/asyncSocketToDevice.kt @@ -0,0 +1,101 @@ +package net.sergeych.kiloparsec.adapter + +import kotlinx.coroutines.CompletableDeferred +import kotlinx.coroutines.cancel +import kotlinx.coroutines.channels.Channel +import kotlinx.coroutines.channels.ClosedReceiveChannelException +import kotlinx.coroutines.channels.ClosedSendChannelException +import kotlinx.coroutines.isActive +import kotlinx.coroutines.launch +import net.sergeych.crypto.encodeVarUnsigned +import net.sergeych.crypto.readVarUnsigned +import net.sergeych.kiloparsec.Transport +import net.sergeych.mp_tools.globalLaunch +import java.nio.ByteBuffer +import java.nio.channels.AsynchronousSocketChannel +import kotlin.coroutines.cancellation.CancellationException +import kotlin.coroutines.suspendCoroutine + +/** + * Convert asynchronous socket to a [Transport.Device] using non-blocking nio, + * in a coroutine-effective manner. Note that it runs coroutines to read/write + * to the socket in a global scope.These are closed when transport is closed + * or the socket is closed, for example, by network failure. + */ +suspend fun asyncSocketToDevice(socket: AsynchronousSocketChannel): Transport.Device { + val deferredDevice = CompletableDeferred() + globalLaunch { + fun stop() { + cancel() + runCatching { socket.close() } + } + val input = Channel(1024) + val output = Channel(1024) + // copy from socket to input + launch { + val inb = ByteBuffer.allocate(1024) + while (isActive) { + val size: Int = suspendCoroutine { continuation -> + socket.read(inb, continuation, IntCompletionHandler) + } + if (size < 0) stop() + else for (i in 0.. + socket.write(outb, continuation, IntCompletionHandler) + } + } + } catch (_: ClosedReceiveChannelException) { + stop() + } + } + // pump blocks from socket output to device input + val inputBlocks = Channel() + launch { + try { + while (isActive) { + val size = readVarUnsigned(input) + if (size == 0u) println("*** zero size block is ignored!") + else { + val block = UByteArray(size.toInt()) + for (i in 0..() + launch { + try { + while (isActive) { + val block = outputBlocks.receive() + output.sendAll(encodeVarUnsigned(block.size.toUInt())) + output.sendAll(block) + } + } catch (_: ClosedSendChannelException) { + stop() + } + } + deferredDevice.complete( + ProxyDevice(inputBlocks, outputBlocks) { stop() } + ) + } + return deferredDevice.await() +} diff --git a/src/jvmTest/kotlin/net/sergeych/kiloparsec/adapters/UServerTest.kt b/src/jvmTest/kotlin/net/sergeych/kiloparsec/adapters/NetworkTest.kt similarity index 64% rename from src/jvmTest/kotlin/net/sergeych/kiloparsec/adapters/UServerTest.kt rename to src/jvmTest/kotlin/net/sergeych/kiloparsec/adapters/NetworkTest.kt index 102b58b..c301d58 100644 --- a/src/jvmTest/kotlin/net/sergeych/kiloparsec/adapters/UServerTest.kt +++ b/src/jvmTest/kotlin/net/sergeych/kiloparsec/adapters/NetworkTest.kt @@ -3,14 +3,16 @@ package net.sergeych.kiloparsec.adapters import com.ionspin.kotlin.crypto.util.encodeToUByteArray import kotlinx.coroutines.test.runTest import net.sergeych.kiloparsec.adapter.UdpServer +import net.sergeych.kiloparsec.adapter.connectTcpDevice +import net.sergeych.kiloparsec.adapter.toNetworkAddress import net.sergeych.mp_logger.Log import org.junit.jupiter.api.Assertions.assertEquals import kotlin.test.Test -class UServerTest { +class NetworkTest { @Test - fun udpProvider() = runTest { + fun udpProviderTest() = runTest { Log.connectConsole(Log.Level.DEBUG) val s1 = UdpServer(17120) val s2 = UdpServer(17121) @@ -18,9 +20,16 @@ class UServerTest { val d1 = s2.incoming.receive() assertEquals(d1.address.port, 17120) assertEquals("Hello", d1.message.toByteArray().decodeToString()) - d1.respondWith("world".encodeToUByteArray()) + s1.send("world".encodeToUByteArray(),d1.address) assertEquals("world", s1.incoming.receive().message.toByteArray().decodeToString()) // println("s1: ${s1.bindAddress()}") } + + @Test + fun tcpAsyncConnectionTest() = runTest { + Log.connectConsole(Log.Level.DEBUG) + val s = connectTcpDevice("sergeych.net:80".toNetworkAddress()) + s.input.receive() + } } \ No newline at end of file diff --git a/src/nativeMain/kotlin/net/sergeych/kiloparsec/adapter/DatagramProvider.native.kt b/src/nativeMain/kotlin/net/sergeych/kiloparsec/adapter/DatagramProvider.native.kt index ff55b73..64d0da0 100644 --- a/src/nativeMain/kotlin/net/sergeych/kiloparsec/adapter/DatagramProvider.native.kt +++ b/src/nativeMain/kotlin/net/sergeych/kiloparsec/adapter/DatagramProvider.native.kt @@ -1,5 +1,2 @@ package net.sergeych.kiloparsec.adapter -actual fun networkAddressOf(address: String): NetworkAddress { - TODO("Not yet implemented") -} \ No newline at end of file diff --git a/src/nativeMain/kotlin/net/sergeych/kiloparsec/adapter/NetworkProvider.native.kt b/src/nativeMain/kotlin/net/sergeych/kiloparsec/adapter/NetworkProvider.native.kt index d4dc4ab..5e08d7d 100644 --- a/src/nativeMain/kotlin/net/sergeych/kiloparsec/adapter/NetworkProvider.native.kt +++ b/src/nativeMain/kotlin/net/sergeych/kiloparsec/adapter/NetworkProvider.native.kt @@ -1,5 +1,16 @@ package net.sergeych.kiloparsec.adapter +import kotlinx.coroutines.flow.Flow +import net.sergeych.kiloparsec.Transport + actual fun NetworkAddress(host: String, port: Int): NetworkAddress { TODO("Not yet implemented") +} + +actual fun acceptTcpDevice(pord: Int): Flow { + TODO("Not yet implemented") +} + +actual suspend fun connectTcpDevice(address: NetworkAddress): Transport.Device { + TODO("Not yet implemented") } \ No newline at end of file