work in progress: restore connection

This commit is contained in:
Sergey Chernov 2024-04-23 21:49:15 +03:00
parent 93dc66acc5
commit 51873aa9b1
5 changed files with 59 additions and 47 deletions

2
.idea/misc.xml generated
View File

@ -3,7 +3,7 @@
<component name="FrameworkDetectionExcludesConfiguration"> <component name="FrameworkDetectionExcludesConfiguration">
<file type="web" url="file://$PROJECT_DIR$" /> <file type="web" url="file://$PROJECT_DIR$" />
</component> </component>
<component name="ProjectRootManager" version="2" languageLevel="JDK_1_8" project-jdk-name="17 (5)" project-jdk-type="JavaSDK"> <component name="ProjectRootManager" version="2" languageLevel="JDK_1_8" project-jdk-name="corretto-17" project-jdk-type="JavaSDK">
<output url="file://$PROJECT_DIR$/out" /> <output url="file://$PROJECT_DIR$/out" />
</component> </component>
</project> </project>

View File

@ -47,12 +47,12 @@ kotlin {
// else -> throw GradleException("Host OS is not supported in Kotlin/Native.") // else -> throw GradleException("Host OS is not supported in Kotlin/Native.")
// } // }
macosArm64() // macosArm64()
iosX64() // iosX64()
iosArm64() // iosArm64()
iosSimulatorArm64() // iosSimulatorArm64()
linuxX64() linuxX64()
macosX64() // macosX64()
val ktor_version = "2.3.6" val ktor_version = "2.3.6"

View File

@ -147,14 +147,9 @@ class Transport<S>(
suspend fun run() { suspend fun run() {
coroutineScope { coroutineScope {
debug { "awaiting incoming blocks" } debug { "awaiting incoming blocks" }
// todo: rewrite it to close the job with no exceptions at all, always
while (isActive && !isClosed) { while (isActive && !isClosed) {
try { try {
debug { "input step starting closed=$isClosed active=$isActive"}
if( isClosed ) {
info { "breaking transort loop on closed"}
break
}
device.input.receive().let { packed -> device.input.receive().let { packed ->
debug { "<<<\n${packed.toDump()}" } debug { "<<<\n${packed.toDump()}" }
val b = unpack<Block>(packed) val b = unpack<Block>(packed)
@ -189,7 +184,6 @@ class Transport<S>(
// handler forced close // handler forced close
warning { "handler requested closing of the connection"} warning { "handler requested closing of the connection"}
isClosed = true isClosed = true
runCatching { device.close() }
throw x throw x
} catch (x: RemoteInterface.RemoteException) { } catch (x: RemoteInterface.RemoteException) {
send(Block.Error(b.id, x.code, x.text, x.extra)) send(Block.Error(b.id, x.code, x.text, x.extra))
@ -210,10 +204,7 @@ class Transport<S>(
isClosed = true isClosed = true
} }
catch (_: CancellationException) { catch (_: CancellationException) {
info { "loop is cancelled" } info { "loop is cancelled with CancellationException" }
isClosed = true
} catch( _: RemoteInterface.ClosedException) {
debug { "git closed exception here, ignoring" }
isClosed = true isClosed = true
} catch (t: Throwable) { } catch (t: Throwable) {
exception { "channel closed on error" to t } exception { "channel closed on error" to t }
@ -225,10 +216,12 @@ class Transport<S>(
access.withLock { access.withLock {
debug { "access lock obtained"} debug { "access lock obtained"}
isClosed = true isClosed = true
debug { "closgin device $device" } debug { "closing device $device, calls in queue ${calls.size}" }
runCatching { device.close() } runCatching { device.close() }
for (c in calls.values) c.completeExceptionally(RemoteInterface.ClosedException()) for (c in calls.values)
c.completeExceptionally(RemoteInterface.ClosedException())
calls.clear() calls.clear()
debug { "calls clear has been called" }
} }
debug { "no more active: $isActive / ${calls.size}" } debug { "no more active: $isActive / ${calls.size}" }
} }

View File

@ -5,6 +5,7 @@ import io.ktor.client.plugins.websocket.*
import io.ktor.http.* import io.ktor.http.*
import io.ktor.websocket.* import io.ktor.websocket.*
import kotlinx.coroutines.CancellationException import kotlinx.coroutines.CancellationException
import kotlinx.coroutines.CompletableDeferred
import kotlinx.coroutines.cancel import kotlinx.coroutines.cancel
import kotlinx.coroutines.channels.Channel import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.channels.ClosedReceiveChannelException import kotlinx.coroutines.channels.ClosedReceiveChannelException
@ -16,6 +17,7 @@ import net.sergeych.crypto2.toDump
import net.sergeych.kiloparsec.KiloClient import net.sergeych.kiloparsec.KiloClient
import net.sergeych.kiloparsec.KiloConnectionData import net.sergeych.kiloparsec.KiloConnectionData
import net.sergeych.kiloparsec.KiloInterface import net.sergeych.kiloparsec.KiloInterface
import net.sergeych.kiloparsec.RemoteInterface
import net.sergeych.mp_logger.LogTag import net.sergeych.mp_logger.LogTag
import net.sergeych.mp_logger.exception import net.sergeych.mp_logger.exception
import net.sergeych.mp_logger.info import net.sergeych.mp_logger.info
@ -54,21 +56,28 @@ fun <S> websocketClient(
url.parameters.appendAll(u.parameters) url.parameters.appendAll(u.parameters)
log.info { "kiloparsec server URL: $url" } log.info { "kiloparsec server URL: $url" }
}) { }) {
try { val closeHandle = CompletableDeferred<Boolean>()
log.info { "connected to the server" } log.info { "connected to the server" }
println("SENDING!!!") // println("SENDING!!!")
send("Helluva") // send("Helluva")
launch { launch {
try { try {
for (block in output) { for (block in output) {
send(block.toByteArray()) send(block.toByteArray())
} }
log.info { "input is closed, closing the websocket" } log.info { "input is closed, closing the websocket" }
if (closeHandle.isActive) closeHandle.complete(true)
} catch (_: ClosedSendChannelException) { } catch (_: ClosedSendChannelException) {
log.info { "send channel closed" } log.info { "send channel closed" }
} }
cancel() catch(t: Throwable) {
log.info { "unexpected exception in websock sender: ${t.stackTraceToString()}" }
closeHandle.completeExceptionally(t)
} }
if (closeHandle.isActive) closeHandle.complete(false)
}
launch {
try {
for (f in incoming) { for (f in incoming) {
if (f is Frame.Binary) { if (f is Frame.Binary) {
input.send(f.readBytes().toUByteArray().also { input.send(f.readBytes().toUByteArray().also {
@ -78,15 +87,24 @@ fun <S> websocketClient(
log.warning { "ignoring unexpected frame of type ${f.frameType}" } log.warning { "ignoring unexpected frame of type ${f.frameType}" }
} }
} }
if (closeHandle.isActive) closeHandle.complete(true)
} catch (_: CancellationException) { } catch (_: CancellationException) {
} catch( _: ClosedReceiveChannelException) { if (closeHandle.isActive) closeHandle.complete(false)
} catch (_: ClosedReceiveChannelException) {
log.warning { "receive channel closed unexpectedly" } log.warning { "receive channel closed unexpectedly" }
if (closeHandle.isActive) closeHandle.complete(false)
} catch (t: Throwable) { } catch (t: Throwable) {
log.exception { "unexpected error" to t } log.exception { "unexpected error" to t }
if (closeHandle.isActive) closeHandle.complete(false)
}
}
if(!closeHandle.await()) {
log.warning { "Client is closing with error" }
throw RemoteInterface.ClosedException()
}
} }
log.info { "closing connection" } log.info { "closing connection" }
} }
}
val device = ProxyDevice(input, output) { val device = ProxyDevice(input, output) {
input.close() input.close()
// we need to explicitly close the coroutine job, or it can hang for a long time // we need to explicitly close the coroutine job, or it can hang for a long time

View File

@ -193,7 +193,8 @@ class TransportTest {
registerError { IllegalStateException() } registerError { IllegalStateException() }
registerError { IllegalArgumentException(it) } registerError { IllegalArgumentException(it) }
} }
val kiloServerConnection = KiloServerConnection(serverInterface, d1, "server session", serverId.secretKey) val kiloServerConnection = KiloServerConnection(serverInterface, d1, "server session", serverId.secretKey
)
launch { kiloServerConnection.run() } launch { kiloServerConnection.run() }
var cnt = 0 var cnt = 0