started migration to ktor sockets (e.g. tcp on all platforms but JS)

This commit is contained in:
Sergey Chernov 2024-08-02 02:47:26 +02:00
parent 326b92142d
commit 26d1f3522f
4 changed files with 175 additions and 108 deletions

View File

@ -6,7 +6,7 @@ plugins {
}
group = "net.sergeych"
version = "0.2.4"
version = "0.2.5-SNAPSHOT"
repositories {
mavenCentral()
@ -21,19 +21,6 @@ kotlin {
browser {
}
}
// val hostOs = System.getProperty("os.name")
// val isArm64 = System.getProperty("os.arch") == "aarch64"
// val isMingwX64 = hostOs.startsWith("Windows")
// @Suppress("UNUSED_VARIABLE")
// val nativeTarget = when {
// hostOs == "Mac OS X" && isArm64 -> macosArm64("native")
// hostOs == "Mac OS X" && !isArm64 -> macosX64("native")
// hostOs == "Linux" && isArm64 -> linuxArm64("native")
// hostOs == "Linux" && !isArm64 -> linuxX64("native")
// isMingwX64 -> mingwX64("native")
// else -> throw GradleException("Host OS is not supported in Kotlin/Native.")
// }
macosArm64()
iosX64()
iosArm64()
@ -55,18 +42,16 @@ kotlin {
dependencies {
implementation("org.jetbrains.kotlinx:kotlinx-coroutines-core:1.7.3")
implementation("org.jetbrains.kotlinx:kotlinx-serialization-json:1.7.1")
// api("com.ionspin.kotlin:bignum:0.3.9")
api("io.ktor:ktor-client-core:$ktor_version")
api("net.sergeych:crypto2:0.4.2")
}
}
// val ktorSocketMain by creating {
// dependsOn(commonMain)
// dependencies {
// implementation("io.ktor:ktor-network:$ktor_version")
// }
// }
val ktorSocketMain by creating {
dependsOn(commonMain)
dependencies {
implementation("io.ktor:ktor-network:$ktor_version")
}
}
val commonTest by getting {
dependencies {
implementation(kotlin("test"))
@ -82,7 +67,7 @@ kotlin {
implementation("io.ktor:ktor-server-netty:$ktor_version")
api("io.ktor:ktor-client-cio:$ktor_version")
}
// dependsOn(ktorSocketMain)
dependsOn(ktorSocketMain)
}
val jvmTest by getting
val jsMain by getting {
@ -92,8 +77,8 @@ kotlin {
}
val jsTest by getting
// for (pm in listOf(linuxMain, macosMain, iosMain, mingwMain))
// pm { dependsOn(ktorSocketMain) }
for (pm in listOf(linuxMain, macosMain, iosMain, mingwMain))
pm { dependsOn(ktorSocketMain) }
}

View File

@ -1,49 +1,39 @@
package net.sergeych.kiloparsec.adapter
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.suspendCancellableCoroutine
import kotlinx.coroutines.withContext
import java.net.InetAddress
import java.net.InetSocketAddress
import java.nio.channels.AsynchronousServerSocketChannel
import java.nio.channels.AsynchronousSocketChannel
import kotlin.coroutines.suspendCoroutine
actual fun NetworkAddress(host: String, port: Int): NetworkAddress =
JvmNetworkAddress(InetAddress.getByName(host), port)
actual fun acceptTcpDevice(port: Int): Flow<InetTransportDevice> {
return flow {
val socket = withContext(Dispatchers.IO) {
AsynchronousServerSocketChannel.open().also {
it.bind(InetSocketAddress(port))
}
}
while (true) {
println("0 --- $port")
val connectedSocket = suspendCancellableCoroutine { continuation ->
continuation.invokeOnCancellation {
socket.close()
}
socket.accept(continuation, ContinuationHandler())
}
println("1 ---")
emit(asyncSocketToDevice(connectedSocket))
}
}
}
@Suppress("unused")
suspend fun connectTcpDevice(host: String, port: Int) = connectTcpDevice(NetworkAddress(host,port))
actual suspend fun connectTcpDevice(address: NetworkAddress): InetTransportDevice {
address as JvmNetworkAddress
val socket = withContext(Dispatchers.IO) {
AsynchronousSocketChannel.open()
}
suspendCoroutine { cont ->
socket.connect(address.socketAddress, cont, VoidCompletionHandler)
}
return asyncSocketToDevice(socket)
}
//
//actual fun NetworkAddress(host: String, port: Int): NetworkAddress =
// JvmNetworkAddress(InetAddress.getByName(host), port)
//
//actual fun acceptTcpDevice(port: Int): Flow<InetTransportDevice> {
// return flow {
// val socket = withContext(Dispatchers.IO) {
// AsynchronousServerSocketChannel.open().also {
// it.bind(InetSocketAddress(port))
// }
// }
// while (true) {
// println("0 --- $port")
// val connectedSocket = suspendCancellableCoroutine { continuation ->
// continuation.invokeOnCancellation {
// socket.close()
// }
// socket.accept(continuation, ContinuationHandler())
// }
// println("1 ---")
// emit(asyncSocketToDevice(connectedSocket))
// }
// }
//}
//
//@Suppress("unused")
//suspend fun connectTcpDevice(host: String, port: Int) = connectTcpDevice(NetworkAddress(host,port))
//actual suspend fun connectTcpDevice(address: NetworkAddress): InetTransportDevice {
// address as JvmNetworkAddress
// val socket = withContext(Dispatchers.IO) {
// AsynchronousSocketChannel.open()
// }
// suspendCoroutine { cont ->
// socket.connect(address.socketAddress, cont, VoidCompletionHandler)
// }
// return asyncSocketToDevice(socket)
//}

View File

@ -3,6 +3,7 @@ package net.sergeych.kiloparsec
import assertThrows
import io.ktor.server.engine.*
import io.ktor.server.netty.*
import kotlinx.coroutines.delay
import kotlinx.coroutines.launch
import kotlinx.coroutines.test.runTest
import net.sergeych.crypto2.initCrypto
@ -52,20 +53,21 @@ class ClientTest {
throw LocalInterface.BreakConnectionException()
}
}
val server = KiloServer(cli, acceptTcpDevice(17101)) {
val server = KiloServer(cli, acceptTcpDevice(27101)) {
Session("unknown")
}
val client = KiloClient<Unit>() {
addErrors(cli)
connect { connectTcpDevice("localhost:17101") }
connect { connectTcpDevice("localhost:27101") }
}
delay(500)
println(client.call(cmdLoad))
assertEquals("start", client.call(cmdLoad))
client.call(cmdSave, "foobar")
assertEquals("foobar", client.call(cmdLoad))
//
val res = kotlin.runCatching { client.call(cmdException) }
println(res.exceptionOrNull())
assertIs<TestException>(res.exceptionOrNull())

View File

@ -3,41 +3,131 @@ package net.sergeych.kiloparsec.adapter
import io.ktor.network.selector.*
import io.ktor.network.sockets.*
import io.ktor.util.network.*
import io.ktor.utils.io.*
import io.ktor.utils.io.core.*
import kotlinx.coroutines.*
import kotlinx.coroutines.CancellationException
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.channels.ClosedReceiveChannelException
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.isActive
import kotlinx.coroutines.launch
import net.sergeych.kiloparsec.AsyncVarint
import net.sergeych.mp_logger.LogTag
import kotlin.io.use
import net.sergeych.kiloparsec.LocalInterface
import net.sergeych.mp_logger.*
import net.sergeych.tools.AtomicCounter
//class SocketNetworkAddress(override val host: String, override val port: Int) : NetworkAddress
//
//actual fun NetworkAddress(host: String, port: Int): NetworkAddress = SocketNetworkAddress(host, port)
//
//fun acceptTcpSocketDevice(port: Int): Flow<InetTransportDevice> {
// val selectorManager = SelectorManager(Dispatchers.IO)
// val serverSocket = aSocket(selectorManager).tcp().bind("127.0.0.1", port)
// val log = LogTag("TCPS${}")
// return flow {
// serverSocket.accept().use { sock ->
// val closed = CompletableDeferred<Boolean>()
// val scope = coroutineScope {
// val networkAddress = sock.remoteAddress.toJavaAddress().let { NetworkAddress(it.hostname, it.port) }
// val inputBlocks = Channel<UByteArray>(4096)
// sock.launch {
// val sockInput = sock.openReadChannel()
// while (isActive && sock.isActive) {
// try {
// val size = AsyncVarint.decodeUnsigned(sockInput)
// } catch (e: Exception) {
//
// }
// }
// }
// }
// }
// }
//}
class SocketNetworkAddress(override val host: String, override val port: Int) : NetworkAddress {
override fun toString(): String {
return "$host:$port"
}
}
actual fun NetworkAddress(host: String, port: Int): NetworkAddress = SocketNetworkAddress(host, port)
private val logCounter = AtomicCounter(0)
class ProtocolException(text: String, cause: Throwable? = null) : RuntimeException(text, cause)
const val MAX_TCP_BLOCK_SIZE = 16776216
actual fun acceptTcpDevice(port: Int): Flow<InetTransportDevice> {
val selectorManager = SelectorManager(Dispatchers.IO)
val serverSocket = aSocket(selectorManager).tcp().bind("127.0.0.1", port)
val log = LogTag("TCPS${logCounter.incrementAndGet()}")
return flow {
while(true) {
log.info { "Accepting incoming connections on $port" }
serverSocket.accept().let { sock ->
log.info { "Emitting transport device" }
emit(inetTransportDevice(sock, "srv"))
}
}
}
}
actual suspend fun connectTcpDevice(address: NetworkAddress): InetTransportDevice {
val selectorManager = SelectorManager(Dispatchers.IO)
val socket = aSocket(selectorManager).tcp().connect(address.host, address.port)
println("Connected to ${address.host}:${address.port}")
return inetTransportDevice(socket)
}
private fun inetTransportDevice(
sock: Socket,
suffix: String = "cli",
): InetTransportDevice {
val networkAddress = sock.remoteAddress.toJavaAddress().let { NetworkAddress(it.hostname, it.port) }
val inputBlocks = Channel<UByteArray>(4096)
val outputBlocks = Channel<UByteArray>(4096)
val log = LogTag("TCPT${logCounter.incrementAndGet()}:$suffix:$networkAddress")
fun stop() {
log.info { "stopping" }
runCatching { inputBlocks.close()}
runCatching { outputBlocks.close()}
if( !sock.isClosed ) runCatching { sock.close()}
}
sock.launch {
log.debug { "opening read channel" }
val sockInput = runCatching { sock.openReadChannel() }.getOrElse {
log.warning { "failed to open read channel $it" }
sock.close()
throw IllegalStateException("failed to open read channel")
}
while (isActive && sock.isActive) {
try {
val size = AsyncVarint.decodeUnsigned(sockInput).toInt()
if (size > MAX_TCP_BLOCK_SIZE) // 16M is a max command block
throw ProtocolException("Illegal block size: $size should be < $MAX_TCP_BLOCK_SIZE")
log.info { "read size: $size" }
val data = ByteArray(size)
log.info { "data ready" }
sockInput.readFully(data, 0, size)
inputBlocks.send(data.toUByteArray())
} catch (e: ClosedReceiveChannelException) {
stop()
break
} catch (_: CancellationException) {
} catch (e: Exception) {
log.exception { "unexpected exception in TCP socket read" to e }
stop()
break
}
}
}
sock.launch {
val sockOutput = sock.openWriteChannel()
while (isActive && sock.isActive) {
try {
val block = outputBlocks.receive()
AsyncVarint.encodeUnsigned(block.size.toULong(), sockOutput)
sockOutput.writeFully(block.toByteArray(), 0, block.size)
log.info { "Client sock output: ${block.size}" }
sockOutput.flush()
} catch (_: CancellationException) {
log.info { "Caught cancellation, closing transport" }
} catch (_: LocalInterface.BreakConnectionException) {
log.info { "requested connection break" }
stop()
break
} catch (_: ClosedReceiveChannelException) {
log.info { "receive block channel closed, closing the socket" }
stop()
break
} catch (e: Exception) {
log.exception { "unexpected exception. closing." to e }
stop()
break
}
}
}
val device = InetTransportDevice(inputBlocks, outputBlocks, networkAddress, {
log.info { "Close has been called" }
stop()
})
log.info { "Transport ready" }
return device
}