diff --git a/settings.gradle.kts b/settings.gradle.kts index 961a200..617937d 100644 --- a/settings.gradle.kts +++ b/settings.gradle.kts @@ -1,4 +1,4 @@ -/* + /* * Copyright (c) 2025. Sergey S. Chernov - All Rights Reserved * * You may use, distribute and modify this code under the diff --git a/src/commonMain/kotlin/net/sergeych/kiloparsec/KiloClient.kt b/src/commonMain/kotlin/net/sergeych/kiloparsec/KiloClient.kt index 474abae..c6a712d 100644 --- a/src/commonMain/kotlin/net/sergeych/kiloparsec/KiloClient.kt +++ b/src/commonMain/kotlin/net/sergeych/kiloparsec/KiloClient.kt @@ -91,9 +91,11 @@ class KiloClient( debug { "get device and session" } val client = KiloClientConnection(localInterface, kc, secretKey) deferredClient.complete(client) - client.run { + debug { "starting client run"} + val r = runCatching { client.run { _state.value = it - } + } } + debug { "----------- client run finished: $r" } resetDeferredClient() debug { "client run finished" } } catch (_: RemoteInterface.ClosedException) { @@ -109,7 +111,7 @@ class KiloClient( _state.value = false resetDeferredClient() // reconnection timeout - delay(100) + delay(700) } } diff --git a/src/commonMain/kotlin/net/sergeych/kiloparsec/KiloClientConnection.kt b/src/commonMain/kotlin/net/sergeych/kiloparsec/KiloClientConnection.kt index 2e1c130..60df23a 100644 --- a/src/commonMain/kotlin/net/sergeych/kiloparsec/KiloClientConnection.kt +++ b/src/commonMain/kotlin/net/sergeych/kiloparsec/KiloClientConnection.kt @@ -49,7 +49,6 @@ class KiloClientConnection( try { // in parallel: keys and connection val deferredKeyPair = async { SafeKeyExchange() } - debug { "opening device" } debug { "got a transport device $device" } @@ -62,10 +61,11 @@ class KiloClientConnection( debug { "transport started" } val pair = deferredKeyPair.await() - debug { "keypair ready" } + debug { "keypair ready (1)" } val serverHe = transport.call(L0Request, Handshake(1u, pair.publicKey)) + debug { "got server HE (2)" } val sk = pair.clientSessionKey(serverHe.publicKey) var params = KiloParams(false, transport, sk, session, null, this@KiloClientConnection) @@ -97,8 +97,7 @@ class KiloClientConnection( } catch (x: CancellationException) { info { "client is cancelled" } } catch (x: RemoteInterface.ClosedException) { - x.printStackTrace() - info { "connection closed by remote" } + debug { "connection closed/refused by remote" } } finally { onConnectedStateChanged?.invoke(false) job?.cancel() diff --git a/src/commonMain/kotlin/net/sergeych/kiloparsec/Transport.kt b/src/commonMain/kotlin/net/sergeych/kiloparsec/Transport.kt index a9be57f..79ad293 100644 --- a/src/commonMain/kotlin/net/sergeych/kiloparsec/Transport.kt +++ b/src/commonMain/kotlin/net/sergeych/kiloparsec/Transport.kt @@ -134,7 +134,13 @@ class Transport( } // now we have mutex freed so we can call: - val r = runCatching { device.output.send(pack(b)) } + val r = runCatching { + do { + val cr = device.output.trySend(pack(b)) + if( cr.isClosed ) throw ClosedSendChannelException("can't send block: channel is closed") + delay(100) + } while(!cr.isSuccess) + } if (!r.isSuccess) { r.exceptionOrNull()?.let { exception { "failed to send output block" to it } @@ -271,7 +277,7 @@ class Transport( } debug { "no more active: $isActive / ${calls.size}" } } - info { "exiting transport loop" } + debug { "exiting transport loop" } } private suspend fun send(block: Block) { diff --git a/src/commonMain/kotlin/net/sergeych/kiloparsec/adapter/websocketClient.kt b/src/commonMain/kotlin/net/sergeych/kiloparsec/adapter/websocketClient.kt index 81923ad..74a968a 100644 --- a/src/commonMain/kotlin/net/sergeych/kiloparsec/adapter/websocketClient.kt +++ b/src/commonMain/kotlin/net/sergeych/kiloparsec/adapter/websocketClient.kt @@ -20,12 +20,10 @@ import kotlinx.coroutines.channels.Channel import kotlinx.coroutines.channels.ClosedReceiveChannelException import kotlinx.coroutines.channels.ClosedSendChannelException import kotlinx.coroutines.launch +import kotlinx.io.IOException import net.sergeych.crypto2.SigningKey import net.sergeych.kiloparsec.* -import net.sergeych.mp_logger.LogTag -import net.sergeych.mp_logger.exception -import net.sergeych.mp_logger.info -import net.sergeych.mp_logger.warning +import net.sergeych.mp_logger.* import net.sergeych.mp_tools.globalLaunch import net.sergeych.tools.AtomicCounter @@ -67,70 +65,85 @@ fun websocketTransportDevice( val input = Channel() val output = Channel() val closeHandle = CompletableDeferred() + val readyHandle = CompletableDeferred() + globalLaunch { val log = LogTag("KC:${counter.incrementAndGet()}") - client.webSocket({ - url.protocol = u.protocol - url.host = u.host - url.port = u.port - url.encodedPath = u.encodedPath - url.parameters.appendAll(u.parameters) - log.info { "kiloparsec server URL: $url" } - }) { - log.info { "connected to the server" } + try { + client.webSocket({ + url.protocol = u.protocol + url.host = u.host + url.port = u.port + url.encodedPath = u.encodedPath + url.parameters.appendAll(u.parameters) + log.info { "kiloparsec server URL: $url" } + }) { + log.info { "connected to the server" } // println("SENDING!!!") // send("Helluva") - launch { - try { - for (block in output) { - send(block.toByteArray()) - } - log.info { "input is closed, closing the websocket" } - if (closeHandle.isActive) closeHandle.complete(true) - } catch (_: ClosedSendChannelException) { - log.info { "send channel closed" } - } catch (_: CancellationException) { - } catch (t: Throwable) { - log.info { "unexpected exception in websock sender: ${t.stackTraceToString()}" } - closeHandle.completeExceptionally(t) - } - if (closeHandle.isActive) closeHandle.complete(false) - } - launch { - try { - for (f in incoming) { - if (f is Frame.Binary) { - input.send(f.readBytes().toUByteArray()) - } else { - log.warning { "ignoring unexpected frame of type ${f.frameType}" } + readyHandle.complete(Unit) + launch { + try { + for (block in output) { + send(block.toByteArray()) } + log.info { "input is closed, closing the websocket" } + if (closeHandle.isActive) closeHandle.complete(true) + } catch (_: ClosedSendChannelException) { + log.info { "send channel closed" } + } catch (_: CancellationException) { + } catch (t: Throwable) { + log.info { "unexpected exception in websock sender: ${t.stackTraceToString()}" } + closeHandle.completeExceptionally(t) } - if (closeHandle.isActive) closeHandle.complete(true) - } catch (_: CancellationException) { - if (closeHandle.isActive) closeHandle.complete(false) - } catch (_: ClosedReceiveChannelException) { - log.warning { "receive channel closed unexpectedly" } - if (closeHandle.isActive) closeHandle.complete(false) - } catch (t: Throwable) { - log.exception { "unexpected error" to t } if (closeHandle.isActive) closeHandle.complete(false) } + launch { + try { + for (f in incoming) { + if (f is Frame.Binary) { + input.send(f.readBytes().toUByteArray()) + } else { + log.warning { "ignoring unexpected frame of type ${f.frameType}" } + } + } + if (closeHandle.isActive) closeHandle.complete(true) + } catch (_: CancellationException) { + if (closeHandle.isActive) closeHandle.complete(false) + } catch (_: ClosedReceiveChannelException) { + log.warning { "receive channel closed unexpectedly" } + if (closeHandle.isActive) closeHandle.complete(false) + } catch (t: Throwable) { + log.exception { "unexpected error" to t } + if (closeHandle.isActive) closeHandle.complete(false) + } + } + if (!closeHandle.await()) { + log.warning { "Client is closing with error" } + throw RemoteInterface.ClosedException() + } + runCatching { output.close() } + runCatching { input.close() } + runCatching { close() } } - if (!closeHandle.await()) { - log.warning { "Client is closing with error" } - throw RemoteInterface.ClosedException() - } - output.close() - input.close() + } + catch(x: IOException) { + if( "refused" in x.toString()) log.debug { "connection refused" } + else log.warning { "unexpected IO error $x" } + runCatching { output.close() } + runCatching { input.close() } } log.info { "closing connection" } } - val device = ProxyDevice(input, output) { + // Wait for connection be established or failed + val device = ProxyDevice(input, output, doClose = { // we need to explicitly close the coroutine job, or it can hang for a long time // leaking resources. + runCatching { output.close() } + runCatching { input.close() } closeHandle.complete(true) // job.cancel() - } + }) return device } diff --git a/src/jvmTest/kotlin/net/sergeych/kiloparsec/ClientTest.kt b/src/jvmTest/kotlin/net/sergeych/kiloparsec/ClientTest.kt index 550d1b3..238ff17 100644 --- a/src/jvmTest/kotlin/net/sergeych/kiloparsec/ClientTest.kt +++ b/src/jvmTest/kotlin/net/sergeych/kiloparsec/ClientTest.kt @@ -15,10 +15,13 @@ import io.ktor.server.engine.* import io.ktor.server.netty.* import kotlinx.coroutines.delay import kotlinx.coroutines.launch +import kotlinx.coroutines.runBlocking import kotlinx.coroutines.test.runTest +import net.sergeych.crypto2.SigningSecretKey import net.sergeych.crypto2.initCrypto import net.sergeych.kiloparsec.adapter.setupWebsocketServer import net.sergeych.kiloparsec.adapter.websocketClient +import net.sergeych.kiloparsec.adapter.websocketTransportDevice import net.sergeych.mp_logger.Log import java.net.InetAddress import kotlin.random.Random @@ -42,14 +45,15 @@ class ClientTest { fun webSocketTest() = runTest { initCrypto() // fun Application. - val cmdClose by command() - val cmdGetFoo by command() - val cmdSetFoo by command() - val cmdCheckConnected by command() + val cmdClose by command() + val cmdGetFoo by command() + val cmdSetFoo by command() + val cmdCheckConnected by command() Log.connectConsole(Log.Level.DEBUG) - data class Session(var foo: String="not set") + data class Session(var foo: String = "not set") + var closeCounter = 0 val serverInterface = KiloInterface().apply { var connectedCalled = false @@ -62,7 +66,7 @@ class ClientTest { } } - val port = Random.nextInt(8080,9090) + val port = Random.nextInt(8080, 9090) val ns = embeddedServer(Netty, port = port, host = "0.0.0.0", module = { setupWebsocketServer(serverInterface) { Session() } }).start(wait = false) @@ -73,7 +77,9 @@ class ClientTest { client.connectedStateFlow.collect { println("got: $closeCounter/$it") states += it - if( !it) { closeCounter++ } + if (!it) { + closeCounter++ + } } } assertEquals(true, client.call(cmdCheckConnected)) @@ -94,7 +100,7 @@ class ClientTest { assertFalse { client.connectedStateFlow.value } // this should be run on automatically reopen connection - client.call(cmdSetFoo,"superbar") + client.call(cmdSetFoo, "superbar") assertTrue { client.connectedStateFlow.value } assertEquals("superbar", client.call(cmdGetFoo)) client.close() @@ -104,4 +110,26 @@ class ClientTest { // println("stopped server") // println("closed client") } + + @Test + fun webSocketWaitForConnectTest() = runBlocking { + initCrypto() +// fun Application. + Log.connectConsole(Log.Level.DEBUG) + + val clientInterface = KiloInterface().apply {} + + val port = Random.nextInt(8080, 9090) + var clientConnectCalls = 0 + // It should repeatedly reconnect, and we will count: + KiloClient(clientInterface, SigningSecretKey.new()) { + clientConnectCalls++ + KiloConnectionData(websocketTransportDevice("ws://localhost:$port/kp"), Unit) + } + delay(1200) + // and check: +// println("connection attemtps: $clientConnectCalls") + assertTrue { clientConnectCalls > 1 } + } + } \ No newline at end of file