0.2.5-snapshot native support for tcp client/server

This commit is contained in:
Sergey Chernov 2024-08-03 15:36:04 +02:00
parent 26d1f3522f
commit ffcdcf7350
16 changed files with 524 additions and 580 deletions

1
.idea/misc.xml generated
View File

@ -1,4 +1,3 @@
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="ExternalStorageConfigurationManager" enabled="true" />
<component name="FrameworkDetectionExcludesConfiguration">

View File

@ -1,31 +1,36 @@
# Kiloparsec
The new generation of __PARanoid SECurity__ protocol, advanced, faster, more secure. It also allows connecting any "block device" transport to the same local interface. Out if the box it
The new generation of __PARanoid SECurity__ protocol, advanced, faster, more secure. It also allows connecting any "
block device" transport to the same local interface. Out if the box it
provides the following transports:
| name | JVM | JS | native |
|----------------|-----|----|--------|
| TCP/IP server | * | | |
| TCP/IP client | * | | |
| Websock server | * | | |
| Websock client | * | * | * |
At the moment we're working on supporting TCP/IP on most native targets. This feature is planned to rach public beta in August and production in early september 2024.
| name | JVM | JS | native |
|----------------|-----|----|-------------------|
| TCP/IP server | ✓ | | β @0.2.5-SNAPSHOT |
| TCP/IP client | ✓ | | β @0.2.5-SNAPSHOT |
| Websock server | ✓ | | |
| Websock client | ✓ | ✓ | ✓ |
At the moment we're working on supporting TCP/IP on most native targets. This feature is planned to rach public beta in
August and production in early september 2024.
## TCP/IP transport
It is the fastest. JVM implementation uses nio2 async sockets and optimizes TCP socket to play
well with blocks (smart NO_DELAY mode). It is multiplatform, nut lacks of async TCP/IP support
on natvic targetm this is where I need help having little time. I'd prefer to use something asyn like UV on native targets.
on natvic targetm this is where I need help having little time. I'd prefer to use something asyn like UV on native
targets.
I know no existing way to implement it in KotlinJS for the modern browsers.
## Websock server
While it is much slower than pure TCP, it is still faster than any http-based transport. It uses binary frames based on the Ktor server framework to easily integrate with web services. We recommend using it instead of a classic HTTP API as it beats it in terms of speed and server load even with HTTP/2.
While it is much slower than pure TCP, it is still faster than any http-based transport. It uses binary frames based on
the Ktor server framework to easily integrate with web services. We recommend using it instead of a classic HTTP API as
it beats it in terms of speed and server load even with HTTP/2.
We recommend to create the `KiloInterface<S>` instance and connect it to the websock and tcp servers in real applications to get easy access from anywhere.
We recommend to create the `KiloInterface<S>` instance and connect it to the websock and tcp servers in real
applications to get easy access from anywhere.
# Usage
@ -64,16 +69,16 @@ and functions available, like:
// Api.kt
@Serializable
class FooArgs(val text: String,val number: Int = 42)
class FooArgs(val text: String, val number: Int = 42)
// Server-side interface
val cmdSetFoo by command<FooArgs,Unit>()
val cmdGetFoo by command<Unit,FooArgs>()
val cmdPing by command<String,String>()
val cmdCheckConnected by command<Unit,Boolean>()
val cmdSetFoo by command<FooArgs, Unit>()
val cmdGetFoo by command<Unit, FooArgs>()
val cmdPing by command<String, String>()
val cmdCheckConnected by command<Unit, Boolean>()
// client-side interface (called from the server)
val cmdPushClient by command<String,Unit>()
val cmdPushClient by command<String, Unit>()
```
## Call it from the client:
@ -93,7 +98,7 @@ val client = websocketClient<Unit>("wss://your.host.com/kp") {
// If we want to collect connected state changes (this is optional)
launch {
client.connectedStateFlow.collect {
if( it )
if (it)
println("I am connected")
else
println("trying to connect...")
@ -113,13 +118,13 @@ the protocol. With KILOPARSEC it is rather basic operation:\
~~~kotlin
// Our session just keeps Foo for cmd{Get|Set}Foo:
data class Session(var fooState: FooArgs?=null)
data class Session(var fooState: FooArgs? = null)
// Let's now provide interface we export, it will be used on each connection automatically:
// Note server interface uses Session:
val serverInterface = KiloInterface<Session>().apply {
onConnected {
onConnected {
// Do some initialization
session.fooState = null
}
@ -136,6 +141,7 @@ val ns: NettyApplicationEngine = embeddedServer(Netty, port = 8080, host = "0.0.
~~~
# Details
It is not compatible with parsec family and no more based on an Universa crypto library. To better fit
@ -144,12 +150,14 @@ and every connection (while parsec caches session keys to avoid time-consuming k
keys cryptography for session is shifted to use ed25519 curves which are supposed to provide agreeable strength with
enough speed to protect every connection with a unique new keys. Also, we completely get rid of SHA2.
Kiloparsec also uses a denser binary format [bipack](https://gitea.sergeych.net/SergeychWorks/mp_bintools), no more key-values,
which reveals much less on the inner data structure, providing advanced
typed RPC interfaces with kotlinx.serialization. There is also Rust implementation [bipack_ru](https://gitea.sergeych.net/DiWAN/bipack_ru).
Kiloparsec also uses a denser binary format [bipack](https://gitea.sergeych.net/SergeychWorks/mp_bintools), no more
key-values,
which reveals much less on the inner data structure, providing advanced
typed RPC interfaces with kotlinx.serialization. There is also Rust
implementation [bipack_ru](https://gitea.sergeych.net/DiWAN/bipack_ru).
The architecture allows connecting same functional interfaces to several various type channels at once.
Also, the difference from parsecs is that there are no more unencrypted layer commands available to users.
Also, the difference from parsecs is that there are no more unencrypted layer commands available to users.
All RPC is performed over the encrypted connection.
# Technical description
@ -163,7 +171,9 @@ Integrated tools to prevent MITM attacks include also non-transferred independen
independently on the ends and is never transferred with the network. Comparing it somehow (visually, with QR code, etc)
could add a very robust guarantee of the connection safety and ingenuity.
Kiloparsec has built-in completely asynchronous (coroutine based top-down) transport layer based on TCP (JVM only as for now) and the same async Websocket-based transport based on KTOR. Websocket client is multiplatform, though the server is JVM only insofar.
Kiloparsec has built-in completely asynchronous (coroutine based top-down) transport layer based on TCP (JVM only as for
now) and the same async Websocket-based transport based on KTOR. Websocket client is multiplatform, though the server is
JVM only insofar.
# Licensing

View File

@ -1,4 +1,3 @@
plugins {
kotlin("multiplatform") version "2.0.0"
id("org.jetbrains.kotlin.plugin.serialization") version "2.0.0"
@ -59,6 +58,9 @@ kotlin {
implementation("org.jetbrains.kotlinx:kotlinx-coroutines-test:1.8.1")
}
}
val ktorSocketTest by creating {
dependsOn(commonTest)
}
val jvmMain by getting {
dependencies {
implementation("io.ktor:ktor-server-core:$ktor_version")
@ -69,16 +71,52 @@ kotlin {
}
dependsOn(ktorSocketMain)
}
val jvmTest by getting
val jvmTest by getting {
dependsOn(ktorSocketTest)
}
val jsMain by getting {
dependencies {
implementation("io.ktor:ktor-client-js:$ktor_version")
}
}
val jsTest by getting
val macosArm64Main by getting {
dependsOn(ktorSocketMain)
}
val macosArm64Test by getting {
dependsOn(ktorSocketTest)
}
val macosX64Main by getting {
dependsOn(ktorSocketMain)
}
val iosX64Main by getting {
dependsOn(ktorSocketMain)
}
val iosX64Test by getting {
dependsOn(ktorSocketTest)
}
val iosArm64Main by getting {
dependsOn(ktorSocketMain)
}
val iosArm64Test by getting {
dependsOn(ktorSocketTest)
}
val linuxArm64Main by getting {
dependsOn(ktorSocketMain)
}
val linuxArm64Test by getting {
dependsOn(ktorSocketTest)
}
val linuxX64Main by getting {
dependsOn(ktorSocketMain)
}
val linuxX64Test by getting {
dependsOn(ktorSocketTest)
}
for (pm in listOf(linuxMain, macosMain, iosMain, mingwMain))
pm { dependsOn(ktorSocketMain) }
// for (pm: NamedDomainObjectProvider<KotlinSourceSet> in listOf(macosMain,linuxMain, iosMain, mingwMain))
// pm.get().dependsOn(ktorSocketMain)
}

View File

@ -1 +1,2 @@
kotlin.code.style=official
kotlin.mpp.applyDefaultHierarchyTemplate=false

View File

@ -43,6 +43,8 @@ class KiloServer<S>(
}
fun close() {
println("PRREEEC")
job.cancel()
println("POOOSTC")
}
}

View File

@ -1,56 +1,56 @@
package net.sergeych.kiloparsec.adapter
import kotlinx.coroutines.channels.ReceiveChannel
import kotlinx.coroutines.flow.Flow
/**
* Multiplatform implementation of an internet address.
* Notice to implementors. It must provide correct and effective [equals] and [hashCode].
* Multiplatform internet address.
*/
interface NetworkAddress {
val host: String
data class NetworkAddress(
val host: String,
val port: Int
}
/**
* Multiplatform datagram abstraction
*/
interface Datagram {
/**
* Received message
*/
val message: UByteArray
/**
* Address from where the message was sent
*/
val address: NetworkAddress
}
@OptIn(ExperimentalStdlibApi::class)
interface DatagramConnector: AutoCloseable {
val incoming: ReceiveChannel<Datagram>
suspend fun send(message: UByteArray, networkAddress: NetworkAddress)
@Suppress("unused")
suspend fun send(message: UByteArray, datagramAddress: String) {
send(message, datagramAddress.toNetworkAddress())
) {
override fun toString(): String {
return "$host:$port"
}
suspend fun send(message: UByteArray,host: String,port: Int) =
send(message, NetworkAddress(host,port))
override fun close()
}
expect fun NetworkAddress(host: String,port: Int): NetworkAddress
fun String.toNetworkAddress() : NetworkAddress {
val (host, port) = this.split(":").map { it.trim()}
return NetworkAddress(host, port.toInt())
}
expect fun acceptTcpDevice(port: Int): Flow<InetTransportDevice>
expect suspend fun connectTcpDevice(address: NetworkAddress): InetTransportDevice
suspend fun connectTcpDevice(address: String) = connectTcpDevice(address.toNetworkAddress())
//
///**
// * Multiplatform datagram abstraction
// */
//interface Datagram {
// /**
// * Received message
// */
// val message: UByteArray
//
// /**
// * Address from where the message was sent
// */
// val address: NetworkAddress
//}
//
//@OptIn(ExperimentalStdlibApi::class)
//interface DatagramConnector: AutoCloseable {
//
// val incoming: ReceiveChannel<Datagram>
// suspend fun send(message: UByteArray, networkAddress: NetworkAddress)
// @Suppress("unused")
// suspend fun send(message: UByteArray, datagramAddress: String) {
// send(message, datagramAddress.toNetworkAddress())
// }
//
// suspend fun send(message: UByteArray,host: String,port: Int) =
// send(message, NetworkAddress(host,port))
// override fun close()
//}
//
//expect fun NetworkAddress(host: String,port: Int): NetworkAddress
//
//fun String.toNetworkAddress() : NetworkAddress {
// val (host, port) = this.split(":").map { it.trim()}
// return NetworkAddress(host, port.toInt())
//}
//
//expect fun acceptTcpDevice(port: Int): Flow<InetTransportDevice>
//
//expect suspend fun connectTcpDevice(address: NetworkAddress): InetTransportDevice
//
//suspend fun connectTcpDevice(address: String) = connectTcpDevice(address.toNetworkAddress())

View File

@ -1,2 +0,0 @@
package net.sergeych.kiloparsec.adapter

View File

@ -1,15 +0,0 @@
package net.sergeych.kiloparsec.adapter
import kotlinx.coroutines.flow.Flow
actual fun NetworkAddress(host: String, port: Int): NetworkAddress {
TODO("Not yet implemented")
}
actual fun acceptTcpDevice(port: Int): Flow<InetTransportDevice> {
TODO("Not yet implemented")
}
actual suspend fun connectTcpDevice(address: NetworkAddress): InetTransportDevice {
TODO("Not yet implemented")
}

View File

@ -1,118 +1,118 @@
package net.sergeych.kiloparsec.adapter
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.BufferOverflow
import kotlinx.coroutines.channels.Channel
import net.sergeych.mp_logger.LogTag
import net.sergeych.mp_logger.exception
import net.sergeych.mp_logger.info
import net.sergeych.mp_logger.warning
import java.net.*
import java.util.concurrent.atomic.AtomicInteger
private val counter = AtomicInteger(0)
class JvmNetworkAddress(val inetAddress: InetAddress, override val port: Int) : NetworkAddress {
override val host: String by lazy { inetAddress.canonicalHostName }
override fun equals(other: Any?): Boolean {
if (this === other) return true
if (other !is JvmNetworkAddress) return false
if (inetAddress != other.inetAddress) return false
if (port != other.port) return false
return true
}
val socketAddress: SocketAddress by lazy { InetSocketAddress(inetAddress,port) }
override fun hashCode(): Int {
var result = inetAddress.hashCode()
result = 31 * result + port
return result
}
override fun toString(): String = "$host:$port"
}
class UdpDatagram(override val message: UByteArray, val inetAddress: InetAddress, val port: Int) : Datagram {
override val address: NetworkAddress by lazy {
JvmNetworkAddress(inetAddress, port)
}
}
@OptIn(DelicateCoroutinesApi::class)
class UdpServer(val port: Int) :
DatagramConnector, LogTag("UDPS:${counter.incrementAndGet()}") {
private var isClosed = false
private val deferredSocket = CompletableDeferred<DatagramSocket>()
private var job: Job? = null
private suspend fun start() = try {
coroutineScope {
val socket = DatagramSocket(port)
val buffer = ByteArray(16384)
val packet = DatagramPacket(buffer, buffer.size)
deferredSocket.complete(socket)
while (isActive && !isClosed) {
try {
socket.receive(packet)
val data = packet.data.sliceArray(0..<packet.length)
val datagram = UdpDatagram(data.toUByteArray(), packet.address, packet.port)
if (!channel.trySend(datagram).isSuccess) {
warning { "packet lost!" }
// and we cause overflow that overwrites the oldest
channel.send(datagram)
}
} catch (e: Exception) {
if (!isClosed)
e.printStackTrace()
throw e
}
}
info { "closing socket and reception loop" }
}
} catch (_: CancellationException) {
info { "server is closed" }
} catch (t: Throwable) {
exception { "unexpected end of server" to t }
}
init {
job = GlobalScope.launch(Dispatchers.IO) {
start()
}
}
override fun close() {
if (!isClosed) {
if (deferredSocket.isCompleted) {
runCatching {
deferredSocket.getCompleted().close()
}
}
isClosed = true
job?.cancel(); job = null
}
}
private val channel = Channel<Datagram>(2048, BufferOverflow.DROP_OLDEST)
override val incoming = channel
override suspend fun send(message: UByteArray, networkAddress: NetworkAddress) {
networkAddress as JvmNetworkAddress
withContext(Dispatchers.IO) {
val packet = DatagramPacket(
message.toByteArray(), message.size,
networkAddress.inetAddress, networkAddress.port
)
deferredSocket.await().send(packet)
}
}
}
//package net.sergeych.kiloparsec.adapter
//
//import kotlinx.coroutines.*
//import kotlinx.coroutines.channels.BufferOverflow
//import kotlinx.coroutines.channels.Channel
//import net.sergeych.mp_logger.LogTag
//import net.sergeych.mp_logger.exception
//import net.sergeych.mp_logger.info
//import net.sergeych.mp_logger.warning
//import java.net.*
//import java.util.concurrent.atomic.AtomicInteger
//
//private val counter = AtomicInteger(0)
//
//class JvmNetworkAddress(val inetAddress: InetAddress, override val port: Int) : NetworkAddress {
// override val host: String by lazy { inetAddress.canonicalHostName }
// override fun equals(other: Any?): Boolean {
// if (this === other) return true
// if (other !is JvmNetworkAddress) return false
//
// if (inetAddress != other.inetAddress) return false
// if (port != other.port) return false
//
// return true
// }
//
// val socketAddress: SocketAddress by lazy { InetSocketAddress(inetAddress,port) }
//
// override fun hashCode(): Int {
// var result = inetAddress.hashCode()
// result = 31 * result + port
// return result
// }
//
// override fun toString(): String = "$host:$port"
//}
//
//class UdpDatagram(override val message: UByteArray, val inetAddress: InetAddress, val port: Int) : Datagram {
//
// override val address: NetworkAddress by lazy {
// JvmNetworkAddress(inetAddress, port)
// }
//
//}
//
//
//@OptIn(DelicateCoroutinesApi::class)
//class UdpServer(val port: Int) :
// DatagramConnector, LogTag("UDPS:${counter.incrementAndGet()}") {
// private var isClosed = false
//
//
// private val deferredSocket = CompletableDeferred<DatagramSocket>()
// private var job: Job? = null
//
// private suspend fun start() = try {
// coroutineScope {
// val socket = DatagramSocket(port)
// val buffer = ByteArray(16384)
// val packet = DatagramPacket(buffer, buffer.size)
// deferredSocket.complete(socket)
// while (isActive && !isClosed) {
// try {
// socket.receive(packet)
// val data = packet.data.sliceArray(0..<packet.length)
// val datagram = UdpDatagram(data.toUByteArray(), packet.address, packet.port)
// if (!channel.trySend(datagram).isSuccess) {
// warning { "packet lost!" }
// // and we cause overflow that overwrites the oldest
// channel.send(datagram)
// }
// } catch (e: Exception) {
// if (!isClosed)
// e.printStackTrace()
// throw e
// }
// }
// info { "closing socket and reception loop" }
//
// }
// } catch (_: CancellationException) {
// info { "server is closed" }
// } catch (t: Throwable) {
// exception { "unexpected end of server" to t }
// }
//
// init {
// job = GlobalScope.launch(Dispatchers.IO) {
// start()
// }
// }
//
// override fun close() {
// if (!isClosed) {
// if (deferredSocket.isCompleted) {
// runCatching {
// deferredSocket.getCompleted().close()
// }
// }
// isClosed = true
// job?.cancel(); job = null
// }
// }
//
// private val channel = Channel<Datagram>(2048, BufferOverflow.DROP_OLDEST)
// override val incoming = channel
//
// override suspend fun send(message: UByteArray, networkAddress: NetworkAddress) {
// networkAddress as JvmNetworkAddress
// withContext(Dispatchers.IO) {
// val packet = DatagramPacket(
// message.toByteArray(), message.size,
// networkAddress.inetAddress, networkAddress.port
// )
// deferredSocket.await().send(packet)
// }
// }
//}

View File

@ -1,155 +1,155 @@
package net.sergeych.kiloparsec.adapter
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.channels.ClosedReceiveChannelException
import kotlinx.coroutines.flow.MutableStateFlow
import net.sergeych.crypto2.Contrail
import net.sergeych.crypto2.encodeVarUnsigned
import net.sergeych.crypto2.readVarUnsigned
import net.sergeych.kiloparsec.RemoteInterface
import net.sergeych.kiloparsec.Transport
import net.sergeych.mp_logger.LogTag
import net.sergeych.mp_logger.warning
import net.sergeych.mp_tools.globalLaunch
import net.sergeych.tools.waitFor
import java.net.InetSocketAddress
import java.net.StandardSocketOptions.TCP_NODELAY
import java.nio.ByteBuffer
import java.nio.channels.AsynchronousSocketChannel
import kotlin.coroutines.cancellation.CancellationException
import kotlin.coroutines.suspendCoroutine
private val log = LogTag("ASTD")
/**
* Prepend block with its size, varint-encoded
*/
private fun encode(block: UByteArray): ByteArray {
val c = Contrail.create(block)
return (encodeVarUnsigned(c.size.toUInt()) + c).toByteArray()
}
/**
* Convert asynchronous socket to a [Transport.Device] using non-blocking nio,
* in a coroutine-effective manner. Note that it runs coroutines to read/write
* to the socket in a global scope.These are closed when transport is closed
* or the socket is closed, for example, by network failure.
*/
suspend fun asyncSocketToDevice(socket: AsynchronousSocketChannel): InetTransportDevice {
val deferredDevice = CompletableDeferred<InetTransportDevice>()
globalLaunch {
coroutineScope {
val sendQueueEmpty = MutableStateFlow(true)
val receiving = MutableStateFlow(false)
// We're in block mode, every block we send worth immediate sending, we do not
// send partial blocks, so:
socket.setOption(TCP_NODELAY, true)
// socket input is to be parsed for blocks, so we receive bytes
// and decode them to blocks
val input = Channel<UByte>(1024)
val inputBlocks = Channel<UByteArray>()
// output is blocks, so we sent transformed, framed blocks:
val outputBlocks = Channel<UByteArray>()
fun stop() {
kotlin.runCatching { inputBlocks.close(RemoteInterface.ClosedException()) }
kotlin.runCatching { outputBlocks.close() }
socket.close()
cancel()
}
// copy incoming data from the socket to input channel:
launch {
val data = ByteArray(1024)
val inb = ByteBuffer.wrap(data)
kotlin.runCatching {
while (isActive) {
inb.position(0)
val size: Int = suspendCoroutine { continuation ->
socket.read(inb, continuation, IntCompletionHandler)
}
if (size < 0) stop()
else {
// println("recvd:\n${data.sliceArray(0..<size).toDump()}\n------------------")
for (i in 0..<size) input.send(data[i].toUByte())
}
}
}
}
// copy from output to socket:
launch {
try {
while (isActive) {
// wait for the first block to send
sendQueueEmpty.value = outputBlocks.isEmpty
var data = encode(outputBlocks.receive())
// now we're sending, so queue state is sending:
sendQueueEmpty.value = false
// if there are more, take them all (NO_DELAY optimization)
while (!outputBlocks.isEmpty)
data += encode(outputBlocks.receive())
// now send it all together:
val outBuff = ByteBuffer.wrap(data)
val cnt = suspendCoroutine { continuation ->
socket.write(outBuff, continuation, IntCompletionHandler)
}
// be sure it was all sent
if (outBuff.position() != data.size || cnt != data.size) {
throw RuntimeException("unexpected partial write")
}
}
// in the case of just breaking out of the loop:
sendQueueEmpty.value = true
} catch (_: ClosedReceiveChannelException) {
stop()
}
}
// transport device copes with blocks:
// decode blocks from a byte channel read from the socket:
launch {
try {
while (isActive) {
receiving.value = !input.isEmpty
val size = readVarUnsigned(input)
receiving.value = true
if (size == 0u) log.warning { "zero size block is ignored!" }
else {
val block = UByteArray(size.toInt())
for (i in 0..<size.toInt()) {
block[i] = input.receive()
}
Contrail.unpack(block)?.let { inputBlocks.send(it) }
?: log.warning { "skipping bad block ${block.size} bytes" }
}
}
} catch (_: CancellationException) {
} catch (_: ClosedReceiveChannelException) {
stop()
}
receiving.value = false
}
val addr = socket.remoteAddress as InetSocketAddress
deferredDevice.complete(
InetTransportDevice(inputBlocks, outputBlocks, JvmNetworkAddress(addr.address, addr.port), {
yield()
// wait until all received data are parsed, but not too long
withTimeoutOrNull(500) {
receiving.waitFor { !it }
}
// then stop it
stop()
})
)
}
globalLaunch { socket.close() }
}
return deferredDevice.await()
}
//package net.sergeych.kiloparsec.adapter
//
//import kotlinx.coroutines.*
//import kotlinx.coroutines.channels.Channel
//import kotlinx.coroutines.channels.ClosedReceiveChannelException
//import kotlinx.coroutines.flow.MutableStateFlow
//import net.sergeych.crypto2.Contrail
//import net.sergeych.crypto2.encodeVarUnsigned
//import net.sergeych.crypto2.readVarUnsigned
//import net.sergeych.kiloparsec.RemoteInterface
//import net.sergeych.kiloparsec.Transport
//import net.sergeych.mp_logger.LogTag
//import net.sergeych.mp_logger.warning
//import net.sergeych.mp_tools.globalLaunch
//import net.sergeych.tools.waitFor
//import java.net.InetSocketAddress
//import java.net.StandardSocketOptions.TCP_NODELAY
//import java.nio.ByteBuffer
//import java.nio.channels.AsynchronousSocketChannel
//import kotlin.coroutines.cancellation.CancellationException
//import kotlin.coroutines.suspendCoroutine
//
//private val log = LogTag("ASTD")
//
///**
// * Prepend block with its size, varint-encoded
// */
//private fun encode(block: UByteArray): ByteArray {
// val c = Contrail.create(block)
// return (encodeVarUnsigned(c.size.toUInt()) + c).toByteArray()
//}
//
///**
// * Convert asynchronous socket to a [Transport.Device] using non-blocking nio,
// * in a coroutine-effective manner. Note that it runs coroutines to read/write
// * to the socket in a global scope.These are closed when transport is closed
// * or the socket is closed, for example, by network failure.
// */
//suspend fun asyncSocketToDevice(socket: AsynchronousSocketChannel): InetTransportDevice {
// val deferredDevice = CompletableDeferred<InetTransportDevice>()
// globalLaunch {
// coroutineScope {
// val sendQueueEmpty = MutableStateFlow(true)
// val receiving = MutableStateFlow(false)
// // We're in block mode, every block we send worth immediate sending, we do not
// // send partial blocks, so:
// socket.setOption(TCP_NODELAY, true)
//
// // socket input is to be parsed for blocks, so we receive bytes
// // and decode them to blocks
// val input = Channel<UByte>(1024)
// val inputBlocks = Channel<UByteArray>()
// // output is blocks, so we sent transformed, framed blocks:
// val outputBlocks = Channel<UByteArray>()
//
// fun stop() {
// kotlin.runCatching { inputBlocks.close(RemoteInterface.ClosedException()) }
// kotlin.runCatching { outputBlocks.close() }
// socket.close()
// cancel()
// }
//
//
// // copy incoming data from the socket to input channel:
// launch {
// val data = ByteArray(1024)
// val inb = ByteBuffer.wrap(data)
// kotlin.runCatching {
// while (isActive) {
// inb.position(0)
// val size: Int = suspendCoroutine { continuation ->
// socket.read(inb, continuation, IntCompletionHandler)
// }
// if (size < 0) stop()
// else {
//// println("recvd:\n${data.sliceArray(0..<size).toDump()}\n------------------")
// for (i in 0..<size) input.send(data[i].toUByte())
// }
// }
// }
// }
//
// // copy from output to socket:
// launch {
// try {
// while (isActive) {
// // wait for the first block to send
// sendQueueEmpty.value = outputBlocks.isEmpty
// var data = encode(outputBlocks.receive())
//
// // now we're sending, so queue state is sending:
// sendQueueEmpty.value = false
//
// // if there are more, take them all (NO_DELAY optimization)
// while (!outputBlocks.isEmpty)
// data += encode(outputBlocks.receive())
//
// // now send it all together:
// val outBuff = ByteBuffer.wrap(data)
// val cnt = suspendCoroutine { continuation ->
// socket.write(outBuff, continuation, IntCompletionHandler)
// }
// // be sure it was all sent
// if (outBuff.position() != data.size || cnt != data.size) {
// throw RuntimeException("unexpected partial write")
// }
// }
// // in the case of just breaking out of the loop:
// sendQueueEmpty.value = true
// } catch (_: ClosedReceiveChannelException) {
// stop()
// }
// }
// // transport device copes with blocks:
// // decode blocks from a byte channel read from the socket:
// launch {
// try {
// while (isActive) {
// receiving.value = !input.isEmpty
// val size = readVarUnsigned(input)
// receiving.value = true
// if (size == 0u) log.warning { "zero size block is ignored!" }
// else {
// val block = UByteArray(size.toInt())
// for (i in 0..<size.toInt()) {
// block[i] = input.receive()
// }
// Contrail.unpack(block)?.let { inputBlocks.send(it) }
// ?: log.warning { "skipping bad block ${block.size} bytes" }
// }
// }
// } catch (_: CancellationException) {
// } catch (_: ClosedReceiveChannelException) {
// stop()
// }
// receiving.value = false
// }
//
// val addr = socket.remoteAddress as InetSocketAddress
// deferredDevice.complete(
// InetTransportDevice(inputBlocks, outputBlocks, JvmNetworkAddress(addr.address, addr.port), {
// yield()
// // wait until all received data are parsed, but not too long
// withTimeoutOrNull(500) {
// receiving.waitFor { !it }
// }
// // then stop it
// stop()
// })
// )
// }
// globalLaunch { socket.close() }
// }
// return deferredDevice.await()
//}

View File

@ -3,22 +3,20 @@ package net.sergeych.kiloparsec
import assertThrows
import io.ktor.server.engine.*
import io.ktor.server.netty.*
import kotlinx.coroutines.delay
import kotlinx.coroutines.launch
import kotlinx.coroutines.test.runTest
import net.sergeych.crypto2.initCrypto
import net.sergeych.kiloparsec.adapter.acceptTcpDevice
import net.sergeych.kiloparsec.adapter.connectTcpDevice
import net.sergeych.kiloparsec.adapter.setupWebsocketServer
import net.sergeych.kiloparsec.adapter.websocketClient
import net.sergeych.mp_logger.Log
import java.net.InetAddress
import kotlin.test.*
import kotlin.test.Test
import kotlin.test.assertEquals
import kotlin.test.assertFalse
import kotlin.test.assertTrue
class ClientTest {
class TestException : Exception("test1")
@Test
fun testAddresses() {
println(InetAddress.getLocalHost())
@ -26,60 +24,7 @@ class ClientTest {
println(InetAddress.getByName("127.0.0.1"))
println(InetAddress.getByName("mail.ru"))
}
@Test
fun testClient() = runTest {
initCrypto()
Log.connectConsole(Log.Level.DEBUG)
data class Session(
var data: String
)
val cmdSave by command<String,Unit>()
val cmdLoad by command<Unit,String>()
val cmdDrop by command<Unit,Unit>()
val cmdException by command<Unit,Unit>()
val cli = KiloInterface<Session>().apply {
registerError { TestException() }
onConnected { session.data = "start" }
on(cmdSave) { session.data = it }
on(cmdLoad) {
session.data
}
on(cmdException) {
throw TestException()
}
on(cmdDrop) {
throw LocalInterface.BreakConnectionException()
}
}
val server = KiloServer(cli, acceptTcpDevice(27101)) {
Session("unknown")
}
val client = KiloClient<Unit>() {
addErrors(cli)
connect { connectTcpDevice("localhost:27101") }
}
delay(500)
println(client.call(cmdLoad))
assertEquals("start", client.call(cmdLoad))
client.call(cmdSave, "foobar")
assertEquals("foobar", client.call(cmdLoad))
//
val res = kotlin.runCatching { client.call(cmdException) }
println(res.exceptionOrNull())
assertIs<TestException>(res.exceptionOrNull())
assertEquals("foobar", client.call(cmdLoad))
assertThrows<RemoteInterface.ClosedException> { client.call(cmdDrop) }
// reconnect?
assertEquals("start", client.call(cmdLoad))
server.close()
}
@Test
fun webSocketTest() = runTest {

View File

@ -1,100 +0,0 @@
package net.sergeych.kiloparsec.adapters
import kotlinx.coroutines.*
import kotlinx.coroutines.test.runTest
import net.sergeych.crypto2.initCrypto
import net.sergeych.kiloparsec.adapter.UdpServer
import net.sergeych.kiloparsec.adapter.acceptTcpDevice
import net.sergeych.kiloparsec.adapter.connectTcpDevice
import net.sergeych.kiloparsec.adapter.toNetworkAddress
import net.sergeych.kiloparsec.decodeFromUByteArray
import net.sergeych.kiloparsec.encodeToUByteArray
import net.sergeych.mp_logger.Log
import net.sergeych.mp_logger.LogTag
import net.sergeych.synctools.ProtectedOp
import net.sergeych.synctools.invoke
import kotlin.test.Test
import kotlin.test.assertContains
import kotlin.test.assertEquals
class NetworkTest {
@Test
fun udpProviderTest() = runTest {
Log.connectConsole(Log.Level.DEBUG)
val s1 = UdpServer(17120)
val s2 = UdpServer(17121)
s1.send("Hello".encodeToUByteArray(), "localhost", 17121)
val d1 = s2.incoming.receive()
assertEquals(d1.address.port, 17120)
assertEquals("Hello", d1.message.toByteArray().decodeToString())
s1.send("world".encodeToUByteArray(), d1.address)
assertEquals("world", s1.incoming.receive().message.toByteArray().decodeToString())
// println("s1: ${s1.bindAddress()}")
}
@Test
fun tcpAsyncConnectionTest() = runTest {
initCrypto()
Log.connectConsole(Log.Level.DEBUG)
coroutineScope {
val serverFlow = acceptTcpDevice(17171)
val op = ProtectedOp()
var pills = setOf<String>()
val j = launch {
println("serf")
serverFlow.collect { device ->
println("serf 0")
launch {
println("serf 1")
device.output.send("Hello, world!".encodeToUByteArray())
device.output.send("Great".encodeToUByteArray())
while (true) {
val x = device.input.receive().decodeFromUByteArray()
if (x.startsWith("die")) {
op.invoke {
pills += x
}
cancel()
}
else
println("ignoring unexpected input: $x")
}
}
}
}
yield()
run {
try {
println("pre-con")
val s = connectTcpDevice("127.0.1.1:17171".toNetworkAddress())
println("2")
assertEquals("Hello, world!", s.input.receive().decodeFromUByteArray())
assertEquals("Great", s.input.receive().decodeFromUByteArray())
s.output.send("Goodbye".encodeToUByteArray())
s.output.send("die1".encodeToUByteArray())
s.close()
}
catch(t: Throwable) {
t.printStackTrace()
throw t
}
}
println("pre-con2")
val s1 = connectTcpDevice("127.0.1.1:17171".toNetworkAddress())
assertEquals("Hello, world!", s1.input.receive().decodeFromUByteArray())
assertEquals("Great", s1.input.receive().decodeFromUByteArray())
s1.output.send("die2".encodeToUByteArray())
s1.close()
// check that channels were flushed prior to closed:
assertContains(pills, "die1")
assertContains(pills, "die2")
// Check that server jobs are closed
j.cancelAndJoin()
}
}
}

View File

@ -2,27 +2,16 @@ package net.sergeych.kiloparsec.adapter
import io.ktor.network.selector.*
import io.ktor.network.sockets.*
import io.ktor.util.network.*
import kotlinx.coroutines.CancellationException
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.channels.ClosedReceiveChannelException
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.isActive
import kotlinx.coroutines.launch
import net.sergeych.kiloparsec.AsyncVarint
import net.sergeych.kiloparsec.LocalInterface
import net.sergeych.mp_logger.*
import net.sergeych.tools.AtomicCounter
class SocketNetworkAddress(override val host: String, override val port: Int) : NetworkAddress {
override fun toString(): String {
return "$host:$port"
}
}
actual fun NetworkAddress(host: String, port: Int): NetworkAddress = SocketNetworkAddress(host, port)
import net.sergeych.tools.AtomicValue
private val logCounter = AtomicCounter(0)
@ -30,51 +19,66 @@ class ProtocolException(text: String, cause: Throwable? = null) : RuntimeExcepti
const val MAX_TCP_BLOCK_SIZE = 16776216
actual fun acceptTcpDevice(port: Int): Flow<InetTransportDevice> {
fun acceptTcpDevice(port: Int): Flow<InetTransportDevice> {
val selectorManager = SelectorManager(Dispatchers.IO)
val serverSocket = aSocket(selectorManager).tcp().bind("127.0.0.1", port)
val log = LogTag("TCPS${logCounter.incrementAndGet()}")
return flow {
while(true) {
log.info { "Accepting incoming connections on $port" }
while (true) {
serverSocket.accept().let { sock ->
log.info { "Emitting transport device" }
emit(inetTransportDevice(sock, "srv"))
}
}
}
}
actual suspend fun connectTcpDevice(address: NetworkAddress): InetTransportDevice {
suspend fun connectTcpDevice(address: String) = connectTcpDevice(address.toNetworkAddress())
suspend fun connectTcpDevice(address: NetworkAddress): InetTransportDevice {
val selectorManager = SelectorManager(Dispatchers.IO)
val socket = aSocket(selectorManager).tcp().connect(address.host, address.port)
println("Connected to ${address.host}:${address.port}")
return inetTransportDevice(socket)
}
fun String.toNetworkAddress(): NetworkAddress {
val (host, port) = this.split(":").map { it.trim() }
return NetworkAddress(host, port.toInt())
}
private fun inetTransportDevice(
sock: Socket,
suffix: String = "cli",
): InetTransportDevice {
val networkAddress = sock.remoteAddress.toJavaAddress().let { NetworkAddress(it.hostname, it.port) }
val networkAddress = (sock.remoteAddress as InetSocketAddress).let { NetworkAddress(it.hostname, it.port) }
val inputBlocks = Channel<UByteArray>(4096)
val outputBlocks = Channel<UByteArray>(4096)
val log = LogTag("TCPT${logCounter.incrementAndGet()}:$suffix:$networkAddress")
val stopCalled = AtomicValue(false)
fun stop() {
log.info { "stopping" }
runCatching { inputBlocks.close()}
runCatching { outputBlocks.close()}
if( !sock.isClosed ) runCatching { sock.close()}
stopCalled.mutate {
if (!it) {
log.debug { "stopping" }
runCatching { inputBlocks.close() }
runCatching { outputBlocks.close() }
if (!sock.isClosed)
runCatching {
log.debug { "closing socket by stop" }
sock.close()
}
else
log.debug { "socket is already closed when stop is called" }
} else
log.debug { "already stopped" }
true
}
}
sock.launch {
log.debug { "opening read channel" }
val sockInput = runCatching { sock.openReadChannel() }.getOrElse {
log.warning { "failed to open read channel $it" }
sock.close()
stop()
throw IllegalStateException("failed to open read channel")
}
while (isActive && sock.isActive) {
@ -82,15 +86,16 @@ private fun inetTransportDevice(
val size = AsyncVarint.decodeUnsigned(sockInput).toInt()
if (size > MAX_TCP_BLOCK_SIZE) // 16M is a max command block
throw ProtocolException("Illegal block size: $size should be < $MAX_TCP_BLOCK_SIZE")
log.info { "read size: $size" }
val data = ByteArray(size)
log.info { "data ready" }
sockInput.readFully(data, 0, size)
inputBlocks.send(data.toUByteArray())
} catch (e: ClosedReceiveChannelException) {
log.error { "closed receive channel " }
stop()
break
} catch (_: CancellationException) {
log.error { "cancellation exception " }
break
} catch (e: Exception) {
log.exception { "unexpected exception in TCP socket read" to e }
stop()
@ -105,16 +110,17 @@ private fun inetTransportDevice(
val block = outputBlocks.receive()
AsyncVarint.encodeUnsigned(block.size.toULong(), sockOutput)
sockOutput.writeFully(block.toByteArray(), 0, block.size)
log.info { "Client sock output: ${block.size}" }
sockOutput.flush()
} catch (_: CancellationException) {
log.info { "Caught cancellation, closing transport" }
log.debug { "cancellation exception on output" }
stop()
break
} catch (_: LocalInterface.BreakConnectionException) {
log.info { "requested connection break" }
log.debug { "requested connection break" }
stop()
break
} catch (_: ClosedReceiveChannelException) {
log.info { "receive block channel closed, closing the socket" }
log.debug { "receive block channel closed, closing the socket" }
stop()
break
} catch (e: Exception) {
@ -124,10 +130,10 @@ private fun inetTransportDevice(
}
}
}
val device = InetTransportDevice(inputBlocks, outputBlocks, networkAddress, {
log.info { "Close has been called" }
stop()
})
log.info { "Transport ready" }
log.debug { "Transport ready" }
return device
}

View File

@ -0,0 +1,77 @@
import kotlinx.coroutines.delay
import kotlinx.coroutines.test.runTest
import net.sergeych.crypto2.initCrypto
import net.sergeych.kiloparsec.*
import net.sergeych.kiloparsec.adapter.acceptTcpDevice
import net.sergeych.kiloparsec.adapter.connectTcpDevice
import net.sergeych.mp_logger.Log
import kotlin.random.Random
import kotlin.test.Test
import kotlin.test.assertEquals
import kotlin.test.assertIs
class TcpTest {
class TestException : Exception("test1")
@Test
fun tcpTest() = runTest {
initCrypto()
Log.connectConsole(Log.Level.DEBUG)
data class Session(
var data: String
)
val port = 27170 + Random.nextInt(1, 200)
val cmdSave by command<String, Unit>()
val cmdLoad by command<Unit, String>()
val cmdDrop by command<Unit, Unit>()
val cmdException by command<Unit, Unit>()
val cli = KiloInterface<Session>().apply {
registerError { TestException() }
onConnected { session.data = "start" }
on(cmdSave) { session.data = it }
on(cmdLoad) {
session.data
}
on(cmdException) {
throw TestException()
}
on(cmdDrop) {
throw LocalInterface.BreakConnectionException()
}
}
val server = KiloServer(cli, acceptTcpDevice(port)) {
Session("unknown")
}
val client = KiloClient<Unit>() {
addErrors(cli)
connect { connectTcpDevice("localhost:$port") }
}
delay(500)
assertEquals("start", client.call(cmdLoad))
client.call(cmdSave, "foobar")
assertEquals("foobar", client.call(cmdLoad))
val res = kotlin.runCatching { client.call(cmdException) }
println(res.exceptionOrNull())
assertIs<TestException>(res.exceptionOrNull())
assertEquals("foobar", client.call(cmdLoad))
println("----------------------------------- pre drops")
assertThrows<RemoteInterface.ClosedException> { client.call(cmdDrop) }
println("----------------------------------- DROPPED")
// reconnect?
assertEquals("start", client.call(cmdLoad))
println("------------------------------=---- RECONNECTED")
server.close()
println("****************************************************************")
}
}

View File

@ -1,2 +0,0 @@
package net.sergeych.kiloparsec.adapter

View File

@ -1,15 +0,0 @@
package net.sergeych.kiloparsec.adapter
import kotlinx.coroutines.flow.Flow
actual fun NetworkAddress(host: String, port: Int): NetworkAddress {
TODO("Not yet implemented")
}
actual fun acceptTcpDevice(port: Int): Flow<InetTransportDevice> {
TODO("Not yet implemented")
}
actual suspend fun connectTcpDevice(address: NetworkAddress): InetTransportDevice {
TODO("Not yet implemented")
}