websocket client now includes transport device to use in higher order protocols

This commit is contained in:
Sergey Chernov 2024-11-23 11:47:56 +07:00
parent 4098358233
commit 9545ca28cf
2 changed files with 80 additions and 68 deletions

View File

@ -8,7 +8,7 @@ plugins {
}
group = "net.sergeych"
version = "0.4.5-SNAPSHOT"
version = "0.4.6-SNAPSHOT"
repositories {
mavenCentral()

View File

@ -11,10 +11,7 @@ import kotlinx.coroutines.channels.ClosedReceiveChannelException
import kotlinx.coroutines.channels.ClosedSendChannelException
import kotlinx.coroutines.launch
import net.sergeych.crypto2.SigningKey
import net.sergeych.kiloparsec.KiloClient
import net.sergeych.kiloparsec.KiloConnectionData
import net.sergeych.kiloparsec.KiloInterface
import net.sergeych.kiloparsec.RemoteInterface
import net.sergeych.kiloparsec.*
import net.sergeych.mp_logger.LogTag
import net.sergeych.mp_logger.exception
import net.sergeych.mp_logger.info
@ -24,91 +21,106 @@ import net.sergeych.tools.AtomicCounter
private val counter = AtomicCounter()
/**
* Shortcut to create websocket client. Use [webSocketTransportDevice] with [KiloClient]
* for fine-grained control.
*/
fun <S> websocketClient(
path: String,
clientInterface: KiloInterface<S> = KiloInterface(),
client: HttpClient = HttpClient { install(WebSockets) },
secretKey: SigningKey? = null,
sessionMaker: () -> S = {
@Suppress("UNCHECKED_CAST")
Unit as S
},
): KiloClient<S> {
return KiloClient(clientInterface, secretKey) {
KiloConnectionData(webSocketTransportDevice(path), sessionMaker())
}
}
/**
* Create kilopaarsec transport over websocket (ws or wss).
* @param path websocket path (must start with ws:// or wss:// and contain a path part)
* @client use default [HttpClient], it installs [WebSockets] plugin
*/
fun webSocketTransportDevice(
path: String,
client: HttpClient = HttpClient { install(WebSockets) },
): Transport.Device {
var u = Url(path)
if (u.encodedPath.length <= 1)
u = URLBuilder(u).apply {
encodedPath = "/kp"
}.build()
return KiloClient(clientInterface, secretKey) {
val input = Channel<UByteArray>()
val output = Channel<UByteArray>()
val closeHandle = CompletableDeferred<Boolean>()
globalLaunch {
val log = LogTag("KC:${counter.incrementAndGet()}")
client.webSocket({
url.protocol = u.protocol
url.host = u.host
url.port = u.port
url.encodedPath = u.encodedPath
url.parameters.appendAll(u.parameters)
log.info { "kiloparsec server URL: $url" }
}) {
log.info { "connected to the server" }
val input = Channel<UByteArray>()
val output = Channel<UByteArray>()
val closeHandle = CompletableDeferred<Boolean>()
globalLaunch {
val log = LogTag("KC:${counter.incrementAndGet()}")
client.webSocket({
url.protocol = u.protocol
url.host = u.host
url.port = u.port
url.encodedPath = u.encodedPath
url.parameters.appendAll(u.parameters)
log.info { "kiloparsec server URL: $url" }
}) {
log.info { "connected to the server" }
// println("SENDING!!!")
// send("Helluva")
launch {
try {
for (block in output) {
send(block.toByteArray())
launch {
try {
for (block in output) {
send(block.toByteArray())
}
log.info { "input is closed, closing the websocket" }
if (closeHandle.isActive) closeHandle.complete(true)
} catch (_: ClosedSendChannelException) {
log.info { "send channel closed" }
} catch (_: CancellationException) {
} catch (t: Throwable) {
log.info { "unexpected exception in websock sender: ${t.stackTraceToString()}" }
closeHandle.completeExceptionally(t)
}
if (closeHandle.isActive) closeHandle.complete(false)
}
launch {
try {
for (f in incoming) {
if (f is Frame.Binary) {
input.send(f.readBytes().toUByteArray())
} else {
log.warning { "ignoring unexpected frame of type ${f.frameType}" }
}
log.info { "input is closed, closing the websocket" }
if (closeHandle.isActive) closeHandle.complete(true)
} catch (_: ClosedSendChannelException) {
log.info { "send channel closed" }
}
catch(_: CancellationException) {}
catch(t: Throwable) {
log.info { "unexpected exception in websock sender: ${t.stackTraceToString()}" }
closeHandle.completeExceptionally(t)
}
if (closeHandle.isActive) closeHandle.complete(true)
} catch (_: CancellationException) {
if (closeHandle.isActive) closeHandle.complete(false)
} catch (_: ClosedReceiveChannelException) {
log.warning { "receive channel closed unexpectedly" }
if (closeHandle.isActive) closeHandle.complete(false)
} catch (t: Throwable) {
log.exception { "unexpected error" to t }
if (closeHandle.isActive) closeHandle.complete(false)
}
launch {
try {
for (f in incoming) {
if (f is Frame.Binary) {
input.send(f.readBytes().toUByteArray())
} else {
log.warning { "ignoring unexpected frame of type ${f.frameType}" }
}
}
if (closeHandle.isActive) closeHandle.complete(true)
} catch (_: CancellationException) {
if (closeHandle.isActive) closeHandle.complete(false)
} catch (_: ClosedReceiveChannelException) {
log.warning { "receive channel closed unexpectedly" }
if (closeHandle.isActive) closeHandle.complete(false)
} catch (t: Throwable) {
log.exception { "unexpected error" to t }
if (closeHandle.isActive) closeHandle.complete(false)
}
}
if(!closeHandle.await()) {
log.warning { "Client is closing with error" }
throw RemoteInterface.ClosedException()
}
output.close()
input.close()
}
log.info { "closing connection" }
if (!closeHandle.await()) {
log.warning { "Client is closing with error" }
throw RemoteInterface.ClosedException()
}
output.close()
input.close()
}
val device = ProxyDevice(input, output) {
// we need to explicitly close the coroutine job, or it can hang for a long time
// leaking resources.
closeHandle.complete(true)
// job.cancel()
}
KiloConnectionData(device, sessionMaker())
log.info { "closing connection" }
}
val device = ProxyDevice(input, output) {
// we need to explicitly close the coroutine job, or it can hang for a long time
// leaking resources.
closeHandle.complete(true)
// job.cancel()
}
return device
}