fixed bug that prevented kilo client to restart if the channel was closed while channel send operation was suspended (featurebug of channels, one of)
This commit is contained in:
		
							parent
							
								
									146878629e
								
							
						
					
					
						commit
						7e1f7ec4aa
					
				| @ -1,4 +1,4 @@ | ||||
| /* | ||||
|     /* | ||||
|  * Copyright (c) 2025. Sergey S. Chernov - All Rights Reserved | ||||
|  * | ||||
|  *  You may use, distribute and modify this code under the | ||||
|  | ||||
| @ -91,9 +91,11 @@ class KiloClient<S>( | ||||
|                     debug { "get device and session" } | ||||
|                     val client = KiloClientConnection(localInterface, kc, secretKey) | ||||
|                     deferredClient.complete(client) | ||||
|                     client.run { | ||||
|                     debug { "starting client run"} | ||||
|                     val r = runCatching {  client.run { | ||||
|                         _state.value = it | ||||
|                     } | ||||
|                     } } | ||||
|                     debug { "----------- client run finished: $r" } | ||||
|                     resetDeferredClient() | ||||
|                     debug { "client run finished" } | ||||
|                 } catch (_: RemoteInterface.ClosedException) { | ||||
| @ -109,7 +111,7 @@ class KiloClient<S>( | ||||
|                 _state.value = false | ||||
|                 resetDeferredClient() | ||||
|                 // reconnection timeout | ||||
|                 delay(100) | ||||
|                 delay(700) | ||||
|             } | ||||
|         } | ||||
| 
 | ||||
|  | ||||
| @ -49,7 +49,6 @@ class KiloClientConnection<S>( | ||||
|             try { | ||||
|                 // in parallel: keys and connection | ||||
|                 val deferredKeyPair = async { SafeKeyExchange() } | ||||
|                 debug { "opening device" } | ||||
|                 debug { "got a transport device $device" } | ||||
| 
 | ||||
| 
 | ||||
| @ -62,10 +61,11 @@ class KiloClientConnection<S>( | ||||
|                 debug { "transport started" } | ||||
| 
 | ||||
|                 val pair = deferredKeyPair.await() | ||||
|                 debug { "keypair ready" } | ||||
|                 debug { "keypair ready (1)" } | ||||
| 
 | ||||
|                 val serverHe = transport.call(L0Request, Handshake(1u, pair.publicKey)) | ||||
| 
 | ||||
|                 debug { "got server HE (2)" } | ||||
|                 val sk = pair.clientSessionKey(serverHe.publicKey) | ||||
|                 var params = KiloParams(false, transport, sk, session, null, this@KiloClientConnection) | ||||
| 
 | ||||
| @ -97,8 +97,7 @@ class KiloClientConnection<S>( | ||||
|             } catch (x: CancellationException) { | ||||
|                 info { "client is cancelled" } | ||||
|             } catch (x: RemoteInterface.ClosedException) { | ||||
|                 x.printStackTrace() | ||||
|                 info { "connection closed by remote" } | ||||
|                 debug { "connection closed/refused by remote" } | ||||
|             } finally { | ||||
|                 onConnectedStateChanged?.invoke(false) | ||||
|                 job?.cancel() | ||||
|  | ||||
| @ -134,7 +134,13 @@ class Transport<S>( | ||||
|         } | ||||
| 
 | ||||
|         // now we have mutex freed so we can call: | ||||
|         val r = runCatching { device.output.send(pack(b)) } | ||||
|         val r = runCatching { | ||||
|             do { | ||||
|                 val cr = device.output.trySend(pack(b)) | ||||
|                 if( cr.isClosed ) throw ClosedSendChannelException("can't send block: channel is closed") | ||||
|                 delay(100) | ||||
|             } while(!cr.isSuccess) | ||||
|         } | ||||
|         if (!r.isSuccess) { | ||||
|             r.exceptionOrNull()?.let { | ||||
|                 exception { "failed to send output block" to it } | ||||
| @ -271,7 +277,7 @@ class Transport<S>( | ||||
|             } | ||||
|             debug { "no more active: $isActive / ${calls.size}" } | ||||
|         } | ||||
|         info { "exiting transport loop" } | ||||
|         debug { "exiting transport loop" } | ||||
|     } | ||||
| 
 | ||||
|     private suspend fun send(block: Block) { | ||||
|  | ||||
| @ -20,12 +20,10 @@ import kotlinx.coroutines.channels.Channel | ||||
| import kotlinx.coroutines.channels.ClosedReceiveChannelException | ||||
| import kotlinx.coroutines.channels.ClosedSendChannelException | ||||
| import kotlinx.coroutines.launch | ||||
| import kotlinx.io.IOException | ||||
| import net.sergeych.crypto2.SigningKey | ||||
| import net.sergeych.kiloparsec.* | ||||
| import net.sergeych.mp_logger.LogTag | ||||
| import net.sergeych.mp_logger.exception | ||||
| import net.sergeych.mp_logger.info | ||||
| import net.sergeych.mp_logger.warning | ||||
| import net.sergeych.mp_logger.* | ||||
| import net.sergeych.mp_tools.globalLaunch | ||||
| import net.sergeych.tools.AtomicCounter | ||||
| 
 | ||||
| @ -67,70 +65,85 @@ fun websocketTransportDevice( | ||||
|     val input = Channel<UByteArray>() | ||||
|     val output = Channel<UByteArray>() | ||||
|     val closeHandle = CompletableDeferred<Boolean>() | ||||
|     val readyHandle = CompletableDeferred<Unit>() | ||||
| 
 | ||||
|     globalLaunch { | ||||
|         val log = LogTag("KC:${counter.incrementAndGet()}") | ||||
|         client.webSocket({ | ||||
|             url.protocol = u.protocol | ||||
|             url.host = u.host | ||||
|             url.port = u.port | ||||
|             url.encodedPath = u.encodedPath | ||||
|             url.parameters.appendAll(u.parameters) | ||||
|             log.info { "kiloparsec server URL: $url" } | ||||
|         }) { | ||||
|             log.info { "connected to the server" } | ||||
|         try { | ||||
|             client.webSocket({ | ||||
|                 url.protocol = u.protocol | ||||
|                 url.host = u.host | ||||
|                 url.port = u.port | ||||
|                 url.encodedPath = u.encodedPath | ||||
|                 url.parameters.appendAll(u.parameters) | ||||
|                 log.info { "kiloparsec server URL: $url" } | ||||
|             }) { | ||||
|                 log.info { "connected to the server" } | ||||
| //                println("SENDING!!!") | ||||
| //                send("Helluva") | ||||
|             launch { | ||||
|                 try { | ||||
|                     for (block in output) { | ||||
|                         send(block.toByteArray()) | ||||
|                     } | ||||
|                     log.info { "input is closed, closing the websocket" } | ||||
|                     if (closeHandle.isActive) closeHandle.complete(true) | ||||
|                 } catch (_: ClosedSendChannelException) { | ||||
|                     log.info { "send channel closed" } | ||||
|                 } catch (_: CancellationException) { | ||||
|                 } catch (t: Throwable) { | ||||
|                     log.info { "unexpected exception in websock sender: ${t.stackTraceToString()}" } | ||||
|                     closeHandle.completeExceptionally(t) | ||||
|                 } | ||||
|                 if (closeHandle.isActive) closeHandle.complete(false) | ||||
|             } | ||||
|             launch { | ||||
|                 try { | ||||
|                     for (f in incoming) { | ||||
|                         if (f is Frame.Binary) { | ||||
|                             input.send(f.readBytes().toUByteArray()) | ||||
|                         } else { | ||||
|                             log.warning { "ignoring unexpected frame of type ${f.frameType}" } | ||||
|                 readyHandle.complete(Unit) | ||||
|                 launch { | ||||
|                     try { | ||||
|                         for (block in output) { | ||||
|                             send(block.toByteArray()) | ||||
|                         } | ||||
|                         log.info { "input is closed, closing the websocket" } | ||||
|                         if (closeHandle.isActive) closeHandle.complete(true) | ||||
|                     } catch (_: ClosedSendChannelException) { | ||||
|                         log.info { "send channel closed" } | ||||
|                     } catch (_: CancellationException) { | ||||
|                     } catch (t: Throwable) { | ||||
|                         log.info { "unexpected exception in websock sender: ${t.stackTraceToString()}" } | ||||
|                         closeHandle.completeExceptionally(t) | ||||
|                     } | ||||
|                     if (closeHandle.isActive) closeHandle.complete(true) | ||||
|                 } catch (_: CancellationException) { | ||||
|                     if (closeHandle.isActive) closeHandle.complete(false) | ||||
|                 } catch (_: ClosedReceiveChannelException) { | ||||
|                     log.warning { "receive channel closed unexpectedly" } | ||||
|                     if (closeHandle.isActive) closeHandle.complete(false) | ||||
|                 } catch (t: Throwable) { | ||||
|                     log.exception { "unexpected error" to t } | ||||
|                     if (closeHandle.isActive) closeHandle.complete(false) | ||||
|                 } | ||||
|                 launch { | ||||
|                     try { | ||||
|                         for (f in incoming) { | ||||
|                             if (f is Frame.Binary) { | ||||
|                                 input.send(f.readBytes().toUByteArray()) | ||||
|                             } else { | ||||
|                                 log.warning { "ignoring unexpected frame of type ${f.frameType}" } | ||||
|                             } | ||||
|                         } | ||||
|                         if (closeHandle.isActive) closeHandle.complete(true) | ||||
|                     } catch (_: CancellationException) { | ||||
|                         if (closeHandle.isActive) closeHandle.complete(false) | ||||
|                     } catch (_: ClosedReceiveChannelException) { | ||||
|                         log.warning { "receive channel closed unexpectedly" } | ||||
|                         if (closeHandle.isActive) closeHandle.complete(false) | ||||
|                     } catch (t: Throwable) { | ||||
|                         log.exception { "unexpected error" to t } | ||||
|                         if (closeHandle.isActive) closeHandle.complete(false) | ||||
|                     } | ||||
|                 } | ||||
|                 if (!closeHandle.await()) { | ||||
|                     log.warning { "Client is closing with error" } | ||||
|                     throw RemoteInterface.ClosedException() | ||||
|                 } | ||||
|                 runCatching { output.close() } | ||||
|                 runCatching { input.close() } | ||||
|                 runCatching { close() } | ||||
|             } | ||||
|             if (!closeHandle.await()) { | ||||
|                 log.warning { "Client is closing with error" } | ||||
|                 throw RemoteInterface.ClosedException() | ||||
|             } | ||||
|             output.close() | ||||
|             input.close() | ||||
|         } | ||||
|         catch(x: IOException) { | ||||
|             if( "refused" in x.toString()) log.debug { "connection refused" } | ||||
|             else log.warning { "unexpected IO error $x" } | ||||
|             runCatching { output.close() } | ||||
|             runCatching { input.close() } | ||||
|         } | ||||
|         log.info { "closing connection" } | ||||
|     } | ||||
|     val device = ProxyDevice(input, output) { | ||||
|     // Wait for connection be established or failed | ||||
|     val device = ProxyDevice(input, output, doClose = { | ||||
|         // we need to explicitly close the coroutine job, or it can hang for a long time | ||||
|         // leaking resources. | ||||
|         runCatching { output.close() } | ||||
|         runCatching { input.close() } | ||||
|         closeHandle.complete(true) | ||||
| //            job.cancel() | ||||
|     } | ||||
|     }) | ||||
|     return device | ||||
| } | ||||
| 
 | ||||
|  | ||||
| @ -15,10 +15,13 @@ import io.ktor.server.engine.* | ||||
| import io.ktor.server.netty.* | ||||
| import kotlinx.coroutines.delay | ||||
| import kotlinx.coroutines.launch | ||||
| import kotlinx.coroutines.runBlocking | ||||
| import kotlinx.coroutines.test.runTest | ||||
| import net.sergeych.crypto2.SigningSecretKey | ||||
| import net.sergeych.crypto2.initCrypto | ||||
| import net.sergeych.kiloparsec.adapter.setupWebsocketServer | ||||
| import net.sergeych.kiloparsec.adapter.websocketClient | ||||
| import net.sergeych.kiloparsec.adapter.websocketTransportDevice | ||||
| import net.sergeych.mp_logger.Log | ||||
| import java.net.InetAddress | ||||
| import kotlin.random.Random | ||||
| @ -42,14 +45,15 @@ class ClientTest { | ||||
|     fun webSocketTest() = runTest { | ||||
|         initCrypto() | ||||
| //        fun Application. | ||||
|         val cmdClose by command<Unit,Unit>() | ||||
|         val cmdGetFoo by command<Unit,String>() | ||||
|         val cmdSetFoo by command<String,Unit>() | ||||
|         val cmdCheckConnected by command<Unit,Boolean>() | ||||
|         val cmdClose by command<Unit, Unit>() | ||||
|         val cmdGetFoo by command<Unit, String>() | ||||
|         val cmdSetFoo by command<String, Unit>() | ||||
|         val cmdCheckConnected by command<Unit, Boolean>() | ||||
| 
 | ||||
|         Log.connectConsole(Log.Level.DEBUG) | ||||
| 
 | ||||
|         data class Session(var foo: String="not set") | ||||
|         data class Session(var foo: String = "not set") | ||||
| 
 | ||||
|         var closeCounter = 0 | ||||
|         val serverInterface = KiloInterface<Session>().apply { | ||||
|             var connectedCalled = false | ||||
| @ -62,7 +66,7 @@ class ClientTest { | ||||
|             } | ||||
|         } | ||||
| 
 | ||||
|         val port = Random.nextInt(8080,9090) | ||||
|         val port = Random.nextInt(8080, 9090) | ||||
|         val ns = embeddedServer(Netty, port = port, host = "0.0.0.0", module = { | ||||
|             setupWebsocketServer(serverInterface) { Session() } | ||||
|         }).start(wait = false) | ||||
| @ -73,7 +77,9 @@ class ClientTest { | ||||
|             client.connectedStateFlow.collect { | ||||
|                 println("got: $closeCounter/$it") | ||||
|                 states += it | ||||
|                 if( !it) { closeCounter++ } | ||||
|                 if (!it) { | ||||
|                     closeCounter++ | ||||
|                 } | ||||
|             } | ||||
|         } | ||||
|         assertEquals(true, client.call(cmdCheckConnected)) | ||||
| @ -94,7 +100,7 @@ class ClientTest { | ||||
|         assertFalse { client.connectedStateFlow.value } | ||||
| 
 | ||||
|         // this should be run on automatically reopen connection | ||||
|         client.call(cmdSetFoo,"superbar") | ||||
|         client.call(cmdSetFoo, "superbar") | ||||
|         assertTrue { client.connectedStateFlow.value } | ||||
|         assertEquals("superbar", client.call(cmdGetFoo)) | ||||
|         client.close() | ||||
| @ -104,4 +110,26 @@ class ClientTest { | ||||
| //        println("stopped server") | ||||
| //        println("closed client") | ||||
|     } | ||||
| 
 | ||||
|     @Test | ||||
|     fun webSocketWaitForConnectTest() = runBlocking { | ||||
|         initCrypto() | ||||
| //        fun Application. | ||||
|         Log.connectConsole(Log.Level.DEBUG) | ||||
| 
 | ||||
|         val clientInterface = KiloInterface<Unit>().apply {} | ||||
| 
 | ||||
|         val port = Random.nextInt(8080, 9090) | ||||
|         var clientConnectCalls = 0 | ||||
|         // It should repeatedly reconnect, and we will count: | ||||
|         KiloClient(clientInterface, SigningSecretKey.new()) { | ||||
|             clientConnectCalls++ | ||||
|             KiloConnectionData(websocketTransportDevice("ws://localhost:$port/kp"), Unit) | ||||
|         } | ||||
|         delay(1200) | ||||
|         // and check: | ||||
| //        println("connection attemtps: $clientConnectCalls") | ||||
|         assertTrue { clientConnectCalls > 1 } | ||||
|     } | ||||
| 
 | ||||
| } | ||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user