separate BreakConnectionException and more correct processing for ClosedException.

This commit is contained in:
Sergey Chernov 2024-06-17 13:01:09 +07:00
parent 4f6bc3c77e
commit 0d3a8ae95c
7 changed files with 84 additions and 16 deletions

View File

@ -14,8 +14,9 @@ class KiloInterface<S> : LocalInterface<KiloScope<S>>() {
init { init {
registerError { RemoteInterface.UnknownCommand() } registerError { RemoteInterface.UnknownCommand() }
registerError { RemoteInterface.InternalError(it) }
registerError { RemoteInterface.ClosedException(it) } registerError { RemoteInterface.ClosedException(it) }
registerError { RemoteInterface.SecurityException(it) } // registerError { RemoteInterface.SecurityException(it) }
registerError { RemoteInterface.InvalidDataException(it) } registerError { RemoteInterface.InvalidDataException(it) }
registerError { RemoteInterface.RemoteException(it) } registerError { RemoteInterface.RemoteException(it) }
registerError { IllegalStateException() } registerError { IllegalStateException() }

View File

@ -33,7 +33,7 @@ internal class KiloL0Interface<T>(
0u, 0u,
clientInterface.execute(params.scope, call.name, call.serializedArgs) clientInterface.execute(params.scope, call.name, call.serializedArgs)
) )
} catch(t: RemoteInterface.ClosedException) { } catch(t: BreakConnectionException) {
throw t throw t
} }
catch (t: Throwable) { catch (t: Throwable) {

View File

@ -32,8 +32,8 @@ class KiloServer<S>(
} }
catch(_: CancellationException) { catch(_: CancellationException) {
} }
catch(_: RemoteInterface.ClosedException) { catch(cce: LocalInterface.BreakConnectionException) {
info { "Closed exception caught, closing" } info { "Closed exception caught, closing (${cce.flushSendQueue}" }
} }
catch (t: Throwable) { catch (t: Throwable) {
exception { "unexpected while creating kiloclient" to t } exception { "unexpected while creating kiloclient" to t }

View File

@ -2,6 +2,7 @@ package net.sergeych.kiloparsec
import net.sergeych.mp_logger.LogTag import net.sergeych.mp_logger.LogTag
import net.sergeych.mp_logger.Loggable import net.sergeych.mp_logger.Loggable
import net.sergeych.mp_logger.exception
import net.sergeych.mp_logger.info import net.sergeych.mp_logger.info
import net.sergeych.tools.AtomicCounter import net.sergeych.tools.AtomicCounter
import net.sergeych.utools.firstNonNull import net.sergeych.utools.firstNonNull
@ -15,6 +16,26 @@ open class LocalInterface<S> : Loggable by LogTag("LocalInterface${idCounter.inc
private val commands = mutableMapOf<String, RawCommandHandler<S>>() private val commands = mutableMapOf<String, RawCommandHandler<S>>()
/**
* Instruct the transport to immediately break the connection.
* This exception is not passed to the remote end, instead, transport device breaks
* connection to remote when receiving it.
*
* Remote interface will throw [RemoteInterface.ClosedException] as the break will be detected. As reaction time
* it depends on the transport in use, we recommend sending some registered exception first if you need
* to pass important data, or implement special commands on both sides.
*
* __Important note:__ _it is not allowed to throw [RemoteInterface.ClosedException] directly!_
* This exception is processed internally and can't be sent over the network.
*/
open class BreakConnectionException(
text: String = "break connection request",
val flushSendQueue: Boolean = true,
) : RuntimeException(text) {
override val message: String?
get() = super.message + " (flush=$flushSendQueue)"
}
/** /**
* New session creator. Rarely needed directlym it can be used for delegation * New session creator. Rarely needed directlym it can be used for delegation
* of local interfaces. * of local interfaces.
@ -77,8 +98,13 @@ open class LocalInterface<S> : Loggable by LogTag("LocalInterface${idCounter.inc
errorByClass[t::class] ?: errorProviders.firstNonNull { it.getErrorCode(t) } errorByClass[t::class] ?: errorProviders.firstNonNull { it.getErrorCode(t) }
fun encodeError(forId: UInt, t: Throwable): Transport.Block.Error = fun encodeError(forId: UInt, t: Throwable): Transport.Block.Error =
getErrorCode(t)?.let { Transport.Block.Error(forId, it, t.message) } if (t is RemoteInterface.ClosedException) {
?: Transport.Block.Error(forId, "UnknownError", "${t::class.simpleName}: ${t.message}") exception { "Illegal attempt to send ClosedException" to t }
encodeError(forId, RemoteInterface.InternalError("TCE"))
}
else
getErrorCode(t)?.let { Transport.Block.Error(forId, it, t.message) }
?: Transport.Block.Error(forId, "UnknownError", "${t::class.simpleName}: ${t.message}")
open fun getErrorBuilder(code: String): ((String, UByteArray?) -> Throwable)? = open fun getErrorBuilder(code: String): ((String, UByteArray?) -> Throwable)? =
errorBuilder[code] ?: errorProviders.firstNonNull { it.getErrorBuilder(code) } errorBuilder[code] ?: errorProviders.firstNonNull { it.getErrorBuilder(code) }

View File

@ -15,11 +15,12 @@ interface RemoteInterface {
/** /**
* Is thrown when the channel is closed, in an attempt to execute a command, also to all pending * Is thrown when the channel is closed, in an attempt to execute a command, also to all pending
* calls (see [call]). * calls (see [call]). Client code should never throw it. If command handler needs to break connection
* it should throw [LocalInterface.BreakConnectionException]
*/ */
open class ClosedException(t: String = "connection is closed") : Exception(t) open class ClosedException(t: String = "connection is closed") : Exception(t)
open class SecurityException(t: String = "invalid remote id and signature") : ClosedException(t) open class SecurityException(t: String = "invalid remote id and signature") : LocalInterface.BreakConnectionException(t)
open class InvalidDataException(msg: String="invalid data, can't unpack") : Exception(msg) open class InvalidDataException(msg: String="invalid data, can't unpack") : Exception(msg)
@ -41,6 +42,8 @@ interface RemoteInterface {
*/ */
class UnknownCommand : RemoteException("UnknownCommand") class UnknownCommand : RemoteException("UnknownCommand")
open class InternalError(code: String="0"): RemoteException("Internal error: $code")
suspend fun <R> call(cmd: Command<Unit, R>): R = call(cmd, Unit) suspend fun <R> call(cmd: Command<Unit, R>): R = call(cmd, Unit)
/** /**

View File

@ -180,11 +180,11 @@ class Transport<S>(
localInterface.execute(commandContext, b.name, b.packedArgs) localInterface.execute(commandContext, b.name, b.packedArgs)
) )
) )
} catch (x: RemoteInterface.ClosedException) { } catch (x: LocalInterface.BreakConnectionException) {
// handler forced close // handler forced close
warning { "handler requested closing of the connection"} warning { "handler requested closing of the connection (${x.flushSendQueue}"}
isClosed = true isClosed = true
throw x device.close()
} catch (x: RemoteInterface.RemoteException) { } catch (x: RemoteInterface.RemoteException) {
send(Block.Error(b.id, x.code, x.text, x.extra)) send(Block.Error(b.id, x.code, x.text, x.extra))
} catch (t: Throwable) { } catch (t: Throwable) {
@ -203,6 +203,10 @@ class Transport<S>(
info { "closed receive channel"} info { "closed receive channel"}
isClosed = true isClosed = true
} }
catch(cce: LocalInterface.BreakConnectionException) {
info { "closing connection by local request ($cce)"}
device.close()
}
catch (_: CancellationException) { catch (_: CancellationException) {
info { "loop is cancelled with CancellationException" } info { "loop is cancelled with CancellationException" }
isClosed = true isClosed = true

View File

@ -7,7 +7,10 @@ import net.sergeych.crypto2.SigningKey
import net.sergeych.crypto2.initCrypto import net.sergeych.crypto2.initCrypto
import net.sergeych.kiloparsec.* import net.sergeych.kiloparsec.*
import net.sergeych.mp_logger.Log import net.sergeych.mp_logger.Log
import kotlin.test.* import kotlin.test.Test
import kotlin.test.assertContentEquals
import kotlin.test.assertEquals
import kotlin.test.fail
private var dcnt = 0 private var dcnt = 0
fun createTestDevice(): Pair<Transport.Device, Transport.Device> { fun createTestDevice(): Pair<Transport.Device, Transport.Device> {
@ -154,6 +157,8 @@ class TransportTest {
d2.close() d2.close()
} }
class TestException(text: String) : Exception(text)
@Test @Test
fun testClient() = runTest { fun testClient() = runTest {
initCrypto() initCrypto()
@ -173,6 +178,10 @@ class TransportTest {
val serverId = SigningKey.pair() val serverId = SigningKey.pair()
val clientId = SigningKey.pair() val clientId = SigningKey.pair()
val cmdException by command<Unit, Unit>()
val cmdRemoteExceptionTest by command<Unit, String>()
val cmdBreak by command<Unit, Unit>()
val serverInterface = KiloInterface<String>().apply { val serverInterface = KiloInterface<String>().apply {
on(cmdPing) { on(cmdPing) {
"pong! [$it]" "pong! [$it]"
@ -189,15 +198,26 @@ class TransportTest {
on(cmdChainCallServer2) { on(cmdChainCallServer2) {
remote.call(cmdChainCallClient2, "$it-s2") remote.call(cmdChainCallClient2, "$it-s2")
} }
registerError { IllegalStateException() } on(cmdException) { throw TestException("te1") }
registerError { IllegalArgumentException(it) } on(cmdRemoteExceptionTest) {
try {
remote.call(cmdException)
"error!"
} catch (e: TestException) {
"ok: ${e.message}"
}
}
on(cmdBreak) { throw LocalInterface.BreakConnectionException() }
registerError { TestException(it) }
} }
val kiloServerConnection = KiloServerConnection(serverInterface, d1, "server session", serverId.secretKey val kiloServerConnection = KiloServerConnection(
serverInterface, d1, "server session", serverId.secretKey
) )
launch { kiloServerConnection.run() } launch { kiloServerConnection.run() }
var cnt = 0 var cnt = 0
val client = KiloClient { val client = KiloClient {
addErrors(serverInterface)
session { "client session!" } session { "client session!" }
secretIdKey = clientId.secretKey secretIdKey = clientId.secretKey
local { local {
@ -208,11 +228,13 @@ class TransportTest {
"client pong: $it" "client pong: $it"
} }
on(cmdChainCallClient1) { on(cmdChainCallClient1) {
remote.call(cmdChainCallServer2,"$it-c1") remote.call(cmdChainCallServer2, "$it-c1")
} }
on(cmdChainCallClient2) { "$it-c2" } on(cmdChainCallClient2) { "$it-c2" }
on(cmdException) { throw TestException("te-local") }
} }
connect { connect {
println("Called connect: $cnt")
if (cnt++ > 0) { if (cnt++ > 0) {
cancel() cancel()
fail("connect called once again") fail("connect called once again")
@ -226,6 +248,18 @@ class TransportTest {
assertEquals("server push: bar", kiloServerConnection.call(cmdPush, "bar")) assertEquals("server push: bar", kiloServerConnection.call(cmdPush, "bar"))
assertEquals("**-s1-c1-s2-c2", client.call(cmdChainCallServer1, "**")) assertEquals("**-s1-c1-s2-c2", client.call(cmdChainCallServer1, "**"))
assertThrows<TestException> { client.call(cmdException) }
assertEquals("ok: te-local", client.call(cmdRemoteExceptionTest))
assertThrows<RemoteInterface.ClosedException> {
client.call(cmdBreak)
}
// Note that current transport test is too simple,
// therefore, we can't test reconnecting, also we need server and client instances
// not connections, so that's all
d1.close() d1.close()
d2.close() d2.close()
client.close() client.close()