+implemented automatic relogin on connection restore and better connection tracking.

This commit is contained in:
Sergey Chernov 2022-12-18 21:48:39 +01:00
parent 0a23e02119
commit f5cd7a3819
6 changed files with 179 additions and 67 deletions

View File

@ -26,7 +26,6 @@ fun Application.testServerModule() {
superloginServer(TestApiServer<TestSession>(), { TestSession() }) { superloginServer(TestApiServer<TestSession>(), { TestSession() }) {
// This is a sample of your porvate API implementation: // This is a sample of your porvate API implementation:
on(api.loginName) { on(api.loginName) {
println("login name called. now we have $currentLoginName : $superloginData")
currentLoginName currentLoginName
} }
} }

View File

@ -9,7 +9,7 @@ val logback_version="1.2.10"
group = "net.sergeych" group = "net.sergeych"
version = "0.0.1-SNAPSHOT" version = "0.0.2-SNAPSHOT"
repositories { repositories {
mavenCentral() mavenCentral()
@ -49,7 +49,7 @@ kotlin {
dependencies { dependencies {
implementation("org.jetbrains.kotlinx:kotlinx-coroutines-core:1.6.3") implementation("org.jetbrains.kotlinx:kotlinx-coroutines-core:1.6.3")
api("net.sergeych:unikrypto:1.2.2-SNAPSHOT") api("net.sergeych:unikrypto:1.2.2-SNAPSHOT")
api("net.sergeych:parsec3:0.3.3-SNAPSHOT") api("net.sergeych:parsec3:0.4.0-SNAPSHOT")
api("net.sergeych:boss-serialization-mp:0.2.4-SNAPSHOT") api("net.sergeych:boss-serialization-mp:0.2.4-SNAPSHOT")
api("net.sergeych:unikrypto:1.2.2-SNAPSHOT") api("net.sergeych:unikrypto:1.2.2-SNAPSHOT")
} }
@ -76,7 +76,7 @@ kotlin {
val jsMain by getting val jsMain by getting
val jsTest by getting val jsTest by getting
} }
+ publishing { publishing {
repositories { repositories {
maven { maven {
url = uri("https://maven.universablockchain.com/") url = uri("https://maven.universablockchain.com/")

View File

@ -0,0 +1,39 @@
package net.sergeych.superlogin
import kotlinx.coroutines.cancel
import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.flow.StateFlow
import kotlinx.coroutines.launch
/**
* Wait for stateflow value to be accepted by the predicate that should
* return true. Does not wait if the predicate returns true for the
* current state value.
*/
suspend fun <T> StateFlow<T>.waitUntil(predicate: (T) -> Boolean) {
// Speed optimization:
if( predicate(value)) return
// we have to wait here
coroutineScope {
// first we watch the state change to avoid RCs:
val job = launch {
collect {
if (predicate(value)) cancel()
}
}
// now the value can be changed while we were starting up the
// job so another check is necessary before waiting for a job
if (!predicate(value)) job.join()
// created job should be cancelled anyway
if (job.isActive) job.cancel()
}
}
/**
* Wait for state flow to be equal to the expected value. Does not wait if it
* already so.
*/
suspend fun <T> StateFlow<T>.waitFor(state: T) {
waitUntil { it == state }
}

View File

@ -1,15 +1,16 @@
package net.sergeych.superlogin.client package net.sergeych.superlogin.client
import kotlinx.coroutines.* import kotlinx.coroutines.Job
import kotlinx.coroutines.async
import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.MutableStateFlow import kotlinx.coroutines.flow.MutableStateFlow
import kotlinx.coroutines.flow.StateFlow import kotlinx.coroutines.flow.StateFlow
import kotlinx.coroutines.flow.asStateFlow
import kotlinx.serialization.Serializable import kotlinx.serialization.Serializable
import net.sergeych.boss_serialization.BossDecoder import net.sergeych.boss_serialization.BossDecoder
import net.sergeych.boss_serialization_mp.BossEncoder import net.sergeych.boss_serialization_mp.BossEncoder
import net.sergeych.mp_logger.LogTag import net.sergeych.mp_logger.*
import net.sergeych.mp_logger.Loggable
import net.sergeych.mp_logger.exception
import net.sergeych.mp_logger.warning
import net.sergeych.mp_tools.globalLaunch import net.sergeych.mp_tools.globalLaunch
import net.sergeych.parsec3.* import net.sergeych.parsec3.*
import net.sergeych.superlogin.* import net.sergeych.superlogin.*
@ -62,7 +63,6 @@ class SuperloginClient<D, S : WithAdapter>(
val v = _state.value val v = _state.value
if (v !is LoginState.LoggedIn<*> || v.loginData != value) { if (v !is LoginState.LoggedIn<*> || v.loginData != value) {
_state.value = LoginState.LoggedIn(value) _state.value = LoginState.LoggedIn(value)
if (!adapterReady.isCompleted) adapterReady.complete(Unit)
} }
} }
} }
@ -71,20 +71,34 @@ class SuperloginClient<D, S : WithAdapter>(
val applicationData: D? val applicationData: D?
get() = (state.value as? LoginState.LoggedIn<D>)?.loginData?.data get() = (state.value as? LoginState.LoggedIn<D>)?.loginData?.data
private var adapterReady = CompletableDeferred<Unit>() private var adapterReady = MutableStateFlow<Boolean>(false)
override suspend fun adapter(): Adapter<S> = transport.adapter() /**
// do { * The flow that tracks readiness state of the connetion adapter. In other works,
// try { * when its value is false, [adapter] deferred is not completed and [call] method
// adapterReady.await() * will wait until it is ready.
// return transport.adapter() *
// } catch (x: Throwable) { * The reason for it is as follows: when connetion drops,
// exception { "failed to get adapter" to x } * superlogin client awaits its automatic restore (by parsec3) and then tries to re-login.
// } * Until this login restore will finish either successful or not, calling parsec3 commands
// } while (true) * may produce unpredictable results, so it is automatically postponed until login state
// } * is restored. This is completely transparent to the caller, and this state flow allows
* client to be notified on actual connection state.
*/
val connectionReady = adapterReady.asStateFlow()
override suspend fun adapter(): Adapter<S> {
adapterReady.waitFor(true)
return transport.adapter()
}
/**
* Call client API commands with it (uses [adapter] under the hood)
*/
suspend fun <A, R> call(ca: CommandDescriptor<A, R>, args: A): R = adapter().invokeCommand(ca, args) suspend fun <A, R> call(ca: CommandDescriptor<A, R>, args: A): R = adapter().invokeCommand(ca, args)
/**
* Call client API commands with it (uses [adapter] under the hood)
*/
suspend fun <R> call(ca: CommandDescriptor<Unit, R>): R = adapter().invokeCommand(ca) suspend fun <R> call(ca: CommandDescriptor<Unit, R>): R = adapter().invokeCommand(ca)
private suspend fun <A, R> invoke(ca: CommandDescriptor<A, R>, args: A): R = private suspend fun <A, R> invoke(ca: CommandDescriptor<A, R>, args: A): R =
@ -98,20 +112,26 @@ class SuperloginClient<D, S : WithAdapter>(
private suspend fun tryRestoreLogin() { private suspend fun tryRestoreLogin() {
slData?.loginToken?.let { token -> slData?.loginToken?.let { token ->
debug { "trying to restore login with a token" }
while( true ) {
try { try {
val ar = transport.adapter().invokeCommand(serverApi.slLoginByToken, token) val ar = transport.adapter().invokeCommand(serverApi.slLoginByToken, token)
slData = if (ar is AuthenticationResult.Success) { slData = if (ar is AuthenticationResult.Success) {
val data: D? = ar.applicationData?.let { BossDecoder.decodeFrom(dataType, it) } val data: D? = ar.applicationData?.let { BossDecoder.decodeFrom(dataType, it) }
debug { "login restored by the token: ${ar.loginName}" }
SuperloginData(ar.loginName, ar.loginToken, data) SuperloginData(ar.loginName, ar.loginToken, data)
} else { } else {
debug { "failed to restore login by the token: $ar" }
null null
} }
break
} catch (t: Throwable) { } catch (t: Throwable) {
exception { "failed to restore login by token, will retry" to t } exception { "failed to restore login by token, will retry" to t }
delay(1500) delay(1500)
tryRestoreLogin() }
} }
} ?: warning { "tryRestoreLogin is ignored as slData is now null" } } ?: warning { "tryRestoreLogin is ignored as slData is now null" }
adapterReady.value = true
} }
init { init {
@ -120,6 +140,7 @@ class SuperloginClient<D, S : WithAdapter>(
transport.connectedFlow.collect { on -> transport.connectedFlow.collect { on ->
if (on) tryRestoreLogin() if (on) tryRestoreLogin()
else { else {
adapterReady.value = false
_cflow.value = false _cflow.value = false
} }
} }
@ -131,16 +152,24 @@ class SuperloginClient<D, S : WithAdapter>(
transport.close() transport.close()
} }
/**
* Force dropping and re-establish underlying parsec3 connection and restore
* login state to the current.
*/
override fun reconnect() { override fun reconnect() {
adapterReady.value = false
transport.reconnect() transport.reconnect()
if (!adapterReady.isActive) {
adapterReady.cancel()
adapterReady = CompletableDeferred()
}
} }
private var registration: Registration? = null private var registration: Registration? = null
/**
* Whether the client is supposed to be logged in. Note that it is also true when
* there is no ready connection (means also offline), if there is information about staved
* loged in state. It can change at aby time as server may drop login state too. Use
* [state] flow to track the state changes and [adapterReady] flow to track connection state
* that are in fact independent to some degree.
*/
val isLoggedIn get() = state.value.isLoggedIn val isLoggedIn get() = state.value.isLoggedIn
/** /**

View File

@ -77,7 +77,6 @@ inline fun <reified D, S : SLServerSession<D>,A: CommandHost<S>> Application.sup
* *
* // Sample service-specifiv api (above login api): * // Sample service-specifiv api (above login api):
* on(api.loginName) { * on(api.loginName) {
* println("login name called. now we have $currentLoginName : $superloginData")
* currentLoginName * currentLoginName
* } * }
* } * }

View File

@ -5,6 +5,11 @@ import io.ktor.server.engine.*
import io.ktor.server.netty.* import io.ktor.server.netty.*
import kotlinx.coroutines.runBlocking import kotlinx.coroutines.runBlocking
import kotlinx.serialization.Serializable import kotlinx.serialization.Serializable
import net.sergeych.mp_logger.Log
import net.sergeych.mp_logger.LogTag
import net.sergeych.mp_logger.info
import net.sergeych.mp_tools.encodeToBase64Compact
import net.sergeych.parsec3.Adapter
import net.sergeych.parsec3.CommandHost import net.sergeych.parsec3.CommandHost
import net.sergeych.parsec3.Parsec3WSClient import net.sergeych.parsec3.Parsec3WSClient
import net.sergeych.parsec3.WithAdapter import net.sergeych.parsec3.WithAdapter
@ -19,56 +24,66 @@ import superlogin.assertThrowsAsync
import kotlin.random.Random import kotlin.random.Random
import kotlin.test.* import kotlin.test.*
data class TestSession(var buzz: String = "BuZZ") : SLServerSession<TestData>() { class TestStorage(
val byLogin = mutableMapOf<String, RegistrationArgs>() val byLogin: MutableMap<String, RegistrationArgs> = mutableMapOf<String, RegistrationArgs>(),
val byLoginId = mutableMapOf<List<Byte>, RegistrationArgs>() val byLoginId: MutableMap<List<Byte>, RegistrationArgs> = mutableMapOf<List<Byte>, RegistrationArgs>(),
val byRestoreId = mutableMapOf<List<Byte>, RegistrationArgs>() val byRestoreId: MutableMap<List<Byte>, RegistrationArgs> = mutableMapOf<List<Byte>, RegistrationArgs>(),
val byToken = mutableMapOf<List<Byte>, RegistrationArgs>() val byToken: MutableMap<List<Byte>, RegistrationArgs> = mutableMapOf<List<Byte>, RegistrationArgs>(),
val tokens = mutableMapOf<String, ByteArray>() val tokens: MutableMap<String, ByteArray> = mutableMapOf<String, ByteArray>(),
)
data class TestSession(val s: TestStorage) : SLServerSession<TestData>() {
var buzz: String = "BuZZ"
override suspend fun register(ra: RegistrationArgs): AuthenticationResult { override suspend fun register(ra: RegistrationArgs): AuthenticationResult {
println("ra: ${ra.loginName} : $currentLoginName : $superloginData") println("ra: ${ra.loginName} : $currentLoginName : $superloginData")
return when { return when {
ra.loginName in byLogin -> { ra.loginName in s.byLogin -> {
AuthenticationResult.LoginUnavailable AuthenticationResult.LoginUnavailable
} }
ra.loginId.toList() in byLoginId -> AuthenticationResult.LoginIdUnavailable ra.loginId.toList() in s.byLoginId -> AuthenticationResult.LoginIdUnavailable
ra.restoreId.toList() in byRestoreId -> AuthenticationResult.RestoreIdUnavailable ra.restoreId.toList() in s.byRestoreId -> AuthenticationResult.RestoreIdUnavailable
else -> { else -> {
byLogin[ra.loginName] = ra s.byLogin[ra.loginName] = ra
byRestoreId[ra.restoreId.toList()] = ra s.byRestoreId[ra.restoreId.toList()] = ra
byLoginId[ra.loginId.toList()] = ra s.byLoginId[ra.loginId.toList()] = ra
val token = Random.Default.nextBytes(32) val token = Random.Default.nextBytes(32)
byToken[token.toList()] = ra s.byToken[token.toList()] = ra
tokens[ra.loginName] = token s.tokens[ra.loginName] = token
println("registered with token ${token.encodeToBase64Compact()}")
println(" ${s.byToken[token.toList()]}")
AuthenticationResult.Success(ra.loginName, token, ra.extraData) AuthenticationResult.Success(ra.loginName, token, ra.extraData)
} }
} }
} }
override suspend fun loginByToken(token: ByteArray): AuthenticationResult { override suspend fun loginByToken(token: ByteArray): AuthenticationResult {
return byToken[token.toList()]?.let { println("requested login by tokeb ${token.encodeToBase64Compact()}")
println(" ${s.byToken[token.toList()]}")
println(" ${s.byToken.size} / ${s.byLoginId.size}")
return s.byToken[token.toList()]?.let {
AuthenticationResult.Success(it.loginName, token, it.extraData) AuthenticationResult.Success(it.loginName, token, it.extraData)
} }
?: AuthenticationResult.LoginUnavailable ?: AuthenticationResult.LoginUnavailable
} }
override suspend fun requestDerivationParams(loginName: String): PasswordDerivationParams? = override suspend fun requestDerivationParams(loginName: String): PasswordDerivationParams? =
byLogin[loginName]?.derivationParams s.byLogin[loginName]?.derivationParams
override suspend fun requestACOByLoginName(loginName: String, loginId: ByteArray): ByteArray? { override suspend fun requestACOByLoginName(loginName: String, loginId: ByteArray): ByteArray? {
return byLogin[loginName]?.packedACO return s.byLogin[loginName]?.packedACO
} }
override suspend fun requestACOByRestoreId(restoreId: ByteArray): ByteArray? { override suspend fun requestACOByRestoreId(restoreId: ByteArray): ByteArray? {
return byRestoreId[restoreId.toList()]?.packedACO return s.byRestoreId[restoreId.toList()]?.packedACO
} }
override suspend fun loginByKey(loginName: String, publicKey: PublicKey): AuthenticationResult { override suspend fun loginByKey(loginName: String, publicKey: PublicKey): AuthenticationResult {
val ra = byLogin[loginName] val ra = s.byLogin[loginName]
return if (ra != null && ra.loginPublicKey.id == publicKey.id) return if (ra != null && ra.loginPublicKey.id == publicKey.id)
AuthenticationResult.Success(ra.loginName, tokens[loginName]!!, ra.extraData) AuthenticationResult.Success(ra.loginName, s.tokens[loginName]!!, ra.extraData)
else AuthenticationResult.LoginUnavailable else AuthenticationResult.LoginUnavailable
} }
@ -79,8 +94,8 @@ data class TestSession(var buzz: String = "BuZZ") : SLServerSession<TestData>()
newLoginKey: PublicKey, newLoginKey: PublicKey,
newLoginId: ByteArray newLoginId: ByteArray
) { ) {
val r = byLogin[loginName]?.also { val r = s.byLogin[loginName]?.also {
byLoginId.remove(it.loginId.toList()) s.byLoginId.remove(it.loginId.toList())
}?.copy( }?.copy(
packedACO = packedACO, packedACO = packedACO,
derivationParams = passwordDerivationParams, derivationParams = passwordDerivationParams,
@ -88,10 +103,10 @@ data class TestSession(var buzz: String = "BuZZ") : SLServerSession<TestData>()
loginId = newLoginId loginId = newLoginId
) )
?: throw RuntimeException("login not found") ?: throw RuntimeException("login not found")
byLogin[loginName] = r s.byLogin[loginName] = r
byLoginId[newLoginId.toList()] = r s.byLoginId[newLoginId.toList()] = r
byToken[currentLoginToken!!.toList()] = r s.byToken[currentLoginToken!!.toList()] = r
byRestoreId[r.restoreId.toList()] = r s.byRestoreId[r.restoreId.toList()] = r
} }
} }
@ -99,6 +114,7 @@ data class TestSession(var buzz: String = "BuZZ") : SLServerSession<TestData>()
class TestApiServer<T : WithAdapter> : CommandHost<T>() { class TestApiServer<T : WithAdapter> : CommandHost<T>() {
val loginName by command<Unit, String?>() val loginName by command<Unit, String?>()
val dropConnection by command<Unit,Unit>()
} }
@ -113,9 +129,9 @@ internal class WsServerKtTest {
@Test @Test
fun testWsServer() { fun testWsServer() {
embeddedServer(Netty, port = 8080, module = Application::testServerModule).start(wait = false) embeddedServer(Netty, port = 8085, module = Application::testServerModule).start(wait = false)
val client = Parsec3WSClient("ws://localhost:8080/api/p3") val client = Parsec3WSClient("ws://localhost:8085/api/p3")
runBlocking { runBlocking {
val api = TestApiServer<WithAdapter>() val api = TestApiServer<WithAdapter>()
@ -226,6 +242,7 @@ internal class WsServerKtTest {
embeddedServer(Netty, port = 8082, module = Application::testServerModule).start(wait = false) embeddedServer(Netty, port = 8082, module = Application::testServerModule).start(wait = false)
val client = Parsec3WSClient("ws://localhost:8082/api/p3") val client = Parsec3WSClient("ws://localhost:8082/api/p3")
runBlocking { runBlocking {
Log.connectConsole(Log.Level.DEBUG)
val slc = SuperloginClient<TestData, WithAdapter>(client) val slc = SuperloginClient<TestData, WithAdapter>(client)
val serverApi = SuperloginServerApi<WithAdapter>() val serverApi = SuperloginServerApi<WithAdapter>()
assertThrowsAsync<SLInternalException> { assertThrowsAsync<SLInternalException> {
@ -235,13 +252,42 @@ internal class WsServerKtTest {
} }
@Test
fun testDroppedConnection() {
embeddedServer(Netty, port = 8089, module = Application::testServerModule).start(wait = false)
val client = Parsec3WSClient("ws://localhost:8089/api/p3")
runBlocking {
Log.connectConsole(Log.Level.DEBUG)
val l = LogTag("Test")
val api = TestApiServer<WithAdapter>()
val slc = SuperloginClient<TestData, WithAdapter>(client)
val serverApi = SuperloginServerApi<WithAdapter>()
var rt = slc.register("foo", "passwd", TestData("bar!"))
assertIs<Registration.Result.Success>(rt)
assertEquals("foo", slc.call(api.loginName))
l.info { "---- breaking the connection ----------" }
assertThrowsAsync<Adapter.CloseError> { slc.call(api.dropConnection) }
assertTrue { slc.isLoggedIn }
assertEquals("foo", slc.call(api.loginName))
}
}
} }
fun Application.testServerModule() { fun Application.testServerModule() {
superloginServer(TestApiServer<TestSession>(), { TestSession() }) { val s = TestStorage()
superloginServer(TestApiServer<TestSession>(), { TestSession(s) }) {
on(api.loginName) { on(api.loginName) {
println("login name called. now we have $currentLoginName : $superloginData")
currentLoginName currentLoginName
} }
on(api.dropConnection) {
adapter.cancel()
}
} }
} }