basic tcp connect async inmplementation for JVM/NIO

This commit is contained in:
Sergey Chernov 2023-11-14 01:47:30 +03:00
parent 8fc24567f0
commit fe29bec1b0
13 changed files with 249 additions and 48 deletions

View File

@ -5,6 +5,7 @@ package net.sergeych.crypto
import com.ionspin.kotlin.crypto.secretbox.SecretBox
import com.ionspin.kotlin.crypto.secretbox.crypto_secretbox_NONCEBYTES
import com.ionspin.kotlin.crypto.util.LibsodiumRandom
import kotlinx.coroutines.channels.ReceiveChannel
import kotlinx.serialization.Serializable
import net.sergeych.bintools.toDataSource
import net.sergeych.bipack.BipackDecoder
@ -26,6 +27,30 @@ data class WithFill(
constructor(data: UByteArray, fillSize: Int) : this(data, randomBytes(fillSize))
}
suspend fun readVarUnsigned(input: ReceiveChannel<UByte>): UInt {
var result = 0u
var cnt = 0
while(true) {
val b = input.receive().toUInt()
result = (result shr 7) or (b and 0x7fu)
if( (b and 0x80u) != 0u ) break
if( ++cnt > 4 ) throw IllegalArgumentException("overflow while decoding varuint")
}
return result
}
fun encodeVarUnsigned(value: UInt): UByteArray {
val result = mutableListOf<UByte>()
var rest = value
do {
val mask = if( rest <= 0x7fu ) 0x80u else 0u
result.add( (mask or (rest and 0x7fu)).toUByte() )
rest = rest shr 7
} while(rest != 0u)
return result.toUByteArray()
}
fun randomBytes(n: Int): UByteArray = if (n > 0) LibsodiumRandom.buf(n) else ubyteArrayOf()
fun randomBytes(n: UInt): UByteArray = if (n > 0u) LibsodiumRandom.buf(n.toInt()) else ubyteArrayOf()

View File

@ -1,6 +1,8 @@
package net.sergeych.kiloparsec.adapter
import kotlinx.coroutines.channels.ReceiveChannel
import kotlinx.coroutines.flow.Flow
import net.sergeych.kiloparsec.Transport
/**
* Multiplatform implementation of an internet address.
@ -24,12 +26,6 @@ interface Datagram {
* Address from where the message was sent
*/
val address: NetworkAddress
/**
* Send a datagram in response, e.g., to the [address].
* This method is optimized per single per-datagram use. If you need to send many datagram, use [DatagramConnector].
*/
suspend fun respondWith(message: UByteArray)
}
@OptIn(ExperimentalStdlibApi::class)
@ -39,7 +35,7 @@ interface DatagramConnector: AutoCloseable {
suspend fun send(message: UByteArray, networkAddress: NetworkAddress)
@Suppress("unused")
suspend fun send(message: UByteArray, datagramAddress: String) {
send(message, networkAddressOf(datagramAddress))
send(message, datagramAddress.toNetworkAddress())
}
suspend fun send(message: UByteArray,host: String,port: Int) =
@ -47,5 +43,14 @@ interface DatagramConnector: AutoCloseable {
override fun close()
}
expect fun networkAddressOf(address: String): NetworkAddress
expect fun NetworkAddress(host: String,port: Int): NetworkAddress
fun CharSequence.toNetworkAddress() : NetworkAddress {
val (host, port) = this.split(":").map { it.trim()}
return NetworkAddress(host, port.toInt())
}
expect fun acceptTcpDevice(pord: Int): Flow<Transport.Device>
expect suspend fun connectTcpDevice(address: NetworkAddress): Transport.Device

View File

@ -0,0 +1,18 @@
package net.sergeych.kiloparsec.adapter
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.channels.ReceiveChannel
import kotlinx.coroutines.channels.SendChannel
import net.sergeych.kiloparsec.Transport
class ProxyDevice(
inputChannel: Channel<UByteArray?>,
outputChannel: Channel<UByteArray>,
private val onClose: ()->Unit = {}): Transport.Device {
override val input: ReceiveChannel<UByteArray?> = inputChannel
override val output: SendChannel<UByteArray> = outputChannel
override suspend fun close() {
onClose()
}
}

View File

@ -1,5 +1,2 @@
package net.sergeych.kiloparsec.adapter
actual fun networkAddressOf(address: String): NetworkAddress {
TODO("Not yet implemented")
}

View File

@ -1,5 +1,16 @@
package net.sergeych.kiloparsec.adapter
import kotlinx.coroutines.flow.Flow
import net.sergeych.kiloparsec.Transport
actual fun NetworkAddress(host: String, port: Int): NetworkAddress {
TODO("Not yet implemented")
}
actual fun acceptTcpDevice(pord: Int): Flow<Transport.Device> {
TODO("Not yet implemented")
}
actual suspend fun connectTcpDevice(address: NetworkAddress): Transport.Device {
TODO("Not yet implemented")
}

View File

@ -0,0 +1,22 @@
package net.sergeych.kiloparsec.adapter
import java.nio.channels.CompletionHandler
import kotlin.coroutines.Continuation
import kotlin.coroutines.resume
import kotlin.coroutines.resumeWithException
open class ContinuationHandler<T> : CompletionHandler<T, Continuation<T>> {
override fun completed(result: T, attachment: Continuation<T>) {
println("completed $result")
attachment.resume(result)
}
override fun failed(exc: Throwable, attachment: Continuation<T>) {
println("failed $exc")
attachment.resumeWithException(exc)
}
}
object VoidCompletionHandler: ContinuationHandler<Void>()
object IntCompletionHandler: ContinuationHandler<Int>()

View File

@ -1,8 +0,0 @@
package net.sergeych.kiloparsec.adapter
import java.net.InetAddress
actual fun networkAddressOf(address: String): NetworkAddress {
val (host,port) = address.split(":")
return JvmNetworkAddress(InetAddress.getByName(host), port.toInt())
}

View File

@ -1,6 +1,37 @@
package net.sergeych.kiloparsec.adapter
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.channels.SendChannel
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.withContext
import net.sergeych.kiloparsec.Transport
import java.net.InetAddress
import java.nio.channels.AsynchronousSocketChannel
import kotlin.coroutines.suspendCoroutine
actual fun NetworkAddress(host: String, port: Int): NetworkAddress =
JvmNetworkAddress(InetAddress.getByName(host), port)
actual fun acceptTcpDevice(pord: Int): Flow<Transport.Device> {
return flow {
TODO("Not yet implemented")
}
}
actual suspend fun connectTcpDevice(address: NetworkAddress): Transport.Device {
address as JvmNetworkAddress
val socket = withContext(Dispatchers.IO) {
AsynchronousSocketChannel.open()
}
suspendCoroutine { cont ->
socket.connect(address.socketAddress, cont, VoidCompletionHandler)
}
println("connected")
return asyncSocketToDevice(socket)
}
suspend fun SendChannel<UByte>.sendAll(bytes: Collection<UByte>) {
for( b in bytes) send(b)
}

View File

@ -3,15 +3,11 @@ package net.sergeych.kiloparsec.adapter
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.BufferOverflow
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock
import net.sergeych.mp_logger.LogTag
import net.sergeych.mp_logger.exception
import net.sergeych.mp_logger.info
import net.sergeych.mp_logger.warning
import java.net.DatagramPacket
import java.net.DatagramSocket
import java.net.InetAddress
import java.net.*
import java.util.concurrent.atomic.AtomicInteger
private val counter = AtomicInteger(0)
@ -28,6 +24,8 @@ class JvmNetworkAddress(val inetAddress: InetAddress, override val port: Int) :
return true
}
val socketAddress: SocketAddress by lazy { InetSocketAddress(inetAddress,port) }
override fun hashCode(): Int {
var result = inetAddress.hashCode()
result = 31 * result + port
@ -42,28 +40,12 @@ class UdpDatagram(override val message: UByteArray, val inetAddress: InetAddress
JvmNetworkAddress(inetAddress, port)
}
private val access = Mutex()
private var socket: DatagramSocket? = null
override suspend fun respondWith(message: UByteArray) {
withContext(Dispatchers.IO) {
access.withLock {
if (socket == null) socket = DatagramSocket()
val packet = DatagramPacket(
message.toByteArray(),
message.size,
inetAddress,
port
)
socket!!.send(packet)
}
}
}
}
@OptIn(DelicateCoroutinesApi::class)
class UdpServer(val port: Int) :
DatagramReceiver, LogTag("UDPS:${counter.incrementAndGet()}") {
DatagramConnector, LogTag("UDPS:${counter.incrementAndGet()}") {
private var isClosed = false

View File

@ -0,0 +1,101 @@
package net.sergeych.kiloparsec.adapter
import kotlinx.coroutines.CompletableDeferred
import kotlinx.coroutines.cancel
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.channels.ClosedReceiveChannelException
import kotlinx.coroutines.channels.ClosedSendChannelException
import kotlinx.coroutines.isActive
import kotlinx.coroutines.launch
import net.sergeych.crypto.encodeVarUnsigned
import net.sergeych.crypto.readVarUnsigned
import net.sergeych.kiloparsec.Transport
import net.sergeych.mp_tools.globalLaunch
import java.nio.ByteBuffer
import java.nio.channels.AsynchronousSocketChannel
import kotlin.coroutines.cancellation.CancellationException
import kotlin.coroutines.suspendCoroutine
/**
* Convert asynchronous socket to a [Transport.Device] using non-blocking nio,
* in a coroutine-effective manner. Note that it runs coroutines to read/write
* to the socket in a global scope.These are closed when transport is closed
* or the socket is closed, for example, by network failure.
*/
suspend fun asyncSocketToDevice(socket: AsynchronousSocketChannel): Transport.Device {
val deferredDevice = CompletableDeferred<Transport.Device>()
globalLaunch {
fun stop() {
cancel()
runCatching { socket.close() }
}
val input = Channel<UByte>(1024)
val output = Channel<UByte>(1024)
// copy from socket to input
launch {
val inb = ByteBuffer.allocate(1024)
while (isActive) {
val size: Int = suspendCoroutine { continuation ->
socket.read(inb, continuation, IntCompletionHandler)
}
if (size < 0) stop()
else for (i in 0..<size) input.send(inb[i].toUByte().also { print(it.toInt().toChar())})
}
}
// copy from output tp socket
launch {
val outb = ByteBuffer.allocate(1024)
try {
while (isActive) {
var count = 0
outb.put(count++, output.receive().toByte())
while (!output.isEmpty && count < outb.capacity())
outb.put(count++, output.receive().toByte())
suspendCoroutine { continuation ->
socket.write(outb, continuation, IntCompletionHandler)
}
}
} catch (_: ClosedReceiveChannelException) {
stop()
}
}
// pump blocks from socket output to device input
val inputBlocks = Channel<UByteArray?>()
launch {
try {
while (isActive) {
val size = readVarUnsigned(input)
if (size == 0u) println("*** zero size block is ignored!")
else {
val block = UByteArray(size.toInt())
for (i in 0..<size.toInt()) {
block[i] = input.receive()
}
inputBlocks.send(block)
}
}
} catch (_: CancellationException) {
inputBlocks.send(null)
} catch (_: ClosedReceiveChannelException) {
inputBlocks.send(null)
stop()
}
}
val outputBlocks = Channel<UByteArray>()
launch {
try {
while (isActive) {
val block = outputBlocks.receive()
output.sendAll(encodeVarUnsigned(block.size.toUInt()))
output.sendAll(block)
}
} catch (_: ClosedSendChannelException) {
stop()
}
}
deferredDevice.complete(
ProxyDevice(inputBlocks, outputBlocks) { stop() }
)
}
return deferredDevice.await()
}

View File

@ -3,14 +3,16 @@ package net.sergeych.kiloparsec.adapters
import com.ionspin.kotlin.crypto.util.encodeToUByteArray
import kotlinx.coroutines.test.runTest
import net.sergeych.kiloparsec.adapter.UdpServer
import net.sergeych.kiloparsec.adapter.connectTcpDevice
import net.sergeych.kiloparsec.adapter.toNetworkAddress
import net.sergeych.mp_logger.Log
import org.junit.jupiter.api.Assertions.assertEquals
import kotlin.test.Test
class UServerTest {
class NetworkTest {
@Test
fun udpProvider() = runTest {
fun udpProviderTest() = runTest {
Log.connectConsole(Log.Level.DEBUG)
val s1 = UdpServer(17120)
val s2 = UdpServer(17121)
@ -18,9 +20,16 @@ class UServerTest {
val d1 = s2.incoming.receive()
assertEquals(d1.address.port, 17120)
assertEquals("Hello", d1.message.toByteArray().decodeToString())
d1.respondWith("world".encodeToUByteArray())
s1.send("world".encodeToUByteArray(),d1.address)
assertEquals("world", s1.incoming.receive().message.toByteArray().decodeToString())
// println("s1: ${s1.bindAddress()}")
}
@Test
fun tcpAsyncConnectionTest() = runTest {
Log.connectConsole(Log.Level.DEBUG)
val s = connectTcpDevice("sergeych.net:80".toNetworkAddress())
s.input.receive()
}
}

View File

@ -1,5 +1,2 @@
package net.sergeych.kiloparsec.adapter
actual fun networkAddressOf(address: String): NetworkAddress {
TODO("Not yet implemented")
}

View File

@ -1,5 +1,16 @@
package net.sergeych.kiloparsec.adapter
import kotlinx.coroutines.flow.Flow
import net.sergeych.kiloparsec.Transport
actual fun NetworkAddress(host: String, port: Int): NetworkAddress {
TODO("Not yet implemented")
}
actual fun acceptTcpDevice(pord: Int): Flow<Transport.Device> {
TODO("Not yet implemented")
}
actual suspend fun connectTcpDevice(address: NetworkAddress): Transport.Device {
TODO("Not yet implemented")
}