diff --git a/build.gradle.kts b/build.gradle.kts index 379eab8..7b17d71 100644 --- a/build.gradle.kts +++ b/build.gradle.kts @@ -8,7 +8,7 @@ plugins { } group = "net.sergeych" -version = "0.4.5-SNAPSHOT" +version = "0.4.6-SNAPSHOT" repositories { mavenCentral() diff --git a/src/commonMain/kotlin/net/sergeych/kiloparsec/adapter/websocketClient.kt b/src/commonMain/kotlin/net/sergeych/kiloparsec/adapter/websocketClient.kt index 5f93f0b..a86d1aa 100644 --- a/src/commonMain/kotlin/net/sergeych/kiloparsec/adapter/websocketClient.kt +++ b/src/commonMain/kotlin/net/sergeych/kiloparsec/adapter/websocketClient.kt @@ -11,10 +11,7 @@ import kotlinx.coroutines.channels.ClosedReceiveChannelException import kotlinx.coroutines.channels.ClosedSendChannelException import kotlinx.coroutines.launch import net.sergeych.crypto2.SigningKey -import net.sergeych.kiloparsec.KiloClient -import net.sergeych.kiloparsec.KiloConnectionData -import net.sergeych.kiloparsec.KiloInterface -import net.sergeych.kiloparsec.RemoteInterface +import net.sergeych.kiloparsec.* import net.sergeych.mp_logger.LogTag import net.sergeych.mp_logger.exception import net.sergeych.mp_logger.info @@ -24,91 +21,106 @@ import net.sergeych.tools.AtomicCounter private val counter = AtomicCounter() +/** + * Shortcut to create websocket client. Use [webSocketTransportDevice] with [KiloClient] + * for fine-grained control. + */ fun websocketClient( path: String, clientInterface: KiloInterface = KiloInterface(), - client: HttpClient = HttpClient { install(WebSockets) }, secretKey: SigningKey? = null, sessionMaker: () -> S = { @Suppress("UNCHECKED_CAST") Unit as S }, ): KiloClient { + return KiloClient(clientInterface, secretKey) { + KiloConnectionData(webSocketTransportDevice(path), sessionMaker()) + } +} + +/** + * Create kilopaarsec transport over websocket (ws or wss). + * @param path websocket path (must start with ws:// or wss:// and contain a path part) + * @client use default [HttpClient], it installs [WebSockets] plugin + */ +fun webSocketTransportDevice( + path: String, + client: HttpClient = HttpClient { install(WebSockets) }, +): Transport.Device { var u = Url(path) if (u.encodedPath.length <= 1) u = URLBuilder(u).apply { encodedPath = "/kp" }.build() - return KiloClient(clientInterface, secretKey) { - val input = Channel() - val output = Channel() - val closeHandle = CompletableDeferred() - globalLaunch { - val log = LogTag("KC:${counter.incrementAndGet()}") - client.webSocket({ - url.protocol = u.protocol - url.host = u.host - url.port = u.port - url.encodedPath = u.encodedPath - url.parameters.appendAll(u.parameters) - log.info { "kiloparsec server URL: $url" } - }) { - log.info { "connected to the server" } + val input = Channel() + val output = Channel() + val closeHandle = CompletableDeferred() + globalLaunch { + val log = LogTag("KC:${counter.incrementAndGet()}") + client.webSocket({ + url.protocol = u.protocol + url.host = u.host + url.port = u.port + url.encodedPath = u.encodedPath + url.parameters.appendAll(u.parameters) + log.info { "kiloparsec server URL: $url" } + }) { + log.info { "connected to the server" } // println("SENDING!!!") // send("Helluva") - launch { - try { - for (block in output) { - send(block.toByteArray()) + launch { + try { + for (block in output) { + send(block.toByteArray()) + } + log.info { "input is closed, closing the websocket" } + if (closeHandle.isActive) closeHandle.complete(true) + } catch (_: ClosedSendChannelException) { + log.info { "send channel closed" } + } catch (_: CancellationException) { + } catch (t: Throwable) { + log.info { "unexpected exception in websock sender: ${t.stackTraceToString()}" } + closeHandle.completeExceptionally(t) + } + if (closeHandle.isActive) closeHandle.complete(false) + } + launch { + try { + for (f in incoming) { + if (f is Frame.Binary) { + input.send(f.readBytes().toUByteArray()) + } else { + log.warning { "ignoring unexpected frame of type ${f.frameType}" } } - log.info { "input is closed, closing the websocket" } - if (closeHandle.isActive) closeHandle.complete(true) - } catch (_: ClosedSendChannelException) { - log.info { "send channel closed" } - } - catch(_: CancellationException) {} - catch(t: Throwable) { - log.info { "unexpected exception in websock sender: ${t.stackTraceToString()}" } - closeHandle.completeExceptionally(t) } + if (closeHandle.isActive) closeHandle.complete(true) + } catch (_: CancellationException) { + if (closeHandle.isActive) closeHandle.complete(false) + } catch (_: ClosedReceiveChannelException) { + log.warning { "receive channel closed unexpectedly" } + if (closeHandle.isActive) closeHandle.complete(false) + } catch (t: Throwable) { + log.exception { "unexpected error" to t } if (closeHandle.isActive) closeHandle.complete(false) } - launch { - try { - for (f in incoming) { - if (f is Frame.Binary) { - input.send(f.readBytes().toUByteArray()) - } else { - log.warning { "ignoring unexpected frame of type ${f.frameType}" } - } - } - if (closeHandle.isActive) closeHandle.complete(true) - } catch (_: CancellationException) { - if (closeHandle.isActive) closeHandle.complete(false) - } catch (_: ClosedReceiveChannelException) { - log.warning { "receive channel closed unexpectedly" } - if (closeHandle.isActive) closeHandle.complete(false) - } catch (t: Throwable) { - log.exception { "unexpected error" to t } - if (closeHandle.isActive) closeHandle.complete(false) - } - } - if(!closeHandle.await()) { - log.warning { "Client is closing with error" } - throw RemoteInterface.ClosedException() - } - output.close() - input.close() } - log.info { "closing connection" } + if (!closeHandle.await()) { + log.warning { "Client is closing with error" } + throw RemoteInterface.ClosedException() + } + output.close() + input.close() } - val device = ProxyDevice(input, output) { - // we need to explicitly close the coroutine job, or it can hang for a long time - // leaking resources. - closeHandle.complete(true) -// job.cancel() - } - KiloConnectionData(device, sessionMaker()) + log.info { "closing connection" } } + val device = ProxyDevice(input, output) { + // we need to explicitly close the coroutine job, or it can hang for a long time + // leaking resources. + closeHandle.complete(true) +// job.cancel() + } + return device } +