release 0.2.6: ktor-based async sockets for all native targets

This commit is contained in:
Sergey Chernov 2024-08-04 02:05:35 +02:00
parent ffcdcf7350
commit 439e229294
9 changed files with 141 additions and 423 deletions

View File

@ -5,23 +5,27 @@ block device" transport to the same local interface. Out if the box it
provides the following transports:
| name | JVM | JS | native |
|----------------|-----|----|-------------------|
| TCP/IP server | ✓ | | β @0.2.5-SNAPSHOT |
| TCP/IP client | ✓ | | β @0.2.5-SNAPSHOT |
|----------------|-----|----|----------|
| TCP/IP server | ✓ | | β @0.2.6 |
| TCP/IP client | ✓ | | β @0.2.6 |
| Websock server | ✓ | | |
| Websock client | ✓ | ✓ | ✓ |
At the moment we're working on supporting TCP/IP on most native targets. This feature is planned to rach public beta in
August and production in early september 2024.
### Supported native targets
- iosArm64, iosX64
- macosArm64, macosArm64
- linxArm64, linuxX64
### Non-native targets
- JS (browser and nodeJS)
- JVM (android, macos, windows, linx, everywhere where JRE is installed)
## TCP/IP transport
It is the fastest. JVM implementation uses nio2 async sockets and optimizes TCP socket to play
well with blocks (smart NO_DELAY mode). It is multiplatform, nut lacks of async TCP/IP support
on natvic targetm this is where I need help having little time. I'd prefer to use something asyn like UV on native
targets.
I know no existing way to implement it in KotlinJS for the modern browsers.
It is the fastest based on async socket implementation of ktor client. It works everywhere but JS target as
there is currently no widely adopted sockets for browser javascript.
## Websock server
@ -34,7 +38,7 @@ applications to get easy access from anywhere.
# Usage
Th elibrary should be used as maven dependency, not as source.
The library should be used as maven dependency, not as source.
## Adding dependency
@ -55,7 +59,7 @@ It could be, depending on your project structure, something like:
```kotlin
val commonMain by getting {
dependencies {
api("net.sergeych:kiloparsec:0.2.4")
api("net.sergeych:kiloparsec:0.2.6")
}
}
```

View File

@ -5,7 +5,7 @@ plugins {
}
group = "net.sergeych"
version = "0.2.5-SNAPSHOT"
version = "0.2.6"
repositories {
mavenCentral()
@ -19,6 +19,7 @@ kotlin {
js {
browser {
}
nodejs()
}
macosArm64()
iosX64()
@ -114,10 +115,6 @@ kotlin {
val linuxX64Test by getting {
dependsOn(ktorSocketTest)
}
// for (pm: NamedDomainObjectProvider<KotlinSourceSet> in listOf(macosMain,linuxMain, iosMain, mingwMain))
// pm.get().dependsOn(ktorSocketMain)
}
publishing {

View File

@ -43,8 +43,6 @@ class KiloServer<S>(
}
fun close() {
println("PRREEEC")
job.cancel()
println("POOOSTC")
}
}

View File

@ -1,20 +0,0 @@
package net.sergeych.kiloparsec.adapter
import java.nio.channels.CompletionHandler
import kotlin.coroutines.Continuation
import kotlin.coroutines.resume
import kotlin.coroutines.resumeWithException
open class ContinuationHandler<T> : CompletionHandler<T, Continuation<T>> {
override fun completed(result: T, attachment: Continuation<T>) {
attachment.resume(result)
}
override fun failed(exc: Throwable, attachment: Continuation<T>) {
attachment.resumeWithException(exc)
}
}
object VoidCompletionHandler: ContinuationHandler<Void>()
object IntCompletionHandler: ContinuationHandler<Int>()

View File

@ -1,39 +0,0 @@
package net.sergeych.kiloparsec.adapter
//
//actual fun NetworkAddress(host: String, port: Int): NetworkAddress =
// JvmNetworkAddress(InetAddress.getByName(host), port)
//
//actual fun acceptTcpDevice(port: Int): Flow<InetTransportDevice> {
// return flow {
// val socket = withContext(Dispatchers.IO) {
// AsynchronousServerSocketChannel.open().also {
// it.bind(InetSocketAddress(port))
// }
// }
// while (true) {
// println("0 --- $port")
// val connectedSocket = suspendCancellableCoroutine { continuation ->
// continuation.invokeOnCancellation {
// socket.close()
// }
// socket.accept(continuation, ContinuationHandler())
// }
// println("1 ---")
// emit(asyncSocketToDevice(connectedSocket))
// }
// }
//}
//
//@Suppress("unused")
//suspend fun connectTcpDevice(host: String, port: Int) = connectTcpDevice(NetworkAddress(host,port))
//actual suspend fun connectTcpDevice(address: NetworkAddress): InetTransportDevice {
// address as JvmNetworkAddress
// val socket = withContext(Dispatchers.IO) {
// AsynchronousSocketChannel.open()
// }
// suspendCoroutine { cont ->
// socket.connect(address.socketAddress, cont, VoidCompletionHandler)
// }
// return asyncSocketToDevice(socket)
//}

View File

@ -1,118 +0,0 @@
//package net.sergeych.kiloparsec.adapter
//
//import kotlinx.coroutines.*
//import kotlinx.coroutines.channels.BufferOverflow
//import kotlinx.coroutines.channels.Channel
//import net.sergeych.mp_logger.LogTag
//import net.sergeych.mp_logger.exception
//import net.sergeych.mp_logger.info
//import net.sergeych.mp_logger.warning
//import java.net.*
//import java.util.concurrent.atomic.AtomicInteger
//
//private val counter = AtomicInteger(0)
//
//class JvmNetworkAddress(val inetAddress: InetAddress, override val port: Int) : NetworkAddress {
// override val host: String by lazy { inetAddress.canonicalHostName }
// override fun equals(other: Any?): Boolean {
// if (this === other) return true
// if (other !is JvmNetworkAddress) return false
//
// if (inetAddress != other.inetAddress) return false
// if (port != other.port) return false
//
// return true
// }
//
// val socketAddress: SocketAddress by lazy { InetSocketAddress(inetAddress,port) }
//
// override fun hashCode(): Int {
// var result = inetAddress.hashCode()
// result = 31 * result + port
// return result
// }
//
// override fun toString(): String = "$host:$port"
//}
//
//class UdpDatagram(override val message: UByteArray, val inetAddress: InetAddress, val port: Int) : Datagram {
//
// override val address: NetworkAddress by lazy {
// JvmNetworkAddress(inetAddress, port)
// }
//
//}
//
//
//@OptIn(DelicateCoroutinesApi::class)
//class UdpServer(val port: Int) :
// DatagramConnector, LogTag("UDPS:${counter.incrementAndGet()}") {
// private var isClosed = false
//
//
// private val deferredSocket = CompletableDeferred<DatagramSocket>()
// private var job: Job? = null
//
// private suspend fun start() = try {
// coroutineScope {
// val socket = DatagramSocket(port)
// val buffer = ByteArray(16384)
// val packet = DatagramPacket(buffer, buffer.size)
// deferredSocket.complete(socket)
// while (isActive && !isClosed) {
// try {
// socket.receive(packet)
// val data = packet.data.sliceArray(0..<packet.length)
// val datagram = UdpDatagram(data.toUByteArray(), packet.address, packet.port)
// if (!channel.trySend(datagram).isSuccess) {
// warning { "packet lost!" }
// // and we cause overflow that overwrites the oldest
// channel.send(datagram)
// }
// } catch (e: Exception) {
// if (!isClosed)
// e.printStackTrace()
// throw e
// }
// }
// info { "closing socket and reception loop" }
//
// }
// } catch (_: CancellationException) {
// info { "server is closed" }
// } catch (t: Throwable) {
// exception { "unexpected end of server" to t }
// }
//
// init {
// job = GlobalScope.launch(Dispatchers.IO) {
// start()
// }
// }
//
// override fun close() {
// if (!isClosed) {
// if (deferredSocket.isCompleted) {
// runCatching {
// deferredSocket.getCompleted().close()
// }
// }
// isClosed = true
// job?.cancel(); job = null
// }
// }
//
// private val channel = Channel<Datagram>(2048, BufferOverflow.DROP_OLDEST)
// override val incoming = channel
//
// override suspend fun send(message: UByteArray, networkAddress: NetworkAddress) {
// networkAddress as JvmNetworkAddress
// withContext(Dispatchers.IO) {
// val packet = DatagramPacket(
// message.toByteArray(), message.size,
// networkAddress.inetAddress, networkAddress.port
// )
// deferredSocket.await().send(packet)
// }
// }
//}

View File

@ -1,155 +0,0 @@
//package net.sergeych.kiloparsec.adapter
//
//import kotlinx.coroutines.*
//import kotlinx.coroutines.channels.Channel
//import kotlinx.coroutines.channels.ClosedReceiveChannelException
//import kotlinx.coroutines.flow.MutableStateFlow
//import net.sergeych.crypto2.Contrail
//import net.sergeych.crypto2.encodeVarUnsigned
//import net.sergeych.crypto2.readVarUnsigned
//import net.sergeych.kiloparsec.RemoteInterface
//import net.sergeych.kiloparsec.Transport
//import net.sergeych.mp_logger.LogTag
//import net.sergeych.mp_logger.warning
//import net.sergeych.mp_tools.globalLaunch
//import net.sergeych.tools.waitFor
//import java.net.InetSocketAddress
//import java.net.StandardSocketOptions.TCP_NODELAY
//import java.nio.ByteBuffer
//import java.nio.channels.AsynchronousSocketChannel
//import kotlin.coroutines.cancellation.CancellationException
//import kotlin.coroutines.suspendCoroutine
//
//private val log = LogTag("ASTD")
//
///**
// * Prepend block with its size, varint-encoded
// */
//private fun encode(block: UByteArray): ByteArray {
// val c = Contrail.create(block)
// return (encodeVarUnsigned(c.size.toUInt()) + c).toByteArray()
//}
//
///**
// * Convert asynchronous socket to a [Transport.Device] using non-blocking nio,
// * in a coroutine-effective manner. Note that it runs coroutines to read/write
// * to the socket in a global scope.These are closed when transport is closed
// * or the socket is closed, for example, by network failure.
// */
//suspend fun asyncSocketToDevice(socket: AsynchronousSocketChannel): InetTransportDevice {
// val deferredDevice = CompletableDeferred<InetTransportDevice>()
// globalLaunch {
// coroutineScope {
// val sendQueueEmpty = MutableStateFlow(true)
// val receiving = MutableStateFlow(false)
// // We're in block mode, every block we send worth immediate sending, we do not
// // send partial blocks, so:
// socket.setOption(TCP_NODELAY, true)
//
// // socket input is to be parsed for blocks, so we receive bytes
// // and decode them to blocks
// val input = Channel<UByte>(1024)
// val inputBlocks = Channel<UByteArray>()
// // output is blocks, so we sent transformed, framed blocks:
// val outputBlocks = Channel<UByteArray>()
//
// fun stop() {
// kotlin.runCatching { inputBlocks.close(RemoteInterface.ClosedException()) }
// kotlin.runCatching { outputBlocks.close() }
// socket.close()
// cancel()
// }
//
//
// // copy incoming data from the socket to input channel:
// launch {
// val data = ByteArray(1024)
// val inb = ByteBuffer.wrap(data)
// kotlin.runCatching {
// while (isActive) {
// inb.position(0)
// val size: Int = suspendCoroutine { continuation ->
// socket.read(inb, continuation, IntCompletionHandler)
// }
// if (size < 0) stop()
// else {
//// println("recvd:\n${data.sliceArray(0..<size).toDump()}\n------------------")
// for (i in 0..<size) input.send(data[i].toUByte())
// }
// }
// }
// }
//
// // copy from output to socket:
// launch {
// try {
// while (isActive) {
// // wait for the first block to send
// sendQueueEmpty.value = outputBlocks.isEmpty
// var data = encode(outputBlocks.receive())
//
// // now we're sending, so queue state is sending:
// sendQueueEmpty.value = false
//
// // if there are more, take them all (NO_DELAY optimization)
// while (!outputBlocks.isEmpty)
// data += encode(outputBlocks.receive())
//
// // now send it all together:
// val outBuff = ByteBuffer.wrap(data)
// val cnt = suspendCoroutine { continuation ->
// socket.write(outBuff, continuation, IntCompletionHandler)
// }
// // be sure it was all sent
// if (outBuff.position() != data.size || cnt != data.size) {
// throw RuntimeException("unexpected partial write")
// }
// }
// // in the case of just breaking out of the loop:
// sendQueueEmpty.value = true
// } catch (_: ClosedReceiveChannelException) {
// stop()
// }
// }
// // transport device copes with blocks:
// // decode blocks from a byte channel read from the socket:
// launch {
// try {
// while (isActive) {
// receiving.value = !input.isEmpty
// val size = readVarUnsigned(input)
// receiving.value = true
// if (size == 0u) log.warning { "zero size block is ignored!" }
// else {
// val block = UByteArray(size.toInt())
// for (i in 0..<size.toInt()) {
// block[i] = input.receive()
// }
// Contrail.unpack(block)?.let { inputBlocks.send(it) }
// ?: log.warning { "skipping bad block ${block.size} bytes" }
// }
// }
// } catch (_: CancellationException) {
// } catch (_: ClosedReceiveChannelException) {
// stop()
// }
// receiving.value = false
// }
//
// val addr = socket.remoteAddress as InetSocketAddress
// deferredDevice.complete(
// InetTransportDevice(inputBlocks, outputBlocks, JvmNetworkAddress(addr.address, addr.port), {
// yield()
// // wait until all received data are parsed, but not too long
// withTimeoutOrNull(500) {
// receiving.waitFor { !it }
// }
// // then stop it
// stop()
// })
// )
// }
// globalLaunch { socket.close() }
// }
// return deferredDevice.await()
//}

View File

@ -2,22 +2,30 @@ package net.sergeych.kiloparsec.adapter
import io.ktor.network.selector.*
import io.ktor.network.sockets.*
import io.ktor.utils.io.*
import kotlinx.coroutines.*
import kotlinx.coroutines.CancellationException
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.channels.ClosedReceiveChannelException
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock
import kotlinx.datetime.Clock
import net.sergeych.kiloparsec.AsyncVarint
import net.sergeych.kiloparsec.LocalInterface
import net.sergeych.mp_logger.*
import net.sergeych.mp_tools.globalLaunch
import net.sergeych.tools.AtomicCounter
import net.sergeych.tools.AtomicValue
import kotlin.time.Duration.Companion.seconds
private val logCounter = AtomicCounter(0)
class ProtocolException(text: String, cause: Throwable? = null) : RuntimeException(text, cause)
const val MAX_TCP_BLOCK_SIZE = 16776216
val PING_INACTIVITY_TIME = 30.seconds
fun acceptTcpDevice(port: Int): Flow<InetTransportDevice> {
val selectorManager = SelectorManager(Dispatchers.IO)
@ -53,14 +61,24 @@ private fun inetTransportDevice(
val outputBlocks = Channel<UByteArray>(4096)
val log = LogTag("TCPT${logCounter.incrementAndGet()}:$suffix:$networkAddress")
val stopCalled = AtomicValue(false)
val job = AtomicValue<Job?>(null)
val sockOutput = sock.openWriteChannel()
val sockInput = runCatching { sock.openReadChannel() }.getOrElse {
log.warning { "failed to open read channel $it" }
throw IllegalStateException("failed to open read channel")
}
fun stop() {
stopCalled.mutate {
if (!it) {
job.mutate {
if ( it != null ) {
log.debug { "stopping" }
runCatching { inputBlocks.close() }
runCatching { outputBlocks.close() }
// The problem: on mac platofrms closing the socket does not close its input
// and output channels!
runCatching { sockInput.cancel() }
runCatching { sockOutput.close() }
if (!sock.isClosed)
runCatching {
log.debug { "closing socket by stop" }
@ -68,27 +86,33 @@ private fun inetTransportDevice(
}
else
log.debug { "socket is already closed when stop is called" }
it.cancel()
log.debug { "implementation job cancel called" }
} else
log.debug { "already stopped" }
true
null
}
}
sock.launch {
var lastActiveAt = Clock.System.now()
job.value = globalLaunch {
launch {
log.debug { "opening read channel" }
val sockInput = runCatching { sock.openReadChannel() }.getOrElse {
log.warning { "failed to open read channel $it" }
stop()
throw IllegalStateException("failed to open read channel")
}
while (isActive && sock.isActive) {
try {
val size = AsyncVarint.decodeUnsigned(sockInput).toInt()
if (size > MAX_TCP_BLOCK_SIZE) // 16M is a max command block
throw ProtocolException("Illegal block size: $size should be < $MAX_TCP_BLOCK_SIZE")
val data = ByteArray(size)
if (size == 0) {
log.debug { "ping received" }
lastActiveAt = Clock.System.now()
} else {
sockInput.readFully(data, 0, size)
inputBlocks.send(data.toUByteArray())
}
} catch (e: ClosedReceiveChannelException) {
log.error { "closed receive channel " }
stop()
@ -103,14 +127,48 @@ private fun inetTransportDevice(
}
}
}
sock.launch {
val sockOutput = sock.openWriteChannel()
launch {
val outAccess = Mutex()
var lastSentAt = Clock.System.now()
launch {
while (isActive && sock.isActive) {
delay(500)
val activityTime = if(lastSentAt > lastActiveAt) lastSentAt else lastActiveAt
if (Clock.System.now() - activityTime > PING_INACTIVITY_TIME) {
log.debug { "pinging for inactivity" }
val repeat = outAccess.withLock {
try {
sockOutput.writeByte(0)
sockOutput.flush()
lastSentAt = Clock.System.now()
true
} catch (e: ClosedReceiveChannelException) {
e.printStackTrace()
stop()
false
} catch (_: CancellationException) {
false
} catch (e: Throwable) {
e.printStackTrace()
stop()
false
}
}
if (!repeat) break
}
}
}
while (isActive && sock.isActive) {
try {
val block = outputBlocks.receive()
outAccess.withLock {
AsyncVarint.encodeUnsigned(block.size.toULong(), sockOutput)
sockOutput.writeFully(block.toByteArray(), 0, block.size)
sockOutput.flush()
lastSentAt = Clock.System.now()
}
} catch (_: CancellationException) {
log.debug { "cancellation exception on output" }
stop()
@ -130,7 +188,7 @@ private fun inetTransportDevice(
}
}
}
}
val device = InetTransportDevice(inputBlocks, outputBlocks, networkAddress, {
stop()
})

View File

@ -4,7 +4,6 @@ import net.sergeych.crypto2.initCrypto
import net.sergeych.kiloparsec.*
import net.sergeych.kiloparsec.adapter.acceptTcpDevice
import net.sergeych.kiloparsec.adapter.connectTcpDevice
import net.sergeych.mp_logger.Log
import kotlin.random.Random
import kotlin.test.Test
import kotlin.test.assertEquals
@ -16,7 +15,7 @@ class TcpTest {
@Test
fun tcpTest() = runTest {
initCrypto()
Log.connectConsole(Log.Level.DEBUG)
// Log.connectConsole(Log.Level.DEBUG)
data class Session(
var data: String
)
@ -58,20 +57,14 @@ class TcpTest {
assertEquals("foobar", client.call(cmdLoad))
val res = kotlin.runCatching { client.call(cmdException) }
println(res.exceptionOrNull())
assertIs<TestException>(res.exceptionOrNull())
assertEquals("foobar", client.call(cmdLoad))
println("----------------------------------- pre drops")
assertThrows<RemoteInterface.ClosedException> { client.call(cmdDrop) }
println("----------------------------------- DROPPED")
// reconnect?
assertEquals("start", client.call(cmdLoad))
println("------------------------------=---- RECONNECTED")
server.close()
println("****************************************************************")
}
}