From f5cd7a3819101faace4015bcbf7f9dff592b8eb5 Mon Sep 17 00:00:00 2001 From: sergeych Date: Sun, 18 Dec 2022 21:48:39 +0100 Subject: [PATCH] +implemented automatic relogin on connection restore and better connection tracking. --- README.md | 1 - build.gradle.kts | 6 +- .../net.sergeych.superlogin/WaitForState.kt | 39 +++++++ .../client/SuperloginClient.kt | 93 +++++++++------ .../superlogin/server/SuperloginServer.kt | 1 - .../kotlin/net/sergeych/WsServerKtTest.kt | 106 +++++++++++++----- 6 files changed, 179 insertions(+), 67 deletions(-) create mode 100644 src/commonMain/kotlin/net.sergeych.superlogin/WaitForState.kt diff --git a/README.md b/README.md index bf5a1ce..9b5cbb1 100644 --- a/README.md +++ b/README.md @@ -26,7 +26,6 @@ fun Application.testServerModule() { superloginServer(TestApiServer(), { TestSession() }) { // This is a sample of your porvate API implementation: on(api.loginName) { - println("login name called. now we have $currentLoginName : $superloginData") currentLoginName } } diff --git a/build.gradle.kts b/build.gradle.kts index 78a7968..e7c2ff2 100644 --- a/build.gradle.kts +++ b/build.gradle.kts @@ -9,7 +9,7 @@ val logback_version="1.2.10" group = "net.sergeych" -version = "0.0.1-SNAPSHOT" +version = "0.0.2-SNAPSHOT" repositories { mavenCentral() @@ -49,7 +49,7 @@ kotlin { dependencies { implementation("org.jetbrains.kotlinx:kotlinx-coroutines-core:1.6.3") api("net.sergeych:unikrypto:1.2.2-SNAPSHOT") - api("net.sergeych:parsec3:0.3.3-SNAPSHOT") + api("net.sergeych:parsec3:0.4.0-SNAPSHOT") api("net.sergeych:boss-serialization-mp:0.2.4-SNAPSHOT") api("net.sergeych:unikrypto:1.2.2-SNAPSHOT") } @@ -76,7 +76,7 @@ kotlin { val jsMain by getting val jsTest by getting } -+ publishing { + publishing { repositories { maven { url = uri("https://maven.universablockchain.com/") diff --git a/src/commonMain/kotlin/net.sergeych.superlogin/WaitForState.kt b/src/commonMain/kotlin/net.sergeych.superlogin/WaitForState.kt new file mode 100644 index 0000000..e82e82d --- /dev/null +++ b/src/commonMain/kotlin/net.sergeych.superlogin/WaitForState.kt @@ -0,0 +1,39 @@ +package net.sergeych.superlogin + +import kotlinx.coroutines.cancel +import kotlinx.coroutines.coroutineScope +import kotlinx.coroutines.flow.StateFlow +import kotlinx.coroutines.launch + + +/** + * Wait for stateflow value to be accepted by the predicate that should + * return true. Does not wait if the predicate returns true for the + * current state value. + */ +suspend fun StateFlow.waitUntil(predicate: (T) -> Boolean) { + // Speed optimization: + if( predicate(value)) return + // we have to wait here + coroutineScope { + // first we watch the state change to avoid RCs: + val job = launch { + collect { + if (predicate(value)) cancel() + } + } + // now the value can be changed while we were starting up the + // job so another check is necessary before waiting for a job + if (!predicate(value)) job.join() + // created job should be cancelled anyway + if (job.isActive) job.cancel() + } +} + +/** + * Wait for state flow to be equal to the expected value. Does not wait if it + * already so. + */ +suspend fun StateFlow.waitFor(state: T) { + waitUntil { it == state } +} diff --git a/src/commonMain/kotlin/net.sergeych.superlogin/client/SuperloginClient.kt b/src/commonMain/kotlin/net.sergeych.superlogin/client/SuperloginClient.kt index 5e84450..3cc4b9b 100644 --- a/src/commonMain/kotlin/net.sergeych.superlogin/client/SuperloginClient.kt +++ b/src/commonMain/kotlin/net.sergeych.superlogin/client/SuperloginClient.kt @@ -1,15 +1,16 @@ package net.sergeych.superlogin.client -import kotlinx.coroutines.* +import kotlinx.coroutines.Job +import kotlinx.coroutines.async +import kotlinx.coroutines.coroutineScope +import kotlinx.coroutines.delay import kotlinx.coroutines.flow.MutableStateFlow import kotlinx.coroutines.flow.StateFlow +import kotlinx.coroutines.flow.asStateFlow import kotlinx.serialization.Serializable import net.sergeych.boss_serialization.BossDecoder import net.sergeych.boss_serialization_mp.BossEncoder -import net.sergeych.mp_logger.LogTag -import net.sergeych.mp_logger.Loggable -import net.sergeych.mp_logger.exception -import net.sergeych.mp_logger.warning +import net.sergeych.mp_logger.* import net.sergeych.mp_tools.globalLaunch import net.sergeych.parsec3.* import net.sergeych.superlogin.* @@ -62,7 +63,6 @@ class SuperloginClient( val v = _state.value if (v !is LoginState.LoggedIn<*> || v.loginData != value) { _state.value = LoginState.LoggedIn(value) - if (!adapterReady.isCompleted) adapterReady.complete(Unit) } } } @@ -71,20 +71,34 @@ class SuperloginClient( val applicationData: D? get() = (state.value as? LoginState.LoggedIn)?.loginData?.data - private var adapterReady = CompletableDeferred() + private var adapterReady = MutableStateFlow(false) - override suspend fun adapter(): Adapter = transport.adapter() -// do { -// try { -// adapterReady.await() -// return transport.adapter() -// } catch (x: Throwable) { -// exception { "failed to get adapter" to x } -// } -// } while (true) -// } + /** + * The flow that tracks readiness state of the connetion adapter. In other works, + * when its value is false, [adapter] deferred is not completed and [call] method + * will wait until it is ready. + * + * The reason for it is as follows: when connetion drops, + * superlogin client awaits its automatic restore (by parsec3) and then tries to re-login. + * Until this login restore will finish either successful or not, calling parsec3 commands + * may produce unpredictable results, so it is automatically postponed until login state + * is restored. This is completely transparent to the caller, and this state flow allows + * client to be notified on actual connection state. + */ + val connectionReady = adapterReady.asStateFlow() + override suspend fun adapter(): Adapter { + adapterReady.waitFor(true) + return transport.adapter() + } + + /** + * Call client API commands with it (uses [adapter] under the hood) + */ suspend fun call(ca: CommandDescriptor, args: A): R = adapter().invokeCommand(ca, args) + /** + * Call client API commands with it (uses [adapter] under the hood) + */ suspend fun call(ca: CommandDescriptor): R = adapter().invokeCommand(ca) private suspend fun invoke(ca: CommandDescriptor, args: A): R = @@ -98,20 +112,26 @@ class SuperloginClient( private suspend fun tryRestoreLogin() { slData?.loginToken?.let { token -> - try { - val ar = transport.adapter().invokeCommand(serverApi.slLoginByToken, token) - slData = if (ar is AuthenticationResult.Success) { - val data: D? = ar.applicationData?.let { BossDecoder.decodeFrom(dataType, it) } - SuperloginData(ar.loginName, ar.loginToken, data) - } else { - null + debug { "trying to restore login with a token" } + while( true ) { + try { + val ar = transport.adapter().invokeCommand(serverApi.slLoginByToken, token) + slData = if (ar is AuthenticationResult.Success) { + val data: D? = ar.applicationData?.let { BossDecoder.decodeFrom(dataType, it) } + debug { "login restored by the token: ${ar.loginName}" } + SuperloginData(ar.loginName, ar.loginToken, data) + } else { + debug { "failed to restore login by the token: $ar" } + null + } + break + } catch (t: Throwable) { + exception { "failed to restore login by token, will retry" to t } + delay(1500) } - } catch (t: Throwable) { - exception { "failed to restore login by token, will retry" to t } - delay(1500) - tryRestoreLogin() } } ?: warning { "tryRestoreLogin is ignored as slData is now null" } + adapterReady.value = true } init { @@ -120,6 +140,7 @@ class SuperloginClient( transport.connectedFlow.collect { on -> if (on) tryRestoreLogin() else { + adapterReady.value = false _cflow.value = false } } @@ -131,16 +152,24 @@ class SuperloginClient( transport.close() } + /** + * Force dropping and re-establish underlying parsec3 connection and restore + * login state to the current. + */ override fun reconnect() { + adapterReady.value = false transport.reconnect() - if (!adapterReady.isActive) { - adapterReady.cancel() - adapterReady = CompletableDeferred() - } } private var registration: Registration? = null + /** + * Whether the client is supposed to be logged in. Note that it is also true when + * there is no ready connection (means also offline), if there is information about staved + * loged in state. It can change at aby time as server may drop login state too. Use + * [state] flow to track the state changes and [adapterReady] flow to track connection state + * that are in fact independent to some degree. + */ val isLoggedIn get() = state.value.isLoggedIn /** diff --git a/src/jvmMain/kotlin/net/sergeych/superlogin/server/SuperloginServer.kt b/src/jvmMain/kotlin/net/sergeych/superlogin/server/SuperloginServer.kt index a060bb2..29ddba0 100644 --- a/src/jvmMain/kotlin/net/sergeych/superlogin/server/SuperloginServer.kt +++ b/src/jvmMain/kotlin/net/sergeych/superlogin/server/SuperloginServer.kt @@ -77,7 +77,6 @@ inline fun ,A: CommandHost> Application.sup * * // Sample service-specifiv api (above login api): * on(api.loginName) { - * println("login name called. now we have $currentLoginName : $superloginData") * currentLoginName * } * } diff --git a/src/jvmTest/kotlin/net/sergeych/WsServerKtTest.kt b/src/jvmTest/kotlin/net/sergeych/WsServerKtTest.kt index d4e0237..fa99a67 100644 --- a/src/jvmTest/kotlin/net/sergeych/WsServerKtTest.kt +++ b/src/jvmTest/kotlin/net/sergeych/WsServerKtTest.kt @@ -5,6 +5,11 @@ import io.ktor.server.engine.* import io.ktor.server.netty.* import kotlinx.coroutines.runBlocking import kotlinx.serialization.Serializable +import net.sergeych.mp_logger.Log +import net.sergeych.mp_logger.LogTag +import net.sergeych.mp_logger.info +import net.sergeych.mp_tools.encodeToBase64Compact +import net.sergeych.parsec3.Adapter import net.sergeych.parsec3.CommandHost import net.sergeych.parsec3.Parsec3WSClient import net.sergeych.parsec3.WithAdapter @@ -19,56 +24,66 @@ import superlogin.assertThrowsAsync import kotlin.random.Random import kotlin.test.* -data class TestSession(var buzz: String = "BuZZ") : SLServerSession() { - val byLogin = mutableMapOf() - val byLoginId = mutableMapOf, RegistrationArgs>() - val byRestoreId = mutableMapOf, RegistrationArgs>() - val byToken = mutableMapOf, RegistrationArgs>() - val tokens = mutableMapOf() +class TestStorage( + val byLogin: MutableMap = mutableMapOf(), + val byLoginId: MutableMap, RegistrationArgs> = mutableMapOf, RegistrationArgs>(), + val byRestoreId: MutableMap, RegistrationArgs> = mutableMapOf, RegistrationArgs>(), + val byToken: MutableMap, RegistrationArgs> = mutableMapOf, RegistrationArgs>(), + val tokens: MutableMap = mutableMapOf(), +) +data class TestSession(val s: TestStorage) : SLServerSession() { + + var buzz: String = "BuZZ" override suspend fun register(ra: RegistrationArgs): AuthenticationResult { println("ra: ${ra.loginName} : $currentLoginName : $superloginData") return when { - ra.loginName in byLogin -> { + ra.loginName in s.byLogin -> { AuthenticationResult.LoginUnavailable } - ra.loginId.toList() in byLoginId -> AuthenticationResult.LoginIdUnavailable - ra.restoreId.toList() in byRestoreId -> AuthenticationResult.RestoreIdUnavailable + ra.loginId.toList() in s.byLoginId -> AuthenticationResult.LoginIdUnavailable + ra.restoreId.toList() in s.byRestoreId -> AuthenticationResult.RestoreIdUnavailable else -> { - byLogin[ra.loginName] = ra - byRestoreId[ra.restoreId.toList()] = ra - byLoginId[ra.loginId.toList()] = ra + s.byLogin[ra.loginName] = ra + s.byRestoreId[ra.restoreId.toList()] = ra + s.byLoginId[ra.loginId.toList()] = ra val token = Random.Default.nextBytes(32) - byToken[token.toList()] = ra - tokens[ra.loginName] = token + s.byToken[token.toList()] = ra + s.tokens[ra.loginName] = token + println("registered with token ${token.encodeToBase64Compact()}") + println(" ${s.byToken[token.toList()]}") AuthenticationResult.Success(ra.loginName, token, ra.extraData) } } } override suspend fun loginByToken(token: ByteArray): AuthenticationResult { - return byToken[token.toList()]?.let { + println("requested login by tokeb ${token.encodeToBase64Compact()}") + println(" ${s.byToken[token.toList()]}") + println(" ${s.byToken.size} / ${s.byLoginId.size}") + + return s.byToken[token.toList()]?.let { AuthenticationResult.Success(it.loginName, token, it.extraData) } ?: AuthenticationResult.LoginUnavailable } override suspend fun requestDerivationParams(loginName: String): PasswordDerivationParams? = - byLogin[loginName]?.derivationParams + s.byLogin[loginName]?.derivationParams override suspend fun requestACOByLoginName(loginName: String, loginId: ByteArray): ByteArray? { - return byLogin[loginName]?.packedACO + return s.byLogin[loginName]?.packedACO } override suspend fun requestACOByRestoreId(restoreId: ByteArray): ByteArray? { - return byRestoreId[restoreId.toList()]?.packedACO + return s.byRestoreId[restoreId.toList()]?.packedACO } override suspend fun loginByKey(loginName: String, publicKey: PublicKey): AuthenticationResult { - val ra = byLogin[loginName] + val ra = s.byLogin[loginName] return if (ra != null && ra.loginPublicKey.id == publicKey.id) - AuthenticationResult.Success(ra.loginName, tokens[loginName]!!, ra.extraData) + AuthenticationResult.Success(ra.loginName, s.tokens[loginName]!!, ra.extraData) else AuthenticationResult.LoginUnavailable } @@ -79,8 +94,8 @@ data class TestSession(var buzz: String = "BuZZ") : SLServerSession() newLoginKey: PublicKey, newLoginId: ByteArray ) { - val r = byLogin[loginName]?.also { - byLoginId.remove(it.loginId.toList()) + val r = s.byLogin[loginName]?.also { + s.byLoginId.remove(it.loginId.toList()) }?.copy( packedACO = packedACO, derivationParams = passwordDerivationParams, @@ -88,10 +103,10 @@ data class TestSession(var buzz: String = "BuZZ") : SLServerSession() loginId = newLoginId ) ?: throw RuntimeException("login not found") - byLogin[loginName] = r - byLoginId[newLoginId.toList()] = r - byToken[currentLoginToken!!.toList()] = r - byRestoreId[r.restoreId.toList()] = r + s.byLogin[loginName] = r + s.byLoginId[newLoginId.toList()] = r + s.byToken[currentLoginToken!!.toList()] = r + s.byRestoreId[r.restoreId.toList()] = r } } @@ -99,6 +114,7 @@ data class TestSession(var buzz: String = "BuZZ") : SLServerSession() class TestApiServer : CommandHost() { val loginName by command() + val dropConnection by command() } @@ -113,9 +129,9 @@ internal class WsServerKtTest { @Test fun testWsServer() { - embeddedServer(Netty, port = 8080, module = Application::testServerModule).start(wait = false) + embeddedServer(Netty, port = 8085, module = Application::testServerModule).start(wait = false) - val client = Parsec3WSClient("ws://localhost:8080/api/p3") + val client = Parsec3WSClient("ws://localhost:8085/api/p3") runBlocking { val api = TestApiServer() @@ -226,6 +242,7 @@ internal class WsServerKtTest { embeddedServer(Netty, port = 8082, module = Application::testServerModule).start(wait = false) val client = Parsec3WSClient("ws://localhost:8082/api/p3") runBlocking { + Log.connectConsole(Log.Level.DEBUG) val slc = SuperloginClient(client) val serverApi = SuperloginServerApi() assertThrowsAsync { @@ -235,13 +252,42 @@ internal class WsServerKtTest { } + @Test + fun testDroppedConnection() { + embeddedServer(Netty, port = 8089, module = Application::testServerModule).start(wait = false) + val client = Parsec3WSClient("ws://localhost:8089/api/p3") + runBlocking { + Log.connectConsole(Log.Level.DEBUG) + val l = LogTag("Test") + val api = TestApiServer() + val slc = SuperloginClient(client) + val serverApi = SuperloginServerApi() + + var rt = slc.register("foo", "passwd", TestData("bar!")) + assertIs(rt) + + assertEquals("foo", slc.call(api.loginName)) + + l.info { "---- breaking the connection ----------" } + assertThrowsAsync { slc.call(api.dropConnection) } + + assertTrue { slc.isLoggedIn } + assertEquals("foo", slc.call(api.loginName)) + } + + } + + } fun Application.testServerModule() { - superloginServer(TestApiServer(), { TestSession() }) { + val s = TestStorage() + superloginServer(TestApiServer(), { TestSession(s) }) { on(api.loginName) { - println("login name called. now we have $currentLoginName : $superloginData") currentLoginName } + on(api.dropConnection) { + adapter.cancel() + } } } \ No newline at end of file