removed debug noise
This commit is contained in:
parent
0c05cce13b
commit
814263c11d
@ -7,7 +7,6 @@ import com.ionspin.kotlin.crypto.secretbox.crypto_secretbox_NONCEBYTES
|
||||
import com.ionspin.kotlin.crypto.util.LibsodiumRandom
|
||||
import kotlinx.coroutines.channels.ReceiveChannel
|
||||
import kotlinx.serialization.Serializable
|
||||
import net.sergeych.bintools.encodeToHex
|
||||
import net.sergeych.bintools.toDataSource
|
||||
import net.sergeych.bipack.BipackDecoder
|
||||
import net.sergeych.bipack.BipackEncoder
|
||||
@ -31,13 +30,10 @@ data class WithFill(
|
||||
suspend fun readVarUnsigned(input: ReceiveChannel<UByte>): UInt {
|
||||
var result = 0u
|
||||
var cnt = 0
|
||||
println("----start")
|
||||
while(true) {
|
||||
val b = input.receive().toUInt()
|
||||
println("RVU: ${b.encodeToHex()} / ${b and 0x80u}")
|
||||
result = (result shl 7) or (b and 0x7fu)
|
||||
if( (b and 0x80u) != 0u ) {
|
||||
println("---! $result")
|
||||
return result
|
||||
}
|
||||
if( ++cnt > 5 ) throw IllegalArgumentException("overflow while decoding varuint")
|
||||
|
@ -7,12 +7,10 @@ import kotlin.coroutines.resumeWithException
|
||||
|
||||
open class ContinuationHandler<T> : CompletionHandler<T, Continuation<T>> {
|
||||
override fun completed(result: T, attachment: Continuation<T>) {
|
||||
println("completed $result")
|
||||
attachment.resume(result)
|
||||
}
|
||||
|
||||
override fun failed(exc: Throwable, attachment: Continuation<T>) {
|
||||
println("failed $exc")
|
||||
attachment.resumeWithException(exc)
|
||||
}
|
||||
}
|
||||
|
@ -18,21 +18,18 @@ actual fun NetworkAddress(host: String, port: Int): NetworkAddress =
|
||||
|
||||
actual fun acceptTcpDevice(port: Int): Flow<Transport.Device> {
|
||||
return flow {
|
||||
println("start generating tcp accept flow")
|
||||
val socket = withContext(Dispatchers.IO) {
|
||||
AsynchronousServerSocketChannel.open().also {
|
||||
it.bind(InetSocketAddress(InetAddress.getLocalHost(), port))
|
||||
}
|
||||
}
|
||||
while (true) {
|
||||
println("Server socket ready $socket")
|
||||
val connectedSocket = suspendCancellableCoroutine { continuation ->
|
||||
continuation.invokeOnCancellation {
|
||||
socket.close()
|
||||
}
|
||||
socket.accept(continuation, ContinuationHandler())
|
||||
}
|
||||
println("incoming connection")
|
||||
emit(asyncSocketToDevice(connectedSocket))
|
||||
}
|
||||
}
|
||||
@ -46,7 +43,6 @@ actual suspend fun connectTcpDevice(address: NetworkAddress): Transport.Device {
|
||||
suspendCoroutine { cont ->
|
||||
socket.connect(address.socketAddress, cont, VoidCompletionHandler)
|
||||
}
|
||||
println("connected")
|
||||
return asyncSocketToDevice(socket)
|
||||
}
|
||||
|
||||
|
@ -6,14 +6,18 @@ import kotlinx.coroutines.channels.ClosedReceiveChannelException
|
||||
import kotlinx.coroutines.channels.ClosedSendChannelException
|
||||
import net.sergeych.crypto.encodeVarUnsigned
|
||||
import net.sergeych.crypto.readVarUnsigned
|
||||
import net.sergeych.crypto.toDump
|
||||
import net.sergeych.kiloparsec.Transport
|
||||
import net.sergeych.mp_logger.LogTag
|
||||
import net.sergeych.mp_logger.warning
|
||||
import net.sergeych.mp_tools.globalLaunch
|
||||
import java.net.StandardSocketOptions.TCP_NODELAY
|
||||
import java.nio.ByteBuffer
|
||||
import java.nio.channels.AsynchronousSocketChannel
|
||||
import kotlin.coroutines.cancellation.CancellationException
|
||||
import kotlin.coroutines.suspendCoroutine
|
||||
|
||||
private val log = LogTag("ASTD")
|
||||
|
||||
/**
|
||||
* Convert asynchronous socket to a [Transport.Device] using non-blocking nio,
|
||||
* in a coroutine-effective manner. Note that it runs coroutines to read/write
|
||||
@ -23,21 +27,20 @@ import kotlin.coroutines.suspendCoroutine
|
||||
suspend fun asyncSocketToDevice(socket: AsynchronousSocketChannel): Transport.Device {
|
||||
val deferredDevice = CompletableDeferred<Transport.Device>()
|
||||
globalLaunch {
|
||||
coroutineScope {
|
||||
fun stop() {
|
||||
cancel()
|
||||
}
|
||||
|
||||
socket.setOption(TCP_NODELAY, true)
|
||||
val input = Channel<UByte>(1024)
|
||||
val output = Channel<UByte>(1024)
|
||||
// copy from socket to input
|
||||
coroutineScope {
|
||||
launch {
|
||||
val inb = ByteBuffer.allocate(1024)
|
||||
while (isActive) {
|
||||
val size: Int = suspendCoroutine { continuation ->
|
||||
socket.read(inb, continuation, IntCompletionHandler)
|
||||
}
|
||||
println("--------- read chunk $size")
|
||||
if (size < 0) stop()
|
||||
else for (i in 0..<size) input.send(inb[i].toUByte().also { print(it.toInt().toChar()) })
|
||||
}
|
||||
@ -65,14 +68,12 @@ suspend fun asyncSocketToDevice(socket: AsynchronousSocketChannel): Transport.De
|
||||
try {
|
||||
while (isActive) {
|
||||
val size = readVarUnsigned(input)
|
||||
println("expected size $size")
|
||||
if (size == 0u) println("*** zero size block is ignored!")
|
||||
if (size == 0u) log.warning { "zero size block is ignored!" }
|
||||
else {
|
||||
val block = UByteArray(size.toInt())
|
||||
for (i in 0..<size.toInt()) {
|
||||
block[i] = input.receive()
|
||||
}
|
||||
println("ready block:\n${block.toDump()}")
|
||||
inputBlocks.send(block)
|
||||
}
|
||||
}
|
||||
|
@ -37,15 +37,12 @@ class NetworkTest {
|
||||
val serverFlow = acceptTcpDevice(17171)
|
||||
val j = launch {
|
||||
serverFlow.collect { device ->
|
||||
println("collected input: $device")
|
||||
device.output.send("Hello, world!".encodeToUByteArray())
|
||||
device.output.send("Great".encodeToUByteArray())
|
||||
while(true) {
|
||||
val x = device.input.receive()?.decodeFromUByteArray() ?: break
|
||||
println("!****************** $x")
|
||||
if( x== "Goodbye" ) break
|
||||
if( x == "die") {
|
||||
println("request death")
|
||||
cancel()
|
||||
break
|
||||
}
|
||||
@ -60,12 +57,9 @@ class NetworkTest {
|
||||
s.output.send("Goodbye".encodeToUByteArray())
|
||||
s.close()
|
||||
|
||||
println("connecting- -----------------------------------------------------------------")
|
||||
s = connectTcpDevice("127.0.1.1:17171".toNetworkAddress())
|
||||
assertEquals("Hello, world!", s.input.receive()!!.decodeFromUByteArray())
|
||||
println("--1")
|
||||
assertEquals("Great", s.input.receive()!!.decodeFromUByteArray())
|
||||
println("--2")
|
||||
// s.output.send("die".encodeToUByteArray())
|
||||
s.close()
|
||||
j.cancelAndJoin()
|
||||
|
Loading…
x
Reference in New Issue
Block a user