fixed local host with accept and added tests
This commit is contained in:
parent
f02b390ed4
commit
c6ac6f5907
1
.idea/misc.xml
generated
1
.idea/misc.xml
generated
@ -1,4 +1,3 @@
|
|||||||
<?xml version="1.0" encoding="UTF-8"?>
|
|
||||||
<project version="4">
|
<project version="4">
|
||||||
<component name="ExternalStorageConfigurationManager" enabled="true" />
|
<component name="ExternalStorageConfigurationManager" enabled="true" />
|
||||||
<component name="FrameworkDetectionExcludesConfiguration">
|
<component name="FrameworkDetectionExcludesConfiguration">
|
||||||
|
@ -2,6 +2,7 @@ package net.sergeych.kiloparsec
|
|||||||
|
|
||||||
import kotlinx.coroutines.CancellationException
|
import kotlinx.coroutines.CancellationException
|
||||||
import kotlinx.coroutines.CompletableDeferred
|
import kotlinx.coroutines.CompletableDeferred
|
||||||
|
import kotlinx.coroutines.delay
|
||||||
import kotlinx.coroutines.flow.MutableStateFlow
|
import kotlinx.coroutines.flow.MutableStateFlow
|
||||||
import kotlinx.coroutines.flow.asStateFlow
|
import kotlinx.coroutines.flow.asStateFlow
|
||||||
import kotlinx.coroutines.isActive
|
import kotlinx.coroutines.isActive
|
||||||
@ -52,10 +53,12 @@ class KiloClient<S>(
|
|||||||
debug { "client run finished" }
|
debug { "client run finished" }
|
||||||
} catch (_: RemoteInterface.ClosedException) {
|
} catch (_: RemoteInterface.ClosedException) {
|
||||||
debug { "remote closed" }
|
debug { "remote closed" }
|
||||||
|
delay(1000)
|
||||||
} catch (_: CancellationException) {
|
} catch (_: CancellationException) {
|
||||||
debug { "cancelled" }
|
debug { "cancelled" }
|
||||||
} catch (t: Throwable) {
|
} catch (t: Throwable) {
|
||||||
exception { "unexpected exception" to t }
|
exception { "unexpected exception" to t }
|
||||||
|
delay(1000)
|
||||||
}
|
}
|
||||||
_state.value = false
|
_state.value = false
|
||||||
if (deferredClient.isActive)
|
if (deferredClient.isActive)
|
||||||
|
@ -1,22 +1,40 @@
|
|||||||
package net.sergeych.kiloparsec
|
package net.sergeych.kiloparsec
|
||||||
|
|
||||||
|
import kotlinx.coroutines.CancellationException
|
||||||
import kotlinx.coroutines.flow.Flow
|
import kotlinx.coroutines.flow.Flow
|
||||||
import kotlinx.coroutines.launch
|
import kotlinx.coroutines.launch
|
||||||
import net.sergeych.crypto.Key
|
import net.sergeych.crypto.Key
|
||||||
|
import net.sergeych.kiloparsec.adapter.InetTransportDevice
|
||||||
|
import net.sergeych.mp_logger.LogTag
|
||||||
|
import net.sergeych.mp_logger.debug
|
||||||
|
import net.sergeych.mp_logger.exception
|
||||||
|
import net.sergeych.mp_logger.info
|
||||||
import net.sergeych.mp_tools.globalLaunch
|
import net.sergeych.mp_tools.globalLaunch
|
||||||
|
import net.sergeych.tools.AtomicCounter
|
||||||
|
|
||||||
|
private val instances = AtomicCounter()
|
||||||
@Suppress("unused")
|
@Suppress("unused")
|
||||||
class KiloServer<S>(
|
class KiloServer<S>(
|
||||||
private val clientInterface: KiloInterface<S>,
|
private val clientInterface: KiloInterface<S>,
|
||||||
private val connections: Flow<Transport.Device>,
|
private val connections: Flow<InetTransportDevice>,
|
||||||
private val serverSigningKey: Key.Signing? = null,
|
private val serverSigningKey: Key.Signing? = null,
|
||||||
private val sessionBuilder: ()->S,
|
private val sessionBuilder: ()->S,
|
||||||
) {
|
): LogTag("KS:${instances.incrementAndGet()}") {
|
||||||
|
|
||||||
private val job = globalLaunch {
|
private val job = globalLaunch {
|
||||||
connections.collect { device ->
|
connections.collect { device ->
|
||||||
launch {
|
launch {
|
||||||
KiloServerConnection(clientInterface,device,sessionBuilder(), serverSigningKey).run()
|
try {
|
||||||
|
info { "connected ${device}" }
|
||||||
|
KiloServerConnection(clientInterface, device, sessionBuilder(), serverSigningKey)
|
||||||
|
.apply { debug { "server connection is ready" }}
|
||||||
|
.run()
|
||||||
|
}
|
||||||
|
catch(_: CancellationException) {
|
||||||
|
}
|
||||||
|
catch (t: Throwable) {
|
||||||
|
exception { "unexpected while creating kiloclient" to t }
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -89,7 +89,7 @@ class KiloServerConnection<S>(
|
|||||||
val transport = Transport(device, l0Interface, Unit)
|
val transport = Transport(device, l0Interface, Unit)
|
||||||
deferredTransport.complete(transport)
|
deferredTransport.complete(transport)
|
||||||
kiloRemoteInterface.complete(KiloRemoteInterface(deferredParams,clientInterface))
|
kiloRemoteInterface.complete(KiloRemoteInterface(deferredParams,clientInterface))
|
||||||
debug { "starintg the transport"}
|
debug { "starting the transport"}
|
||||||
transport.run()
|
transport.run()
|
||||||
debug { "server transport finished" }
|
debug { "server transport finished" }
|
||||||
}
|
}
|
||||||
|
@ -12,4 +12,6 @@ class InetTransportDevice(
|
|||||||
val remoteAddress: NetworkAddress,
|
val remoteAddress: NetworkAddress,
|
||||||
val flush: suspend ()->Unit = {},
|
val flush: suspend ()->Unit = {},
|
||||||
doClose: suspend ()->Unit = {}
|
doClose: suspend ()->Unit = {}
|
||||||
) : ProxyDevice(inputChannel, outputChannel, doClose)
|
) : ProxyDevice(inputChannel, outputChannel, doClose) {
|
||||||
|
override fun toString(): String = "@$remoteAddress"
|
||||||
|
}
|
@ -44,7 +44,7 @@ interface DatagramConnector: AutoCloseable {
|
|||||||
|
|
||||||
expect fun NetworkAddress(host: String,port: Int): NetworkAddress
|
expect fun NetworkAddress(host: String,port: Int): NetworkAddress
|
||||||
|
|
||||||
fun CharSequence.toNetworkAddress() : NetworkAddress {
|
fun String.toNetworkAddress() : NetworkAddress {
|
||||||
val (host, port) = this.split(":").map { it.trim()}
|
val (host, port) = this.split(":").map { it.trim()}
|
||||||
return NetworkAddress(host, port.toInt())
|
return NetworkAddress(host, port.toInt())
|
||||||
}
|
}
|
||||||
|
@ -0,0 +1,9 @@
|
|||||||
|
package net.sergeych.tools
|
||||||
|
|
||||||
|
class AtomicCounter(initialValue: Long = 0) {
|
||||||
|
private val op = ProtectedOp()
|
||||||
|
var value: Long = initialValue
|
||||||
|
private set
|
||||||
|
|
||||||
|
fun incrementAndGet(): Long = op { ++value }
|
||||||
|
}
|
@ -18,7 +18,7 @@ actual fun acceptTcpDevice(port: Int): Flow<InetTransportDevice> {
|
|||||||
return flow {
|
return flow {
|
||||||
val socket = withContext(Dispatchers.IO) {
|
val socket = withContext(Dispatchers.IO) {
|
||||||
AsynchronousServerSocketChannel.open().also {
|
AsynchronousServerSocketChannel.open().also {
|
||||||
it.bind(InetSocketAddress(InetAddress.getLocalHost(), port))
|
it.bind(InetSocketAddress(port))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
while (true) {
|
while (true) {
|
||||||
@ -33,6 +33,8 @@ actual fun acceptTcpDevice(port: Int): Flow<InetTransportDevice> {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
suspend fun connectTcpDevice(address: String) = connectTcpDevice(address.toNetworkAddress())
|
||||||
|
suspend fun connectTcpDevice(host: String, port: Int) = connectTcpDevice(NetworkAddress(host,port))
|
||||||
actual suspend fun connectTcpDevice(address: NetworkAddress): InetTransportDevice {
|
actual suspend fun connectTcpDevice(address: NetworkAddress): InetTransportDevice {
|
||||||
address as JvmNetworkAddress
|
address as JvmNetworkAddress
|
||||||
val socket = withContext(Dispatchers.IO) {
|
val socket = withContext(Dispatchers.IO) {
|
||||||
|
@ -13,7 +13,7 @@ import java.util.concurrent.atomic.AtomicInteger
|
|||||||
private val counter = AtomicInteger(0)
|
private val counter = AtomicInteger(0)
|
||||||
|
|
||||||
class JvmNetworkAddress(val inetAddress: InetAddress, override val port: Int) : NetworkAddress {
|
class JvmNetworkAddress(val inetAddress: InetAddress, override val port: Int) : NetworkAddress {
|
||||||
override val host: String by lazy { inetAddress.hostName }
|
override val host: String by lazy { inetAddress.canonicalHostName }
|
||||||
override fun equals(other: Any?): Boolean {
|
override fun equals(other: Any?): Boolean {
|
||||||
if (this === other) return true
|
if (this === other) return true
|
||||||
if (other !is JvmNetworkAddress) return false
|
if (other !is JvmNetworkAddress) return false
|
||||||
@ -32,6 +32,7 @@ class JvmNetworkAddress(val inetAddress: InetAddress, override val port: Int) :
|
|||||||
return result
|
return result
|
||||||
}
|
}
|
||||||
|
|
||||||
|
override fun toString(): String = "$host:$port"
|
||||||
}
|
}
|
||||||
|
|
||||||
class UdpDatagram(override val message: UByteArray, val inetAddress: InetAddress, val port: Int) : Datagram {
|
class UdpDatagram(override val message: UByteArray, val inetAddress: InetAddress, val port: Int) : Datagram {
|
||||||
|
@ -1,10 +1,51 @@
|
|||||||
package net.sergeych.kiloparsec
|
package net.sergeych.kiloparsec
|
||||||
|
|
||||||
|
import kotlinx.coroutines.test.runTest
|
||||||
|
import net.sergeych.crypto.initCrypto
|
||||||
|
import net.sergeych.kiloparsec.adapter.acceptTcpDevice
|
||||||
|
import net.sergeych.kiloparsec.adapter.connectTcpDevice
|
||||||
|
import net.sergeych.mp_logger.Log
|
||||||
|
import java.net.InetAddress
|
||||||
import kotlin.test.Test
|
import kotlin.test.Test
|
||||||
|
import kotlin.test.assertEquals
|
||||||
|
|
||||||
class ClientTest {
|
class ClientTest {
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
fun testClient() {
|
fun testAddresses() {
|
||||||
// Todo
|
println(InetAddress.getLocalHost())
|
||||||
|
println(InetAddress.getByName("localhost"))
|
||||||
|
println(InetAddress.getByName("127.0.0.1"))
|
||||||
|
println(InetAddress.getByName("mail.ru"))
|
||||||
|
}
|
||||||
|
@Test
|
||||||
|
fun testClient() = runTest {
|
||||||
|
initCrypto()
|
||||||
|
Log.connectConsole(Log.Level.DEBUG)
|
||||||
|
data class Session(
|
||||||
|
var data: String
|
||||||
|
)
|
||||||
|
|
||||||
|
val cmdSave by command<String,Unit>()
|
||||||
|
val cmdLoad by command<Unit,String>()
|
||||||
|
|
||||||
|
val cli = KiloInterface<Session>().apply {
|
||||||
|
on(cmdSave) { session.data = it }
|
||||||
|
on(cmdLoad) {
|
||||||
|
println("load!")
|
||||||
|
session.data }
|
||||||
|
}
|
||||||
|
val server = KiloServer(cli, acceptTcpDevice(17101)) { Session("unknown")}
|
||||||
|
val client = KiloClient<Unit> {
|
||||||
|
connect { connectTcpDevice("localhost:17101") }
|
||||||
|
}
|
||||||
|
println(client.call(cmdLoad))
|
||||||
|
|
||||||
|
assertEquals("unknown", client.call(cmdLoad))
|
||||||
|
client.call(cmdSave, "foobar")
|
||||||
|
assertEquals("foobar", client.call(cmdLoad))
|
||||||
|
server.close()
|
||||||
|
// client.close()
|
||||||
|
// Todo
|
||||||
}
|
}
|
||||||
}
|
}
|
Loading…
x
Reference in New Issue
Block a user