added reconnection/connection waiting logic

added way to close cannection from the adapter if the protocol implementation allows it (adapter.onCancel/adapter.cancel())
This commit is contained in:
Sergey Chernov 2022-12-18 03:30:11 +01:00
parent 953907b197
commit 16b2d1780b
6 changed files with 138 additions and 23 deletions

View File

@ -10,6 +10,8 @@ Please note about versioning
Current stable version is __0.3.3__. Current stable version is __0.3.3__.
- v.0.4.* - improved cancellation & fixed auto-reconnection logic.
This is a connection-agnostic, full-duplex RPC type binary protocol, effective to work with binary data, such as encrypted data, keys, multimedia, etc. Its key points are: This is a connection-agnostic, full-duplex RPC type binary protocol, effective to work with binary data, such as encrypted data, keys, multimedia, etc. Its key points are:
- simple and practical transport RPC layer, which is a primary choice when, for exaple, `wss://` level by TSL is enough, e.g. when there is no sensitive data being transmitted (games, etc). - simple and practical transport RPC layer, which is a primary choice when, for exaple, `wss://` level by TSL is enough, e.g. when there is no sensitive data being transmitted (games, etc).

View File

@ -10,7 +10,7 @@ plugins {
} }
group = "net.sergeych" group = "net.sergeych"
version = "0.3.3" version = "0.4.0-SNAPSHOT"
repositories { repositories {
mavenCentral() mavenCentral()

View File

@ -2,6 +2,7 @@
package net.sergeych.parsec3 package net.sergeych.parsec3
import io.ktor.utils.io.errors.*
import kotlinx.coroutines.* import kotlinx.coroutines.*
import kotlinx.coroutines.sync.Mutex import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock import kotlinx.coroutines.sync.withLock
@ -77,6 +78,12 @@ open class Adapter<T: WithAdapter>(
val scope = CoroutineScope(GlobalScope.coroutineContext) val scope = CoroutineScope(GlobalScope.coroutineContext)
/**
* If you plaan to cancel the adapter from outside its context (e.g. from API command or like)
* provide specific close code that frees resource for this adapter (like closing websocket connection).
*/
var onCancel: suspend ()->Unit = { throw NotImplementedError("this adapted has no onCancel implementation, provide it")}
private val completions = mutableMapOf<Int, CompletableDeferred<ByteArray>>() private val completions = mutableMapOf<Int, CompletableDeferred<ByteArray>>()
private var lastId = 1 private var lastId = 1
private val access = Mutex() private val access = Mutex()
@ -92,13 +99,13 @@ open class Adapter<T: WithAdapter>(
*/ */
@Suppress("UNCHECKED_CAST") @Suppress("UNCHECKED_CAST")
suspend fun <A, R> invokeCommand(ca: CommandDescriptor<A, R>, args: A = Unit as A): R { suspend fun <A, R> invokeCommand(ca: CommandDescriptor<A, R>, args: A = Unit as A): R {
var myId = -1 // var myId = -1
return CompletableDeferred<ByteArray>().also { dr -> return CompletableDeferred<ByteArray>().also { dr ->
sendPackage( sendPackage(
access.withLock { access.withLock {
// debug { "calling $lastId:${ca.name}($args)" } // debug { "calling $lastId:${ca.name}($args)" }
completions[lastId] = dr completions[lastId] = dr
myId = lastId // myId = lastId
Package.Command(lastId++, ca.name, BossEncoder.encode(ca.ass, args)) Package.Command(lastId++, ca.name, BossEncoder.encode(ca.ass, args))
} }
) )
@ -114,8 +121,9 @@ open class Adapter<T: WithAdapter>(
* *
* Not calling it might cause unknown number of pending command processing coroutines to remain active. * Not calling it might cause unknown number of pending command processing coroutines to remain active.
*/ */
fun cancel() { suspend fun cancel() {
scope.cancel() scope.cancel()
onCancel.invoke()
} }
/** /**
@ -136,6 +144,8 @@ open class Adapter<T: WithAdapter>(
sendPackage( sendPackage(
Package.Response(pe.id, result) Package.Response(pe.id, result)
) )
} catch(_: CancellationException) {
// just ignore it
} catch (ae: ParsecException) { } catch (ae: ParsecException) {
sendPackage(Package.Response(pe.id, null, ae.code, ae.text)) sendPackage(Package.Response(pe.id, null, ae.code, ae.text))
} catch (ex: Throwable) { } catch (ex: Throwable) {
@ -167,7 +177,7 @@ open class Adapter<T: WithAdapter>(
} }
private suspend fun sendPackage(pe: Package) { private suspend fun sendPackage(pe: Package) {
sendEncoded(BossEncoder.encode(pe)) sendEncoded(BossEncoder.encode(pe))
} }
/** /**
@ -182,6 +192,19 @@ open class Adapter<T: WithAdapter>(
} }
} }
class CloseError : IOException("adapter is closed")
/**
* Frees any allocater resources, for example, pending commands.
* Any protocol implementatino MUST call it when connection is closed.
*/
fun close() {
val error = CloseError()
completions.forEach {
it.value.completeExceptionally(error)
}
}
companion object { companion object {
val format = Json { prettyPrint = true } val format = Json { prettyPrint = true }
} }

View File

@ -3,11 +3,12 @@ package net.sergeych.parsec3
import io.ktor.client.* import io.ktor.client.*
import io.ktor.client.plugins.websocket.* import io.ktor.client.plugins.websocket.*
import io.ktor.websocket.* import io.ktor.websocket.*
import kotlinx.coroutines.CompletableDeferred import kotlinx.coroutines.*
import kotlinx.coroutines.flow.MutableStateFlow import kotlinx.coroutines.flow.MutableStateFlow
import kotlinx.coroutines.flow.StateFlow import kotlinx.coroutines.flow.StateFlow
import kotlinx.coroutines.launch
import net.sergeych.mp_logger.LogTag import net.sergeych.mp_logger.LogTag
import net.sergeych.mp_logger.debug
import net.sergeych.mp_logger.exception
import net.sergeych.mp_logger.info import net.sergeych.mp_logger.info
import net.sergeych.mp_tools.globalLaunch import net.sergeych.mp_tools.globalLaunch
@ -29,10 +30,13 @@ class Parsec3WSClient<S : WithAdapter, H : CommandHost<S>>(
builder: AdapterBuilder<S, H>.() -> Unit, builder: AdapterBuilder<S, H>.() -> Unit,
) : LogTag("P3WSC"), Parsec3Transport<S> { ) : LogTag("P3WSC"), Parsec3Transport<S> {
private lateinit var connectionJob: Job
val builder = AdapterBuilder(api, exceptionsRegistry, builder) val builder = AdapterBuilder(api, exceptionsRegistry, builder)
private val _connectionFlow = MutableStateFlow(false) private val _connectionFlow = MutableStateFlow(false)
private val closeFlow = MutableStateFlow(false) private val closeFlow = MutableStateFlow(false)
// This one is used to send reconnection signal
private val reconnectFlow = MutableStateFlow(false) private val reconnectFlow = MutableStateFlow(false)
override val connectedFlow: StateFlow<Boolean> = _connectionFlow override val connectedFlow: StateFlow<Boolean> = _connectionFlow
@ -55,24 +59,42 @@ class Parsec3WSClient<S : WithAdapter, H : CommandHost<S>>(
override suspend fun adapter(): Adapter<S> = deferredAdapter.await() override suspend fun adapter(): Adapter<S> = deferredAdapter.await()
fun start() { fun start() {
globalLaunch { connectionJob = globalLaunch {
while (closeFlow.value != true) { while (closeFlow.value != true) {
reconnectFlow.value = false try {
client.webSocket(url) { reconnectFlow.value = false
info { "Connected to $url" } debug { "trying to connect to $url" }
val a = builder.create { send(Frame.Binary(true, it)) } client.webSocket(url) {
_connectionFlow.value = true info { "Connected to $url" }
launch { closeFlow.collect { if (it == true) close() } } val a = builder.create { send(Frame.Binary(true, it)) }
launch { reconnectFlow.collect { if (it == true) close() } } a.onCancel = { close() }
deferredAdapter.complete(a) _connectionFlow.value = true
for (f in incoming) launch { closeFlow.collect { if (it == true) close() } }
if (f is Frame.Binary) a.receiveFrame(f.data) launch { reconnectFlow.collect { if (it == true) close() } }
// when we leave connection will be closed. So far we do not close the adapter, deferredAdapter.complete(a)
// should we? for (f in incoming)
if (f is Frame.Binary) a.receiveFrame(f.data)
debug { "disconnecting $url" }
_connectionFlow.value = false
deferredAdapter = CompletableDeferred()
a.close()
cancel()
}
info { "connection to $url is closed normally" }
}
catch(x: CancellationException) {
info { "parsec3 connector job cancelled" }
return@globalLaunch
}
catch(t: Throwable) {
exception { "connection process failed, will try to reconnect" to t }
_connectionFlow.value = false _connectionFlow.value = false
deferredAdapter = CompletableDeferred() if( !deferredAdapter.isActive)
deferredAdapter = CompletableDeferred()
delay(200)
} }
} }
debug { "exiting ws connection loop" }
} }
} }

View File

@ -39,6 +39,7 @@ fun <S: WithAdapter, H : CommandHost<S>>Application.parsec3TransportServer(
routing { routing {
webSocket(path) { webSocket(path) {
val adapter = builder.create { outgoing.send(Frame.Binary(true, it)) } val adapter = builder.create { outgoing.send(Frame.Binary(true, it)) }
adapter.onCancel = {close()}
for (frame in (this@webSocket).incoming) { for (frame in (this@webSocket).incoming) {
when (frame) { when (frame) {
is Frame.Binary -> { is Frame.Binary -> {
@ -49,6 +50,7 @@ fun <S: WithAdapter, H : CommandHost<S>>Application.parsec3TransportServer(
} }
} }
} }
adapter.close()
} }
var totalConnections = AtomicLong(0) var totalConnections = AtomicLong(0)
var activeConnections = AtomicInteger(0) var activeConnections = AtomicInteger(0)

View File

@ -1,8 +1,10 @@
package net.sergeych.parsec3 package net.sergeych.parsec3
import assertThrows
import io.ktor.server.engine.* import io.ktor.server.engine.*
import io.ktor.server.netty.* import io.ktor.server.netty.*
import kotlinx.coroutines.runBlocking import kotlinx.coroutines.runBlocking
import net.sergeych.mp_logger.Log
import kotlin.test.Test import kotlin.test.Test
import kotlin.test.assertEquals import kotlin.test.assertEquals
@ -12,6 +14,7 @@ internal class WsServerKtTest {
object TestApiServer: CommandHost<TestSession>() { object TestApiServer: CommandHost<TestSession>() {
val foo by command<String,String>() val foo by command<String,String>()
val ping by command<Unit,String>()
} }
object TestApiClient: CommandHost<WithAdapter>() { object TestApiClient: CommandHost<WithAdapter>() {
val bar by command<String,String>() val bar by command<String,String>()
@ -20,7 +23,7 @@ internal class WsServerKtTest {
@Test @Test
fun testWsServer() { fun testWsServer() {
embeddedServer(Netty, port = 8080) { embeddedServer(Netty, port = 8081) {
parsec3TransportServer( parsec3TransportServer(
TestApiServer, TestApiServer,
) { ) {
@ -31,7 +34,7 @@ internal class WsServerKtTest {
} }
}.start(wait = false) }.start(wait = false)
val client = Parsec3WSClient("ws://localhost:8080/api/p3", TestApiClient) { val client = Parsec3WSClient("ws://localhost:8081/api/p3", TestApiClient) {
on(api.bar) { on(api.bar) {
"bar:$it" "bar:$it"
} }
@ -43,4 +46,67 @@ internal class WsServerKtTest {
} }
} }
@Test
fun testWsServerReconnect() {
embeddedServer(Netty, port = 8080) {
parsec3TransportServer(
TestApiServer,
) {
newSession { TestSession() }
var count = 0
on(api.foo) {
adapter.cancel()
"cancelled:${count++}"
}
on(api.ping) { "pong!"}
}
}.start(wait = false)
Log.connectConsole(Log.Level.DEBUG)
val client = Parsec3WSClient("ws://localhost:8080/api/p3", TestApiClient) {
on(api.bar) {
"bar:$it"
}
}
runBlocking {
assertEquals("pong!", TestApiServer.ping.invoke(client.adapter()))
assertThrows<Adapter.CloseError> { TestApiServer.foo.invoke(client.adapter(), "*great*") }
assertEquals("pong!", TestApiServer.ping.invoke(client.adapter()))
assertThrows<Adapter.CloseError> { TestApiServer.foo.invoke(client.adapter(), "*great*") }
}
}
@Test
fun testWsServerWaitForConnect() {
val client = Parsec3WSClient("ws://localhost:8084/api/p3", TestApiClient) {
on(api.bar) {
"bar:$it"
}
}
println("---1")
embeddedServer(Netty, port = 8084) {
parsec3TransportServer(
TestApiServer,
) {
newSession { TestSession() }
on(api.foo) {
it + buzz + "-foo"
}
}
}.start(wait = false)
runBlocking {
println("----2")
val x = TestApiServer.foo.invoke(client.adapter(), "*great*")
println(">> $x")
assertEquals("*great*BuZZ-foo", x)
client.close()
}
}
} }