diff --git a/src/commonMain/kotlin/net/sergeych/crypto/tools.kt b/src/commonMain/kotlin/net/sergeych/crypto/tools.kt index 9de4723..3e970b2 100644 --- a/src/commonMain/kotlin/net/sergeych/crypto/tools.kt +++ b/src/commonMain/kotlin/net/sergeych/crypto/tools.kt @@ -7,7 +7,6 @@ import com.ionspin.kotlin.crypto.secretbox.crypto_secretbox_NONCEBYTES import com.ionspin.kotlin.crypto.util.LibsodiumRandom import kotlinx.coroutines.channels.ReceiveChannel import kotlinx.serialization.Serializable -import net.sergeych.bintools.encodeToHex import net.sergeych.bintools.toDataSource import net.sergeych.bipack.BipackDecoder import net.sergeych.bipack.BipackEncoder @@ -31,13 +30,10 @@ data class WithFill( suspend fun readVarUnsigned(input: ReceiveChannel): UInt { var result = 0u var cnt = 0 - println("----start") while(true) { val b = input.receive().toUInt() - println("RVU: ${b.encodeToHex()} / ${b and 0x80u}") result = (result shl 7) or (b and 0x7fu) if( (b and 0x80u) != 0u ) { - println("---! $result") return result } if( ++cnt > 5 ) throw IllegalArgumentException("overflow while decoding varuint") diff --git a/src/jvmMain/kotlin/net/sergeych/kiloparsec/adapter/ContinuationHandler.kt b/src/jvmMain/kotlin/net/sergeych/kiloparsec/adapter/ContinuationHandler.kt index 582c1be..3e8a55f 100644 --- a/src/jvmMain/kotlin/net/sergeych/kiloparsec/adapter/ContinuationHandler.kt +++ b/src/jvmMain/kotlin/net/sergeych/kiloparsec/adapter/ContinuationHandler.kt @@ -7,12 +7,10 @@ import kotlin.coroutines.resumeWithException open class ContinuationHandler : CompletionHandler> { override fun completed(result: T, attachment: Continuation) { - println("completed $result") attachment.resume(result) } override fun failed(exc: Throwable, attachment: Continuation) { - println("failed $exc") attachment.resumeWithException(exc) } } diff --git a/src/jvmMain/kotlin/net/sergeych/kiloparsec/adapter/NetworkProvider.jvm.kt b/src/jvmMain/kotlin/net/sergeych/kiloparsec/adapter/NetworkProvider.jvm.kt index 6321930..3bdad24 100644 --- a/src/jvmMain/kotlin/net/sergeych/kiloparsec/adapter/NetworkProvider.jvm.kt +++ b/src/jvmMain/kotlin/net/sergeych/kiloparsec/adapter/NetworkProvider.jvm.kt @@ -18,21 +18,18 @@ actual fun NetworkAddress(host: String, port: Int): NetworkAddress = actual fun acceptTcpDevice(port: Int): Flow { return flow { - println("start generating tcp accept flow") val socket = withContext(Dispatchers.IO) { AsynchronousServerSocketChannel.open().also { it.bind(InetSocketAddress(InetAddress.getLocalHost(), port)) } } while (true) { - println("Server socket ready $socket") val connectedSocket = suspendCancellableCoroutine { continuation -> continuation.invokeOnCancellation { socket.close() } socket.accept(continuation, ContinuationHandler()) } - println("incoming connection") emit(asyncSocketToDevice(connectedSocket)) } } @@ -46,7 +43,6 @@ actual suspend fun connectTcpDevice(address: NetworkAddress): Transport.Device { suspendCoroutine { cont -> socket.connect(address.socketAddress, cont, VoidCompletionHandler) } - println("connected") return asyncSocketToDevice(socket) } diff --git a/src/jvmMain/kotlin/net/sergeych/kiloparsec/adapter/asyncSocketToDevice.kt b/src/jvmMain/kotlin/net/sergeych/kiloparsec/adapter/asyncSocketToDevice.kt index d6084d5..b9c1e2a 100644 --- a/src/jvmMain/kotlin/net/sergeych/kiloparsec/adapter/asyncSocketToDevice.kt +++ b/src/jvmMain/kotlin/net/sergeych/kiloparsec/adapter/asyncSocketToDevice.kt @@ -6,14 +6,18 @@ import kotlinx.coroutines.channels.ClosedReceiveChannelException import kotlinx.coroutines.channels.ClosedSendChannelException import net.sergeych.crypto.encodeVarUnsigned import net.sergeych.crypto.readVarUnsigned -import net.sergeych.crypto.toDump import net.sergeych.kiloparsec.Transport +import net.sergeych.mp_logger.LogTag +import net.sergeych.mp_logger.warning import net.sergeych.mp_tools.globalLaunch +import java.net.StandardSocketOptions.TCP_NODELAY import java.nio.ByteBuffer import java.nio.channels.AsynchronousSocketChannel import kotlin.coroutines.cancellation.CancellationException import kotlin.coroutines.suspendCoroutine +private val log = LogTag("ASTD") + /** * Convert asynchronous socket to a [Transport.Device] using non-blocking nio, * in a coroutine-effective manner. Note that it runs coroutines to read/write @@ -23,21 +27,20 @@ import kotlin.coroutines.suspendCoroutine suspend fun asyncSocketToDevice(socket: AsynchronousSocketChannel): Transport.Device { val deferredDevice = CompletableDeferred() globalLaunch { - fun stop() { - cancel() - } - - val input = Channel(1024) - val output = Channel(1024) - // copy from socket to input coroutineScope { + fun stop() { + cancel() + } + socket.setOption(TCP_NODELAY, true) + val input = Channel(1024) + val output = Channel(1024) + // copy from socket to input launch { val inb = ByteBuffer.allocate(1024) while (isActive) { val size: Int = suspendCoroutine { continuation -> socket.read(inb, continuation, IntCompletionHandler) } - println("--------- read chunk $size") if (size < 0) stop() else for (i in 0.. - println("collected input: $device") device.output.send("Hello, world!".encodeToUByteArray()) device.output.send("Great".encodeToUByteArray()) while(true) { val x = device.input.receive()?.decodeFromUByteArray() ?: break - println("!****************** $x") if( x== "Goodbye" ) break if( x == "die") { - println("request death") cancel() break } @@ -60,12 +57,9 @@ class NetworkTest { s.output.send("Goodbye".encodeToUByteArray()) s.close() - println("connecting- -----------------------------------------------------------------") s = connectTcpDevice("127.0.1.1:17171".toNetworkAddress()) assertEquals("Hello, world!", s.input.receive()!!.decodeFromUByteArray()) - println("--1") assertEquals("Great", s.input.receive()!!.decodeFromUByteArray()) - println("--2") // s.output.send("die".encodeToUByteArray()) s.close() j.cancelAndJoin()