From c1bd6f09a9ce2e30e5ef29d55a5f947e01eadbbf Mon Sep 17 00:00:00 2001 From: sergeych Date: Tue, 18 Feb 2025 11:24:47 +0300 Subject: [PATCH] 0.6.* started: kotlin upgrade to 2.1.0 ktor updgared to 3.1.6 less debug noise yet no wasmJS --- build.gradle.kts | 7 +- .../sergeych/kiloparsec/AtomicAsyncValue.kt | 42 ++++++++++ .../kiloparsec/adapter/WebsocketServer.kt | 7 +- .../net/sergeych/kiloparsec/ClientTest.kt | 10 ++- .../kiloparsec/adapter/TcpProvider.kt | 78 +++++++++---------- .../kiloparsec/adapter/UdpProvider.kt | 6 +- .../sergeych/kiloparsec/adapter/UdpServer.kt | 15 ++-- 7 files changed, 108 insertions(+), 57 deletions(-) create mode 100644 src/commonMain/kotlin/net/sergeych/kiloparsec/AtomicAsyncValue.kt diff --git a/build.gradle.kts b/build.gradle.kts index 376aa6c..71e6b69 100644 --- a/build.gradle.kts +++ b/build.gradle.kts @@ -6,13 +6,14 @@ plugins { } group = "net.sergeych" -version = "0.5.4-SNAPSHOT" +version = "0.6.1-SNAPSHOT" repositories { mavenCentral() mavenLocal() maven("https://maven.universablockchain.com/") maven("https://gitea.sergeych.net/api/packages/SergeychWorks/maven") + maven("https://gitea.sergeych.net/api/packages/YoungBlood/maven") } kotlin { @@ -32,8 +33,10 @@ kotlin { // macosX64() // macosX64() mingwX64() +// @OptIn(ExperimentalWasmDsl::class) +// wasmJs() - val ktor_version = "2.3.12" + val ktor_version = "3.1.0" sourceSets { all { diff --git a/src/commonMain/kotlin/net/sergeych/kiloparsec/AtomicAsyncValue.kt b/src/commonMain/kotlin/net/sergeych/kiloparsec/AtomicAsyncValue.kt new file mode 100644 index 0000000..cd15e65 --- /dev/null +++ b/src/commonMain/kotlin/net/sergeych/kiloparsec/AtomicAsyncValue.kt @@ -0,0 +1,42 @@ +package net.sergeych.kiloparsec + +import kotlinx.coroutines.sync.Mutex +import kotlinx.coroutines.sync.withLock + +/** + * Multiplatform atomically mutable value to be used in [kotlinx.coroutines], + * with suspending mutating operations, see [mutate]. + * + * Actual value can be either changed in a block of [mutate] when + * new value _depends on the current value_ or with [reset]. + * + * [value] getter is suspended because it waits until the mutation finishes + */ +open class AtomicAsyncValue(initialValue: T) { + private var actualValue = initialValue + private val access = Mutex() + + /** + * Change the value: get the current and set to the returned, all in the + * atomic suspend operation. All other mutating requests including assigning to [value] + * will be blocked and queued. + * @return result of the mutation. Note that immediate call to property [value] + * could already return modified bu some other thread value! + */ + suspend fun mutate(mutator: suspend (T) -> T): T = access.withLock { + actualValue = mutator(actualValue) + actualValue + } + + /** + * Atomic get or set the value. Atomic get means if there is a [mutate] in progress + * it will wait until the mutation finishes and then return the correct result. + */ + suspend fun value() = access.withLock { actualValue } + + /** + * Set the new value without checking it. Shortcut to + * ```mutate { value = newValue }``` + */ + suspend fun reset(value: T) = mutate { value } +} \ No newline at end of file diff --git a/src/jvmMain/kotlin/net/sergeych/kiloparsec/adapter/WebsocketServer.kt b/src/jvmMain/kotlin/net/sergeych/kiloparsec/adapter/WebsocketServer.kt index 4ff5324..55cce0c 100644 --- a/src/jvmMain/kotlin/net/sergeych/kiloparsec/adapter/WebsocketServer.kt +++ b/src/jvmMain/kotlin/net/sergeych/kiloparsec/adapter/WebsocketServer.kt @@ -15,7 +15,7 @@ import net.sergeych.kiloparsec.KiloServerConnection import net.sergeych.kiloparsec.RemoteInterface import net.sergeych.mp_logger.* import net.sergeych.tools.AtomicCounter -import java.time.Duration +import kotlin.time.Duration.Companion.seconds /** * Create a ktor-based websocket server. @@ -32,7 +32,6 @@ import java.time.Duration * session could be transport specific. * * @param localInterface where the actual work is performed. - * @param timeout how long to wait for the connection to be established. * @param path default http path to the websocket. * @param serverKey optional key to authenticate the connection. If the client specify expected * server key it should match of connection will not be established. @@ -45,8 +44,8 @@ fun Application.setupWebsocketServer( createSession: () -> S, ) { install(WebSockets) { - pingPeriod = Duration.ofSeconds(15) - timeout = Duration.ofSeconds(15) + pingPeriod = 60.seconds //Duration.ofSeconds(15) + timeout = 45.seconds maxFrameSize = Long.MAX_VALUE masking = false } diff --git a/src/jvmTest/kotlin/net/sergeych/kiloparsec/ClientTest.kt b/src/jvmTest/kotlin/net/sergeych/kiloparsec/ClientTest.kt index c690abe..8a336c9 100644 --- a/src/jvmTest/kotlin/net/sergeych/kiloparsec/ClientTest.kt +++ b/src/jvmTest/kotlin/net/sergeych/kiloparsec/ClientTest.kt @@ -3,6 +3,7 @@ package net.sergeych.kiloparsec import assertThrows import io.ktor.server.engine.* import io.ktor.server.netty.* +import kotlinx.coroutines.delay import kotlinx.coroutines.launch import kotlinx.coroutines.test.runTest import net.sergeych.crypto2.initCrypto @@ -10,6 +11,7 @@ import net.sergeych.kiloparsec.adapter.setupWebsocketServer import net.sergeych.kiloparsec.adapter.websocketClient import net.sergeych.mp_logger.Log import java.net.InetAddress +import kotlin.random.Random import kotlin.test.Test import kotlin.test.assertEquals import kotlin.test.assertFalse @@ -50,11 +52,12 @@ class ClientTest { } } - val ns: NettyApplicationEngine = embeddedServer(Netty, port = 8080, host = "0.0.0.0", module = { + val port = Random.nextInt(8080,9090) + val ns = embeddedServer(Netty, port = port, host = "0.0.0.0", module = { setupWebsocketServer(serverInterface) { Session() } }).start(wait = false) - val client = websocketClient("ws://localhost:8080/kp") + val client = websocketClient("ws://localhost:$port/kp") val states = mutableListOf() val collector = launch { client.connectedStateFlow.collect { @@ -75,6 +78,9 @@ class ClientTest { } // connection should now be closed + // the problem is: it needs some unspecified time to close + // as it is async process. + delay(100) assertFalse { client.connectedStateFlow.value } // this should be run on automatically reopen connection diff --git a/src/ktorSocketMain/kotlin/net/sergeych/kiloparsec/adapter/TcpProvider.kt b/src/ktorSocketMain/kotlin/net/sergeych/kiloparsec/adapter/TcpProvider.kt index 8c9f87a..39fb1e4 100644 --- a/src/ktorSocketMain/kotlin/net/sergeych/kiloparsec/adapter/TcpProvider.kt +++ b/src/ktorSocketMain/kotlin/net/sergeych/kiloparsec/adapter/TcpProvider.kt @@ -3,6 +3,7 @@ package net.sergeych.kiloparsec.adapter import io.ktor.network.selector.* import io.ktor.network.sockets.* import io.ktor.utils.io.* +import io.ktor.utils.io.writeByte import kotlinx.coroutines.* import kotlinx.coroutines.CancellationException import kotlinx.coroutines.channels.Channel @@ -12,14 +13,10 @@ import kotlinx.coroutines.flow.flow import kotlinx.coroutines.sync.Mutex import kotlinx.coroutines.sync.withLock import kotlinx.datetime.Clock -import net.sergeych.kiloparsec.AsyncVarint -import net.sergeych.kiloparsec.KiloClient -import net.sergeych.kiloparsec.KiloServer -import net.sergeych.kiloparsec.LocalInterface +import net.sergeych.kiloparsec.* import net.sergeych.mp_logger.* import net.sergeych.mp_tools.globalLaunch import net.sergeych.tools.AtomicCounter -import net.sergeych.tools.AtomicValue import kotlin.time.Duration.Companion.seconds private val logCounter = AtomicCounter(0) @@ -33,10 +30,10 @@ internal val PING_INACTIVITY_TIME = 30.seconds * Listen for incoming TCP/IP connections on all local interfaces and the specified [port] * anc create flow of [InetTransportDevice] suitable for [KiloClient]. */ -fun acceptTcpDevice(port: Int,localInterface: String = "0.0.0.0"): Flow { +fun acceptTcpDevice(port: Int, localInterface: String = "0.0.0.0"): Flow { val selectorManager = SelectorManager(Dispatchers.IO) - val serverSocket = aSocket(selectorManager).tcp().bind(localInterface, port) return flow { + val serverSocket = aSocket(selectorManager).tcp().bind(localInterface, port) while (true) { serverSocket.accept().let { sock -> emit(inetTransportDevice(sock, "srv")) @@ -74,7 +71,7 @@ private fun inetTransportDevice( val outputBlocks = Channel(4096) val log = LogTag("TCPT${logCounter.incrementAndGet()}:$suffix:$networkAddress") - val job = AtomicValue(null) + val job = AtomicAsyncValue(null) val sockOutput = sock.openWriteChannel() val sockInput = runCatching { sock.openReadChannel() }.getOrElse { @@ -82,16 +79,16 @@ private fun inetTransportDevice( throw IllegalStateException("failed to open read channel") } - fun stop() { + suspend fun stop() { job.mutate { - if ( it != null ) { + if (it != null) { log.debug { "stopping" } runCatching { inputBlocks.close() } runCatching { outputBlocks.close() } // The problem: on mac platofrms closing the socket does not close its input // and output channels! runCatching { sockInput.cancel() } - runCatching { sockOutput.close() } + runCatching { sockOutput.flushAndClose() } if (!sock.isClosed) runCatching { log.debug { "closing socket by stop" } @@ -108,46 +105,47 @@ private fun inetTransportDevice( } var lastActiveAt = Clock.System.now() - job.value = globalLaunch { - launch { + globalLaunch { + job.reset(globalLaunch { + launch { - log.debug { "opening read channel" } + log.debug { "opening read channel" } - while (isActive && sock.isActive) { - try { - val size = AsyncVarint.decodeUnsigned(sockInput).toInt() - if (size > MAX_TCP_BLOCK_SIZE) // 16M is a max command block - throw ProtocolException("Illegal block size: $size should be < $MAX_TCP_BLOCK_SIZE") - val data = ByteArray(size) - if (size == 0) { - log.debug { "ping received" } - lastActiveAt = Clock.System.now() - } else { - sockInput.readFully(data, 0, size) - inputBlocks.send(data.toUByteArray()) + while (isActive && sock.isActive) { + try { + val size = AsyncVarint.decodeUnsigned(sockInput).toInt() + if (size > MAX_TCP_BLOCK_SIZE) // 16M is a max command block + throw ProtocolException("Illegal block size: $size should be < $MAX_TCP_BLOCK_SIZE") + val data = ByteArray(size) + if (size == 0) { + log.debug { "ping received" } + lastActiveAt = Clock.System.now() + } else { + sockInput.readFully(data, 0, size) + inputBlocks.send(data.toUByteArray()) + } + } catch (e: ClosedReceiveChannelException) { + log.error { "closed receive channel " } + stop() + break + } catch (_: CancellationException) { + log.error { "cancellation exception " } + break + } catch (e: Exception) { + log.exception { "unexpected exception in TCP socket read" to e } + stop() + break } - } catch (e: ClosedReceiveChannelException) { - log.error { "closed receive channel " } - stop() - break - } catch (_: CancellationException) { - log.error { "cancellation exception " } - break - } catch (e: Exception) { - log.exception { "unexpected exception in TCP socket read" to e } - stop() - break } } - } - + }) launch { val outAccess = Mutex() var lastSentAt = Clock.System.now() launch { while (isActive && sock.isActive) { delay(500) - val activityTime = if(lastSentAt > lastActiveAt) lastSentAt else lastActiveAt + val activityTime = if (lastSentAt > lastActiveAt) lastSentAt else lastActiveAt if (Clock.System.now() - activityTime > PING_INACTIVITY_TIME) { log.debug { "pinging for inactivity" } val repeat = outAccess.withLock { diff --git a/src/ktorSocketMain/kotlin/net/sergeych/kiloparsec/adapter/UdpProvider.kt b/src/ktorSocketMain/kotlin/net/sergeych/kiloparsec/adapter/UdpProvider.kt index fa5e2b4..a969f4d 100644 --- a/src/ktorSocketMain/kotlin/net/sergeych/kiloparsec/adapter/UdpProvider.kt +++ b/src/ktorSocketMain/kotlin/net/sergeych/kiloparsec/adapter/UdpProvider.kt @@ -87,7 +87,7 @@ fun acceptUdpDevice( * the module automatically issues pings on inactivity when there is no data often enough * to maintain the connection open. */ -fun connectUdpDevice( +suspend fun connectUdpDevice( hostPort: String, maxInactivityTimeout: Duration = 2.minutes, ) = connectUdpDevice(hostPort.toNetworkAddress(), maxInactivityTimeout) @@ -107,16 +107,16 @@ fun connectUdpDevice( * the module automatically issues pings on inactivity when there is no data often enough * to maintain the connection open. */ -fun connectUdpDevice( +suspend fun connectUdpDevice( addr: NetworkAddress, maxInactivityTimeout: Duration = 2.minutes, ): InetTransportDevice { val selectorManager = SelectorManager(Dispatchers.IO) val remoteAddress = InetSocketAddress(addr.host, addr.port) - val socket = aSocket(selectorManager).udp().connect(remoteAddress) val done = CompletableDeferred() + val socket = aSocket(selectorManager).udp().connect(remoteAddress) val transport = UdpSocketTransport(object : UdpConnector { override suspend fun sendBlock(block: UdpBlock, toAddress: SocketAddress) { socket.send(block.toDatagram(remoteAddress)) diff --git a/src/ktorSocketMain/kotlin/net/sergeych/kiloparsec/adapter/UdpServer.kt b/src/ktorSocketMain/kotlin/net/sergeych/kiloparsec/adapter/UdpServer.kt index 378a252..0c9d982 100644 --- a/src/ktorSocketMain/kotlin/net/sergeych/kiloparsec/adapter/UdpServer.kt +++ b/src/ktorSocketMain/kotlin/net/sergeych/kiloparsec/adapter/UdpServer.kt @@ -14,6 +14,7 @@ import net.sergeych.mp_logger.LogTag import net.sergeych.mp_logger.Loggable import net.sergeych.mp_logger.debug import net.sergeych.mp_logger.exception +import net.sergeych.mp_tools.globalDefer import kotlin.time.Duration import kotlin.time.Duration.Companion.minutes @@ -51,7 +52,9 @@ class UdpServer(val port: Int, localInterface: String = "0.0.0.0", maxInactivity private val access = Mutex() private val selectorManager = SelectorManager(Dispatchers.IO) - private val serverSocket = aSocket(selectorManager).udp().bind(InetSocketAddress(localInterface, port)) + private val serverSocket = globalDefer { + aSocket(selectorManager).udp().bind(InetSocketAddress(localInterface, port)) + } override suspend fun disconnectClient(address: SocketAddress) { access.withLock { sessions.remove(address) } @@ -65,7 +68,7 @@ class UdpServer(val port: Int, localInterface: String = "0.0.0.0", maxInactivity flow { while (true) { try { - val datagram = serverSocket.receive() + val datagram = serverSocket.await().receive() val block = UdpBlock.decode(datagram) val remoteAddress = datagram.address @@ -97,10 +100,10 @@ class UdpServer(val port: Int, localInterface: String = "0.0.0.0", maxInactivity } override suspend fun sendBlock(block: UdpBlock, toAddress: SocketAddress) { - serverSocket.send(block.toDatagram(toAddress)) + serverSocket.await().send(block.toDatagram(toAddress)) } - val isClosed: Boolean get() = serverSocket.isClosed + suspend fun isClosed(): Boolean = serverSocket.await().isClosed /** * Close the UDP server. Calling it will cause: @@ -113,8 +116,8 @@ class UdpServer(val port: Int, localInterface: String = "0.0.0.0", maxInactivity */ suspend fun close() { access.withLock { - if (!isClosed) { - runCatching { serverSocket.close() } + if (!isClosed()) { + runCatching { serverSocket.await().close() } } } while (sessions.isNotEmpty()) {