async socket closing logic

This commit is contained in:
Sergey Chernov 2023-11-14 02:50:42 +03:00
parent 1814572a04
commit 75e91f8092
3 changed files with 93 additions and 73 deletions

View File

@ -24,15 +24,17 @@ actual fun acceptTcpDevice(port: Int): Flow<Transport.Device> {
it.bind(InetSocketAddress(InetAddress.getLocalHost(), port)) it.bind(InetSocketAddress(InetAddress.getLocalHost(), port))
} }
} }
println("Server socket ready $socket") while (true) {
val connectedSocket = suspendCancellableCoroutine { continuation -> println("Server socket ready $socket")
continuation.invokeOnCancellation { val connectedSocket = suspendCancellableCoroutine { continuation ->
socket.close() continuation.invokeOnCancellation {
socket.close()
}
socket.accept(continuation, ContinuationHandler())
} }
socket.accept(continuation, ContinuationHandler()) println("incoming connection")
emit(asyncSocketToDevice(connectedSocket))
} }
println("incoming connection")
emit(asyncSocketToDevice(connectedSocket))
} }
} }

View File

@ -1,12 +1,9 @@
package net.sergeych.kiloparsec.adapter package net.sergeych.kiloparsec.adapter
import kotlinx.coroutines.CompletableDeferred import kotlinx.coroutines.*
import kotlinx.coroutines.cancel
import kotlinx.coroutines.channels.Channel import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.channels.ClosedReceiveChannelException import kotlinx.coroutines.channels.ClosedReceiveChannelException
import kotlinx.coroutines.channels.ClosedSendChannelException import kotlinx.coroutines.channels.ClosedSendChannelException
import kotlinx.coroutines.isActive
import kotlinx.coroutines.launch
import net.sergeych.crypto.encodeVarUnsigned import net.sergeych.crypto.encodeVarUnsigned
import net.sergeych.crypto.readVarUnsigned import net.sergeych.crypto.readVarUnsigned
import net.sergeych.crypto.toDump import net.sergeych.crypto.toDump
@ -28,78 +25,81 @@ suspend fun asyncSocketToDevice(socket: AsynchronousSocketChannel): Transport.De
globalLaunch { globalLaunch {
fun stop() { fun stop() {
cancel() cancel()
runCatching { socket.close() }
} }
val input = Channel<UByte>(1024) val input = Channel<UByte>(1024)
val output = Channel<UByte>(1024) val output = Channel<UByte>(1024)
// copy from socket to input // copy from socket to input
launch { coroutineScope {
val inb = ByteBuffer.allocate(1024) launch {
while (isActive) { val inb = ByteBuffer.allocate(1024)
val size: Int = suspendCoroutine { continuation ->
socket.read(inb, continuation, IntCompletionHandler)
}
println("--------- read chunk $size")
if (size < 0) stop()
else for (i in 0..<size) input.send(inb[i].toUByte().also { print(it.toInt().toChar())})
}
}
// copy from output tp socket
launch {
val outBuff = ArrayList<Byte>(1024)
try {
while (isActive) { while (isActive) {
outBuff.clear() val size: Int = suspendCoroutine { continuation ->
outBuff.add(output.receive().toByte()) socket.read(inb, continuation, IntCompletionHandler)
while (!output.isEmpty) }
println("--------- read chunk $size")
if (size < 0) stop()
else for (i in 0..<size) input.send(inb[i].toUByte().also { print(it.toInt().toChar()) })
}
}
// copy from output tp socket
launch {
val outBuff = ArrayList<Byte>(1024)
try {
while (isActive) {
outBuff.clear()
outBuff.add(output.receive().toByte()) outBuff.add(output.receive().toByte())
suspendCoroutine { continuation -> while (!output.isEmpty)
socket.write(ByteBuffer.wrap(outBuff.toByteArray()), continuation, IntCompletionHandler) outBuff.add(output.receive().toByte())
} suspendCoroutine { continuation ->
} socket.write(ByteBuffer.wrap(outBuff.toByteArray()), continuation, IntCompletionHandler)
} catch (_: ClosedReceiveChannelException) {
stop()
}
}
// pump blocks from socket output to device input
val inputBlocks = Channel<UByteArray?>()
launch {
try {
while (isActive) {
val size = readVarUnsigned(input)
println("expected size $size")
if (size == 0u) println("*** zero size block is ignored!")
else {
val block = UByteArray(size.toInt())
for (i in 0..<size.toInt()) {
block[i] = input.receive()
} }
println("ready block:\n${block.toDump()}")
inputBlocks.send(block)
} }
} catch (_: ClosedReceiveChannelException) {
stop()
} }
} catch (_: CancellationException) {
inputBlocks.send(null)
} catch (_: ClosedReceiveChannelException) {
inputBlocks.send(null)
stop()
} }
} // pump blocks from socket output to device input
val outputBlocks = Channel<UByteArray>() val inputBlocks = Channel<UByteArray?>()
launch { launch {
try { try {
while (isActive) { while (isActive) {
val block = outputBlocks.receive() val size = readVarUnsigned(input)
output.sendAll(encodeVarUnsigned(block.size.toUInt())) println("expected size $size")
output.sendAll(block) if (size == 0u) println("*** zero size block is ignored!")
else {
val block = UByteArray(size.toInt())
for (i in 0..<size.toInt()) {
block[i] = input.receive()
}
println("ready block:\n${block.toDump()}")
inputBlocks.send(block)
}
}
} catch (_: CancellationException) {
inputBlocks.send(null)
} catch (_: ClosedReceiveChannelException) {
inputBlocks.send(null)
stop()
} }
} catch (_: ClosedSendChannelException) {
stop()
} }
val outputBlocks = Channel<UByteArray>()
launch {
try {
while (isActive) {
val block = outputBlocks.receive()
output.sendAll(encodeVarUnsigned(block.size.toUInt()))
output.sendAll(block)
}
} catch (_: ClosedSendChannelException) {
stop()
}
}
deferredDevice.complete(
ProxyDevice(inputBlocks, outputBlocks) { stop() }
)
} }
deferredDevice.complete( globalLaunch { socket.close() }
ProxyDevice(inputBlocks, outputBlocks) { stop() }
)
} }
return deferredDevice.await() return deferredDevice.await()
} }

View File

@ -2,7 +2,9 @@ package net.sergeych.kiloparsec.adapters
import com.ionspin.kotlin.crypto.util.decodeFromUByteArray import com.ionspin.kotlin.crypto.util.decodeFromUByteArray
import com.ionspin.kotlin.crypto.util.encodeToUByteArray import com.ionspin.kotlin.crypto.util.encodeToUByteArray
import kotlinx.coroutines.cancel
import kotlinx.coroutines.coroutineScope import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.flow.cancellable
import kotlinx.coroutines.launch import kotlinx.coroutines.launch
import kotlinx.coroutines.test.runTest import kotlinx.coroutines.test.runTest
import kotlinx.coroutines.yield import kotlinx.coroutines.yield
@ -36,24 +38,40 @@ class NetworkTest {
Log.connectConsole(Log.Level.DEBUG) Log.connectConsole(Log.Level.DEBUG)
coroutineScope { coroutineScope {
val serverFlow = acceptTcpDevice(17171) val serverFlow = acceptTcpDevice(17171).cancellable()
launch { launch {
serverFlow.collect { device -> serverFlow.collect { device ->
println("collected input: $device") println("collected input: $device")
device.output.send("Hello, world!".encodeToUByteArray()) device.output.send("Hello, world!".encodeToUByteArray())
device.output.send("Great".encodeToUByteArray()) device.output.send("Great".encodeToUByteArray())
while(true) { while(true) {
val x = device.input.receive()!!.decodeFromUByteArray() val x = device.input.receive()?.decodeFromUByteArray() ?: break
println("!****************** $x")
if( x== "Goodbye" ) break if( x== "Goodbye" ) break
if( x == "die") {
println("request death")
cancel()
break
}
println("ignoring unexpected input: $x") println("ignoring unexpected input: $x")
} }
} }
} }
yield() yield()
val s = connectTcpDevice("127.0.1.1:17171".toNetworkAddress()) var s = connectTcpDevice("127.0.1.1:17171".toNetworkAddress())
assertEquals("Hello, world!", s.input.receive()!!.decodeFromUByteArray()) assertEquals("Hello, world!", s.input.receive()!!.decodeFromUByteArray())
assertEquals("Great", s.input.receive()!!.decodeFromUByteArray()) assertEquals("Great", s.input.receive()!!.decodeFromUByteArray())
s.output.send("Goodbye".encodeToUByteArray()) s.output.send("Goodbye".encodeToUByteArray())
s.close()
println("connecting- -----------------------------------------------------------------")
s = connectTcpDevice("127.0.1.1:17171".toNetworkAddress())
assertEquals("Hello, world!", s.input.receive()!!.decodeFromUByteArray())
println("--1")
assertEquals("Great", s.input.receive()!!.decodeFromUByteArray())
println("--2")
s.output.send("die".encodeToUByteArray())
// s.close()
} }
} }
} }