!refactored session to WithAdapter based to propery run many connections on the server

This commit is contained in:
Sergey Chernov 2022-09-28 00:19:07 +03:00
parent f455f2b955
commit 07f9e720a1
10 changed files with 33 additions and 30 deletions

View File

@ -10,7 +10,7 @@ plugins {
}
group = "net.sergeych"
version = "0.1.0-SNAPSHOT"
version = "0.1.1-SNAPSHOT"
repositories {
mavenCentral()

View File

@ -8,7 +8,6 @@ import net.sergeych.boss_serialization.BossDecoder
import net.sergeych.boss_serialization_mp.BossEncoder
import net.sergeych.boss_serialization_mp.decodeBoss
import net.sergeych.mp_logger.LogTag
import net.sergeych.mp_logger.debug
import net.sergeych.mp_logger.exception
import net.sergeych.mp_logger.warning
import net.sergeych.mptools.toDump
@ -67,7 +66,7 @@ import net.sergeych.mptools.toDump
* parsec3 built-in exceptions.
* @param sendEncoded a method that performs actual sending of the packed binary frame to the remote side
*/
open class Adapter<T>(
open class Adapter<T: WithAdapter>(
private val instance: T,
private val commandHost: CommandHost<T>,
private val exceptionRegistry: ExceptionsRegistry = ExceptionsRegistry(),
@ -95,14 +94,14 @@ open class Adapter<T>(
return CompletableDeferred<ByteArray>().also { dr ->
sendPackage(
access.withLock {
debug { "calling $lastId:${ca.name}($args)" }
// debug { "calling $lastId:${ca.name}($args)" }
completions[lastId] = dr
myId = lastId
Package.Command(lastId++, ca.name, BossEncoder.encode(ca.ass, args))
}
)
}.await().let {
debug { "result $myId:$it" }
// debug { "result $myId:$it" }
BossDecoder.decodeFrom(ca.rss, it)
}
}

View File

@ -3,13 +3,19 @@ package net.sergeych.parsec3
import kotlinx.coroutines.flow.Flow
import net.sergeych.mp_tools.globalLaunch
class AdapterBuilder<S, H : CommandHost<S>>(
open class WithAdapter {
internal var _adapter: Adapter<*>? = null
val adapter: Adapter<*> get() = _adapter ?: throw IllegalStateException("adapter is not yet initialized")
}
class AdapterBuilder<S : WithAdapter, H : CommandHost<S>>(
val api: H,
val exceptionRegistry: ExceptionsRegistry = ExceptionsRegistry(),
f: AdapterBuilder<S, H>.() -> Unit,
) {
internal var sessionProducer: (suspend () -> S) = { Unit as S}
internal var sessionProducer: (suspend () -> S) = { WithAdapter() as S}
private set
@ -17,10 +23,6 @@ class AdapterBuilder<S, H : CommandHost<S>>(
sessionProducer = f
}
private var _adapter: Adapter<S>? = null
val adapter: Adapter<S> get() = _adapter ?: throw IllegalStateException("adapter is not yet initialized")
/**
* Register command implementation
*/
@ -33,15 +35,18 @@ class AdapterBuilder<S, H : CommandHost<S>>(
}
suspend fun createWith(input: Flow<ByteArray>, f: suspend (ByteArray)->Unit ): Adapter<S> {
return Adapter<S>(sessionProducer(), api, exceptionRegistry) { f(it) }
val s = sessionProducer()
return Adapter<S>(
s, api, exceptionRegistry) { f(it) }
.also { a->
s._adapter = a
globalLaunch { input.collect { a.receiveFrame(it)} }
_adapter = a
}
}
suspend fun create(f: suspend (ByteArray) -> Unit): Adapter<S> {
return Adapter(sessionProducer(), api, exceptionRegistry, f)
val s = sessionProducer()
return Adapter(s, api, exceptionRegistry, f).also { s._adapter = it }
}
init {

View File

@ -13,7 +13,7 @@ class CommandDescriptor<A, R>(
@Suppress("UNCHECKED_CAST")
suspend operator fun invoke(adapter: Adapter<*>): R = adapter.invokeCommand(this,Unit as A)
operator fun <I>invoke(commandHost: CommandHost<I>, block: suspend I.(A)->R) {
operator fun <I: WithAdapter>invoke(commandHost: CommandHost<I>, block: suspend I.(A)->R) {
commandHost.on(this, block)
}
}

View File

@ -30,7 +30,7 @@ import kotlin.reflect.typeOf
*
* @param T the type of the `state` instance used to hold state, use `Unit` for stateless interfaces
*/
open class CommandHost<T> {
open class CommandHost<T: WithAdapter> {
private val handlers = mutableMapOf<String, suspend T.(ByteArray) -> ByteArray>()
/**

View File

@ -7,7 +7,7 @@ import kotlinx.coroutines.flow.StateFlow
* asynchronous (push capable) calls, but normally it should be used as a transport for parsec 3.1 secure
* protocol which uses this transport interface to run.
*/
interface Parsec3Transport<S> {
interface Parsec3Transport<S: WithAdapter> {
val connectedFlow: StateFlow<Boolean>
fun close()

View File

@ -11,7 +11,7 @@ import net.sergeych.mp_logger.LogTag
import net.sergeych.mp_logger.info
import net.sergeych.mp_tools.globalLaunch
class Parsec3WSClient<S, H : CommandHost<S>>(
class Parsec3WSClient<S: WithAdapter, H : CommandHost<S>>(
val url: String,
val api: H,
val exceptionsRegistry: ExceptionsRegistry = ExceptionsRegistry(),

View File

@ -11,14 +11,14 @@ import kotlin.test.assertEquals
internal class AdapterTest {
object Api1 : CommandHost<Unit>() {
object Api1 : CommandHost<WithAdapter>() {
// create command `foo` that takes a string argument and
// returns a string:
val foo by command<String, String>()
}
object Api2 : CommandHost<Unit>() {
object Api2 : CommandHost<WithAdapter>() {
val bar by command<String, String>()
}
@ -36,8 +36,8 @@ internal class AdapterTest {
it + "bar"
}
val a1 = Adapter(Unit, api1) { ch12.send(it) }
val a2 = Adapter(Unit, api2) { ch21.send(it) }
val a1 = Adapter(WithAdapter(), api1) { ch12.send(it) }
val a2 = Adapter(WithAdapter(), api2) { ch21.send(it) }
launch { for (b in ch12) a2.receiveFrame(b) }
launch { for (b in ch21) a1.receiveFrame(b) }
@ -49,7 +49,7 @@ internal class AdapterTest {
}
data class TestSession(var buzz: String)
data class TestSession(var buzz: String) : WithAdapter()
object ApiS1 : CommandHost<TestSession>() {
// create command `foo` that takes a string argument and
@ -58,7 +58,7 @@ internal class AdapterTest {
val ex by command<String,Unit>()
val ex2 by command<String,Unit>()
}
class ApiS2<T> : CommandHost<T>() {
class ApiS2<T: WithAdapter> : CommandHost<T>() {
// create command `foo` that takes a string argument and
// returns a string:
val bar by command<String, String>()
@ -86,8 +86,7 @@ internal class AdapterTest {
throw IllegalArgumentException()
}
}
val b2 = AdapterBuilder(ApiS2<Unit>(), er) {
newSession { }
val b2 = AdapterBuilder(ApiS2(), er) {
on(api.bar) {
it + "bar"
}
@ -111,7 +110,7 @@ internal class AdapterTest {
}
// assertEquals("123bar", a1.invokeCommand(ApiS2<Unit>().bar, "123"))
assertEquals("%% loop-42foo %%", a1.invokeCommand(ApiS2<Unit>().loopCall, "123"))
assertEquals("%% loop-42foo %%", a1.invokeCommand(ApiS2<WithAdapter>().loopCall, "123"))
assertEquals("32142foo", a2.invokeCommand(ApiS1.foo, "321"))
assertEquals("---42foo", ApiS1.foo.invoke(a2, "---"))

View File

@ -14,7 +14,7 @@ import java.util.concurrent.atomic.AtomicLong
* Creates a ktor server initialization module capable to perform p3 transport layer (not secure).
* It could be used as is or as transport for p3.1
*/
fun <S, H : CommandHost<S>>Application.parsec3TransportServer(
fun <S: WithAdapter, H : CommandHost<S>>Application.parsec3TransportServer(
api: H,
exceptionsRegistry: ExceptionsRegistry = ExceptionsRegistry(),
path: String = "/api/p3",

View File

@ -8,12 +8,12 @@ import kotlin.test.assertEquals
internal class WsServerKtTest {
data class TestSession(var buzz: String = "BuZZ")
data class TestSession(var buzz: String = "BuZZ"): WithAdapter()
object TestApiServer: CommandHost<TestSession>() {
val foo by command<String,String>()
}
object TestApiClient: CommandHost<Unit>() {
object TestApiClient: CommandHost<WithAdapter>() {
val bar by command<String,String>()
}