diff --git a/build.gradle.kts b/build.gradle.kts index 157162c..b9fc605 100644 --- a/build.gradle.kts +++ b/build.gradle.kts @@ -6,7 +6,7 @@ plugins { } group = "net.sergeych" -version = "0.2.4" +version = "0.2.5-SNAPSHOT" repositories { mavenCentral() @@ -21,19 +21,6 @@ kotlin { browser { } } -// val hostOs = System.getProperty("os.name") -// val isArm64 = System.getProperty("os.arch") == "aarch64" -// val isMingwX64 = hostOs.startsWith("Windows") -// @Suppress("UNUSED_VARIABLE") -// val nativeTarget = when { -// hostOs == "Mac OS X" && isArm64 -> macosArm64("native") -// hostOs == "Mac OS X" && !isArm64 -> macosX64("native") -// hostOs == "Linux" && isArm64 -> linuxArm64("native") -// hostOs == "Linux" && !isArm64 -> linuxX64("native") -// isMingwX64 -> mingwX64("native") -// else -> throw GradleException("Host OS is not supported in Kotlin/Native.") -// } - macosArm64() iosX64() iosArm64() @@ -55,18 +42,16 @@ kotlin { dependencies { implementation("org.jetbrains.kotlinx:kotlinx-coroutines-core:1.7.3") implementation("org.jetbrains.kotlinx:kotlinx-serialization-json:1.7.1") - -// api("com.ionspin.kotlin:bignum:0.3.9") api("io.ktor:ktor-client-core:$ktor_version") api("net.sergeych:crypto2:0.4.2") } } -// val ktorSocketMain by creating { -// dependsOn(commonMain) -// dependencies { -// implementation("io.ktor:ktor-network:$ktor_version") -// } -// } + val ktorSocketMain by creating { + dependsOn(commonMain) + dependencies { + implementation("io.ktor:ktor-network:$ktor_version") + } + } val commonTest by getting { dependencies { implementation(kotlin("test")) @@ -82,7 +67,7 @@ kotlin { implementation("io.ktor:ktor-server-netty:$ktor_version") api("io.ktor:ktor-client-cio:$ktor_version") } -// dependsOn(ktorSocketMain) + dependsOn(ktorSocketMain) } val jvmTest by getting val jsMain by getting { @@ -92,8 +77,8 @@ kotlin { } val jsTest by getting -// for (pm in listOf(linuxMain, macosMain, iosMain, mingwMain)) -// pm { dependsOn(ktorSocketMain) } + for (pm in listOf(linuxMain, macosMain, iosMain, mingwMain)) + pm { dependsOn(ktorSocketMain) } } diff --git a/src/jvmMain/kotlin/net/sergeych/kiloparsec/adapter/NetworkProvider.jvm.kt b/src/jvmMain/kotlin/net/sergeych/kiloparsec/adapter/NetworkProvider.jvm.kt index ccc682c..d8b3a46 100644 --- a/src/jvmMain/kotlin/net/sergeych/kiloparsec/adapter/NetworkProvider.jvm.kt +++ b/src/jvmMain/kotlin/net/sergeych/kiloparsec/adapter/NetworkProvider.jvm.kt @@ -1,49 +1,39 @@ package net.sergeych.kiloparsec.adapter -import kotlinx.coroutines.Dispatchers -import kotlinx.coroutines.flow.Flow -import kotlinx.coroutines.flow.flow -import kotlinx.coroutines.suspendCancellableCoroutine -import kotlinx.coroutines.withContext -import java.net.InetAddress -import java.net.InetSocketAddress -import java.nio.channels.AsynchronousServerSocketChannel -import java.nio.channels.AsynchronousSocketChannel -import kotlin.coroutines.suspendCoroutine - -actual fun NetworkAddress(host: String, port: Int): NetworkAddress = - JvmNetworkAddress(InetAddress.getByName(host), port) - -actual fun acceptTcpDevice(port: Int): Flow { - return flow { - val socket = withContext(Dispatchers.IO) { - AsynchronousServerSocketChannel.open().also { - it.bind(InetSocketAddress(port)) - } - } - while (true) { - println("0 --- $port") - val connectedSocket = suspendCancellableCoroutine { continuation -> - continuation.invokeOnCancellation { - socket.close() - } - socket.accept(continuation, ContinuationHandler()) - } - println("1 ---") - emit(asyncSocketToDevice(connectedSocket)) - } - } -} - -@Suppress("unused") -suspend fun connectTcpDevice(host: String, port: Int) = connectTcpDevice(NetworkAddress(host,port)) -actual suspend fun connectTcpDevice(address: NetworkAddress): InetTransportDevice { - address as JvmNetworkAddress - val socket = withContext(Dispatchers.IO) { - AsynchronousSocketChannel.open() - } - suspendCoroutine { cont -> - socket.connect(address.socketAddress, cont, VoidCompletionHandler) - } - return asyncSocketToDevice(socket) -} +// +//actual fun NetworkAddress(host: String, port: Int): NetworkAddress = +// JvmNetworkAddress(InetAddress.getByName(host), port) +// +//actual fun acceptTcpDevice(port: Int): Flow { +// return flow { +// val socket = withContext(Dispatchers.IO) { +// AsynchronousServerSocketChannel.open().also { +// it.bind(InetSocketAddress(port)) +// } +// } +// while (true) { +// println("0 --- $port") +// val connectedSocket = suspendCancellableCoroutine { continuation -> +// continuation.invokeOnCancellation { +// socket.close() +// } +// socket.accept(continuation, ContinuationHandler()) +// } +// println("1 ---") +// emit(asyncSocketToDevice(connectedSocket)) +// } +// } +//} +// +//@Suppress("unused") +//suspend fun connectTcpDevice(host: String, port: Int) = connectTcpDevice(NetworkAddress(host,port)) +//actual suspend fun connectTcpDevice(address: NetworkAddress): InetTransportDevice { +// address as JvmNetworkAddress +// val socket = withContext(Dispatchers.IO) { +// AsynchronousSocketChannel.open() +// } +// suspendCoroutine { cont -> +// socket.connect(address.socketAddress, cont, VoidCompletionHandler) +// } +// return asyncSocketToDevice(socket) +//} diff --git a/src/jvmTest/kotlin/net/sergeych/kiloparsec/ClientTest.kt b/src/jvmTest/kotlin/net/sergeych/kiloparsec/ClientTest.kt index e66ea1a..1bbcdbd 100644 --- a/src/jvmTest/kotlin/net/sergeych/kiloparsec/ClientTest.kt +++ b/src/jvmTest/kotlin/net/sergeych/kiloparsec/ClientTest.kt @@ -3,6 +3,7 @@ package net.sergeych.kiloparsec import assertThrows import io.ktor.server.engine.* import io.ktor.server.netty.* +import kotlinx.coroutines.delay import kotlinx.coroutines.launch import kotlinx.coroutines.test.runTest import net.sergeych.crypto2.initCrypto @@ -52,20 +53,21 @@ class ClientTest { throw LocalInterface.BreakConnectionException() } } - val server = KiloServer(cli, acceptTcpDevice(17101)) { + val server = KiloServer(cli, acceptTcpDevice(27101)) { Session("unknown") } val client = KiloClient() { addErrors(cli) - connect { connectTcpDevice("localhost:17101") } + connect { connectTcpDevice("localhost:27101") } } + delay(500) println(client.call(cmdLoad)) assertEquals("start", client.call(cmdLoad)) client.call(cmdSave, "foobar") assertEquals("foobar", client.call(cmdLoad)) - +// val res = kotlin.runCatching { client.call(cmdException) } println(res.exceptionOrNull()) assertIs(res.exceptionOrNull()) diff --git a/src/ktorSocketMain/kotlin/adapter/socketClient.kt b/src/ktorSocketMain/kotlin/adapter/socketClient.kt index 48214fc..bcbb2d2 100644 --- a/src/ktorSocketMain/kotlin/adapter/socketClient.kt +++ b/src/ktorSocketMain/kotlin/adapter/socketClient.kt @@ -3,41 +3,131 @@ package net.sergeych.kiloparsec.adapter import io.ktor.network.selector.* import io.ktor.network.sockets.* import io.ktor.util.network.* -import io.ktor.utils.io.* -import io.ktor.utils.io.core.* -import kotlinx.coroutines.* +import kotlinx.coroutines.CancellationException +import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.channels.Channel +import kotlinx.coroutines.channels.ClosedReceiveChannelException import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.flow +import kotlinx.coroutines.isActive +import kotlinx.coroutines.launch import net.sergeych.kiloparsec.AsyncVarint -import net.sergeych.mp_logger.LogTag -import kotlin.io.use +import net.sergeych.kiloparsec.LocalInterface +import net.sergeych.mp_logger.* +import net.sergeych.tools.AtomicCounter -//class SocketNetworkAddress(override val host: String, override val port: Int) : NetworkAddress -// -//actual fun NetworkAddress(host: String, port: Int): NetworkAddress = SocketNetworkAddress(host, port) -// -//fun acceptTcpSocketDevice(port: Int): Flow { -// val selectorManager = SelectorManager(Dispatchers.IO) -// val serverSocket = aSocket(selectorManager).tcp().bind("127.0.0.1", port) -// val log = LogTag("TCPS${}") -// return flow { -// serverSocket.accept().use { sock -> -// val closed = CompletableDeferred() -// val scope = coroutineScope { -// val networkAddress = sock.remoteAddress.toJavaAddress().let { NetworkAddress(it.hostname, it.port) } -// val inputBlocks = Channel(4096) -// sock.launch { -// val sockInput = sock.openReadChannel() -// while (isActive && sock.isActive) { -// try { -// val size = AsyncVarint.decodeUnsigned(sockInput) -// } catch (e: Exception) { -// -// } -// } -// } -// } -// } -// } -//} +class SocketNetworkAddress(override val host: String, override val port: Int) : NetworkAddress { + override fun toString(): String { + return "$host:$port" + } +} + +actual fun NetworkAddress(host: String, port: Int): NetworkAddress = SocketNetworkAddress(host, port) + +private val logCounter = AtomicCounter(0) + +class ProtocolException(text: String, cause: Throwable? = null) : RuntimeException(text, cause) + +const val MAX_TCP_BLOCK_SIZE = 16776216 + +actual fun acceptTcpDevice(port: Int): Flow { + val selectorManager = SelectorManager(Dispatchers.IO) + val serverSocket = aSocket(selectorManager).tcp().bind("127.0.0.1", port) + val log = LogTag("TCPS${logCounter.incrementAndGet()}") + return flow { + while(true) { + log.info { "Accepting incoming connections on $port" } + serverSocket.accept().let { sock -> + log.info { "Emitting transport device" } + emit(inetTransportDevice(sock, "srv")) + } + } + } +} + +actual suspend fun connectTcpDevice(address: NetworkAddress): InetTransportDevice { + val selectorManager = SelectorManager(Dispatchers.IO) + val socket = aSocket(selectorManager).tcp().connect(address.host, address.port) + println("Connected to ${address.host}:${address.port}") + return inetTransportDevice(socket) +} + + +private fun inetTransportDevice( + sock: Socket, + suffix: String = "cli", +): InetTransportDevice { + val networkAddress = sock.remoteAddress.toJavaAddress().let { NetworkAddress(it.hostname, it.port) } + val inputBlocks = Channel(4096) + val outputBlocks = Channel(4096) + + val log = LogTag("TCPT${logCounter.incrementAndGet()}:$suffix:$networkAddress") + + fun stop() { + log.info { "stopping" } + runCatching { inputBlocks.close()} + runCatching { outputBlocks.close()} + if( !sock.isClosed ) runCatching { sock.close()} + } + + sock.launch { + log.debug { "opening read channel" } + val sockInput = runCatching { sock.openReadChannel() }.getOrElse { + log.warning { "failed to open read channel $it" } + sock.close() + throw IllegalStateException("failed to open read channel") + } + while (isActive && sock.isActive) { + try { + val size = AsyncVarint.decodeUnsigned(sockInput).toInt() + if (size > MAX_TCP_BLOCK_SIZE) // 16M is a max command block + throw ProtocolException("Illegal block size: $size should be < $MAX_TCP_BLOCK_SIZE") + log.info { "read size: $size" } + val data = ByteArray(size) + log.info { "data ready" } + sockInput.readFully(data, 0, size) + inputBlocks.send(data.toUByteArray()) + } catch (e: ClosedReceiveChannelException) { + stop() + break + } catch (_: CancellationException) { + } catch (e: Exception) { + log.exception { "unexpected exception in TCP socket read" to e } + stop() + break + } + } + } + sock.launch { + val sockOutput = sock.openWriteChannel() + while (isActive && sock.isActive) { + try { + val block = outputBlocks.receive() + AsyncVarint.encodeUnsigned(block.size.toULong(), sockOutput) + sockOutput.writeFully(block.toByteArray(), 0, block.size) + log.info { "Client sock output: ${block.size}" } + sockOutput.flush() + } catch (_: CancellationException) { + log.info { "Caught cancellation, closing transport" } + } catch (_: LocalInterface.BreakConnectionException) { + log.info { "requested connection break" } + stop() + break + } catch (_: ClosedReceiveChannelException) { + log.info { "receive block channel closed, closing the socket" } + stop() + break + } catch (e: Exception) { + log.exception { "unexpected exception. closing." to e } + stop() + break + } + } + } + val device = InetTransportDevice(inputBlocks, outputBlocks, networkAddress, { + log.info { "Close has been called" } + stop() + }) + log.info { "Transport ready" } + return device +}