network transport with remote address, universal server

This commit is contained in:
Sergey Chernov 2023-11-15 02:25:57 +03:00
parent 96edbb2040
commit f92431a281
8 changed files with 39 additions and 33 deletions

View File

@ -0,0 +1,27 @@
package net.sergeych.kiloparsec
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.launch
import net.sergeych.crypto.Key
import net.sergeych.mp_tools.globalLaunch
@Suppress("unused")
class KiloServer<S>(
private val clientInterface: KiloInterface<S>,
private val connections: Flow<Transport.Device>,
private val serverSigningKey: Key.Signing? = null,
private val sessionBuilder: ()->S,
) {
private val job = globalLaunch {
connections.collect { device ->
launch {
KiloServerConnection(clientInterface,device,sessionBuilder(), serverSigningKey).run()
}
}
}
fun close() {
job.cancel()
}
}

View File

@ -3,7 +3,7 @@ package net.sergeych.kiloparsec.adapter
import kotlinx.coroutines.channels.Channel
@Suppress("unused")
class InetProxyDevice(
class InetTransportDevice(
inputChannel: Channel<UByteArray?>,
outputChannel: Channel<UByteArray>,
val remoteAddress: NetworkAddress,

View File

@ -2,7 +2,6 @@ package net.sergeych.kiloparsec.adapter
import kotlinx.coroutines.channels.ReceiveChannel
import kotlinx.coroutines.flow.Flow
import net.sergeych.kiloparsec.Transport
/**
* Multiplatform implementation of an internet address.
@ -51,6 +50,6 @@ fun CharSequence.toNetworkAddress() : NetworkAddress {
}
expect fun acceptTcpDevice(port: Int): Flow<Transport.Device>
expect fun acceptTcpDevice(port: Int): Flow<InetTransportDevice>
expect suspend fun connectTcpDevice(address: NetworkAddress): Transport.Device
expect suspend fun connectTcpDevice(address: NetworkAddress): InetTransportDevice

View File

@ -1,16 +1,15 @@
package net.sergeych.kiloparsec.adapter
import kotlinx.coroutines.flow.Flow
import net.sergeych.kiloparsec.Transport
actual fun NetworkAddress(host: String, port: Int): NetworkAddress {
TODO("Not yet implemented")
}
actual fun acceptTcpDevice(port: Int): Flow<Transport.Device> {
actual fun acceptTcpDevice(port: Int): Flow<InetTransportDevice> {
TODO("Not yet implemented")
}
actual suspend fun connectTcpDevice(address: NetworkAddress): Transport.Device {
actual suspend fun connectTcpDevice(address: NetworkAddress): InetTransportDevice {
TODO("Not yet implemented")
}

View File

@ -1,12 +1,10 @@
package net.sergeych.kiloparsec.adapter
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.channels.SendChannel
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.suspendCancellableCoroutine
import kotlinx.coroutines.withContext
import net.sergeych.kiloparsec.Transport
import java.net.InetAddress
import java.net.InetSocketAddress
import java.nio.channels.AsynchronousServerSocketChannel
@ -16,7 +14,7 @@ import kotlin.coroutines.suspendCoroutine
actual fun NetworkAddress(host: String, port: Int): NetworkAddress =
JvmNetworkAddress(InetAddress.getByName(host), port)
actual fun acceptTcpDevice(port: Int): Flow<Transport.Device> {
actual fun acceptTcpDevice(port: Int): Flow<InetTransportDevice> {
return flow {
val socket = withContext(Dispatchers.IO) {
AsynchronousServerSocketChannel.open().also {
@ -35,7 +33,7 @@ actual fun acceptTcpDevice(port: Int): Flow<Transport.Device> {
}
}
actual suspend fun connectTcpDevice(address: NetworkAddress): Transport.Device {
actual suspend fun connectTcpDevice(address: NetworkAddress): InetTransportDevice {
address as JvmNetworkAddress
val socket = withContext(Dispatchers.IO) {
AsynchronousSocketChannel.open()
@ -45,7 +43,3 @@ actual suspend fun connectTcpDevice(address: NetworkAddress): Transport.Device {
}
return asyncSocketToDevice(socket)
}
suspend fun SendChannel<UByte>.sendAll(bytes: Collection<UByte>) {
for (b in bytes) send(b)
}

View File

@ -31,8 +31,8 @@ private fun encode(block: UByteArray): ByteArray
* to the socket in a global scope.These are closed when transport is closed
* or the socket is closed, for example, by network failure.
*/
suspend fun asyncSocketToDevice(socket: AsynchronousSocketChannel): InetProxyDevice {
val deferredDevice = CompletableDeferred<InetProxyDevice>()
suspend fun asyncSocketToDevice(socket: AsynchronousSocketChannel): InetTransportDevice {
val deferredDevice = CompletableDeferred<InetTransportDevice>()
globalLaunch {
coroutineScope {
fun stop() {
@ -109,7 +109,7 @@ suspend fun asyncSocketToDevice(socket: AsynchronousSocketChannel): InetProxyDev
// SocketAddress.
val addr = socket.remoteAddress as InetSocketAddress
deferredDevice.complete(
InetProxyDevice(inputBlocks, outputBlocks, JvmNetworkAddress(addr.address,addr.port)) { stop() }
InetTransportDevice(inputBlocks, outputBlocks, JvmNetworkAddress(addr.address,addr.port)) { stop() }
)
}
globalLaunch { socket.close() }

View File

@ -38,7 +38,6 @@ class NetworkTest {
val j = launch {
serverFlow.collect { device ->
launch {
println("connected!")
device.output.send("Hello, world!".encodeToUByteArray())
device.output.send("Great".encodeToUByteArray())
while (true) {
@ -56,29 +55,18 @@ class NetworkTest {
}
yield()
run {
println("x0")
val s = connectTcpDevice("127.0.1.1:17171".toNetworkAddress())
println("x1")
assertEquals("Hello, world!", s.input.receive()!!.decodeFromUByteArray())
println("x2")
assertEquals("Great", s.input.receive()!!.decodeFromUByteArray())
println("x3")
s.output.send("Goodbye".encodeToUByteArray())
println("pre1")
s.close()
println("pre2")
}
val s1 = connectTcpDevice("127.0.1.1:17171".toNetworkAddress())
println("conn-0-1")
assertEquals("Hello, world!", s1.input.receive()!!.decodeFromUByteArray())
println("conn-0-2")
assertEquals("Great", s1.input.receive()!!.decodeFromUByteArray())
println("1")
s1.output.send("die".encodeToUByteArray())
println("2")
delay(200)
s1.close()
println("3 -- the -- end")
j.cancelAndJoin()
}
}

View File

@ -1,16 +1,15 @@
package net.sergeych.kiloparsec.adapter
import kotlinx.coroutines.flow.Flow
import net.sergeych.kiloparsec.Transport
actual fun NetworkAddress(host: String, port: Int): NetworkAddress {
TODO("Not yet implemented")
}
actual fun acceptTcpDevice(port: Int): Flow<Transport.Device> {
actual fun acceptTcpDevice(port: Int): Flow<InetTransportDevice> {
TODO("Not yet implemented")
}
actual suspend fun connectTcpDevice(address: NetworkAddress): Transport.Device {
actual suspend fun connectTcpDevice(address: NetworkAddress): InetTransportDevice {
TODO("Not yet implemented")
}