WS layer with tests
This commit is contained in:
parent
3a56e67c24
commit
192f7e135f
@ -7,6 +7,9 @@ import kotlin.reflect.KProperty
|
|||||||
/**
|
/**
|
||||||
* delegate returning function that creates a [Command] in the current context which by default has the name of
|
* delegate returning function that creates a [Command] in the current context which by default has the name of
|
||||||
* the property.
|
* the property.
|
||||||
|
*
|
||||||
|
* The Default name is a property name except the "cmd" prefix if present, which will be
|
||||||
|
* removed automatically.
|
||||||
*/
|
*/
|
||||||
inline fun <reified A, reified R> command(overrideName: String? = null): CommandDelegate<A, R> {
|
inline fun <reified A, reified R> command(overrideName: String? = null): CommandDelegate<A, R> {
|
||||||
return CommandDelegate(
|
return CommandDelegate(
|
||||||
|
@ -63,11 +63,13 @@ class KiloClient<S>(
|
|||||||
_state.value = false
|
_state.value = false
|
||||||
if (deferredClient.isActive)
|
if (deferredClient.isActive)
|
||||||
deferredClient = CompletableDeferred()
|
deferredClient = CompletableDeferred()
|
||||||
|
delay(1000)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fun close() {
|
fun close() {
|
||||||
job.cancel()
|
job.cancel()
|
||||||
|
debug { "client is closed" }
|
||||||
}
|
}
|
||||||
|
|
||||||
override suspend fun <A, R> call(cmd: Command<A, R>, args: A): R = deferredClient.await().call(cmd, args)
|
override suspend fun <A, R> call(cmd: Command<A, R>, args: A): R = deferredClient.await().call(cmd, args)
|
||||||
|
@ -86,6 +86,7 @@ class KiloClientConnection<S>(
|
|||||||
} catch (x: CancellationException) {
|
} catch (x: CancellationException) {
|
||||||
info { "client is cancelled" }
|
info { "client is cancelled" }
|
||||||
} catch (x: RemoteInterface.ClosedException) {
|
} catch (x: RemoteInterface.ClosedException) {
|
||||||
|
x.printStackTrace()
|
||||||
info { "connection closed by remote" }
|
info { "connection closed by remote" }
|
||||||
} finally {
|
} finally {
|
||||||
onConnectedStateChanged?.invoke(false)
|
onConnectedStateChanged?.invoke(false)
|
||||||
|
@ -112,8 +112,16 @@ class Transport<S>(
|
|||||||
}
|
}
|
||||||
|
|
||||||
// now we have mutex freed so we can call:
|
// now we have mutex freed so we can call:
|
||||||
val r = device.output.trySend(pack(b).also { debug { ">>>\n${it.toDump()}" } })
|
val r = runCatching { device.output.send(pack(b).also { debug { ">>>\n${it.toDump()}" } }) }
|
||||||
if (!r.isSuccess) deferred.completeExceptionally(RemoteInterface.ClosedException())
|
if (!r.isSuccess) {
|
||||||
|
r.exceptionOrNull()?.let {
|
||||||
|
exception { "failed to send output block" to it }
|
||||||
|
} ?: run {
|
||||||
|
error { "It should not happen: empty exception on block send failure" }
|
||||||
|
throw RuntimeException("unexpected failure in sending block")
|
||||||
|
}
|
||||||
|
deferred.completeExceptionally(RemoteInterface.ClosedException())
|
||||||
|
}
|
||||||
|
|
||||||
// it returns packed result or throws a proper error:
|
// it returns packed result or throws a proper error:
|
||||||
return deferred.await()
|
return deferred.await()
|
||||||
|
@ -4,7 +4,9 @@ import kotlinx.coroutines.channels.Channel
|
|||||||
import kotlinx.coroutines.channels.ReceiveChannel
|
import kotlinx.coroutines.channels.ReceiveChannel
|
||||||
import kotlinx.coroutines.channels.SendChannel
|
import kotlinx.coroutines.channels.SendChannel
|
||||||
import net.sergeych.kiloparsec.Transport
|
import net.sergeych.kiloparsec.Transport
|
||||||
|
import net.sergeych.tools.AtomicCounter
|
||||||
|
|
||||||
|
private val counter = AtomicCounter()
|
||||||
open class ProxyDevice(
|
open class ProxyDevice(
|
||||||
inputChannel: Channel<UByteArray>,
|
inputChannel: Channel<UByteArray>,
|
||||||
outputChannel: Channel<UByteArray>,
|
outputChannel: Channel<UByteArray>,
|
||||||
@ -15,4 +17,8 @@ open class ProxyDevice(
|
|||||||
override suspend fun close() {
|
override suspend fun close() {
|
||||||
onClose()
|
onClose()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private val id = counter.incrementAndGet()
|
||||||
|
|
||||||
|
override fun toString(): String = "PX$id"
|
||||||
}
|
}
|
@ -26,7 +26,10 @@ fun <S>websocketClient(
|
|||||||
clientInterface: KiloInterface<S> = KiloInterface(),
|
clientInterface: KiloInterface<S> = KiloInterface(),
|
||||||
client: HttpClient = HttpClient { install(WebSockets) },
|
client: HttpClient = HttpClient { install(WebSockets) },
|
||||||
secretKey: SigningKey.Secret? = null,
|
secretKey: SigningKey.Secret? = null,
|
||||||
sessionMaker: () -> S,
|
sessionMaker: () -> S = {
|
||||||
|
@Suppress("UNCHECKED_CAST")
|
||||||
|
Unit as S
|
||||||
|
},
|
||||||
): KiloClient<S> {
|
): KiloClient<S> {
|
||||||
var u = Url(path)
|
var u = Url(path)
|
||||||
if (u.encodedPath.length <= 1)
|
if (u.encodedPath.length <= 1)
|
||||||
@ -37,16 +40,20 @@ fun <S>websocketClient(
|
|||||||
return KiloClient(clientInterface, secretKey) {
|
return KiloClient(clientInterface, secretKey) {
|
||||||
val input = Channel<UByteArray>()
|
val input = Channel<UByteArray>()
|
||||||
val output = Channel<UByteArray>()
|
val output = Channel<UByteArray>()
|
||||||
globalLaunch {
|
val job = globalLaunch {
|
||||||
val log = LogTag("KC:${counter.incrementAndGet()}:$u")
|
val log = LogTag("KC:${counter.incrementAndGet()}")
|
||||||
client.webSocket({
|
client.webSocket({
|
||||||
url.protocol = u.protocol
|
url.protocol = u.protocol
|
||||||
url.host = u.host
|
url.host = u.host
|
||||||
|
url.port = u.port
|
||||||
url.encodedPath = u.encodedPath
|
url.encodedPath = u.encodedPath
|
||||||
url.parameters.appendAll(u.parameters)
|
url.parameters.appendAll(u.parameters)
|
||||||
|
log.info { "kiloparsec server URL: $url" }
|
||||||
}) {
|
}) {
|
||||||
try {
|
try {
|
||||||
log.info { "connected to server" }
|
log.info { "connected to the server" }
|
||||||
|
println("SENDING!!!")
|
||||||
|
send("Helluva")
|
||||||
launch {
|
launch {
|
||||||
for (block in output) {
|
for (block in output) {
|
||||||
send(block.toByteArray())
|
send(block.toByteArray())
|
||||||
@ -70,7 +77,12 @@ fun <S>websocketClient(
|
|||||||
log.info { "closing connection" }
|
log.info { "closing connection" }
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
val device = ProxyDevice(input,output) { input.close() }
|
val device = ProxyDevice(input,output) {
|
||||||
|
input.close()
|
||||||
|
// we need to explicitly close the coroutine job or it can hang active
|
||||||
|
// forever and leak resources:
|
||||||
|
job.cancel()
|
||||||
|
}
|
||||||
KiloConnectionData(device, sessionMaker())
|
KiloConnectionData(device, sessionMaker())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -9,6 +9,7 @@ import kotlinx.coroutines.channels.Channel
|
|||||||
import kotlinx.coroutines.isActive
|
import kotlinx.coroutines.isActive
|
||||||
import kotlinx.coroutines.launch
|
import kotlinx.coroutines.launch
|
||||||
import net.sergeych.crypto2.SigningKey
|
import net.sergeych.crypto2.SigningKey
|
||||||
|
import net.sergeych.crypto2.toDump
|
||||||
import net.sergeych.kiloparsec.KiloInterface
|
import net.sergeych.kiloparsec.KiloInterface
|
||||||
import net.sergeych.kiloparsec.KiloServerConnection
|
import net.sergeych.kiloparsec.KiloServerConnection
|
||||||
import net.sergeych.mp_logger.LogTag
|
import net.sergeych.mp_logger.LogTag
|
||||||
@ -34,14 +35,17 @@ fun <S> Application.setupWebsocketServer(
|
|||||||
val counter = AtomicCounter()
|
val counter = AtomicCounter()
|
||||||
routing {
|
routing {
|
||||||
webSocket(path) {
|
webSocket(path) {
|
||||||
|
println("--------------------------------------------")
|
||||||
val log = LogTag("KWS:${counter.incrementAndGet()}")
|
val log = LogTag("KWS:${counter.incrementAndGet()}")
|
||||||
log.debug { "opening the connection" }
|
log.debug { "opening the connection" }
|
||||||
val input = Channel<UByteArray>(256)
|
val input = Channel<UByteArray>(256)
|
||||||
val output = Channel<UByteArray>(256)
|
val output = Channel<UByteArray>(256)
|
||||||
launch {
|
launch {
|
||||||
|
log.debug { "starting output pump" }
|
||||||
while (isActive) {
|
while (isActive) {
|
||||||
send(output.receive().toByteArray())
|
send(output.receive().toByteArray())
|
||||||
}
|
}
|
||||||
|
log.debug { "closing output pump" }
|
||||||
}
|
}
|
||||||
val server = KiloServerConnection(
|
val server = KiloServerConnection(
|
||||||
localInterface,
|
localInterface,
|
||||||
@ -50,12 +54,15 @@ fun <S> Application.setupWebsocketServer(
|
|||||||
serverKey
|
serverKey
|
||||||
)
|
)
|
||||||
launch { server.run() }
|
launch { server.run() }
|
||||||
|
log.debug { "KSC started, looking for incoming frames" }
|
||||||
for( f in incoming) {
|
for( f in incoming) {
|
||||||
|
log.debug { "incoming frame: ${f.frameType}" }
|
||||||
if (f is Frame.Binary)
|
if (f is Frame.Binary)
|
||||||
input.send(f.readBytes().toUByteArray())
|
input.send(f.readBytes().toUByteArray().also {
|
||||||
|
log.debug { "in frame\n${it.toDump()}" }
|
||||||
|
})
|
||||||
else
|
else
|
||||||
log.warning { "unknown frame type ${f.frameType}, ignoring" }
|
log.warning { "unknown frame type ${f.frameType}, ignoring" }
|
||||||
|
|
||||||
}
|
}
|
||||||
log.debug { "closing the server" }
|
log.debug { "closing the server" }
|
||||||
cancel()
|
cancel()
|
||||||
|
@ -1,9 +1,13 @@
|
|||||||
package net.sergeych.kiloparsec
|
package net.sergeych.kiloparsec
|
||||||
|
|
||||||
|
import io.ktor.server.engine.*
|
||||||
|
import io.ktor.server.netty.*
|
||||||
import kotlinx.coroutines.test.runTest
|
import kotlinx.coroutines.test.runTest
|
||||||
import net.sergeych.crypto2.initCrypto
|
import net.sergeych.crypto2.initCrypto
|
||||||
import net.sergeych.kiloparsec.adapter.acceptTcpDevice
|
import net.sergeych.kiloparsec.adapter.acceptTcpDevice
|
||||||
import net.sergeych.kiloparsec.adapter.connectTcpDevice
|
import net.sergeych.kiloparsec.adapter.connectTcpDevice
|
||||||
|
import net.sergeych.kiloparsec.adapter.setupWebsocketServer
|
||||||
|
import net.sergeych.kiloparsec.adapter.websocketClient
|
||||||
import net.sergeych.mp_logger.Log
|
import net.sergeych.mp_logger.Log
|
||||||
import java.net.InetAddress
|
import java.net.InetAddress
|
||||||
import kotlin.test.Test
|
import kotlin.test.Test
|
||||||
@ -46,12 +50,45 @@ class ClientTest {
|
|||||||
client.call(cmdSave, "foobar")
|
client.call(cmdSave, "foobar")
|
||||||
assertEquals("foobar", client.call(cmdLoad))
|
assertEquals("foobar", client.call(cmdLoad))
|
||||||
server.close()
|
server.close()
|
||||||
// client.close()
|
|
||||||
// Todo
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
fun webSocketTest() = runTest {
|
fun webSocketTest() = runTest {
|
||||||
|
initCrypto()
|
||||||
|
// fun Application.
|
||||||
|
val cmdGetFoo by command<Unit,String>()
|
||||||
|
val cmdSetFoo by command<String,Unit>()
|
||||||
|
val cmdCheckConnected by command<Unit,Boolean>()
|
||||||
|
|
||||||
|
Log.connectConsole(Log.Level.DEBUG)
|
||||||
|
|
||||||
|
data class Session(var foo: String="not set")
|
||||||
|
val serverInterface = KiloInterface<Session>().apply {
|
||||||
|
var connectedCalled = false
|
||||||
|
onConnected { connectedCalled = true }
|
||||||
|
on(cmdGetFoo) { session.foo }
|
||||||
|
on(cmdSetFoo) { session.foo = it }
|
||||||
|
on(cmdCheckConnected) { connectedCalled }
|
||||||
|
}
|
||||||
// val server = setupWebsoketServer()
|
// val server = setupWebsoketServer()
|
||||||
|
val ns: NettyApplicationEngine = embeddedServer(Netty, port = 8080, host = "0.0.0.0", module = {
|
||||||
|
setupWebsocketServer(serverInterface) { Session() }
|
||||||
|
}).start(wait = false)
|
||||||
|
|
||||||
|
val client = websocketClient<Unit>("ws://localhost:8080/kp")
|
||||||
|
println(1)
|
||||||
|
assertEquals(true, client.call(cmdCheckConnected))
|
||||||
|
println(2)
|
||||||
|
assertEquals("not set", client.call(cmdGetFoo))
|
||||||
|
println(3)
|
||||||
|
client.call(cmdSetFoo, "foo")
|
||||||
|
println(4)
|
||||||
|
assertEquals("foo", client.call(cmdGetFoo))
|
||||||
|
println(5)
|
||||||
|
|
||||||
|
client.close()
|
||||||
|
ns.stop()
|
||||||
|
println("stopped server")
|
||||||
|
println("closed client")
|
||||||
}
|
}
|
||||||
}
|
}
|
Loading…
x
Reference in New Issue
Block a user