diff --git a/README.md b/README.md index 6236000..b1fbfde 100644 --- a/README.md +++ b/README.md @@ -4,24 +4,28 @@ The new generation of __PARanoid SECurity__ protocol, advanced, faster, more sec block device" transport to the same local interface. Out if the box it provides the following transports: -| name | JVM | JS | native | -|----------------|-----|----|-------------------| -| TCP/IP server | ✓ | | β @0.2.5-SNAPSHOT | -| TCP/IP client | ✓ | | β @0.2.5-SNAPSHOT | -| Websock server | ✓ | | | -| Websock client | ✓ | ✓ | ✓ | +| name | JVM | JS | native | +|----------------|-----|----|----------| +| TCP/IP server | ✓ | | β @0.2.6 | +| TCP/IP client | ✓ | | β @0.2.6 | +| Websock server | ✓ | | | +| Websock client | ✓ | ✓ | ✓ | -At the moment we're working on supporting TCP/IP on most native targets. This feature is planned to rach public beta in -August and production in early september 2024. +### Supported native targets + +- iosArm64, iosX64 +- macosArm64, macosArm64 +- linxArm64, linuxX64 + +### Non-native targets + +- JS (browser and nodeJS) +- JVM (android, macos, windows, linx, everywhere where JRE is installed) ## TCP/IP transport -It is the fastest. JVM implementation uses nio2 async sockets and optimizes TCP socket to play -well with blocks (smart NO_DELAY mode). It is multiplatform, nut lacks of async TCP/IP support -on natvic targetm this is where I need help having little time. I'd prefer to use something asyn like UV on native -targets. - -I know no existing way to implement it in KotlinJS for the modern browsers. +It is the fastest based on async socket implementation of ktor client. It works everywhere but JS target as +there is currently no widely adopted sockets for browser javascript. ## Websock server @@ -34,7 +38,7 @@ applications to get easy access from anywhere. # Usage -Th elibrary should be used as maven dependency, not as source. +The library should be used as maven dependency, not as source. ## Adding dependency @@ -55,7 +59,7 @@ It could be, depending on your project structure, something like: ```kotlin val commonMain by getting { dependencies { - api("net.sergeych:kiloparsec:0.2.4") + api("net.sergeych:kiloparsec:0.2.6") } } ``` diff --git a/build.gradle.kts b/build.gradle.kts index e74ac88..283612a 100644 --- a/build.gradle.kts +++ b/build.gradle.kts @@ -5,7 +5,7 @@ plugins { } group = "net.sergeych" -version = "0.2.5-SNAPSHOT" +version = "0.2.6" repositories { mavenCentral() @@ -19,6 +19,7 @@ kotlin { js { browser { } + nodejs() } macosArm64() iosX64() @@ -114,10 +115,6 @@ kotlin { val linuxX64Test by getting { dependsOn(ktorSocketTest) } - -// for (pm: NamedDomainObjectProvider in listOf(macosMain,linuxMain, iosMain, mingwMain)) -// pm.get().dependsOn(ktorSocketMain) - } publishing { diff --git a/src/commonMain/kotlin/net/sergeych/kiloparsec/KiloServer.kt b/src/commonMain/kotlin/net/sergeych/kiloparsec/KiloServer.kt index 8fbc383..f889bb4 100644 --- a/src/commonMain/kotlin/net/sergeych/kiloparsec/KiloServer.kt +++ b/src/commonMain/kotlin/net/sergeych/kiloparsec/KiloServer.kt @@ -43,8 +43,6 @@ class KiloServer( } fun close() { - println("PRREEEC") job.cancel() - println("POOOSTC") } } \ No newline at end of file diff --git a/src/jvmMain/kotlin/net/sergeych/kiloparsec/adapter/ContinuationHandler.kt b/src/jvmMain/kotlin/net/sergeych/kiloparsec/adapter/ContinuationHandler.kt deleted file mode 100644 index 3e8a55f..0000000 --- a/src/jvmMain/kotlin/net/sergeych/kiloparsec/adapter/ContinuationHandler.kt +++ /dev/null @@ -1,20 +0,0 @@ -package net.sergeych.kiloparsec.adapter - -import java.nio.channels.CompletionHandler -import kotlin.coroutines.Continuation -import kotlin.coroutines.resume -import kotlin.coroutines.resumeWithException - -open class ContinuationHandler : CompletionHandler> { - override fun completed(result: T, attachment: Continuation) { - attachment.resume(result) - } - - override fun failed(exc: Throwable, attachment: Continuation) { - attachment.resumeWithException(exc) - } -} - -object VoidCompletionHandler: ContinuationHandler() - -object IntCompletionHandler: ContinuationHandler() \ No newline at end of file diff --git a/src/jvmMain/kotlin/net/sergeych/kiloparsec/adapter/NetworkProvider.jvm.kt b/src/jvmMain/kotlin/net/sergeych/kiloparsec/adapter/NetworkProvider.jvm.kt deleted file mode 100644 index d8b3a46..0000000 --- a/src/jvmMain/kotlin/net/sergeych/kiloparsec/adapter/NetworkProvider.jvm.kt +++ /dev/null @@ -1,39 +0,0 @@ -package net.sergeych.kiloparsec.adapter - -// -//actual fun NetworkAddress(host: String, port: Int): NetworkAddress = -// JvmNetworkAddress(InetAddress.getByName(host), port) -// -//actual fun acceptTcpDevice(port: Int): Flow { -// return flow { -// val socket = withContext(Dispatchers.IO) { -// AsynchronousServerSocketChannel.open().also { -// it.bind(InetSocketAddress(port)) -// } -// } -// while (true) { -// println("0 --- $port") -// val connectedSocket = suspendCancellableCoroutine { continuation -> -// continuation.invokeOnCancellation { -// socket.close() -// } -// socket.accept(continuation, ContinuationHandler()) -// } -// println("1 ---") -// emit(asyncSocketToDevice(connectedSocket)) -// } -// } -//} -// -//@Suppress("unused") -//suspend fun connectTcpDevice(host: String, port: Int) = connectTcpDevice(NetworkAddress(host,port)) -//actual suspend fun connectTcpDevice(address: NetworkAddress): InetTransportDevice { -// address as JvmNetworkAddress -// val socket = withContext(Dispatchers.IO) { -// AsynchronousSocketChannel.open() -// } -// suspendCoroutine { cont -> -// socket.connect(address.socketAddress, cont, VoidCompletionHandler) -// } -// return asyncSocketToDevice(socket) -//} diff --git a/src/jvmMain/kotlin/net/sergeych/kiloparsec/adapter/UdpServer.kt b/src/jvmMain/kotlin/net/sergeych/kiloparsec/adapter/UdpServer.kt deleted file mode 100644 index c375668..0000000 --- a/src/jvmMain/kotlin/net/sergeych/kiloparsec/adapter/UdpServer.kt +++ /dev/null @@ -1,118 +0,0 @@ -//package net.sergeych.kiloparsec.adapter -// -//import kotlinx.coroutines.* -//import kotlinx.coroutines.channels.BufferOverflow -//import kotlinx.coroutines.channels.Channel -//import net.sergeych.mp_logger.LogTag -//import net.sergeych.mp_logger.exception -//import net.sergeych.mp_logger.info -//import net.sergeych.mp_logger.warning -//import java.net.* -//import java.util.concurrent.atomic.AtomicInteger -// -//private val counter = AtomicInteger(0) -// -//class JvmNetworkAddress(val inetAddress: InetAddress, override val port: Int) : NetworkAddress { -// override val host: String by lazy { inetAddress.canonicalHostName } -// override fun equals(other: Any?): Boolean { -// if (this === other) return true -// if (other !is JvmNetworkAddress) return false -// -// if (inetAddress != other.inetAddress) return false -// if (port != other.port) return false -// -// return true -// } -// -// val socketAddress: SocketAddress by lazy { InetSocketAddress(inetAddress,port) } -// -// override fun hashCode(): Int { -// var result = inetAddress.hashCode() -// result = 31 * result + port -// return result -// } -// -// override fun toString(): String = "$host:$port" -//} -// -//class UdpDatagram(override val message: UByteArray, val inetAddress: InetAddress, val port: Int) : Datagram { -// -// override val address: NetworkAddress by lazy { -// JvmNetworkAddress(inetAddress, port) -// } -// -//} -// -// -//@OptIn(DelicateCoroutinesApi::class) -//class UdpServer(val port: Int) : -// DatagramConnector, LogTag("UDPS:${counter.incrementAndGet()}") { -// private var isClosed = false -// -// -// private val deferredSocket = CompletableDeferred() -// private var job: Job? = null -// -// private suspend fun start() = try { -// coroutineScope { -// val socket = DatagramSocket(port) -// val buffer = ByteArray(16384) -// val packet = DatagramPacket(buffer, buffer.size) -// deferredSocket.complete(socket) -// while (isActive && !isClosed) { -// try { -// socket.receive(packet) -// val data = packet.data.sliceArray(0..(2048, BufferOverflow.DROP_OLDEST) -// override val incoming = channel -// -// override suspend fun send(message: UByteArray, networkAddress: NetworkAddress) { -// networkAddress as JvmNetworkAddress -// withContext(Dispatchers.IO) { -// val packet = DatagramPacket( -// message.toByteArray(), message.size, -// networkAddress.inetAddress, networkAddress.port -// ) -// deferredSocket.await().send(packet) -// } -// } -//} \ No newline at end of file diff --git a/src/jvmMain/kotlin/net/sergeych/kiloparsec/adapter/asyncSocketToDevice.kt b/src/jvmMain/kotlin/net/sergeych/kiloparsec/adapter/asyncSocketToDevice.kt deleted file mode 100644 index f610225..0000000 --- a/src/jvmMain/kotlin/net/sergeych/kiloparsec/adapter/asyncSocketToDevice.kt +++ /dev/null @@ -1,155 +0,0 @@ -//package net.sergeych.kiloparsec.adapter -// -//import kotlinx.coroutines.* -//import kotlinx.coroutines.channels.Channel -//import kotlinx.coroutines.channels.ClosedReceiveChannelException -//import kotlinx.coroutines.flow.MutableStateFlow -//import net.sergeych.crypto2.Contrail -//import net.sergeych.crypto2.encodeVarUnsigned -//import net.sergeych.crypto2.readVarUnsigned -//import net.sergeych.kiloparsec.RemoteInterface -//import net.sergeych.kiloparsec.Transport -//import net.sergeych.mp_logger.LogTag -//import net.sergeych.mp_logger.warning -//import net.sergeych.mp_tools.globalLaunch -//import net.sergeych.tools.waitFor -//import java.net.InetSocketAddress -//import java.net.StandardSocketOptions.TCP_NODELAY -//import java.nio.ByteBuffer -//import java.nio.channels.AsynchronousSocketChannel -//import kotlin.coroutines.cancellation.CancellationException -//import kotlin.coroutines.suspendCoroutine -// -//private val log = LogTag("ASTD") -// -///** -// * Prepend block with its size, varint-encoded -// */ -//private fun encode(block: UByteArray): ByteArray { -// val c = Contrail.create(block) -// return (encodeVarUnsigned(c.size.toUInt()) + c).toByteArray() -//} -// -///** -// * Convert asynchronous socket to a [Transport.Device] using non-blocking nio, -// * in a coroutine-effective manner. Note that it runs coroutines to read/write -// * to the socket in a global scope.These are closed when transport is closed -// * or the socket is closed, for example, by network failure. -// */ -//suspend fun asyncSocketToDevice(socket: AsynchronousSocketChannel): InetTransportDevice { -// val deferredDevice = CompletableDeferred() -// globalLaunch { -// coroutineScope { -// val sendQueueEmpty = MutableStateFlow(true) -// val receiving = MutableStateFlow(false) -// // We're in block mode, every block we send worth immediate sending, we do not -// // send partial blocks, so: -// socket.setOption(TCP_NODELAY, true) -// -// // socket input is to be parsed for blocks, so we receive bytes -// // and decode them to blocks -// val input = Channel(1024) -// val inputBlocks = Channel() -// // output is blocks, so we sent transformed, framed blocks: -// val outputBlocks = Channel() -// -// fun stop() { -// kotlin.runCatching { inputBlocks.close(RemoteInterface.ClosedException()) } -// kotlin.runCatching { outputBlocks.close() } -// socket.close() -// cancel() -// } -// -// -// // copy incoming data from the socket to input channel: -// launch { -// val data = ByteArray(1024) -// val inb = ByteBuffer.wrap(data) -// kotlin.runCatching { -// while (isActive) { -// inb.position(0) -// val size: Int = suspendCoroutine { continuation -> -// socket.read(inb, continuation, IntCompletionHandler) -// } -// if (size < 0) stop() -// else { -//// println("recvd:\n${data.sliceArray(0.. -// socket.write(outBuff, continuation, IntCompletionHandler) -// } -// // be sure it was all sent -// if (outBuff.position() != data.size || cnt != data.size) { -// throw RuntimeException("unexpected partial write") -// } -// } -// // in the case of just breaking out of the loop: -// sendQueueEmpty.value = true -// } catch (_: ClosedReceiveChannelException) { -// stop() -// } -// } -// // transport device copes with blocks: -// // decode blocks from a byte channel read from the socket: -// launch { -// try { -// while (isActive) { -// receiving.value = !input.isEmpty -// val size = readVarUnsigned(input) -// receiving.value = true -// if (size == 0u) log.warning { "zero size block is ignored!" } -// else { -// val block = UByteArray(size.toInt()) -// for (i in 0.. { val selectorManager = SelectorManager(Dispatchers.IO) @@ -53,14 +61,24 @@ private fun inetTransportDevice( val outputBlocks = Channel(4096) val log = LogTag("TCPT${logCounter.incrementAndGet()}:$suffix:$networkAddress") - val stopCalled = AtomicValue(false) + val job = AtomicValue(null) + + val sockOutput = sock.openWriteChannel() + val sockInput = runCatching { sock.openReadChannel() }.getOrElse { + log.warning { "failed to open read channel $it" } + throw IllegalStateException("failed to open read channel") + } fun stop() { - stopCalled.mutate { - if (!it) { + job.mutate { + if ( it != null ) { log.debug { "stopping" } runCatching { inputBlocks.close() } runCatching { outputBlocks.close() } + // The problem: on mac platofrms closing the socket does not close its input + // and output channels! + runCatching { sockInput.cancel() } + runCatching { sockOutput.close() } if (!sock.isClosed) runCatching { log.debug { "closing socket by stop" } @@ -68,69 +86,109 @@ private fun inetTransportDevice( } else log.debug { "socket is already closed when stop is called" } + it.cancel() + log.debug { "implementation job cancel called" } } else log.debug { "already stopped" } - true + null } } - sock.launch { - log.debug { "opening read channel" } - val sockInput = runCatching { sock.openReadChannel() }.getOrElse { - log.warning { "failed to open read channel $it" } - stop() - throw IllegalStateException("failed to open read channel") - } - while (isActive && sock.isActive) { - try { - val size = AsyncVarint.decodeUnsigned(sockInput).toInt() - if (size > MAX_TCP_BLOCK_SIZE) // 16M is a max command block - throw ProtocolException("Illegal block size: $size should be < $MAX_TCP_BLOCK_SIZE") - val data = ByteArray(size) - sockInput.readFully(data, 0, size) - inputBlocks.send(data.toUByteArray()) - } catch (e: ClosedReceiveChannelException) { - log.error { "closed receive channel " } - stop() - break - } catch (_: CancellationException) { - log.error { "cancellation exception " } - break - } catch (e: Exception) { - log.exception { "unexpected exception in TCP socket read" to e } - stop() - break - } - } - } - sock.launch { - val sockOutput = sock.openWriteChannel() - while (isActive && sock.isActive) { - try { - val block = outputBlocks.receive() - AsyncVarint.encodeUnsigned(block.size.toULong(), sockOutput) - sockOutput.writeFully(block.toByteArray(), 0, block.size) - sockOutput.flush() - } catch (_: CancellationException) { - log.debug { "cancellation exception on output" } - stop() - break - } catch (_: LocalInterface.BreakConnectionException) { - log.debug { "requested connection break" } - stop() - break - } catch (_: ClosedReceiveChannelException) { - log.debug { "receive block channel closed, closing the socket" } - stop() - break - } catch (e: Exception) { - log.exception { "unexpected exception. closing." to e } - stop() - break - } - } - } + var lastActiveAt = Clock.System.now() + job.value = globalLaunch { + launch { + log.debug { "opening read channel" } + + while (isActive && sock.isActive) { + try { + val size = AsyncVarint.decodeUnsigned(sockInput).toInt() + if (size > MAX_TCP_BLOCK_SIZE) // 16M is a max command block + throw ProtocolException("Illegal block size: $size should be < $MAX_TCP_BLOCK_SIZE") + val data = ByteArray(size) + if (size == 0) { + log.debug { "ping received" } + lastActiveAt = Clock.System.now() + } else { + sockInput.readFully(data, 0, size) + inputBlocks.send(data.toUByteArray()) + } + } catch (e: ClosedReceiveChannelException) { + log.error { "closed receive channel " } + stop() + break + } catch (_: CancellationException) { + log.error { "cancellation exception " } + break + } catch (e: Exception) { + log.exception { "unexpected exception in TCP socket read" to e } + stop() + break + } + } + } + + launch { + val outAccess = Mutex() + var lastSentAt = Clock.System.now() + launch { + while (isActive && sock.isActive) { + delay(500) + val activityTime = if(lastSentAt > lastActiveAt) lastSentAt else lastActiveAt + if (Clock.System.now() - activityTime > PING_INACTIVITY_TIME) { + log.debug { "pinging for inactivity" } + val repeat = outAccess.withLock { + try { + sockOutput.writeByte(0) + sockOutput.flush() + lastSentAt = Clock.System.now() + true + } catch (e: ClosedReceiveChannelException) { + e.printStackTrace() + stop() + false + } catch (_: CancellationException) { + false + } catch (e: Throwable) { + e.printStackTrace() + stop() + false + } + } + if (!repeat) break + } + } + } + + while (isActive && sock.isActive) { + try { + val block = outputBlocks.receive() + outAccess.withLock { + AsyncVarint.encodeUnsigned(block.size.toULong(), sockOutput) + sockOutput.writeFully(block.toByteArray(), 0, block.size) + sockOutput.flush() + lastSentAt = Clock.System.now() + } + } catch (_: CancellationException) { + log.debug { "cancellation exception on output" } + stop() + break + } catch (_: LocalInterface.BreakConnectionException) { + log.debug { "requested connection break" } + stop() + break + } catch (_: ClosedReceiveChannelException) { + log.debug { "receive block channel closed, closing the socket" } + stop() + break + } catch (e: Exception) { + log.exception { "unexpected exception. closing." to e } + stop() + break + } + } + } + } val device = InetTransportDevice(inputBlocks, outputBlocks, networkAddress, { stop() }) diff --git a/src/ktorSocketTest/kotlin/TcpTest.kt b/src/ktorSocketTest/kotlin/TcpTest.kt index a398661..3a995dc 100644 --- a/src/ktorSocketTest/kotlin/TcpTest.kt +++ b/src/ktorSocketTest/kotlin/TcpTest.kt @@ -4,7 +4,6 @@ import net.sergeych.crypto2.initCrypto import net.sergeych.kiloparsec.* import net.sergeych.kiloparsec.adapter.acceptTcpDevice import net.sergeych.kiloparsec.adapter.connectTcpDevice -import net.sergeych.mp_logger.Log import kotlin.random.Random import kotlin.test.Test import kotlin.test.assertEquals @@ -16,7 +15,7 @@ class TcpTest { @Test fun tcpTest() = runTest { initCrypto() - Log.connectConsole(Log.Level.DEBUG) +// Log.connectConsole(Log.Level.DEBUG) data class Session( var data: String ) @@ -58,20 +57,14 @@ class TcpTest { assertEquals("foobar", client.call(cmdLoad)) val res = kotlin.runCatching { client.call(cmdException) } - println(res.exceptionOrNull()) assertIs(res.exceptionOrNull()) assertEquals("foobar", client.call(cmdLoad)) - println("----------------------------------- pre drops") assertThrows { client.call(cmdDrop) } - println("----------------------------------- DROPPED") - // reconnect? assertEquals("start", client.call(cmdLoad)) - println("------------------------------=---- RECONNECTED") server.close() - println("****************************************************************") } } \ No newline at end of file