fixes, tcp/ip optimizations

This commit is contained in:
Sergey Chernov 2023-11-15 02:11:52 +03:00
parent bcf0140edb
commit 96edbb2040
6 changed files with 97 additions and 50 deletions

6
.idea/GitLink.xml generated Normal file
View File

@ -0,0 +1,6 @@
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="uk.co.ben_gibson.git.link.SettingsState">
<option name="host" value="e0f86390-1091-4871-8aeb-f534fbc99cf0" />
</component>
</project>

1
.idea/gradle.xml generated
View File

@ -1,5 +1,6 @@
<?xml version="1.0" encoding="UTF-8"?> <?xml version="1.0" encoding="UTF-8"?>
<project version="4"> <project version="4">
<component name="GradleMigrationSettings" migrationVersion="1" />
<component name="GradleSettings"> <component name="GradleSettings">
<option name="linkedExternalProjectsSettings"> <option name="linkedExternalProjectsSettings">
<GradleProjectSettings> <GradleProjectSettings>

View File

@ -0,0 +1,11 @@
package net.sergeych.kiloparsec.adapter
import kotlinx.coroutines.channels.Channel
@Suppress("unused")
class InetProxyDevice(
inputChannel: Channel<UByteArray?>,
outputChannel: Channel<UByteArray>,
val remoteAddress: NetworkAddress,
onclose: ()->Unit = {}
) : ProxyDevice(inputChannel, outputChannel, onclose)

View File

@ -5,7 +5,7 @@ import kotlinx.coroutines.channels.ReceiveChannel
import kotlinx.coroutines.channels.SendChannel import kotlinx.coroutines.channels.SendChannel
import net.sergeych.kiloparsec.Transport import net.sergeych.kiloparsec.Transport
class ProxyDevice( open class ProxyDevice(
inputChannel: Channel<UByteArray?>, inputChannel: Channel<UByteArray?>,
outputChannel: Channel<UByteArray>, outputChannel: Channel<UByteArray>,
private val onClose: ()->Unit = {}): Transport.Device { private val onClose: ()->Unit = {}): Transport.Device {

View File

@ -3,13 +3,14 @@ package net.sergeych.kiloparsec.adapter
import kotlinx.coroutines.* import kotlinx.coroutines.*
import kotlinx.coroutines.channels.Channel import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.channels.ClosedReceiveChannelException import kotlinx.coroutines.channels.ClosedReceiveChannelException
import kotlinx.coroutines.channels.ClosedSendChannelException import net.sergeych.bintools.toDump
import net.sergeych.crypto.encodeVarUnsigned import net.sergeych.crypto.encodeVarUnsigned
import net.sergeych.crypto.readVarUnsigned import net.sergeych.crypto.readVarUnsigned
import net.sergeych.kiloparsec.Transport import net.sergeych.kiloparsec.Transport
import net.sergeych.mp_logger.LogTag import net.sergeych.mp_logger.LogTag
import net.sergeych.mp_logger.warning import net.sergeych.mp_logger.warning
import net.sergeych.mp_tools.globalLaunch import net.sergeych.mp_tools.globalLaunch
import java.net.InetSocketAddress
import java.net.StandardSocketOptions.TCP_NODELAY import java.net.StandardSocketOptions.TCP_NODELAY
import java.nio.ByteBuffer import java.nio.ByteBuffer
import java.nio.channels.AsynchronousSocketChannel import java.nio.channels.AsynchronousSocketChannel
@ -18,52 +19,73 @@ import kotlin.coroutines.suspendCoroutine
private val log = LogTag("ASTD") private val log = LogTag("ASTD")
/**
* Prepend block with its size, varint-encoded
*/
private fun encode(block: UByteArray): ByteArray
= (encodeVarUnsigned(block.size.toUInt()) + block).toByteArray()
/** /**
* Convert asynchronous socket to a [Transport.Device] using non-blocking nio, * Convert asynchronous socket to a [Transport.Device] using non-blocking nio,
* in a coroutine-effective manner. Note that it runs coroutines to read/write * in a coroutine-effective manner. Note that it runs coroutines to read/write
* to the socket in a global scope.These are closed when transport is closed * to the socket in a global scope.These are closed when transport is closed
* or the socket is closed, for example, by network failure. * or the socket is closed, for example, by network failure.
*/ */
suspend fun asyncSocketToDevice(socket: AsynchronousSocketChannel): Transport.Device { suspend fun asyncSocketToDevice(socket: AsynchronousSocketChannel): InetProxyDevice {
val deferredDevice = CompletableDeferred<Transport.Device>() val deferredDevice = CompletableDeferred<InetProxyDevice>()
globalLaunch { globalLaunch {
coroutineScope { coroutineScope {
fun stop() { fun stop() {
cancel() cancel()
} }
socket.setOption(TCP_NODELAY, true) socket.setOption(TCP_NODELAY, true)
// socket input is to be parsed for blocks, so we receive bytes
// and decode them to blocks
val input = Channel<UByte>(1024) val input = Channel<UByte>(1024)
val output = Channel<UByte>(1024) // copy from socket to input:
// copy from socket to input
launch { launch {
val inb = ByteBuffer.allocate(1024) val data = ByteArray(1024)
val inb = ByteBuffer.wrap(data)
while (isActive) { while (isActive) {
inb.position(0)
val size: Int = suspendCoroutine { continuation -> val size: Int = suspendCoroutine { continuation ->
socket.read(inb, continuation, IntCompletionHandler) socket.read(inb, continuation, IntCompletionHandler)
} }
if (size < 0) stop() if (size < 0) stop()
else for (i in 0..<size) input.send(inb[i].toUByte().also { print(it.toInt().toChar()) }) else {
println("recvd:\n${data.sliceArray(0..<size).toDump()}\n------------------")
for (i in 0..<size) input.send(data[i].toUByte())
}
} }
} }
// copy from output tp socket // output is blocks, so we sent blocks:
val outputBlocks = Channel<UByteArray>()
// copy from output to socket:
launch { launch {
val outBuff = ArrayList<Byte>(1024)
try { try {
while (isActive) { while (isActive) {
outBuff.clear() // wait for the first block to send
outBuff.add(output.receive().toByte()) var data = encode(outputBlocks.receive())
while (!output.isEmpty) // if there are more, take them all (NO_DELAY optimization)
outBuff.add(output.receive().toByte()) while (!outputBlocks.isEmpty)
suspendCoroutine { continuation -> data += encode(outputBlocks.receive())
socket.write(ByteBuffer.wrap(outBuff.toByteArray()), continuation, IntCompletionHandler) // now send the aggregate:
val outBuff = ByteBuffer.wrap(data)
val cnt = suspendCoroutine { continuation ->
socket.write(outBuff, continuation, IntCompletionHandler)
}
// be sure it was all sent
if( outBuff.position() != data.size || cnt != data.size) {
throw RuntimeException("PArtial write!")
} }
} }
} catch (_: ClosedReceiveChannelException) { } catch (_: ClosedReceiveChannelException) {
stop() stop()
} }
} }
// pump blocks from socket output to device input // transport device copes with blocks:
val inputBlocks = Channel<UByteArray?>() val inputBlocks = Channel<UByteArray?>()
// decode blocks from a byte channel read from the socket:
launch { launch {
try { try {
while (isActive) { while (isActive) {
@ -84,20 +106,10 @@ suspend fun asyncSocketToDevice(socket: AsynchronousSocketChannel): Transport.De
stop() stop()
} }
} }
val outputBlocks = Channel<UByteArray>() // SocketAddress.
launch { val addr = socket.remoteAddress as InetSocketAddress
try {
while (isActive) {
val block = outputBlocks.receive()
output.sendAll(encodeVarUnsigned(block.size.toUInt()))
output.sendAll(block)
}
} catch (_: ClosedSendChannelException) {
stop()
}
}
deferredDevice.complete( deferredDevice.complete(
ProxyDevice(inputBlocks, outputBlocks) { stop() } InetProxyDevice(inputBlocks, outputBlocks, JvmNetworkAddress(addr.address,addr.port)) { stop() }
) )
} }
globalLaunch { socket.close() } globalLaunch { socket.close() }

View File

@ -37,31 +37,48 @@ class NetworkTest {
val serverFlow = acceptTcpDevice(17171) val serverFlow = acceptTcpDevice(17171)
val j = launch { val j = launch {
serverFlow.collect { device -> serverFlow.collect { device ->
device.output.send("Hello, world!".encodeToUByteArray()) launch {
device.output.send("Great".encodeToUByteArray()) println("connected!")
while(true) { device.output.send("Hello, world!".encodeToUByteArray())
val x = device.input.receive()?.decodeFromUByteArray() ?: break device.output.send("Great".encodeToUByteArray())
if( x== "Goodbye" ) break while (true) {
if( x == "die") { val x = device.input.receive()?.decodeFromUByteArray() ?: break
cancel() if (x == "Goodbye") break
break if (x == "die") {
println("collector get poisoned pill")
cancel()
break
}
println("ignoring unexpected input: $x")
} }
println("ignoring unexpected input: $x")
} }
} }
} }
yield() yield()
var s = connectTcpDevice("127.0.1.1:17171".toNetworkAddress()) run {
assertEquals("Hello, world!", s.input.receive()!!.decodeFromUByteArray()) println("x0")
assertEquals("Great", s.input.receive()!!.decodeFromUByteArray()) val s = connectTcpDevice("127.0.1.1:17171".toNetworkAddress())
s.output.send("Goodbye".encodeToUByteArray()) println("x1")
s.close() assertEquals("Hello, world!", s.input.receive()!!.decodeFromUByteArray())
println("x2")
s = connectTcpDevice("127.0.1.1:17171".toNetworkAddress()) assertEquals("Great", s.input.receive()!!.decodeFromUByteArray())
assertEquals("Hello, world!", s.input.receive()!!.decodeFromUByteArray()) println("x3")
assertEquals("Great", s.input.receive()!!.decodeFromUByteArray()) s.output.send("Goodbye".encodeToUByteArray())
// s.output.send("die".encodeToUByteArray()) println("pre1")
s.close() s.close()
println("pre2")
}
val s1 = connectTcpDevice("127.0.1.1:17171".toNetworkAddress())
println("conn-0-1")
assertEquals("Hello, world!", s1.input.receive()!!.decodeFromUByteArray())
println("conn-0-2")
assertEquals("Great", s1.input.receive()!!.decodeFromUByteArray())
println("1")
s1.output.send("die".encodeToUByteArray())
println("2")
delay(200)
s1.close()
println("3 -- the -- end")
j.cancelAndJoin() j.cancelAndJoin()
} }
} }