diff --git a/src/jvmMain/kotlin/net/sergeych/kiloparsec/adapter/NetworkProvider.jvm.kt b/src/jvmMain/kotlin/net/sergeych/kiloparsec/adapter/NetworkProvider.jvm.kt index f492512..6321930 100644 --- a/src/jvmMain/kotlin/net/sergeych/kiloparsec/adapter/NetworkProvider.jvm.kt +++ b/src/jvmMain/kotlin/net/sergeych/kiloparsec/adapter/NetworkProvider.jvm.kt @@ -24,15 +24,17 @@ actual fun acceptTcpDevice(port: Int): Flow { it.bind(InetSocketAddress(InetAddress.getLocalHost(), port)) } } - println("Server socket ready $socket") - val connectedSocket = suspendCancellableCoroutine { continuation -> - continuation.invokeOnCancellation { - socket.close() + while (true) { + println("Server socket ready $socket") + val connectedSocket = suspendCancellableCoroutine { continuation -> + continuation.invokeOnCancellation { + socket.close() + } + socket.accept(continuation, ContinuationHandler()) } - socket.accept(continuation, ContinuationHandler()) + println("incoming connection") + emit(asyncSocketToDevice(connectedSocket)) } - println("incoming connection") - emit(asyncSocketToDevice(connectedSocket)) } } diff --git a/src/jvmMain/kotlin/net/sergeych/kiloparsec/adapter/asyncSocketToDevice.kt b/src/jvmMain/kotlin/net/sergeych/kiloparsec/adapter/asyncSocketToDevice.kt index 7d3fdb7..d6084d5 100644 --- a/src/jvmMain/kotlin/net/sergeych/kiloparsec/adapter/asyncSocketToDevice.kt +++ b/src/jvmMain/kotlin/net/sergeych/kiloparsec/adapter/asyncSocketToDevice.kt @@ -1,12 +1,9 @@ package net.sergeych.kiloparsec.adapter -import kotlinx.coroutines.CompletableDeferred -import kotlinx.coroutines.cancel +import kotlinx.coroutines.* import kotlinx.coroutines.channels.Channel import kotlinx.coroutines.channels.ClosedReceiveChannelException import kotlinx.coroutines.channels.ClosedSendChannelException -import kotlinx.coroutines.isActive -import kotlinx.coroutines.launch import net.sergeych.crypto.encodeVarUnsigned import net.sergeych.crypto.readVarUnsigned import net.sergeych.crypto.toDump @@ -28,78 +25,81 @@ suspend fun asyncSocketToDevice(socket: AsynchronousSocketChannel): Transport.De globalLaunch { fun stop() { cancel() - runCatching { socket.close() } } + val input = Channel(1024) val output = Channel(1024) // copy from socket to input - launch { - val inb = ByteBuffer.allocate(1024) - while (isActive) { - val size: Int = suspendCoroutine { continuation -> - socket.read(inb, continuation, IntCompletionHandler) - } - println("--------- read chunk $size") - if (size < 0) stop() - else for (i in 0..(1024) - try { + coroutineScope { + launch { + val inb = ByteBuffer.allocate(1024) while (isActive) { - outBuff.clear() - outBuff.add(output.receive().toByte()) - while (!output.isEmpty) + val size: Int = suspendCoroutine { continuation -> + socket.read(inb, continuation, IntCompletionHandler) + } + println("--------- read chunk $size") + if (size < 0) stop() + else for (i in 0..(1024) + try { + while (isActive) { + outBuff.clear() outBuff.add(output.receive().toByte()) - suspendCoroutine { continuation -> - socket.write(ByteBuffer.wrap(outBuff.toByteArray()), continuation, IntCompletionHandler) - } - } - } catch (_: ClosedReceiveChannelException) { - stop() - } - } - // pump blocks from socket output to device input - val inputBlocks = Channel() - launch { - try { - while (isActive) { - val size = readVarUnsigned(input) - println("expected size $size") - if (size == 0u) println("*** zero size block is ignored!") - else { - val block = UByteArray(size.toInt()) - for (i in 0.. + socket.write(ByteBuffer.wrap(outBuff.toByteArray()), continuation, IntCompletionHandler) } - println("ready block:\n${block.toDump()}") - inputBlocks.send(block) } + } catch (_: ClosedReceiveChannelException) { + stop() } - } catch (_: CancellationException) { - inputBlocks.send(null) - } catch (_: ClosedReceiveChannelException) { - inputBlocks.send(null) - stop() } - } - val outputBlocks = Channel() - launch { - try { - while (isActive) { - val block = outputBlocks.receive() - output.sendAll(encodeVarUnsigned(block.size.toUInt())) - output.sendAll(block) + // pump blocks from socket output to device input + val inputBlocks = Channel() + launch { + try { + while (isActive) { + val size = readVarUnsigned(input) + println("expected size $size") + if (size == 0u) println("*** zero size block is ignored!") + else { + val block = UByteArray(size.toInt()) + for (i in 0..() + launch { + try { + while (isActive) { + val block = outputBlocks.receive() + output.sendAll(encodeVarUnsigned(block.size.toUInt())) + output.sendAll(block) + } + } catch (_: ClosedSendChannelException) { + stop() + } + } + deferredDevice.complete( + ProxyDevice(inputBlocks, outputBlocks) { stop() } + ) } - deferredDevice.complete( - ProxyDevice(inputBlocks, outputBlocks) { stop() } - ) + globalLaunch { socket.close() } } return deferredDevice.await() } diff --git a/src/jvmTest/kotlin/net/sergeych/kiloparsec/adapters/NetworkTest.kt b/src/jvmTest/kotlin/net/sergeych/kiloparsec/adapters/NetworkTest.kt index c96a0c2..97f71de 100644 --- a/src/jvmTest/kotlin/net/sergeych/kiloparsec/adapters/NetworkTest.kt +++ b/src/jvmTest/kotlin/net/sergeych/kiloparsec/adapters/NetworkTest.kt @@ -2,7 +2,9 @@ package net.sergeych.kiloparsec.adapters import com.ionspin.kotlin.crypto.util.decodeFromUByteArray import com.ionspin.kotlin.crypto.util.encodeToUByteArray +import kotlinx.coroutines.cancel import kotlinx.coroutines.coroutineScope +import kotlinx.coroutines.flow.cancellable import kotlinx.coroutines.launch import kotlinx.coroutines.test.runTest import kotlinx.coroutines.yield @@ -36,24 +38,40 @@ class NetworkTest { Log.connectConsole(Log.Level.DEBUG) coroutineScope { - val serverFlow = acceptTcpDevice(17171) + val serverFlow = acceptTcpDevice(17171).cancellable() launch { serverFlow.collect { device -> println("collected input: $device") device.output.send("Hello, world!".encodeToUByteArray()) device.output.send("Great".encodeToUByteArray()) while(true) { - val x = device.input.receive()!!.decodeFromUByteArray() + val x = device.input.receive()?.decodeFromUByteArray() ?: break + println("!****************** $x") if( x== "Goodbye" ) break + if( x == "die") { + println("request death") + cancel() + break + } println("ignoring unexpected input: $x") } } } yield() - val s = connectTcpDevice("127.0.1.1:17171".toNetworkAddress()) + var s = connectTcpDevice("127.0.1.1:17171".toNetworkAddress()) assertEquals("Hello, world!", s.input.receive()!!.decodeFromUByteArray()) assertEquals("Great", s.input.receive()!!.decodeFromUByteArray()) s.output.send("Goodbye".encodeToUByteArray()) + s.close() + + println("connecting- -----------------------------------------------------------------") + s = connectTcpDevice("127.0.1.1:17171".toNetworkAddress()) + assertEquals("Hello, world!", s.input.receive()!!.decodeFromUByteArray()) + println("--1") + assertEquals("Great", s.input.receive()!!.decodeFromUByteArray()) + println("--2") + s.output.send("die".encodeToUByteArray()) +// s.close() } } } \ No newline at end of file