From 1814572a046d3b76dcb5619726b569a59bb5d1ba Mon Sep 17 00:00:00 2001 From: sergeych Date: Tue, 14 Nov 2023 02:27:10 +0300 Subject: [PATCH] working suspended TCP block device on JVM (server and client) --- .../kotlin/net/sergeych/crypto/tools.kt | 13 ++++--- .../kiloparsec/adapter/NetworkProvider.kt | 2 +- .../kiloparsec/adapter/NetworkProvider.js.kt | 2 +- .../kiloparsec/adapter/NetworkProvider.jvm.kt | 24 ++++++++++--- .../kiloparsec/adapter/asyncSocketToDevice.kt | 16 +++++---- .../kiloparsec/adapters/NetworkTest.kt | 34 ++++++++++++++++--- .../adapter/NetworkProvider.native.kt | 2 +- 7 files changed, 71 insertions(+), 22 deletions(-) diff --git a/src/commonMain/kotlin/net/sergeych/crypto/tools.kt b/src/commonMain/kotlin/net/sergeych/crypto/tools.kt index 3d2be30..9de4723 100644 --- a/src/commonMain/kotlin/net/sergeych/crypto/tools.kt +++ b/src/commonMain/kotlin/net/sergeych/crypto/tools.kt @@ -7,6 +7,7 @@ import com.ionspin.kotlin.crypto.secretbox.crypto_secretbox_NONCEBYTES import com.ionspin.kotlin.crypto.util.LibsodiumRandom import kotlinx.coroutines.channels.ReceiveChannel import kotlinx.serialization.Serializable +import net.sergeych.bintools.encodeToHex import net.sergeych.bintools.toDataSource import net.sergeych.bipack.BipackDecoder import net.sergeych.bipack.BipackEncoder @@ -30,13 +31,17 @@ data class WithFill( suspend fun readVarUnsigned(input: ReceiveChannel): UInt { var result = 0u var cnt = 0 + println("----start") while(true) { val b = input.receive().toUInt() - result = (result shr 7) or (b and 0x7fu) - if( (b and 0x80u) != 0u ) break - if( ++cnt > 4 ) throw IllegalArgumentException("overflow while decoding varuint") + println("RVU: ${b.encodeToHex()} / ${b and 0x80u}") + result = (result shl 7) or (b and 0x7fu) + if( (b and 0x80u) != 0u ) { + println("---! $result") + return result + } + if( ++cnt > 5 ) throw IllegalArgumentException("overflow while decoding varuint") } - return result } fun encodeVarUnsigned(value: UInt): UByteArray { diff --git a/src/commonMain/kotlin/net/sergeych/kiloparsec/adapter/NetworkProvider.kt b/src/commonMain/kotlin/net/sergeych/kiloparsec/adapter/NetworkProvider.kt index f4177f9..1de4cca 100644 --- a/src/commonMain/kotlin/net/sergeych/kiloparsec/adapter/NetworkProvider.kt +++ b/src/commonMain/kotlin/net/sergeych/kiloparsec/adapter/NetworkProvider.kt @@ -51,6 +51,6 @@ fun CharSequence.toNetworkAddress() : NetworkAddress { } -expect fun acceptTcpDevice(pord: Int): Flow +expect fun acceptTcpDevice(port: Int): Flow expect suspend fun connectTcpDevice(address: NetworkAddress): Transport.Device \ No newline at end of file diff --git a/src/jsMain/kotlin/net/sergeych/kiloparsec/adapter/NetworkProvider.js.kt b/src/jsMain/kotlin/net/sergeych/kiloparsec/adapter/NetworkProvider.js.kt index 5e08d7d..7f7d68c 100644 --- a/src/jsMain/kotlin/net/sergeych/kiloparsec/adapter/NetworkProvider.js.kt +++ b/src/jsMain/kotlin/net/sergeych/kiloparsec/adapter/NetworkProvider.js.kt @@ -7,7 +7,7 @@ actual fun NetworkAddress(host: String, port: Int): NetworkAddress { TODO("Not yet implemented") } -actual fun acceptTcpDevice(pord: Int): Flow { +actual fun acceptTcpDevice(port: Int): Flow { TODO("Not yet implemented") } diff --git a/src/jvmMain/kotlin/net/sergeych/kiloparsec/adapter/NetworkProvider.jvm.kt b/src/jvmMain/kotlin/net/sergeych/kiloparsec/adapter/NetworkProvider.jvm.kt index 8de27a3..f492512 100644 --- a/src/jvmMain/kotlin/net/sergeych/kiloparsec/adapter/NetworkProvider.jvm.kt +++ b/src/jvmMain/kotlin/net/sergeych/kiloparsec/adapter/NetworkProvider.jvm.kt @@ -4,19 +4,35 @@ import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.channels.SendChannel import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.flow +import kotlinx.coroutines.suspendCancellableCoroutine import kotlinx.coroutines.withContext import net.sergeych.kiloparsec.Transport import java.net.InetAddress +import java.net.InetSocketAddress +import java.nio.channels.AsynchronousServerSocketChannel import java.nio.channels.AsynchronousSocketChannel import kotlin.coroutines.suspendCoroutine actual fun NetworkAddress(host: String, port: Int): NetworkAddress = JvmNetworkAddress(InetAddress.getByName(host), port) -actual fun acceptTcpDevice(pord: Int): Flow { +actual fun acceptTcpDevice(port: Int): Flow { return flow { - - TODO("Not yet implemented") + println("start generating tcp accept flow") + val socket = withContext(Dispatchers.IO) { + AsynchronousServerSocketChannel.open().also { + it.bind(InetSocketAddress(InetAddress.getLocalHost(), port)) + } + } + println("Server socket ready $socket") + val connectedSocket = suspendCancellableCoroutine { continuation -> + continuation.invokeOnCancellation { + socket.close() + } + socket.accept(continuation, ContinuationHandler()) + } + println("incoming connection") + emit(asyncSocketToDevice(connectedSocket)) } } @@ -33,5 +49,5 @@ actual suspend fun connectTcpDevice(address: NetworkAddress): Transport.Device { } suspend fun SendChannel.sendAll(bytes: Collection) { - for( b in bytes) send(b) + for (b in bytes) send(b) } \ No newline at end of file diff --git a/src/jvmMain/kotlin/net/sergeych/kiloparsec/adapter/asyncSocketToDevice.kt b/src/jvmMain/kotlin/net/sergeych/kiloparsec/adapter/asyncSocketToDevice.kt index ac8daf4..7d3fdb7 100644 --- a/src/jvmMain/kotlin/net/sergeych/kiloparsec/adapter/asyncSocketToDevice.kt +++ b/src/jvmMain/kotlin/net/sergeych/kiloparsec/adapter/asyncSocketToDevice.kt @@ -9,6 +9,7 @@ import kotlinx.coroutines.isActive import kotlinx.coroutines.launch import net.sergeych.crypto.encodeVarUnsigned import net.sergeych.crypto.readVarUnsigned +import net.sergeych.crypto.toDump import net.sergeych.kiloparsec.Transport import net.sergeych.mp_tools.globalLaunch import java.nio.ByteBuffer @@ -38,21 +39,22 @@ suspend fun asyncSocketToDevice(socket: AsynchronousSocketChannel): Transport.De val size: Int = suspendCoroutine { continuation -> socket.read(inb, continuation, IntCompletionHandler) } + println("--------- read chunk $size") if (size < 0) stop() else for (i in 0..(1024) try { while (isActive) { - var count = 0 - outb.put(count++, output.receive().toByte()) - while (!output.isEmpty && count < outb.capacity()) - outb.put(count++, output.receive().toByte()) + outBuff.clear() + outBuff.add(output.receive().toByte()) + while (!output.isEmpty) + outBuff.add(output.receive().toByte()) suspendCoroutine { continuation -> - socket.write(outb, continuation, IntCompletionHandler) + socket.write(ByteBuffer.wrap(outBuff.toByteArray()), continuation, IntCompletionHandler) } } } catch (_: ClosedReceiveChannelException) { @@ -65,12 +67,14 @@ suspend fun asyncSocketToDevice(socket: AsynchronousSocketChannel): Transport.De try { while (isActive) { val size = readVarUnsigned(input) + println("expected size $size") if (size == 0u) println("*** zero size block is ignored!") else { val block = UByteArray(size.toInt()) for (i in 0.. + println("collected input: $device") + device.output.send("Hello, world!".encodeToUByteArray()) + device.output.send("Great".encodeToUByteArray()) + while(true) { + val x = device.input.receive()!!.decodeFromUByteArray() + if( x== "Goodbye" ) break + println("ignoring unexpected input: $x") + } + } + } + yield() + val s = connectTcpDevice("127.0.1.1:17171".toNetworkAddress()) + assertEquals("Hello, world!", s.input.receive()!!.decodeFromUByteArray()) + assertEquals("Great", s.input.receive()!!.decodeFromUByteArray()) + s.output.send("Goodbye".encodeToUByteArray()) + } } } \ No newline at end of file diff --git a/src/nativeMain/kotlin/net/sergeych/kiloparsec/adapter/NetworkProvider.native.kt b/src/nativeMain/kotlin/net/sergeych/kiloparsec/adapter/NetworkProvider.native.kt index 5e08d7d..7f7d68c 100644 --- a/src/nativeMain/kotlin/net/sergeych/kiloparsec/adapter/NetworkProvider.native.kt +++ b/src/nativeMain/kotlin/net/sergeych/kiloparsec/adapter/NetworkProvider.native.kt @@ -7,7 +7,7 @@ actual fun NetworkAddress(host: String, port: Int): NetworkAddress { TODO("Not yet implemented") } -actual fun acceptTcpDevice(pord: Int): Flow { +actual fun acceptTcpDevice(port: Int): Flow { TODO("Not yet implemented") }