removed unused code. fixed storage delegate, added adapter builder onCancel handler registration

This commit is contained in:
Sergey Chernov 2023-01-19 16:36:40 +01:00
parent 1fdbcf7913
commit 8175bacfb8
9 changed files with 91 additions and 43 deletions

View File

@ -2,7 +2,7 @@
[![License: MIT](https://img.shields.io/badge/License-MIT-yellow.svg)](https://opensource.org/licenses/MIT)
Current stable version is __0.4.1-SNAPSHOT__. The full-dpulex RPC over websock is ok, the security level is still disabled until the protocol part will be sabilized.
Current stable version is __0.4.2-SNAPSHOT__. The full-dpulex RPC over websock is ok, the security level is still disabled until the protocol part will be sabilized.
This is a connection-agnostic, kotlin multiplaftorm library providing full-duplex RPC type binary protocol, effective to work with binary data, such as encrypted data, keys, multimedia, etc. Default implementation uses websockets transport (binary frames) available on all supported platofrms (currently, JS and JVM).
@ -46,7 +46,7 @@ repsitories {
//...
dependencies {
api("net.sergeych:parsec3:0.3.3")
api("net.sergeych:parsec3:0.4.2-SNAPSHOT")
}
```

View File

@ -10,7 +10,7 @@ plugins {
}
group = "net.sergeych"
version = "0.4.1-SNAPSHOT"
version = "0.4.2-SNAPSHOT"
repositories {
mavenCentral()
@ -22,6 +22,7 @@ kotlin {
jvmToolchain {
languageVersion.set(JavaLanguageVersion.of("8"))
}
jvm {
compilations.all {
kotlinOptions.jvmTarget = "1.8"
@ -39,6 +40,10 @@ kotlin {
}
}
sourceSets {
all {
languageSettings.optIn("kotlinx.serialization.ExperimentalSerializationApi")
languageSettings.optIn("kotlinx.coroutines.ExperimentalCoroutinesApi")
}
val commonMain by getting {
dependencies {
implementation("org.jetbrains.kotlinx:kotlinx-coroutines-core:1.6.3")
@ -83,9 +88,11 @@ kotlin {
repositories {
maven {
url = uri("https://maven.universablockchain.com/")
val mavenUser: String by project
val mavenPassword: String by project
credentials {
username = System.getenv("maven_user")
password = System.getenv("maven_password")
username = mavenUser // System.getenv("maven_user")
password = mavenPassword // System.getenv("maven_password")
}
}
}

View File

@ -16,7 +16,7 @@ class AdapterBuilder<S : WithAdapter, H : CommandHost<S>>(
f: AdapterBuilder<S, H>.() -> Unit,
) {
internal var sessionProducer: (suspend () -> S) = { WithAdapter() as S}
internal var sessionProducer: (suspend () -> S) = { WithAdapter() as S }
private set
@ -31,6 +31,17 @@ class AdapterBuilder<S : WithAdapter, H : CommandHost<S>>(
api.on(ca, block)
}
val onCancelHandlers = mutableListOf<suspend () -> Unit>()
/**
* Set a onCancel handler that will be called upon adapter cancelling. Several adapters
* could be registered and will be called in order of registration
*/
fun onCancel(f: suspend () -> Unit) {
onCancelHandlers += f
}
@Suppress("unused")
inline fun <reified T : Throwable> addError(code: String, noinline handler: (String?) -> T) {
exceptionRegistry.register(code, handler)
@ -41,13 +52,23 @@ class AdapterBuilder<S : WithAdapter, H : CommandHost<S>>(
exceptionRegistry.putAll(otherRegistry)
}
suspend fun createWith(input: Flow<ByteArray>, f: suspend (ByteArray)->Unit ): Adapter<S> {
suspend fun createWith(input: Flow<ByteArray>, f: suspend (ByteArray) -> Unit): Adapter<S> {
val s = sessionProducer()
return Adapter<S>(
s, api, exceptionRegistry) { f(it) }
.also { a->
s, api, exceptionRegistry
) { f(it) }
.also { a ->
s._adapter = a
globalLaunch { input.collect { a.receiveFrame(it)} }
if (onCancelHandlers.isNotEmpty()) {
a.onCancel = {
for (h in onCancelHandlers) try {
h()
} catch (t: Throwable) {
t.printStackTrace()
}
}
}
globalLaunch { input.collect { a.receiveFrame(it) } }
}
}

View File

@ -51,7 +51,7 @@ class KVStorageDelegate<T>(
private var cachedValue: T = defaultValue
private var cacheReady = false
operator fun getValue(thisRef: Nothing?, property: KProperty<*>): T {
operator fun getValue(thisRef: Any?, property: KProperty<*>): T {
if (cacheReady) return cachedValue
val data = storage.get(name(property))
if (data == null)
@ -62,7 +62,7 @@ class KVStorageDelegate<T>(
return cachedValue
}
operator fun setValue(thisRef: Nothing?, property: KProperty<*>, value: T) {
operator fun setValue(thisRef: Any?, property: KProperty<*>, value: T) {
// if (!cacheReady || value != cachedValue) {
cachedValue = value
cacheReady = true
@ -83,6 +83,7 @@ class MemoryKVStorage(copyFrom: KVStorage? = null) : KVStorage {
// is used while underlying is null:
private val data = mutableMapOf<String, ByteArray>()
@Suppress("unused")
fun connectToStorage(other: KVStorage) {
other.addAll(this)
underlying = other
@ -110,10 +111,10 @@ class MemoryKVStorage(copyFrom: KVStorage? = null) : KVStorage {
}
override val keys: Collection<String>
get() = underlying?.let { it.keys } ?: data.keys
get() = underlying?.keys ?: data.keys
override fun clear() {
underlying?.let { it.clear() } ?: data.clear()
underlying?.clear() ?: data.clear()
}
init {

View File

@ -1,7 +1,7 @@
package net.sergeych.parsec3
/**
* Parsec3 secure adapter.
* @param transport a parsec3 transport to establish connection with, for example [Parsec3WSClient].
*/
class Parsec3SecureClient<S : WithAdapter>(transport: Parsec3Transport<WithAdapter>)
///**
// * Parsec3 secure adapter.
// * @param transport a parsec3 transport to establish connection with, for example [Parsec3WSClient].
// */
//class Parsec3SecureClient<S : WithAdapter>(transport: Parsec3Transport<WithAdapter>)

View File

@ -8,6 +8,7 @@ import kotlinx.coroutines.test.runTest
import net.sergeych.parsec3.*
import kotlin.test.Test
import kotlin.test.assertEquals
import kotlin.test.assertTrue
internal class AdapterTest {
@ -74,6 +75,8 @@ internal class AdapterTest {
it.register("foo_x") { IllegalArgumentException(it) }
}
var b1Cancelled = false
var b2Cancelled = false
val b1 = AdapterBuilder(ApiS1, er) {
newSession { TestSession("42") }
on(api.foo) {
@ -85,6 +88,9 @@ internal class AdapterTest {
on(ApiS1.ex2) {
throw IllegalArgumentException()
}
onCancel {
b1Cancelled = true
}
}
val b2 = AdapterBuilder(ApiS2(), er) {
on(api.bar) {
@ -100,6 +106,7 @@ internal class AdapterTest {
throw t
}
}
onCancel { b2Cancelled = true }
}
val a1 = b1.createWith(ch21.receiveAsFlow()) {
@ -122,6 +129,9 @@ internal class AdapterTest {
ch21.cancel()
a1.cancel()
a2.cancel()
assertTrue { b1Cancelled }
assertTrue { b2Cancelled }
}
}

View File

@ -1,12 +1,18 @@
package parsec3
import net.sergeych.mptools.toDump
import net.sergeych.parsec3.KVStorage
import net.sergeych.parsec3.MemoryKVStorage
import net.sergeych.parsec3.stored
import kotlin.test.Test
import kotlin.test.assertEquals
import kotlin.test.assertNull
class TextSt(kvs: KVStorage) {
var x: String? by kvs.stored(null)
}
@Suppress("UNUSED_VARIABLE")
internal class MemoryKVStorageTest {
@Test
@ -18,12 +24,20 @@ internal class MemoryKVStorageTest {
s["foo"] = byteArrayOf(1,2,3)
var bar: String? by s.stored(null)
val bal: String? by s.stored(null)
assertNull(bar)
bar = "foo"
println(s.get("bar")?.toDump())
assertEquals("foo", bar)
var foo: String by s.stored("", "bar")
val foo: String by s.stored("", "bar")
assertEquals("foo", foo)
val t1 = TextSt(s)
assertNull(t1.x)
t1.x = "bar"
assertEquals("bar",t1.x)
val t2 = TextSt(s)
assertEquals("bar",t2.x)
//
// var i: Int? by s(defaultValue = 17)
// i = 19

View File

@ -6,10 +6,7 @@ import io.ktor.server.websocket.*
import io.ktor.websocket.*
import net.sergeych.mp_logger.LogTag
import net.sergeych.mp_logger.warning
import net.sergeych.unikrypto.DiffieHellman
import java.time.Duration
import java.util.concurrent.atomic.AtomicInteger
import java.util.concurrent.atomic.AtomicLong
/**
* Creates a ktor server initialization module capable to perform p3 transport layer (not secure).
@ -52,25 +49,23 @@ fun <S: WithAdapter, H : CommandHost<S>>Application.parsec3TransportServer(
}
adapter.close()
}
var totalConnections = AtomicLong(0)
var activeConnections = AtomicInteger(0)
}
}
fun <S: WithAdapter, H : CommandHost<S>>Application.parsec3SecureServer(
api: H,
exceptionsRegistry: ExceptionsRegistry = ExceptionsRegistry(),
path: String = "/api/p3",
f: AdapterBuilder<S, H>.() -> Unit,
) {
val log = LogTag("P3WSS")
// parsec3TransportServer(api, ex)
// install(WebSockets) {
// pingPeriod = Duration.ofSeconds(45)
// timeout = Duration.ofSeconds(15)
// maxFrameSize = Long.MAX_VALUE
// masking = false
// }
val dh = DiffieHellman()
}
//fun <S: WithAdapter, H : CommandHost<S>>Application.parsec3SecureServer(
// api: H,
// exceptionsRegistry: ExceptionsRegistry = ExceptionsRegistry(),
// path: String = "/api/p3",
// f: AdapterBuilder<S, H>.() -> Unit,
//) {
// val log = LogTag("P3WSS")
//// parsec3TransportServer(api, ex)
//// install(WebSockets) {
//// pingPeriod = Duration.ofSeconds(45)
//// timeout = Duration.ofSeconds(15)
//// maxFrameSize = Long.MAX_VALUE
//// masking = false
//// }
// val dh = DiffieHellman()
//}

View File

@ -31,7 +31,7 @@ internal class WsServerKtTest {
@Test
fun testWsServer() {
embeddedServer(Netty, port = 8081) {
embeddedServer(Netty, port = 8089) {
parsec3TransportServer(
TestApiServer,
) {
@ -42,7 +42,7 @@ internal class WsServerKtTest {
}
}.start(wait = false)
val client = Parsec3WSClient("ws://localhost:8081/api/p3", TestApiClient) {
val client = Parsec3WSClient("ws://localhost:8089/api/p3", TestApiClient) {
on(api.bar) {
"bar:$it"
}