fixed automatic connection reestablishing for web and tcp

This commit is contained in:
Sergey Chernov 2024-06-18 16:11:13 +07:00
parent 825c0bd5f7
commit 38fbca955c
6 changed files with 45 additions and 41 deletions

1
.idea/misc.xml generated
View File

@ -1,4 +1,3 @@
<?xml version="1.0" encoding="UTF-8"?>
<project version="4"> <project version="4">
<component name="ExternalStorageConfigurationManager" enabled="true" /> <component name="ExternalStorageConfigurationManager" enabled="true" />
<component name="FrameworkDetectionExcludesConfiguration"> <component name="FrameworkDetectionExcludesConfiguration">

View File

@ -35,7 +35,7 @@ class KiloClient<S>(
* to authenticate a client on connection restore, for example. * to authenticate a client on connection restore, for example.
*/ */
@Suppress("unused") @Suppress("unused")
val state = _state.asStateFlow() val connectedStateFlow = _state.asStateFlow()
private var deferredClient = CompletableDeferred<KiloClientConnection<S>>() private var deferredClient = CompletableDeferred<KiloClientConnection<S>>()

View File

@ -17,8 +17,9 @@ open class ProxyDevice(
override val input: ReceiveChannel<UByteArray> = inputChannel override val input: ReceiveChannel<UByteArray> = inputChannel
override val output: SendChannel<UByteArray> = outputChannel override val output: SendChannel<UByteArray> = outputChannel
override suspend fun close() { override suspend fun close() {
doClose?.invoke() kotlin.runCatching { doClose?.invoke() }
runCatching { inputChannel.close() } runCatching { inputChannel.close() }
runCatching { outputChannel.close() } runCatching { outputChannel.close() }
} }

View File

@ -6,14 +6,11 @@ import io.ktor.http.*
import io.ktor.websocket.* import io.ktor.websocket.*
import kotlinx.coroutines.CancellationException import kotlinx.coroutines.CancellationException
import kotlinx.coroutines.CompletableDeferred import kotlinx.coroutines.CompletableDeferred
import kotlinx.coroutines.cancel
import kotlinx.coroutines.channels.Channel import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.channels.ClosedReceiveChannelException import kotlinx.coroutines.channels.ClosedReceiveChannelException
import kotlinx.coroutines.channels.ClosedSendChannelException import kotlinx.coroutines.channels.ClosedSendChannelException
import kotlinx.coroutines.channels.ReceiveChannel
import kotlinx.coroutines.launch import kotlinx.coroutines.launch
import net.sergeych.crypto2.SigningKey import net.sergeych.crypto2.SigningKey
import net.sergeych.crypto2.toDump
import net.sergeych.kiloparsec.KiloClient import net.sergeych.kiloparsec.KiloClient
import net.sergeych.kiloparsec.KiloConnectionData import net.sergeych.kiloparsec.KiloConnectionData
import net.sergeych.kiloparsec.KiloInterface import net.sergeych.kiloparsec.KiloInterface
@ -46,7 +43,8 @@ fun <S> websocketClient(
return KiloClient(clientInterface, secretKey) { return KiloClient(clientInterface, secretKey) {
val input = Channel<UByteArray>() val input = Channel<UByteArray>()
val output = Channel<UByteArray>() val output = Channel<UByteArray>()
val job = globalLaunch { val closeHandle = CompletableDeferred<Boolean>()
globalLaunch {
val log = LogTag("KC:${counter.incrementAndGet()}") val log = LogTag("KC:${counter.incrementAndGet()}")
client.webSocket({ client.webSocket({
url.protocol = u.protocol url.protocol = u.protocol
@ -56,7 +54,6 @@ fun <S> websocketClient(
url.parameters.appendAll(u.parameters) url.parameters.appendAll(u.parameters)
log.info { "kiloparsec server URL: $url" } log.info { "kiloparsec server URL: $url" }
}) { }) {
val closeHandle = CompletableDeferred<Boolean>()
log.info { "connected to the server" } log.info { "connected to the server" }
// println("SENDING!!!") // println("SENDING!!!")
// send("Helluva") // send("Helluva")
@ -70,6 +67,7 @@ fun <S> websocketClient(
} catch (_: ClosedSendChannelException) { } catch (_: ClosedSendChannelException) {
log.info { "send channel closed" } log.info { "send channel closed" }
} }
catch(_: CancellationException) {}
catch(t: Throwable) { catch(t: Throwable) {
log.info { "unexpected exception in websock sender: ${t.stackTraceToString()}" } log.info { "unexpected exception in websock sender: ${t.stackTraceToString()}" }
closeHandle.completeExceptionally(t) closeHandle.completeExceptionally(t)
@ -80,9 +78,7 @@ fun <S> websocketClient(
try { try {
for (f in incoming) { for (f in incoming) {
if (f is Frame.Binary) { if (f is Frame.Binary) {
input.send(f.readBytes().toUByteArray().also { input.send(f.readBytes().toUByteArray())
println("incoming\n${it.toDump()}")
})
} else { } else {
log.warning { "ignoring unexpected frame of type ${f.frameType}" } log.warning { "ignoring unexpected frame of type ${f.frameType}" }
} }
@ -102,14 +98,16 @@ fun <S> websocketClient(
log.warning { "Client is closing with error" } log.warning { "Client is closing with error" }
throw RemoteInterface.ClosedException() throw RemoteInterface.ClosedException()
} }
output.close()
input.close()
} }
log.info { "closing connection" } log.info { "closing connection" }
} }
val device = ProxyDevice(input, output) { val device = ProxyDevice(input, output) {
input.close()
// we need to explicitly close the coroutine job, or it can hang for a long time // we need to explicitly close the coroutine job, or it can hang for a long time
// leaking resources. // leaking resources.
job.cancel() closeHandle.complete(true)
// job.cancel()
} }
KiloConnectionData(device, sessionMaker()) KiloConnectionData(device, sessionMaker())
} }

View File

@ -7,7 +7,6 @@ import io.ktor.websocket.*
import kotlinx.coroutines.cancel import kotlinx.coroutines.cancel
import kotlinx.coroutines.channels.Channel import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.channels.ClosedReceiveChannelException import kotlinx.coroutines.channels.ClosedReceiveChannelException
import kotlinx.coroutines.channels.ClosedSendChannelException
import kotlinx.coroutines.isActive import kotlinx.coroutines.isActive
import kotlinx.coroutines.launch import kotlinx.coroutines.launch
import net.sergeych.crypto2.SigningKey import net.sergeych.crypto2.SigningKey
@ -36,7 +35,6 @@ fun <S> Application.setupWebsocketServer(
val counter = AtomicCounter() val counter = AtomicCounter()
routing { routing {
webSocket(path) { webSocket(path) {
println("--------------------------------------------")
val log = LogTag("KWS:${counter.incrementAndGet()}") val log = LogTag("KWS:${counter.incrementAndGet()}")
log.debug { "opening the connection" } log.debug { "opening the connection" }
val input = Channel<UByteArray>(256) val input = Channel<UByteArray>(256)
@ -44,7 +42,13 @@ fun <S> Application.setupWebsocketServer(
launch { launch {
log.debug { "starting output pump" } log.debug { "starting output pump" }
while (isActive) { while (isActive) {
send(output.receive().toByteArray()) try {
send(output.receive().toByteArray())
}
catch(_: ClosedReceiveChannelException) {
log.debug { "closing output pump as output channel is closed" }
break
}
} }
log.debug { "closing output pump" } log.debug { "closing output pump" }
} }
@ -54,7 +58,10 @@ fun <S> Application.setupWebsocketServer(
createSession(), createSession(),
serverKey serverKey
) )
launch { server.run() } launch {
server.run()
close()
}
log.debug { "KSC started, looking for incoming frames" } log.debug { "KSC started, looking for incoming frames" }
for (f in incoming) { for (f in incoming) {
log.debug { "incoming frame: ${f.frameType}" } log.debug { "incoming frame: ${f.frameType}" }
@ -64,7 +71,7 @@ fun <S> Application.setupWebsocketServer(
log.debug { "in frame\n${it.toDump()}" } log.debug { "in frame\n${it.toDump()}" }
}) })
} catch (_: RemoteInterface.ClosedException) { } catch (_: RemoteInterface.ClosedException) {
log.info { "caught local closed exception, closing" } log.warning { "caught local closed exception (strange!), closing" }
break break
} catch (_: ClosedReceiveChannelException) { } catch (_: ClosedReceiveChannelException) {
log.info { "receive channel is closed, closing connection" } log.info { "receive channel is closed, closing connection" }
@ -77,9 +84,9 @@ fun <S> Application.setupWebsocketServer(
log.warning { "unknown frame type ${f.frameType}, ignoring" } log.warning { "unknown frame type ${f.frameType}, ignoring" }
} }
log.debug { "closing the server" } log.debug { "closing the server" }
println("****************prec") close()
cancel() cancel()
println("****************postc") log.debug { "server wbesock processing done" }
} }
} }
} }

View File

@ -12,10 +12,7 @@ import net.sergeych.kiloparsec.adapter.setupWebsocketServer
import net.sergeych.kiloparsec.adapter.websocketClient import net.sergeych.kiloparsec.adapter.websocketClient
import net.sergeych.mp_logger.Log import net.sergeych.mp_logger.Log
import java.net.InetAddress import java.net.InetAddress
import kotlin.test.Test import kotlin.test.*
import kotlin.test.assertEquals
import kotlin.test.assertIs
import kotlin.test.assertTrue
class ClientTest { class ClientTest {
@ -102,7 +99,7 @@ class ClientTest {
on(cmdSetFoo) { session.foo = it } on(cmdSetFoo) { session.foo = it }
on(cmdCheckConnected) { connectedCalled } on(cmdCheckConnected) { connectedCalled }
on(cmdClose) { on(cmdClose) {
throw RemoteInterface.ClosedException() throw LocalInterface.BreakConnectionException()
// if( closeCounter < 2 ) { // if( closeCounter < 2 ) {
// println("-------------------------- call close!") // println("-------------------------- call close!")
// throw RemoteInterface.ClosedException() // throw RemoteInterface.ClosedException()
@ -120,33 +117,35 @@ class ClientTest {
val client = websocketClient<Unit>("ws://localhost:8080/kp") val client = websocketClient<Unit>("ws://localhost:8080/kp")
val states = mutableListOf<Boolean>() val states = mutableListOf<Boolean>()
val collector = launch { val collector = launch {
client.state.collect { client.connectedStateFlow.collect {
println("got: $closeCounter/$it") println("got: $closeCounter/$it")
states += it states += it
if( !it) { closeCounter++ } if( !it) { closeCounter++ }
} }
} }
println(1)
assertEquals(true, client.call(cmdCheckConnected)) assertEquals(true, client.call(cmdCheckConnected))
assertTrue { client.state.value } assertTrue { client.connectedStateFlow.value }
println(2)
assertEquals("not set", client.call(cmdGetFoo)) assertEquals("not set", client.call(cmdGetFoo))
println(3)
client.call(cmdSetFoo, "foo") client.call(cmdSetFoo, "foo")
println(4)
assertEquals("foo", client.call(cmdGetFoo)) assertEquals("foo", client.call(cmdGetFoo))
println(5)
// assertThrows<RemoteInterface.ClosedException> { assertTrue { client.connectedStateFlow.value }
// client.call(cmdClose) assertThrows<RemoteInterface.ClosedException> {
// } client.call(cmdClose)
println("0------------------------------------------------------------------------------connection should be closed") }
// assertFalse { client.state.value }
// assertEquals("foo", client.call(cmdGetFoo)) // connection should now be closed
assertFalse { client.connectedStateFlow.value }
// this should be run on automatically reopen connection
client.call(cmdSetFoo,"superbar")
assertTrue { client.connectedStateFlow.value }
assertEquals("superbar", client.call(cmdGetFoo))
client.close() client.close()
ns.stop() ns.stop()
collector.cancel() collector.cancel()
println("----= states: $states") // println("----= states: $states")
println("stopped server") // println("stopped server")
println("closed client") // println("closed client")
} }
} }