From 825c0bd5f7aebacabe4ef87ec63f6376112f8d33 Mon Sep 17 00:00:00 2001 From: sergeych Date: Mon, 17 Jun 2024 17:42:16 +0700 Subject: [PATCH] fixed automatic reconnection in TCP client/server --- .../net/sergeych/kiloparsec/KiloClient.kt | 34 ++++++++--- .../net/sergeych/kiloparsec/Transport.kt | 8 +++ .../kiloparsec/adapter/InetTransportDevice.kt | 6 +- .../kiloparsec/adapter/ProxyDevice.kt | 24 ++++++-- src/commonTest/kotlin/TransportTest.kt | 1 + .../kiloparsec/adapter/asyncSocketToDevice.kt | 61 ++++++++++--------- .../net/sergeych/kiloparsec/ClientTest.kt | 10 ++- 7 files changed, 95 insertions(+), 49 deletions(-) diff --git a/src/commonMain/kotlin/net/sergeych/kiloparsec/KiloClient.kt b/src/commonMain/kotlin/net/sergeych/kiloparsec/KiloClient.kt index 6b82ae2..3c4f66c 100644 --- a/src/commonMain/kotlin/net/sergeych/kiloparsec/KiloClient.kt +++ b/src/commonMain/kotlin/net/sergeych/kiloparsec/KiloClient.kt @@ -6,6 +6,8 @@ import kotlinx.coroutines.delay import kotlinx.coroutines.flow.MutableStateFlow import kotlinx.coroutines.flow.asStateFlow import kotlinx.coroutines.isActive +import kotlinx.coroutines.sync.Mutex +import kotlinx.coroutines.sync.withLock import net.sergeych.crypto2.SigningKey import net.sergeych.mp_logger.LogTag import net.sergeych.mp_logger.Loggable @@ -45,11 +47,12 @@ class KiloClient( debug { "getting connection" } val kc = connectionDataFactory() debug { "get device and session" } - val client = KiloClientConnection(localInterface, kc,secretKey) + val client = KiloClientConnection(localInterface, kc, secretKey) deferredClient.complete(client) client.run { _state.value = it } + resetDeferredClient() debug { "client run finished" } } catch (_: RemoteInterface.ClosedException) { debug { "remote closed" } @@ -62,9 +65,8 @@ class KiloClient( delay(1000) } _state.value = false - if (deferredClient.isActive) - deferredClient = CompletableDeferred() - delay(1000) + resetDeferredClient() + delay(100) } } @@ -73,7 +75,23 @@ class KiloClient( debug { "client is closed" } } - override suspend fun call(cmd: Command, args: A): R = deferredClient.await().call(cmd, args) + private val defMutex = Mutex() + private suspend fun resetDeferredClient() { + defMutex.withLock { + if (!deferredClient.isActive) { + deferredClient = CompletableDeferred() + } + + } + } + + override suspend fun call(cmd: Command, args: A): R = + try { + deferredClient.await().call(cmd, args) + } catch (t: RemoteInterface.ClosedException) { + resetDeferredClient() + throw t + } /** * Current session token. This is a per-connection unique random value same on the client and server part so @@ -142,11 +160,11 @@ class KiloClient( internal fun build(): KiloClient { val i = KiloInterface() - for(ep in errorProviders) i.addErrorProvider(ep) + for (ep in errorProviders) i.addErrorProvider(ep) interfaceBuilder?.let { i.it() } val connector = connectionBuilder ?: throw IllegalArgumentException("connect handler was not set") - return KiloClient(i,secretIdKey) { - KiloConnectionData(connector(),sessionBuilder()) + return KiloClient(i, secretIdKey) { + KiloConnectionData(connector(), sessionBuilder()) } } } diff --git a/src/commonMain/kotlin/net/sergeych/kiloparsec/Transport.kt b/src/commonMain/kotlin/net/sergeych/kiloparsec/Transport.kt index db15959..f5ec4d8 100644 --- a/src/commonMain/kotlin/net/sergeych/kiloparsec/Transport.kt +++ b/src/commonMain/kotlin/net/sergeych/kiloparsec/Transport.kt @@ -56,6 +56,8 @@ class Transport( * possible. This method must not throw exceptions. */ suspend fun close() + + suspend fun flush() {} } @Serializable(TransportBlockSerializer::class) @@ -184,6 +186,7 @@ class Transport( // handler forced close warning { "handler requested closing of the connection (${x.flushSendQueue}"} isClosed = true + if( x.flushSendQueue ) device.flush() device.close() } catch (x: RemoteInterface.RemoteException) { send(Block.Error(b.id, x.code, x.text, x.extra)) @@ -207,6 +210,11 @@ class Transport( info { "closing connection by local request ($cce)"} device.close() } + catch(t: RemoteInterface.ClosedException) { + // it is ok: we just exit the coroutine normally + // and mark we're closing + isClosed = true + } catch (_: CancellationException) { info { "loop is cancelled with CancellationException" } isClosed = true diff --git a/src/commonMain/kotlin/net/sergeych/kiloparsec/adapter/InetTransportDevice.kt b/src/commonMain/kotlin/net/sergeych/kiloparsec/adapter/InetTransportDevice.kt index ee99b3e..759d8b5 100644 --- a/src/commonMain/kotlin/net/sergeych/kiloparsec/adapter/InetTransportDevice.kt +++ b/src/commonMain/kotlin/net/sergeych/kiloparsec/adapter/InetTransportDevice.kt @@ -10,8 +10,8 @@ class InetTransportDevice( inputChannel: Channel, outputChannel: Channel, val remoteAddress: NetworkAddress, - val flush: suspend ()->Unit = {}, - doClose: suspend ()->Unit = {} -) : ProxyDevice(inputChannel, outputChannel, doClose) { + doClose: (suspend ()->Unit)? = null, + doFlush: (suspend ()->Unit)? = null, +) : ProxyDevice(inputChannel, outputChannel, doClose, doFlush) { override fun toString(): String = "@$remoteAddress" } \ No newline at end of file diff --git a/src/commonMain/kotlin/net/sergeych/kiloparsec/adapter/ProxyDevice.kt b/src/commonMain/kotlin/net/sergeych/kiloparsec/adapter/ProxyDevice.kt index 00264c7..2d4f7b9 100644 --- a/src/commonMain/kotlin/net/sergeych/kiloparsec/adapter/ProxyDevice.kt +++ b/src/commonMain/kotlin/net/sergeych/kiloparsec/adapter/ProxyDevice.kt @@ -3,20 +3,34 @@ package net.sergeych.kiloparsec.adapter import kotlinx.coroutines.channels.Channel import kotlinx.coroutines.channels.ReceiveChannel import kotlinx.coroutines.channels.SendChannel -import net.sergeych.kiloparsec.RemoteInterface +import kotlinx.coroutines.delay import net.sergeych.kiloparsec.Transport import net.sergeych.tools.AtomicCounter private val counter = AtomicCounter() open class ProxyDevice( - inputChannel: Channel, - outputChannel: Channel, - private val onClose: suspend ()->Unit = { throw RemoteInterface.ClosedException() }): Transport.Device { + private val inputChannel: Channel, + private val outputChannel: Channel, + private val doClose: (suspend ()->Unit)? = null, + private val doFlush: (suspend ()->Unit)? = null, +): Transport.Device { override val input: ReceiveChannel = inputChannel override val output: SendChannel = outputChannel override suspend fun close() { - onClose() + doClose?.invoke() + runCatching { inputChannel.close() } + runCatching { outputChannel.close() } + } + + override suspend fun flush() { + doFlush?.invoke() + var cnt = 10 + while(!outputChannel.isEmpty) { + if (cnt-- < 0) break + delay(50) + } + super.flush() } private val id = counter.incrementAndGet() diff --git a/src/commonTest/kotlin/TransportTest.kt b/src/commonTest/kotlin/TransportTest.kt index 5359c4b..3892b29 100644 --- a/src/commonTest/kotlin/TransportTest.kt +++ b/src/commonTest/kotlin/TransportTest.kt @@ -20,6 +20,7 @@ fun createTestDevice(): Pair { val d1 = object : Transport.Device { override val input: ReceiveChannel = p1 override val output: SendChannel = p2 + override suspend fun close() { p2.close() } diff --git a/src/jvmMain/kotlin/net/sergeych/kiloparsec/adapter/asyncSocketToDevice.kt b/src/jvmMain/kotlin/net/sergeych/kiloparsec/adapter/asyncSocketToDevice.kt index 70793bf..c3839c4 100644 --- a/src/jvmMain/kotlin/net/sergeych/kiloparsec/adapter/asyncSocketToDevice.kt +++ b/src/jvmMain/kotlin/net/sergeych/kiloparsec/adapter/asyncSocketToDevice.kt @@ -6,8 +6,10 @@ import kotlinx.coroutines.channels.ClosedReceiveChannelException import kotlinx.coroutines.flow.MutableStateFlow import net.sergeych.crypto2.encodeVarUnsigned import net.sergeych.crypto2.readVarUnsigned +import net.sergeych.kiloparsec.RemoteInterface import net.sergeych.kiloparsec.Transport import net.sergeych.mp_logger.LogTag +import net.sergeych.mp_logger.info import net.sergeych.mp_logger.warning import net.sergeych.mp_tools.globalLaunch import net.sergeych.tools.waitFor @@ -37,9 +39,6 @@ suspend fun asyncSocketToDevice(socket: AsynchronousSocketChannel): InetTranspor coroutineScope { val sendQueueEmpty = MutableStateFlow(true) val receiving = MutableStateFlow(false) - fun stop() { - cancel() - } // We're in block mode, every block we send worth immediate sending, we do not // send partial blocks, so: socket.setOption(TCP_NODELAY, true) @@ -47,25 +46,37 @@ suspend fun asyncSocketToDevice(socket: AsynchronousSocketChannel): InetTranspor // socket input is to be parsed for blocks, so we receive bytes // and decode them to blocks val input = Channel(1024) + val inputBlocks = Channel() + // output is blocks, so we sent transformed, framed blocks: + val outputBlocks = Channel() + + fun stop() { + kotlin.runCatching { inputBlocks.close(RemoteInterface.ClosedException()) } + kotlin.runCatching { outputBlocks.close() } + socket.close() + cancel() + } + + // copy incoming data from the socket to input channel: launch { val data = ByteArray(1024) val inb = ByteBuffer.wrap(data) - while (isActive) { - inb.position(0) - val size: Int = suspendCoroutine { continuation -> - socket.read(inb, continuation, IntCompletionHandler) - } - if (size < 0) stop() - else { + kotlin.runCatching { + while (isActive) { + inb.position(0) + val size: Int = suspendCoroutine { continuation -> + socket.read(inb, continuation, IntCompletionHandler) + } + if (size < 0) stop() + else { // println("recvd:\n${data.sliceArray(0..() // copy from output to socket: launch { try { @@ -98,7 +109,6 @@ suspend fun asyncSocketToDevice(socket: AsynchronousSocketChannel): InetTranspor } } // transport device copes with blocks: - val inputBlocks = Channel() // decode blocks from a byte channel read from the socket: launch { try { @@ -122,30 +132,21 @@ suspend fun asyncSocketToDevice(socket: AsynchronousSocketChannel): InetTranspor receiving.value = false } - // wait until send queue is empty - suspend fun flush() { - yield() - // do not slow down with collect if it is ok by now: - if (!sendQueueEmpty.value || !outputBlocks.isEmpty) - // wait until all output is sent - sendQueueEmpty.waitFor { it && outputBlocks.isEmpty } - } - val addr = socket.remoteAddress as InetSocketAddress deferredDevice.complete( - InetTransportDevice(inputBlocks, outputBlocks, JvmNetworkAddress(addr.address, addr.port), - { flush() } - ) { + InetTransportDevice(inputBlocks, outputBlocks, JvmNetworkAddress(addr.address, addr.port), { + val log = LogTag("S:${addr.address}:${addr.port}") + log.info { "ASTD is waitig to close" } yield() // wait until all received data are parsed, but not too long - withTimeoutOrNull( 1000 ) { + withTimeoutOrNull(500) { receiving.waitFor { !it } } - // graceful close: flush output - flush() // then stop it + log.info { "ASTd is calling STOP" } stop() - } + log.info { "STopped" } + }) ) } globalLaunch { socket.close() } diff --git a/src/jvmTest/kotlin/net/sergeych/kiloparsec/ClientTest.kt b/src/jvmTest/kotlin/net/sergeych/kiloparsec/ClientTest.kt index f0cc812..f56ab43 100644 --- a/src/jvmTest/kotlin/net/sergeych/kiloparsec/ClientTest.kt +++ b/src/jvmTest/kotlin/net/sergeych/kiloparsec/ClientTest.kt @@ -1,5 +1,6 @@ package net.sergeych.kiloparsec +import assertThrows import io.ktor.server.engine.* import io.ktor.server.netty.* import kotlinx.coroutines.launch @@ -45,14 +46,13 @@ class ClientTest { onConnected { session.data = "start" } on(cmdSave) { session.data = it } on(cmdLoad) { - println("load!") session.data } on(cmdException) { throw TestException() } on(cmdDrop) { - throw RemoteInterface.ClosedException() + throw LocalInterface.BreakConnectionException() } } val server = KiloServer(cli, acceptTcpDevice(17101)) { @@ -69,12 +69,16 @@ class ClientTest { client.call(cmdSave, "foobar") assertEquals("foobar", client.call(cmdLoad)) -// client.call(cmdException) val res = kotlin.runCatching { client.call(cmdException) } println(res.exceptionOrNull()) assertIs(res.exceptionOrNull()) assertEquals("foobar", client.call(cmdLoad)) + assertThrows { client.call(cmdDrop) } + + // reconnect? + assertEquals("start", client.call(cmdLoad)) + server.close() }