+exception registry could be extended from inside adapter and transport

+if websock is already initialized in the server, ignore an error
This commit is contained in:
Sergey Chernov 2022-12-10 14:07:10 +01:00
parent 07f9e720a1
commit 19b91c0470
12 changed files with 191 additions and 26 deletions

View File

@ -2,6 +2,8 @@
> v0.1.*+ __are incompatible with 0.0.* versions due to binary protocol optimization.
> v.0.2.* is a for of 0.1 build for JVM 1.8 for better inline compatibility
This is a connection-agnostic, full-duplex RPC type binary protocol, effective to work with binary data, such as encrypted data, keys, multimedia, etc. Its key points are:
- simple and practical transport RPC layer, which is a primary choice when, for exaple, `wss://` level by TSL is enough, e.g. when there is no sensitive data being transmitted (games, etc).

View File

@ -10,7 +10,7 @@ plugins {
}
group = "net.sergeych"
version = "0.1.1-SNAPSHOT"
version = "0.3.3-SNAPSHOT"
repositories {
mavenCentral()
@ -20,11 +20,11 @@ repositories {
kotlin {
jvmToolchain {
languageVersion.set(JavaLanguageVersion.of("11"))
languageVersion.set(JavaLanguageVersion.of("8"))
}
jvm {
compilations.all {
kotlinOptions.jvmTarget = "11"
kotlinOptions.jvmTarget = "1.8"
}
withJava()
testRuns["test"].executionTask.configure {
@ -41,16 +41,13 @@ kotlin {
sourceSets {
val commonMain by getting {
dependencies {
implementation("org.jetbrains.kotlinx:kotlinx-coroutines-core:1.6.3")
implementation("io.ktor:ktor-client-core:$ktor_version")
implementation("io.ktor:ktor-client-websockets:$ktor_version")
api("net.sergeych:unikrypto:1.2.0-SNAPSHOT")
implementation("org.jetbrains.kotlinx:kotlinx-coroutines-core:1.6.3")
implementation("io.ktor:ktor-client-core:$ktor_version")
implementation("io.ktor:ktor-client-websockets:$ktor_version")
api("org.jetbrains.kotlinx:kotlinx-datetime:0.4.0")
api("net.sergeych:boss-serialization-mp:0.1.4-SNAPSHOT")
api("net.sergeych:mp_stools:1.2.3-SNAPSHOT")
api("net.sergeych:boss-serialization-mp:0.2.4-SNAPSHOT")
api("net.sergeych:unikrypto:1.2.2-SNAPSHOT")
api("net.sergeych:mp_stools:1.3.2-SNAPSHOT")
}
}
@ -65,6 +62,7 @@ kotlin {
implementation("org.mapdb:mapdb:3.0.8")
implementation("io.ktor:ktor-client-cio-jvm:$ktor_version")
implementation("io.ktor:ktor-server-websockets:$ktor_version")
implementation("io.ktor:ktor-server-websockets-jvm:$ktor_version")
}
}
val jvmTest by getting {

25
notes/p3-2layout_ideas.md Normal file
View File

@ -0,0 +1,25 @@
# Принцип
Поверх незащищенного уровня вешаем DH, инициализируем сразу.
Адаптеру второго уровня передаем некую затравку, из нее формируем __ключ токена__, который никому не отдаем (выводим из затравки). Когда ДХ сессия установлена, мы передаем клиенту __токен сессии__ - зашифрованный на ключ клиента его сессионный ключ и использованием EtA.
Когда клиент пытается быстро восстановиться он присылает нам токен сессии. Мы его пытаемся расшифровать ключеом токена. Если он не изменился (а такое вполне себе возможно) или неверный, то EtA в любом случае это поймает. Если же EtA расшифровал, мы знаем что ключ нормальный, и его используем его для сессии.
Таким образом, сервер сессии не хранит :) по моему прикольно!
# Реализация
## Сервер
### Начальное подключение
После получения ключа сессии из DH, шифрует его на _ключ токена_ и полученный _токен сессии_ отдает клиенту. Клиент сохраняет и ключ сессии, и токен.
### Восстановление по токену
Клиент при начальном соединении вспоминает ключ сессии и токен, и отправляет последний на сервер, а для проверки владения отправляет также зашифрованный на сессионный ключ тот случайный мусор (EtA достаточная проверка, контент проверять не требуется).
Сервер при коннекте получает пару (токен, зашифрованная контрольная посылка). Он расшифровывает токен, получает ключ сессии. Если расшифровка успешна (EtA опять), то он знает что ключ верный. Дальше он расшифровывает им контрольную посылку, и если EtA не выдает ошибки, считает что сессия восстановлена, и отсылает новый токен сессии (ключ токена должен потихоньку ротироваться). После чего считаем что сессия восстановлена.

View File

@ -1,3 +1,5 @@
@file:Suppress("OPT_IN_USAGE")
package net.sergeych.parsec3
import kotlinx.coroutines.*
@ -116,6 +118,14 @@ open class Adapter<T: WithAdapter>(
scope.cancel()
}
/**
* merge exceptions registry with current (existing entries will be overwritten)
*/
@Suppress("unused")
fun registerErrors(otherRegistry: ExceptionsRegistry) {
exceptionRegistry.putAll(otherRegistry)
}
private suspend fun processIncomingPackage(pe: Package) {
when (pe) {
is Package.Command -> {

View File

@ -9,6 +9,7 @@ open class WithAdapter {
val adapter: Adapter<*> get() = _adapter ?: throw IllegalStateException("adapter is not yet initialized")
}
@Suppress("UNCHECKED_CAST")
class AdapterBuilder<S : WithAdapter, H : CommandHost<S>>(
val api: H,
val exceptionRegistry: ExceptionsRegistry = ExceptionsRegistry(),
@ -30,10 +31,16 @@ class AdapterBuilder<S : WithAdapter, H : CommandHost<S>>(
api.on(ca, block)
}
@Suppress("unused")
inline fun <reified T : Throwable> addError(code: String, noinline handler: (String?) -> T) {
exceptionRegistry.register(code, handler)
}
@Suppress("unused")
fun addErrors(otherRegistry: ExceptionsRegistry) {
exceptionRegistry.putAll(otherRegistry)
}
suspend fun createWith(input: Flow<ByteArray>, f: suspend (ByteArray)->Unit ): Adapter<S> {
val s = sessionProducer()
return Adapter<S>(

View File

@ -40,10 +40,19 @@ open class ExceptionsRegistry {
classCodes[T::class] = _code
}
/**
* Put all registere exception from another registry overriding existing ones if any.
*/
fun putAll(other: ExceptionsRegistry) {
classCodes.putAll(other.classCodes)
handlers.putAll(other.handlers)
}
/**
* raise the exception using the proper handler. Throws [UnknownCodeException] of there is no handler
* for a given code.
*/
@Suppress("unused")
internal fun raise(code: String, message: String?): Nothing {
throw getException(code, message)
}
@ -54,6 +63,11 @@ open class ExceptionsRegistry {
internal fun getException(code: String, message: String?): Throwable =
handlers[code]?.let { it(message) } ?: UnknownCodeException(code, message)
init {
register("illegal state") { IllegalStateException(it) }
register("illegal argument") { IllegalArgumentException(it) }
}
companion object {
val commandNotFoundCode = "_COMMAND_NOT_FOUND"
val unknownErrorCode = "_UNKNOWN_ERROR"

View File

@ -0,0 +1,7 @@
package net.sergeych.parsec3
/**
* Parsec3 secure adapter.
* @param transport a parsec3 transport to establish connection with, for example [Parsec3WSClient].
*/
class Parsec3SecureClient<S : WithAdapter>(transport: Parsec3Transport<WithAdapter>)

View File

@ -0,0 +1,22 @@
package net.sergeych.parsec3
import net.sergeych.unikrypto.SymmetricKey
open class Parsec3SecureServerSession(
val sessionKey: SymmetricKey? = null
) : WithAdapter()
///**
// * Parsec3 secure adapter.
// * @param transport a parsec3 transport to establish connection with, for example [Parsec3WSClient].
// * @param sessionSalt a secret random string (should not long enough) used to maintain permanent session on client side.
// * if this string will be changed, all stored session will be forced to reconnect. Using a random string will
// * cause each connection to dance with Diffie-Hellman after every restart.
// */
//fun Parsec3SecureServer<S : Parsec3SecureServerSession, H : CommandHost<S>>(transport: Parsec3Transport<WithAdapter>, sessionSalt: String = randomId(107)) {
//
// private val sessionTokenKey = SymmetricKeys.create(
// HashAlgorithm.SHA3_256.digest(sessionSalt),
// BytesId(HashAlgorithm.SHA3_256.digest(sessionSalt + "_keyid"))
// )
//}

View File

@ -14,4 +14,10 @@ interface Parsec3Transport<S: WithAdapter> {
fun reconnect()
suspend fun adapter(): Adapter<S>
val exceptionsRegistry: ExceptionsRegistry
fun registerExceptinos(otherRegistry: ExceptionsRegistry) {
exceptionsRegistry.putAll(otherRegistry)
}
}

View File

@ -11,14 +11,25 @@ import net.sergeych.mp_logger.LogTag
import net.sergeych.mp_logger.info
import net.sergeych.mp_tools.globalLaunch
class Parsec3WSClient<S: WithAdapter, H : CommandHost<S>>(
/**
* Construct websocket-based client with client-side API (called _from server_). This form is universal
* and basically is needed when client is accepting synchronous data calls, e.g. pushes, from the server.
* There is a simpler constructor when it is not needed.
*
* @param url server url to connect to
* @param api client api to implement in the builder
* @param exceptionsRegistry the registry of supported exceptions that can be safely transmitted over the network
* @param builder client side api builder, called over api instance. Here client can _implement_ the commands that server
* could call.
*/
class Parsec3WSClient<S : WithAdapter, H : CommandHost<S>>(
val url: String,
val api: H,
val exceptionsRegistry: ExceptionsRegistry = ExceptionsRegistry(),
f: AdapterBuilder<S, H>.() -> Unit,
api: H,
override val exceptionsRegistry: ExceptionsRegistry = ExceptionsRegistry(),
builder: AdapterBuilder<S, H>.() -> Unit,
) : LogTag("P3WSC"), Parsec3Transport<S> {
val builder = AdapterBuilder(api, exceptionsRegistry, f)
val builder = AdapterBuilder(api, exceptionsRegistry, builder)
private val _connectionFlow = MutableStateFlow(false)
private val closeFlow = MutableStateFlow(false)
@ -31,7 +42,7 @@ class Parsec3WSClient<S: WithAdapter, H : CommandHost<S>>(
}
override fun close() {
if( closeFlow.value == false ) closeFlow.value = true
if (closeFlow.value == false) closeFlow.value = true
}
override fun reconnect() {
@ -39,13 +50,13 @@ class Parsec3WSClient<S: WithAdapter, H : CommandHost<S>>(
}
var deferredAdapter = CompletableDeferred<Adapter<S>>()
private set
private set
override suspend fun adapter(): Adapter<S> = deferredAdapter.await()
fun start() {
globalLaunch {
while(closeFlow.value != true) {
while (closeFlow.value != true) {
reconnectFlow.value = false
client.webSocket(url) {
info { "Connected to $url" }
@ -71,5 +82,22 @@ class Parsec3WSClient<S: WithAdapter, H : CommandHost<S>>(
// Configure WebSockets
}
}
/**
* Simplified client constructor for the case when client does not receives commands (e.g. pushes)
* from the server.
* @param url server url
* @param exceptionsRegistry converter of exceptions that can be received from the remote.
*/
operator fun invoke(url: String, exceptionsRegistry: ExceptionsRegistry = ExceptionsRegistry())
: Parsec3WSClient<WithAdapter, CommandHost<WithAdapter>> {
return Parsec3WSClient(url, CommandHost<WithAdapter>(), exceptionsRegistry) {}
}
@Suppress("unused")
fun <S: WithAdapter>withSession(url: String): Parsec3WSClient<S, CommandHost<S>> {
return Parsec3WSClient(url, CommandHost(), ExceptionsRegistry()) {}
}
}
}

View File

@ -0,0 +1,23 @@
package net.sergeych.parsec3
import kotlinx.serialization.Serializable
@Serializable
class RestoreSessionArgs(
val sessionToken: ByteArray,
val controlMessage: ByteArray
)
class SecureServerApi<S: WithAdapter>: CommandHost<S>() {
val restoreSession by command<RestoreSessionArgs,ByteArray>()
// reuqest session: server returns exchange, client provides one-time hello token, low security app token
// or an empty array to identify self.
val requestSession by command<ByteArray,ByteArray>()
// create session: client sends its exchange, client returns encrypted
val createSession by command<ByteArray,ByteArray>()
}

View File

@ -6,6 +6,7 @@ import io.ktor.server.websocket.*
import io.ktor.websocket.*
import net.sergeych.mp_logger.LogTag
import net.sergeych.mp_logger.warning
import net.sergeych.unikrypto.DiffieHellman
import java.time.Duration
import java.util.concurrent.atomic.AtomicInteger
import java.util.concurrent.atomic.AtomicLong
@ -20,20 +21,25 @@ fun <S: WithAdapter, H : CommandHost<S>>Application.parsec3TransportServer(
path: String = "/api/p3",
f: AdapterBuilder<S, H>.() -> Unit,
) {
val log = LogTag("P3WSS")
install(WebSockets) {
pingPeriod = Duration.ofSeconds(45)
timeout = Duration.ofSeconds(15)
maxFrameSize = Long.MAX_VALUE
masking = false
val log = LogTag("P3WST")
try {
install(WebSockets) {
pingPeriod = Duration.ofSeconds(45)
timeout = Duration.ofSeconds(15)
maxFrameSize = Long.MAX_VALUE
masking = false
}
}
catch(_: DuplicatePluginException) {
// webspkects are already initialized, it is OK
}
val builder = AdapterBuilder(api, exceptionsRegistry, f)
routing {
webSocket(path) { // websocketSession
webSocket(path) {
val adapter = builder.create { outgoing.send(Frame.Binary(true, it)) }
for (frame in incoming) {
for (frame in (this@webSocket).incoming) {
when (frame) {
is Frame.Binary -> {
adapter.receiveFrame(frame.readBytes())
@ -49,3 +55,20 @@ fun <S: WithAdapter, H : CommandHost<S>>Application.parsec3TransportServer(
}
}
fun <S: WithAdapter, H : CommandHost<S>>Application.parsec3SecureServer(
api: H,
exceptionsRegistry: ExceptionsRegistry = ExceptionsRegistry(),
path: String = "/api/p3",
f: AdapterBuilder<S, H>.() -> Unit,
) {
val log = LogTag("P3WSS")
// parsec3TransportServer(api, ex)
// install(WebSockets) {
// pingPeriod = Duration.ofSeconds(45)
// timeout = Duration.ofSeconds(15)
// maxFrameSize = Long.MAX_VALUE
// masking = false
// }
val dh = DiffieHellman()
}