added simple realtime server and client code (over ws0

This commit is contained in:
Sergey Chernov 2022-09-07 22:38:21 +03:00
parent 0cfe95d4f4
commit b35ca92e48
9 changed files with 202 additions and 13 deletions

View File

@ -1,5 +1,6 @@
val ktor_version: String by project
val kotlin_version: String by project
val logback_version: String by project
plugins {
kotlin("multiplatform") version "1.7.10"
@ -58,9 +59,18 @@ kotlin {
val jvmMain by getting {
dependencies {
implementation("org.mapdb:mapdb:3.0.8")
implementation("io.ktor:ktor-client-cio-jvm:$ktor_version")
implementation("io.ktor:ktor-server-websockets:$ktor_version")
}
}
val jvmTest by getting {
dependencies {
implementation("io.ktor:ktor-server-core:$ktor_version")
implementation("io.ktor:ktor-server-netty:$ktor_version")
implementation("ch.qos.logback:logback-classic:$logback_version")
}
}
val jvmTest by getting
val jsMain by getting
val jsTest by getting
}

View File

@ -2,4 +2,5 @@ kotlin.code.style=official
kotlin.mpp.enableGranularSourceSetsMetadata=true
kotlin.native.enableDependencyPropagation=false
kotlin.js.generate.executable.default=false
ktor_version=2.1.0
ktor_version=2.1.1
logback_version=1.2.10

View File

@ -117,10 +117,14 @@ open class Adapter<T>(
} catch (ae: ParsecException) {
sendPackage(Package.Response(pe.id, null, ae.code, ae.text))
} catch (ex: Throwable) {
exceptionRegistry.classCodes[ex::class]?.let { code ->
sendPackage(Package.Response(pe.id, null, code, ex.toString()))
} ?: run {
ex.printStackTrace()
sendPackage(Package.Response(pe.id, null, "UNKNOWN_ERROR", ex.toString()))
}
}
}
is Package.Response -> {
val dr = access.withLock { completions.remove(pe.toId) }

View File

@ -5,11 +5,11 @@ import net.sergeych.mp_tools.globalLaunch
class AdapterBuilder<S, H : CommandHost<S>>(
val api: H,
private val exceptionRegistry: ExceptionsRegistry = ExceptionsRegistry(),
val exceptionRegistry: ExceptionsRegistry = ExceptionsRegistry(),
f: AdapterBuilder<S, H>.() -> Unit,
) {
internal var sessionProducer: (suspend () -> S)? = null
internal var sessionProducer: (suspend () -> S) = { Unit as S}
private set
@ -24,15 +24,19 @@ class AdapterBuilder<S, H : CommandHost<S>>(
api.on(ca, block)
}
fun <T : Throwable> addError(code: String, handler: (String?) -> T) {
inline fun <reified T : Throwable> addError(code: String, noinline handler: (String?) -> T) {
exceptionRegistry.register(code, handler)
}
suspend fun createWith(input: Flow<ByteArray>, f: suspend (ByteArray)->Unit ): Adapter<S> {
return Adapter<S>(sessionProducer!!(), api, exceptionRegistry) { f(it) }
return Adapter<S>(sessionProducer(), api, exceptionRegistry) { f(it) }
.also { a-> globalLaunch { input.collect { a.receiveFrame(it)} } }
}
suspend fun create(f: suspend (ByteArray) -> Unit): Adapter<S> {
return Adapter(sessionProducer(), api, exceptionRegistry, f)
}
init {
f(this)
}

View File

@ -1,5 +1,7 @@
package net.sergeych.parsec3
import kotlin.reflect.KClass
/**
* Registry to restore exceptions from parsec block data. Serializing exceptions is dangerous: being a OS-bound
* objects, exceptions can carry too much sensitive or useless information (e.g. call stack), and serializng
@ -15,18 +17,21 @@ package net.sergeych.parsec3
*/
open class ExceptionsRegistry {
private val handlers = mutableMapOf<String,(String?)->Throwable>().also {
val handlers = mutableMapOf<String,(String?)->Throwable>().also {
// predefined exceptions:
it[commandNotFoundCode] = { CommandNotFoundException(it ?: "???") }
it[unknownErrorCode] = { UnknownException(it ?: "???") }
}
val classCodes = mutableMapOf<KClass<*>,String>()
/**
* Register an exception with a code with a handler that creates its instance. Note that the
* handler _should not throw anything_ but rather create an instance of the exception.
*/
fun <T: Throwable>register(code: String, block: (message: String?) -> T) {
inline fun <reified T: Throwable>register(code: String, noinline block: (message: String?) -> T) {
handlers[code] = block
classCodes[T::class] = code
}
/**

View File

@ -0,0 +1,66 @@
package net.sergeych.parsec3
import io.ktor.client.*
import io.ktor.client.plugins.websocket.*
import io.ktor.websocket.*
import kotlinx.coroutines.CompletableDeferred
import kotlinx.coroutines.flow.MutableStateFlow
import kotlinx.coroutines.flow.StateFlow
import kotlinx.coroutines.launch
import net.sergeych.mp_logger.LogTag
import net.sergeych.mp_logger.info
import net.sergeych.mp_tools.globalLaunch
class Parsec3WSClient<S, H : CommandHost<S>>(
val url: String,
val api: H,
val exceptionsRegistry: ExceptionsRegistry = ExceptionsRegistry(),
f: AdapterBuilder<S, H>.() -> Unit,
) : LogTag("P3WSC") {
val builder = AdapterBuilder(api, exceptionsRegistry, f)
private val _connectionFlow = MutableStateFlow(false)
private val closeFlow = MutableStateFlow(false)
val connectedFlow: StateFlow<Boolean> = _connectionFlow
init {
start()
}
fun close() {
if( closeFlow.value == false ) closeFlow.value = true
}
var deferredAdapter = CompletableDeferred<Adapter<S>>()
private set
suspend fun adapter(): Adapter<S> = deferredAdapter.await()
fun start() {
globalLaunch {
client.webSocket(url) {
info { "Connected to $url" }
val a = builder.create { send(Frame.Binary(true, it)) }
_connectionFlow.value = true
launch { closeFlow.collect { if( it == true ) close() } }
deferredAdapter.complete(a)
for (f in incoming)
if (f is Frame.Binary) a.receiveFrame(f.data)
// when we leave connection will be closed. So far we do not close the adapter,
// should we?
_connectionFlow.value = false
deferredAdapter = CompletableDeferred()
}
}
}
companion object {
private val client = HttpClient {
install(WebSockets) {
// Configure WebSockets
}
}
}
}

View File

@ -56,6 +56,7 @@ internal class AdapterTest {
// returns a string:
val foo by command<String, String>()
val ex by command<String,Unit>()
val ex2 by command<String,Unit>()
}
class ApiS2<T> : CommandHost<T>() {
// create command `foo` that takes a string argument and
@ -69,7 +70,7 @@ internal class AdapterTest {
val ch21 = Channel<ByteArray>()
val er = ExceptionsRegistry().also {
it.register("foo_x") { IllegalArgumentException("foo_x") }
it.register("foo_x") { IllegalArgumentException(it) }
}
val b1 = AdapterBuilder(ApiS1, er) {
@ -78,7 +79,10 @@ internal class AdapterTest {
it + buzz + "foo"
}
on(ApiS1.ex) {
throw ParsecException("foo_x")
throw ParsecException("foo_x", it)
}
on(ApiS1.ex2) {
throw IllegalArgumentException()
}
}
val b2 = AdapterBuilder(ApiS2<Unit>(), er) {
@ -95,7 +99,9 @@ internal class AdapterTest {
assertEquals("32142foo", a2.invokeCommand(ApiS1.foo, "321"))
assertEquals("---42foo", ApiS1.foo.invoke(a2, "---"))
assertThrows<IllegalArgumentException> { ApiS1.ex.invoke(a2, "foobar") }
val x = assertThrows<IllegalArgumentException> { ApiS1.ex.invoke(a2, "foobar") }
assertEquals("foobar", x.message)
assertThrows<IllegalArgumentException> { ApiS1.ex2.invoke(a2, "foobar") }
ch12.cancel()
ch21.cancel()

View File

@ -0,0 +1,47 @@
package net.sergeych.parsec3
import io.ktor.server.application.*
import io.ktor.server.routing.*
import io.ktor.server.websocket.*
import io.ktor.websocket.*
import net.sergeych.mp_logger.LogTag
import net.sergeych.mp_logger.warning
import java.time.Duration
import java.util.concurrent.atomic.AtomicInteger
import java.util.concurrent.atomic.AtomicLong
fun <S, H : CommandHost<S>>Application.parsec3Server(
api: H,
exceptionsRegistry: ExceptionsRegistry = ExceptionsRegistry(),
path: String = "/api/p3",
f: AdapterBuilder<S, H>.() -> Unit,
) {
val log = LogTag("P3WSS")
install(WebSockets) {
pingPeriod = Duration.ofSeconds(45)
timeout = Duration.ofSeconds(15)
maxFrameSize = Long.MAX_VALUE
masking = false
}
val builder = AdapterBuilder(api, exceptionsRegistry, f)
routing {
webSocket(path) { // websocketSession
val adapter = builder.create { outgoing.send(Frame.Binary(true, it)) }
for (frame in incoming) {
when (frame) {
is Frame.Binary -> {
adapter.receiveFrame(frame.readBytes())
}
else -> {
log.warning { "unsupported frame type: $frame" }
}
}
}
}
var totalConnections = AtomicLong(0)
var activeConnections = AtomicInteger(0)
}
}

View File

@ -0,0 +1,46 @@
package net.sergeych.parsec3
import io.ktor.server.engine.*
import io.ktor.server.netty.*
import kotlinx.coroutines.runBlocking
import kotlin.test.Test
import kotlin.test.assertEquals
internal class WsServerKtTest {
data class TestSession(var buzz: String = "BuZZ")
object TestApiServer: CommandHost<TestSession>() {
val foo by command<String,String>()
}
object TestApiClient: CommandHost<Unit>() {
val bar by command<String,String>()
}
@Test
fun testWsServer() {
embeddedServer(Netty, port = 8080) {
parsec3Server(
TestApiServer,
) {
newSession { TestSession() }
on(api.foo) {
it + buzz + "-foo"
}
}
}.start(wait = false)
val client = Parsec3WSClient("ws://localhost:8080/api/p3", TestApiClient) {
on(api.bar) {
"bar:$it"
}
}
runBlocking {
val x = TestApiServer.foo.invoke(client.adapter(), "*great*")
assertEquals("*great*BuZZ-foo", x)
client.close()
}
}
}