diff --git a/.idea/misc.xml b/.idea/misc.xml index 76d6398..efbe595 100644 --- a/.idea/misc.xml +++ b/.idea/misc.xml @@ -1,4 +1,3 @@ - diff --git a/README.md b/README.md index e6f1486..6236000 100644 --- a/README.md +++ b/README.md @@ -1,31 +1,36 @@ # Kiloparsec -The new generation of __PARanoid SECurity__ protocol, advanced, faster, more secure. It also allows connecting any "block device" transport to the same local interface. Out if the box it +The new generation of __PARanoid SECurity__ protocol, advanced, faster, more secure. It also allows connecting any " +block device" transport to the same local interface. Out if the box it provides the following transports: -| name | JVM | JS | native | -|----------------|-----|----|--------| -| TCP/IP server | * | | | -| TCP/IP client | * | | | -| Websock server | * | | | -| Websock client | * | * | * | - -At the moment we're working on supporting TCP/IP on most native targets. This feature is planned to rach public beta in August and production in early september 2024. +| name | JVM | JS | native | +|----------------|-----|----|-------------------| +| TCP/IP server | ✓ | | β @0.2.5-SNAPSHOT | +| TCP/IP client | ✓ | | β @0.2.5-SNAPSHOT | +| Websock server | ✓ | | | +| Websock client | ✓ | ✓ | ✓ | +At the moment we're working on supporting TCP/IP on most native targets. This feature is planned to rach public beta in +August and production in early september 2024. ## TCP/IP transport It is the fastest. JVM implementation uses nio2 async sockets and optimizes TCP socket to play well with blocks (smart NO_DELAY mode). It is multiplatform, nut lacks of async TCP/IP support -on natvic targetm this is where I need help having little time. I'd prefer to use something asyn like UV on native targets. +on natvic targetm this is where I need help having little time. I'd prefer to use something asyn like UV on native +targets. I know no existing way to implement it in KotlinJS for the modern browsers. ## Websock server -While it is much slower than pure TCP, it is still faster than any http-based transport. It uses binary frames based on the Ktor server framework to easily integrate with web services. We recommend using it instead of a classic HTTP API as it beats it in terms of speed and server load even with HTTP/2. +While it is much slower than pure TCP, it is still faster than any http-based transport. It uses binary frames based on +the Ktor server framework to easily integrate with web services. We recommend using it instead of a classic HTTP API as +it beats it in terms of speed and server load even with HTTP/2. -We recommend to create the `KiloInterface` instance and connect it to the websock and tcp servers in real applications to get easy access from anywhere. +We recommend to create the `KiloInterface` instance and connect it to the websock and tcp servers in real +applications to get easy access from anywhere. # Usage @@ -64,16 +69,16 @@ and functions available, like: // Api.kt @Serializable -class FooArgs(val text: String,val number: Int = 42) +class FooArgs(val text: String, val number: Int = 42) // Server-side interface -val cmdSetFoo by command() -val cmdGetFoo by command() -val cmdPing by command() -val cmdCheckConnected by command() +val cmdSetFoo by command() +val cmdGetFoo by command() +val cmdPing by command() +val cmdCheckConnected by command() // client-side interface (called from the server) -val cmdPushClient by command() +val cmdPushClient by command() ``` ## Call it from the client: @@ -93,7 +98,7 @@ val client = websocketClient("wss://your.host.com/kp") { // If we want to collect connected state changes (this is optional) launch { client.connectedStateFlow.collect { - if( it ) + if (it) println("I am connected") else println("trying to connect...") @@ -113,13 +118,13 @@ the protocol. With KILOPARSEC it is rather basic operation:\ ~~~kotlin // Our session just keeps Foo for cmd{Get|Set}Foo: -data class Session(var fooState: FooArgs?=null) +data class Session(var fooState: FooArgs? = null) // Let's now provide interface we export, it will be used on each connection automatically: // Note server interface uses Session: val serverInterface = KiloInterface().apply { - onConnected { + onConnected { // Do some initialization session.fooState = null } @@ -136,6 +141,7 @@ val ns: NettyApplicationEngine = embeddedServer(Netty, port = 8080, host = "0.0. ~~~ + # Details It is not compatible with parsec family and no more based on an Universa crypto library. To better fit @@ -144,12 +150,14 @@ and every connection (while parsec caches session keys to avoid time-consuming k keys cryptography for session is shifted to use ed25519 curves which are supposed to provide agreeable strength with enough speed to protect every connection with a unique new keys. Also, we completely get rid of SHA2. -Kiloparsec also uses a denser binary format [bipack](https://gitea.sergeych.net/SergeychWorks/mp_bintools), no more key-values, -which reveals much less on the inner data structure, providing advanced -typed RPC interfaces with kotlinx.serialization. There is also Rust implementation [bipack_ru](https://gitea.sergeych.net/DiWAN/bipack_ru). +Kiloparsec also uses a denser binary format [bipack](https://gitea.sergeych.net/SergeychWorks/mp_bintools), no more +key-values, +which reveals much less on the inner data structure, providing advanced +typed RPC interfaces with kotlinx.serialization. There is also Rust +implementation [bipack_ru](https://gitea.sergeych.net/DiWAN/bipack_ru). The architecture allows connecting same functional interfaces to several various type channels at once. -Also, the difference from parsecs is that there are no more unencrypted layer commands available to users. +Also, the difference from parsecs is that there are no more unencrypted layer commands available to users. All RPC is performed over the encrypted connection. # Technical description @@ -163,7 +171,9 @@ Integrated tools to prevent MITM attacks include also non-transferred independen independently on the ends and is never transferred with the network. Comparing it somehow (visually, with QR code, etc) could add a very robust guarantee of the connection safety and ingenuity. -Kiloparsec has built-in completely asynchronous (coroutine based top-down) transport layer based on TCP (JVM only as for now) and the same async Websocket-based transport based on KTOR. Websocket client is multiplatform, though the server is JVM only insofar. +Kiloparsec has built-in completely asynchronous (coroutine based top-down) transport layer based on TCP (JVM only as for +now) and the same async Websocket-based transport based on KTOR. Websocket client is multiplatform, though the server is +JVM only insofar. # Licensing diff --git a/build.gradle.kts b/build.gradle.kts index b9fc605..e74ac88 100644 --- a/build.gradle.kts +++ b/build.gradle.kts @@ -1,4 +1,3 @@ - plugins { kotlin("multiplatform") version "2.0.0" id("org.jetbrains.kotlin.plugin.serialization") version "2.0.0" @@ -59,6 +58,9 @@ kotlin { implementation("org.jetbrains.kotlinx:kotlinx-coroutines-test:1.8.1") } } + val ktorSocketTest by creating { + dependsOn(commonTest) + } val jvmMain by getting { dependencies { implementation("io.ktor:ktor-server-core:$ktor_version") @@ -69,16 +71,52 @@ kotlin { } dependsOn(ktorSocketMain) } - val jvmTest by getting + val jvmTest by getting { + dependsOn(ktorSocketTest) + } + val jsMain by getting { dependencies { implementation("io.ktor:ktor-client-js:$ktor_version") } } val jsTest by getting + val macosArm64Main by getting { + dependsOn(ktorSocketMain) + } + val macosArm64Test by getting { + dependsOn(ktorSocketTest) + } + val macosX64Main by getting { + dependsOn(ktorSocketMain) + } + val iosX64Main by getting { + dependsOn(ktorSocketMain) + } + val iosX64Test by getting { + dependsOn(ktorSocketTest) + } + val iosArm64Main by getting { + dependsOn(ktorSocketMain) + } + val iosArm64Test by getting { + dependsOn(ktorSocketTest) + } + val linuxArm64Main by getting { + dependsOn(ktorSocketMain) + } + val linuxArm64Test by getting { + dependsOn(ktorSocketTest) + } + val linuxX64Main by getting { + dependsOn(ktorSocketMain) + } + val linuxX64Test by getting { + dependsOn(ktorSocketTest) + } - for (pm in listOf(linuxMain, macosMain, iosMain, mingwMain)) - pm { dependsOn(ktorSocketMain) } +// for (pm: NamedDomainObjectProvider in listOf(macosMain,linuxMain, iosMain, mingwMain)) +// pm.get().dependsOn(ktorSocketMain) } diff --git a/gradle.properties b/gradle.properties index 7fc6f1f..c484ef9 100644 --- a/gradle.properties +++ b/gradle.properties @@ -1 +1,2 @@ kotlin.code.style=official +kotlin.mpp.applyDefaultHierarchyTemplate=false \ No newline at end of file diff --git a/src/commonMain/kotlin/net/sergeych/kiloparsec/KiloServer.kt b/src/commonMain/kotlin/net/sergeych/kiloparsec/KiloServer.kt index f889bb4..8fbc383 100644 --- a/src/commonMain/kotlin/net/sergeych/kiloparsec/KiloServer.kt +++ b/src/commonMain/kotlin/net/sergeych/kiloparsec/KiloServer.kt @@ -43,6 +43,8 @@ class KiloServer( } fun close() { + println("PRREEEC") job.cancel() + println("POOOSTC") } } \ No newline at end of file diff --git a/src/commonMain/kotlin/net/sergeych/kiloparsec/adapter/NetworkProvider.kt b/src/commonMain/kotlin/net/sergeych/kiloparsec/adapter/NetworkProvider.kt index 8c96031..4e2e790 100644 --- a/src/commonMain/kotlin/net/sergeych/kiloparsec/adapter/NetworkProvider.kt +++ b/src/commonMain/kotlin/net/sergeych/kiloparsec/adapter/NetworkProvider.kt @@ -1,56 +1,56 @@ package net.sergeych.kiloparsec.adapter -import kotlinx.coroutines.channels.ReceiveChannel -import kotlinx.coroutines.flow.Flow - /** - * Multiplatform implementation of an internet address. - * Notice to implementors. It must provide correct and effective [equals] and [hashCode]. + * Multiplatform internet address. */ -interface NetworkAddress { - val host: String +data class NetworkAddress( + val host: String, val port: Int -} - -/** - * Multiplatform datagram abstraction - */ -interface Datagram { - /** - * Received message - */ - val message: UByteArray - - /** - * Address from where the message was sent - */ - val address: NetworkAddress -} - -@OptIn(ExperimentalStdlibApi::class) -interface DatagramConnector: AutoCloseable { - - val incoming: ReceiveChannel - suspend fun send(message: UByteArray, networkAddress: NetworkAddress) - @Suppress("unused") - suspend fun send(message: UByteArray, datagramAddress: String) { - send(message, datagramAddress.toNetworkAddress()) +) { + override fun toString(): String { + return "$host:$port" } - - suspend fun send(message: UByteArray,host: String,port: Int) = - send(message, NetworkAddress(host,port)) - override fun close() } - -expect fun NetworkAddress(host: String,port: Int): NetworkAddress - -fun String.toNetworkAddress() : NetworkAddress { - val (host, port) = this.split(":").map { it.trim()} - return NetworkAddress(host, port.toInt()) -} - -expect fun acceptTcpDevice(port: Int): Flow - -expect suspend fun connectTcpDevice(address: NetworkAddress): InetTransportDevice - -suspend fun connectTcpDevice(address: String) = connectTcpDevice(address.toNetworkAddress()) +// +///** +// * Multiplatform datagram abstraction +// */ +//interface Datagram { +// /** +// * Received message +// */ +// val message: UByteArray +// +// /** +// * Address from where the message was sent +// */ +// val address: NetworkAddress +//} +// +//@OptIn(ExperimentalStdlibApi::class) +//interface DatagramConnector: AutoCloseable { +// +// val incoming: ReceiveChannel +// suspend fun send(message: UByteArray, networkAddress: NetworkAddress) +// @Suppress("unused") +// suspend fun send(message: UByteArray, datagramAddress: String) { +// send(message, datagramAddress.toNetworkAddress()) +// } +// +// suspend fun send(message: UByteArray,host: String,port: Int) = +// send(message, NetworkAddress(host,port)) +// override fun close() +//} +// +//expect fun NetworkAddress(host: String,port: Int): NetworkAddress +// +//fun String.toNetworkAddress() : NetworkAddress { +// val (host, port) = this.split(":").map { it.trim()} +// return NetworkAddress(host, port.toInt()) +//} +// +//expect fun acceptTcpDevice(port: Int): Flow +// +//expect suspend fun connectTcpDevice(address: NetworkAddress): InetTransportDevice +// +//suspend fun connectTcpDevice(address: String) = connectTcpDevice(address.toNetworkAddress()) diff --git a/src/jsMain/kotlin/net/sergeych/kiloparsec/adapter/DatagramProvider.js.kt b/src/jsMain/kotlin/net/sergeych/kiloparsec/adapter/DatagramProvider.js.kt deleted file mode 100644 index 64d0da0..0000000 --- a/src/jsMain/kotlin/net/sergeych/kiloparsec/adapter/DatagramProvider.js.kt +++ /dev/null @@ -1,2 +0,0 @@ -package net.sergeych.kiloparsec.adapter - diff --git a/src/jsMain/kotlin/net/sergeych/kiloparsec/adapter/NetworkProvider.js.kt b/src/jsMain/kotlin/net/sergeych/kiloparsec/adapter/NetworkProvider.js.kt deleted file mode 100644 index bc5044d..0000000 --- a/src/jsMain/kotlin/net/sergeych/kiloparsec/adapter/NetworkProvider.js.kt +++ /dev/null @@ -1,15 +0,0 @@ -package net.sergeych.kiloparsec.adapter - -import kotlinx.coroutines.flow.Flow - -actual fun NetworkAddress(host: String, port: Int): NetworkAddress { - TODO("Not yet implemented") -} - -actual fun acceptTcpDevice(port: Int): Flow { - TODO("Not yet implemented") -} - -actual suspend fun connectTcpDevice(address: NetworkAddress): InetTransportDevice { - TODO("Not yet implemented") -} \ No newline at end of file diff --git a/src/jvmMain/kotlin/net/sergeych/kiloparsec/adapter/UdpServer.kt b/src/jvmMain/kotlin/net/sergeych/kiloparsec/adapter/UdpServer.kt index 60f5b07..c375668 100644 --- a/src/jvmMain/kotlin/net/sergeych/kiloparsec/adapter/UdpServer.kt +++ b/src/jvmMain/kotlin/net/sergeych/kiloparsec/adapter/UdpServer.kt @@ -1,118 +1,118 @@ -package net.sergeych.kiloparsec.adapter - -import kotlinx.coroutines.* -import kotlinx.coroutines.channels.BufferOverflow -import kotlinx.coroutines.channels.Channel -import net.sergeych.mp_logger.LogTag -import net.sergeych.mp_logger.exception -import net.sergeych.mp_logger.info -import net.sergeych.mp_logger.warning -import java.net.* -import java.util.concurrent.atomic.AtomicInteger - -private val counter = AtomicInteger(0) - -class JvmNetworkAddress(val inetAddress: InetAddress, override val port: Int) : NetworkAddress { - override val host: String by lazy { inetAddress.canonicalHostName } - override fun equals(other: Any?): Boolean { - if (this === other) return true - if (other !is JvmNetworkAddress) return false - - if (inetAddress != other.inetAddress) return false - if (port != other.port) return false - - return true - } - - val socketAddress: SocketAddress by lazy { InetSocketAddress(inetAddress,port) } - - override fun hashCode(): Int { - var result = inetAddress.hashCode() - result = 31 * result + port - return result - } - - override fun toString(): String = "$host:$port" -} - -class UdpDatagram(override val message: UByteArray, val inetAddress: InetAddress, val port: Int) : Datagram { - - override val address: NetworkAddress by lazy { - JvmNetworkAddress(inetAddress, port) - } - -} - - -@OptIn(DelicateCoroutinesApi::class) -class UdpServer(val port: Int) : - DatagramConnector, LogTag("UDPS:${counter.incrementAndGet()}") { - private var isClosed = false - - - private val deferredSocket = CompletableDeferred() - private var job: Job? = null - - private suspend fun start() = try { - coroutineScope { - val socket = DatagramSocket(port) - val buffer = ByteArray(16384) - val packet = DatagramPacket(buffer, buffer.size) - deferredSocket.complete(socket) - while (isActive && !isClosed) { - try { - socket.receive(packet) - val data = packet.data.sliceArray(0..(2048, BufferOverflow.DROP_OLDEST) - override val incoming = channel - - override suspend fun send(message: UByteArray, networkAddress: NetworkAddress) { - networkAddress as JvmNetworkAddress - withContext(Dispatchers.IO) { - val packet = DatagramPacket( - message.toByteArray(), message.size, - networkAddress.inetAddress, networkAddress.port - ) - deferredSocket.await().send(packet) - } - } -} \ No newline at end of file +//package net.sergeych.kiloparsec.adapter +// +//import kotlinx.coroutines.* +//import kotlinx.coroutines.channels.BufferOverflow +//import kotlinx.coroutines.channels.Channel +//import net.sergeych.mp_logger.LogTag +//import net.sergeych.mp_logger.exception +//import net.sergeych.mp_logger.info +//import net.sergeych.mp_logger.warning +//import java.net.* +//import java.util.concurrent.atomic.AtomicInteger +// +//private val counter = AtomicInteger(0) +// +//class JvmNetworkAddress(val inetAddress: InetAddress, override val port: Int) : NetworkAddress { +// override val host: String by lazy { inetAddress.canonicalHostName } +// override fun equals(other: Any?): Boolean { +// if (this === other) return true +// if (other !is JvmNetworkAddress) return false +// +// if (inetAddress != other.inetAddress) return false +// if (port != other.port) return false +// +// return true +// } +// +// val socketAddress: SocketAddress by lazy { InetSocketAddress(inetAddress,port) } +// +// override fun hashCode(): Int { +// var result = inetAddress.hashCode() +// result = 31 * result + port +// return result +// } +// +// override fun toString(): String = "$host:$port" +//} +// +//class UdpDatagram(override val message: UByteArray, val inetAddress: InetAddress, val port: Int) : Datagram { +// +// override val address: NetworkAddress by lazy { +// JvmNetworkAddress(inetAddress, port) +// } +// +//} +// +// +//@OptIn(DelicateCoroutinesApi::class) +//class UdpServer(val port: Int) : +// DatagramConnector, LogTag("UDPS:${counter.incrementAndGet()}") { +// private var isClosed = false +// +// +// private val deferredSocket = CompletableDeferred() +// private var job: Job? = null +// +// private suspend fun start() = try { +// coroutineScope { +// val socket = DatagramSocket(port) +// val buffer = ByteArray(16384) +// val packet = DatagramPacket(buffer, buffer.size) +// deferredSocket.complete(socket) +// while (isActive && !isClosed) { +// try { +// socket.receive(packet) +// val data = packet.data.sliceArray(0..(2048, BufferOverflow.DROP_OLDEST) +// override val incoming = channel +// +// override suspend fun send(message: UByteArray, networkAddress: NetworkAddress) { +// networkAddress as JvmNetworkAddress +// withContext(Dispatchers.IO) { +// val packet = DatagramPacket( +// message.toByteArray(), message.size, +// networkAddress.inetAddress, networkAddress.port +// ) +// deferredSocket.await().send(packet) +// } +// } +//} \ No newline at end of file diff --git a/src/jvmMain/kotlin/net/sergeych/kiloparsec/adapter/asyncSocketToDevice.kt b/src/jvmMain/kotlin/net/sergeych/kiloparsec/adapter/asyncSocketToDevice.kt index cfc2101..f610225 100644 --- a/src/jvmMain/kotlin/net/sergeych/kiloparsec/adapter/asyncSocketToDevice.kt +++ b/src/jvmMain/kotlin/net/sergeych/kiloparsec/adapter/asyncSocketToDevice.kt @@ -1,155 +1,155 @@ -package net.sergeych.kiloparsec.adapter - -import kotlinx.coroutines.* -import kotlinx.coroutines.channels.Channel -import kotlinx.coroutines.channels.ClosedReceiveChannelException -import kotlinx.coroutines.flow.MutableStateFlow -import net.sergeych.crypto2.Contrail -import net.sergeych.crypto2.encodeVarUnsigned -import net.sergeych.crypto2.readVarUnsigned -import net.sergeych.kiloparsec.RemoteInterface -import net.sergeych.kiloparsec.Transport -import net.sergeych.mp_logger.LogTag -import net.sergeych.mp_logger.warning -import net.sergeych.mp_tools.globalLaunch -import net.sergeych.tools.waitFor -import java.net.InetSocketAddress -import java.net.StandardSocketOptions.TCP_NODELAY -import java.nio.ByteBuffer -import java.nio.channels.AsynchronousSocketChannel -import kotlin.coroutines.cancellation.CancellationException -import kotlin.coroutines.suspendCoroutine - -private val log = LogTag("ASTD") - -/** - * Prepend block with its size, varint-encoded - */ -private fun encode(block: UByteArray): ByteArray { - val c = Contrail.create(block) - return (encodeVarUnsigned(c.size.toUInt()) + c).toByteArray() -} - -/** - * Convert asynchronous socket to a [Transport.Device] using non-blocking nio, - * in a coroutine-effective manner. Note that it runs coroutines to read/write - * to the socket in a global scope.These are closed when transport is closed - * or the socket is closed, for example, by network failure. - */ -suspend fun asyncSocketToDevice(socket: AsynchronousSocketChannel): InetTransportDevice { - val deferredDevice = CompletableDeferred() - globalLaunch { - coroutineScope { - val sendQueueEmpty = MutableStateFlow(true) - val receiving = MutableStateFlow(false) - // We're in block mode, every block we send worth immediate sending, we do not - // send partial blocks, so: - socket.setOption(TCP_NODELAY, true) - - // socket input is to be parsed for blocks, so we receive bytes - // and decode them to blocks - val input = Channel(1024) - val inputBlocks = Channel() - // output is blocks, so we sent transformed, framed blocks: - val outputBlocks = Channel() - - fun stop() { - kotlin.runCatching { inputBlocks.close(RemoteInterface.ClosedException()) } - kotlin.runCatching { outputBlocks.close() } - socket.close() - cancel() - } - - - // copy incoming data from the socket to input channel: - launch { - val data = ByteArray(1024) - val inb = ByteBuffer.wrap(data) - kotlin.runCatching { - while (isActive) { - inb.position(0) - val size: Int = suspendCoroutine { continuation -> - socket.read(inb, continuation, IntCompletionHandler) - } - if (size < 0) stop() - else { -// println("recvd:\n${data.sliceArray(0.. - socket.write(outBuff, continuation, IntCompletionHandler) - } - // be sure it was all sent - if (outBuff.position() != data.size || cnt != data.size) { - throw RuntimeException("unexpected partial write") - } - } - // in the case of just breaking out of the loop: - sendQueueEmpty.value = true - } catch (_: ClosedReceiveChannelException) { - stop() - } - } - // transport device copes with blocks: - // decode blocks from a byte channel read from the socket: - launch { - try { - while (isActive) { - receiving.value = !input.isEmpty - val size = readVarUnsigned(input) - receiving.value = true - if (size == 0u) log.warning { "zero size block is ignored!" } - else { - val block = UByteArray(size.toInt()) - for (i in 0..() +// globalLaunch { +// coroutineScope { +// val sendQueueEmpty = MutableStateFlow(true) +// val receiving = MutableStateFlow(false) +// // We're in block mode, every block we send worth immediate sending, we do not +// // send partial blocks, so: +// socket.setOption(TCP_NODELAY, true) +// +// // socket input is to be parsed for blocks, so we receive bytes +// // and decode them to blocks +// val input = Channel(1024) +// val inputBlocks = Channel() +// // output is blocks, so we sent transformed, framed blocks: +// val outputBlocks = Channel() +// +// fun stop() { +// kotlin.runCatching { inputBlocks.close(RemoteInterface.ClosedException()) } +// kotlin.runCatching { outputBlocks.close() } +// socket.close() +// cancel() +// } +// +// +// // copy incoming data from the socket to input channel: +// launch { +// val data = ByteArray(1024) +// val inb = ByteBuffer.wrap(data) +// kotlin.runCatching { +// while (isActive) { +// inb.position(0) +// val size: Int = suspendCoroutine { continuation -> +// socket.read(inb, continuation, IntCompletionHandler) +// } +// if (size < 0) stop() +// else { +//// println("recvd:\n${data.sliceArray(0.. +// socket.write(outBuff, continuation, IntCompletionHandler) +// } +// // be sure it was all sent +// if (outBuff.position() != data.size || cnt != data.size) { +// throw RuntimeException("unexpected partial write") +// } +// } +// // in the case of just breaking out of the loop: +// sendQueueEmpty.value = true +// } catch (_: ClosedReceiveChannelException) { +// stop() +// } +// } +// // transport device copes with blocks: +// // decode blocks from a byte channel read from the socket: +// launch { +// try { +// while (isActive) { +// receiving.value = !input.isEmpty +// val size = readVarUnsigned(input) +// receiving.value = true +// if (size == 0u) log.warning { "zero size block is ignored!" } +// else { +// val block = UByteArray(size.toInt()) +// for (i in 0..() - val cmdLoad by command() - val cmdDrop by command() - val cmdException by command() - - val cli = KiloInterface().apply { - registerError { TestException() } - onConnected { session.data = "start" } - on(cmdSave) { session.data = it } - on(cmdLoad) { - session.data - } - on(cmdException) { - throw TestException() - } - on(cmdDrop) { - throw LocalInterface.BreakConnectionException() - } - } - val server = KiloServer(cli, acceptTcpDevice(27101)) { - Session("unknown") - } - - val client = KiloClient() { - addErrors(cli) - connect { connectTcpDevice("localhost:27101") } - } - delay(500) - println(client.call(cmdLoad)) - - assertEquals("start", client.call(cmdLoad)) - client.call(cmdSave, "foobar") - assertEquals("foobar", client.call(cmdLoad)) -// - val res = kotlin.runCatching { client.call(cmdException) } - println(res.exceptionOrNull()) - assertIs(res.exceptionOrNull()) - assertEquals("foobar", client.call(cmdLoad)) - - assertThrows { client.call(cmdDrop) } - - // reconnect? - assertEquals("start", client.call(cmdLoad)) - - server.close() - } @Test fun webSocketTest() = runTest { diff --git a/src/jvmTest/kotlin/net/sergeych/kiloparsec/adapters/NetworkTest.kt b/src/jvmTest/kotlin/net/sergeych/kiloparsec/adapters/NetworkTest.kt deleted file mode 100644 index 43dd531..0000000 --- a/src/jvmTest/kotlin/net/sergeych/kiloparsec/adapters/NetworkTest.kt +++ /dev/null @@ -1,100 +0,0 @@ -package net.sergeych.kiloparsec.adapters - -import kotlinx.coroutines.* -import kotlinx.coroutines.test.runTest -import net.sergeych.crypto2.initCrypto -import net.sergeych.kiloparsec.adapter.UdpServer -import net.sergeych.kiloparsec.adapter.acceptTcpDevice -import net.sergeych.kiloparsec.adapter.connectTcpDevice -import net.sergeych.kiloparsec.adapter.toNetworkAddress -import net.sergeych.kiloparsec.decodeFromUByteArray -import net.sergeych.kiloparsec.encodeToUByteArray -import net.sergeych.mp_logger.Log -import net.sergeych.mp_logger.LogTag -import net.sergeych.synctools.ProtectedOp -import net.sergeych.synctools.invoke -import kotlin.test.Test -import kotlin.test.assertContains -import kotlin.test.assertEquals - -class NetworkTest { - - @Test - fun udpProviderTest() = runTest { - Log.connectConsole(Log.Level.DEBUG) - val s1 = UdpServer(17120) - val s2 = UdpServer(17121) - s1.send("Hello".encodeToUByteArray(), "localhost", 17121) - val d1 = s2.incoming.receive() - assertEquals(d1.address.port, 17120) - assertEquals("Hello", d1.message.toByteArray().decodeToString()) - s1.send("world".encodeToUByteArray(), d1.address) - assertEquals("world", s1.incoming.receive().message.toByteArray().decodeToString()) - // println("s1: ${s1.bindAddress()}") - - } - - @Test - fun tcpAsyncConnectionTest() = runTest { - initCrypto() - Log.connectConsole(Log.Level.DEBUG) - - coroutineScope { - val serverFlow = acceptTcpDevice(17171) - val op = ProtectedOp() - var pills = setOf() - val j = launch { - println("serf") - serverFlow.collect { device -> - println("serf 0") - launch { - println("serf 1") - device.output.send("Hello, world!".encodeToUByteArray()) - device.output.send("Great".encodeToUByteArray()) - while (true) { - val x = device.input.receive().decodeFromUByteArray() - if (x.startsWith("die")) { - op.invoke { - pills += x - } - cancel() - } - else - println("ignoring unexpected input: $x") - } - } - } - } - yield() - run { - try { - println("pre-con") - val s = connectTcpDevice("127.0.1.1:17171".toNetworkAddress()) - println("2") - assertEquals("Hello, world!", s.input.receive().decodeFromUByteArray()) - assertEquals("Great", s.input.receive().decodeFromUByteArray()) - s.output.send("Goodbye".encodeToUByteArray()) - s.output.send("die1".encodeToUByteArray()) - s.close() - } - catch(t: Throwable) { - t.printStackTrace() - throw t - } - } - println("pre-con2") - val s1 = connectTcpDevice("127.0.1.1:17171".toNetworkAddress()) - assertEquals("Hello, world!", s1.input.receive().decodeFromUByteArray()) - assertEquals("Great", s1.input.receive().decodeFromUByteArray()) - s1.output.send("die2".encodeToUByteArray()) - s1.close() - - // check that channels were flushed prior to closed: - assertContains(pills, "die1") - assertContains(pills, "die2") - - // Check that server jobs are closed - j.cancelAndJoin() - } - } -} \ No newline at end of file diff --git a/src/ktorSocketMain/kotlin/adapter/socketClient.kt b/src/ktorSocketMain/kotlin/adapter/socketClient.kt index bcbb2d2..9c3b461 100644 --- a/src/ktorSocketMain/kotlin/adapter/socketClient.kt +++ b/src/ktorSocketMain/kotlin/adapter/socketClient.kt @@ -2,27 +2,16 @@ package net.sergeych.kiloparsec.adapter import io.ktor.network.selector.* import io.ktor.network.sockets.* -import io.ktor.util.network.* -import kotlinx.coroutines.CancellationException -import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.* import kotlinx.coroutines.channels.Channel import kotlinx.coroutines.channels.ClosedReceiveChannelException import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.flow -import kotlinx.coroutines.isActive -import kotlinx.coroutines.launch import net.sergeych.kiloparsec.AsyncVarint import net.sergeych.kiloparsec.LocalInterface import net.sergeych.mp_logger.* import net.sergeych.tools.AtomicCounter - -class SocketNetworkAddress(override val host: String, override val port: Int) : NetworkAddress { - override fun toString(): String { - return "$host:$port" - } -} - -actual fun NetworkAddress(host: String, port: Int): NetworkAddress = SocketNetworkAddress(host, port) +import net.sergeych.tools.AtomicValue private val logCounter = AtomicCounter(0) @@ -30,51 +19,66 @@ class ProtocolException(text: String, cause: Throwable? = null) : RuntimeExcepti const val MAX_TCP_BLOCK_SIZE = 16776216 -actual fun acceptTcpDevice(port: Int): Flow { +fun acceptTcpDevice(port: Int): Flow { val selectorManager = SelectorManager(Dispatchers.IO) val serverSocket = aSocket(selectorManager).tcp().bind("127.0.0.1", port) - val log = LogTag("TCPS${logCounter.incrementAndGet()}") return flow { - while(true) { - log.info { "Accepting incoming connections on $port" } + while (true) { serverSocket.accept().let { sock -> - log.info { "Emitting transport device" } emit(inetTransportDevice(sock, "srv")) } } } } -actual suspend fun connectTcpDevice(address: NetworkAddress): InetTransportDevice { +suspend fun connectTcpDevice(address: String) = connectTcpDevice(address.toNetworkAddress()) + +suspend fun connectTcpDevice(address: NetworkAddress): InetTransportDevice { val selectorManager = SelectorManager(Dispatchers.IO) val socket = aSocket(selectorManager).tcp().connect(address.host, address.port) - println("Connected to ${address.host}:${address.port}") return inetTransportDevice(socket) } +fun String.toNetworkAddress(): NetworkAddress { + val (host, port) = this.split(":").map { it.trim() } + return NetworkAddress(host, port.toInt()) +} private fun inetTransportDevice( sock: Socket, suffix: String = "cli", ): InetTransportDevice { - val networkAddress = sock.remoteAddress.toJavaAddress().let { NetworkAddress(it.hostname, it.port) } + val networkAddress = (sock.remoteAddress as InetSocketAddress).let { NetworkAddress(it.hostname, it.port) } val inputBlocks = Channel(4096) val outputBlocks = Channel(4096) val log = LogTag("TCPT${logCounter.incrementAndGet()}:$suffix:$networkAddress") + val stopCalled = AtomicValue(false) fun stop() { - log.info { "stopping" } - runCatching { inputBlocks.close()} - runCatching { outputBlocks.close()} - if( !sock.isClosed ) runCatching { sock.close()} + stopCalled.mutate { + if (!it) { + log.debug { "stopping" } + runCatching { inputBlocks.close() } + runCatching { outputBlocks.close() } + if (!sock.isClosed) + runCatching { + log.debug { "closing socket by stop" } + sock.close() + } + else + log.debug { "socket is already closed when stop is called" } + } else + log.debug { "already stopped" } + true + } } sock.launch { log.debug { "opening read channel" } val sockInput = runCatching { sock.openReadChannel() }.getOrElse { log.warning { "failed to open read channel $it" } - sock.close() + stop() throw IllegalStateException("failed to open read channel") } while (isActive && sock.isActive) { @@ -82,15 +86,16 @@ private fun inetTransportDevice( val size = AsyncVarint.decodeUnsigned(sockInput).toInt() if (size > MAX_TCP_BLOCK_SIZE) // 16M is a max command block throw ProtocolException("Illegal block size: $size should be < $MAX_TCP_BLOCK_SIZE") - log.info { "read size: $size" } val data = ByteArray(size) - log.info { "data ready" } sockInput.readFully(data, 0, size) inputBlocks.send(data.toUByteArray()) } catch (e: ClosedReceiveChannelException) { + log.error { "closed receive channel " } stop() break } catch (_: CancellationException) { + log.error { "cancellation exception " } + break } catch (e: Exception) { log.exception { "unexpected exception in TCP socket read" to e } stop() @@ -105,16 +110,17 @@ private fun inetTransportDevice( val block = outputBlocks.receive() AsyncVarint.encodeUnsigned(block.size.toULong(), sockOutput) sockOutput.writeFully(block.toByteArray(), 0, block.size) - log.info { "Client sock output: ${block.size}" } sockOutput.flush() } catch (_: CancellationException) { - log.info { "Caught cancellation, closing transport" } + log.debug { "cancellation exception on output" } + stop() + break } catch (_: LocalInterface.BreakConnectionException) { - log.info { "requested connection break" } + log.debug { "requested connection break" } stop() break } catch (_: ClosedReceiveChannelException) { - log.info { "receive block channel closed, closing the socket" } + log.debug { "receive block channel closed, closing the socket" } stop() break } catch (e: Exception) { @@ -124,10 +130,10 @@ private fun inetTransportDevice( } } } + val device = InetTransportDevice(inputBlocks, outputBlocks, networkAddress, { - log.info { "Close has been called" } stop() }) - log.info { "Transport ready" } + log.debug { "Transport ready" } return device } diff --git a/src/ktorSocketTest/kotlin/TcpTest.kt b/src/ktorSocketTest/kotlin/TcpTest.kt new file mode 100644 index 0000000..a398661 --- /dev/null +++ b/src/ktorSocketTest/kotlin/TcpTest.kt @@ -0,0 +1,77 @@ +import kotlinx.coroutines.delay +import kotlinx.coroutines.test.runTest +import net.sergeych.crypto2.initCrypto +import net.sergeych.kiloparsec.* +import net.sergeych.kiloparsec.adapter.acceptTcpDevice +import net.sergeych.kiloparsec.adapter.connectTcpDevice +import net.sergeych.mp_logger.Log +import kotlin.random.Random +import kotlin.test.Test +import kotlin.test.assertEquals +import kotlin.test.assertIs + +class TcpTest { + class TestException : Exception("test1") + + @Test + fun tcpTest() = runTest { + initCrypto() + Log.connectConsole(Log.Level.DEBUG) + data class Session( + var data: String + ) + + val port = 27170 + Random.nextInt(1, 200) + + val cmdSave by command() + val cmdLoad by command() + val cmdDrop by command() + val cmdException by command() + + val cli = KiloInterface().apply { + registerError { TestException() } + onConnected { session.data = "start" } + on(cmdSave) { session.data = it } + on(cmdLoad) { + session.data + } + on(cmdException) { + throw TestException() + } + on(cmdDrop) { + throw LocalInterface.BreakConnectionException() + } + } + val server = KiloServer(cli, acceptTcpDevice(port)) { + Session("unknown") + } + + val client = KiloClient() { + addErrors(cli) + connect { connectTcpDevice("localhost:$port") } + } + delay(500) + + assertEquals("start", client.call(cmdLoad)) + + client.call(cmdSave, "foobar") + assertEquals("foobar", client.call(cmdLoad)) + + val res = kotlin.runCatching { client.call(cmdException) } + println(res.exceptionOrNull()) + assertIs(res.exceptionOrNull()) + assertEquals("foobar", client.call(cmdLoad)) + + println("----------------------------------- pre drops") + assertThrows { client.call(cmdDrop) } + + println("----------------------------------- DROPPED") + + // reconnect? + assertEquals("start", client.call(cmdLoad)) + + println("------------------------------=---- RECONNECTED") + server.close() + println("****************************************************************") + } +} \ No newline at end of file diff --git a/src/nativeMain/kotlin/net/sergeych/kiloparsec/adapter/DatagramProvider.native.kt b/src/nativeMain/kotlin/net/sergeych/kiloparsec/adapter/DatagramProvider.native.kt deleted file mode 100644 index 64d0da0..0000000 --- a/src/nativeMain/kotlin/net/sergeych/kiloparsec/adapter/DatagramProvider.native.kt +++ /dev/null @@ -1,2 +0,0 @@ -package net.sergeych.kiloparsec.adapter - diff --git a/src/nativeMain/kotlin/net/sergeych/kiloparsec/adapter/NetworkProvider.native.kt b/src/nativeMain/kotlin/net/sergeych/kiloparsec/adapter/NetworkProvider.native.kt deleted file mode 100644 index bc5044d..0000000 --- a/src/nativeMain/kotlin/net/sergeych/kiloparsec/adapter/NetworkProvider.native.kt +++ /dev/null @@ -1,15 +0,0 @@ -package net.sergeych.kiloparsec.adapter - -import kotlinx.coroutines.flow.Flow - -actual fun NetworkAddress(host: String, port: Int): NetworkAddress { - TODO("Not yet implemented") -} - -actual fun acceptTcpDevice(port: Int): Flow { - TODO("Not yet implemented") -} - -actual suspend fun connectTcpDevice(address: NetworkAddress): InetTransportDevice { - TODO("Not yet implemented") -} \ No newline at end of file