removed nullable array from transport device, as channels close() is more than enough

This commit is contained in:
Sergey Chernov 2023-11-20 15:49:29 +03:00
parent e7abbe6d1d
commit e619b45485
6 changed files with 30 additions and 17 deletions

View File

@ -37,6 +37,7 @@ kotlin {
val hostOs = System.getProperty("os.name")
val isArm64 = System.getProperty("os.arch") == "aarch64"
val isMingwX64 = hostOs.startsWith("Windows")
@Suppress("UNUSED_VARIABLE")
val nativeTarget = when {
hostOs == "Mac OS X" && isArm64 -> macosArm64("native")
hostOs == "Mac OS X" && !isArm64 -> macosX64("native")
@ -46,6 +47,7 @@ kotlin {
else -> throw GradleException("Host OS is not supported in Kotlin/Native.")
}
val ktor_version = "2.3.6"
sourceSets {
all {
@ -61,6 +63,7 @@ kotlin {
implementation("com.ionspin.kotlin:multiplatform-crypto-libsodium-bindings:0.9.0")
api("com.ionspin.kotlin:bignum:0.3.8")
implementation("io.ktor:ktor-client-core:$ktor_version")
api("net.sergeych:mp_bintools:0.0.6-SNAPSHOT")
api("net.sergeych:mp_stools:1.4.1")
@ -73,11 +76,27 @@ kotlin {
implementation("org.jetbrains.kotlinx:kotlinx-coroutines-test:1.7.3")
}
}
val jvmMain by getting
val jvmMain by getting {
dependencies {
implementation("io.ktor:ktor-server-core:$ktor_version")
implementation("io.ktor:ktor-server-core-jvm:$ktor_version")
implementation("io.ktor:ktor-server-websockets-jvm:$ktor_version")
implementation("io.ktor:ktor-server-netty:$ktor_version")
implementation("io.ktor:ktor-client-netty:$ktor_version")
}
}
val jvmTest by getting
val jsMain by getting
val jsMain by getting {
dependencies {
implementation("io.ktor:ktor-client-js:$ktor_version")
}
}
val jsTest by getting
val nativeMain by getting
val nativeMain by getting {
dependencies {
implementation("io.ktor:ktor-client-cio:$ktor_version")
}
}
val nativeTest by getting
}
}

View File

@ -41,7 +41,7 @@ class Transport<S>(
* Input blocks. When the device is disconnected, it should send one null to this channel
* to notify the owner. When [close] is called, the channel should be closed.
*/
val input: ReceiveChannel<UByteArray?>
val input: ReceiveChannel<UByteArray>
/**
* Send a binary block to a remote party where it should be received and put into [input]
@ -139,7 +139,7 @@ class Transport<S>(
debug { "awaiting incoming blocks" }
while (isActive && !isClosed) {
try {
device.input.receive()?.let { packed ->
device.input.receive().let { packed ->
debug { "<<<\n${packed.toDump()}" }
val b = unpack<Block>(packed)
debug { "<<$ $b" }
@ -181,9 +181,6 @@ class Transport<S>(
.also { debug { "command executed: ${b.name}" } }
}
}
} ?: run {
debug { "remote channel close received" }
isClosed = true
}
} catch (_: CancellationException) {
info { "loop is cancelled" }

View File

@ -7,7 +7,7 @@ import kotlinx.coroutines.channels.Channel
*/
@Suppress("unused")
class InetTransportDevice(
inputChannel: Channel<UByteArray?>,
inputChannel: Channel<UByteArray>,
outputChannel: Channel<UByteArray>,
val remoteAddress: NetworkAddress,
val flush: suspend ()->Unit = {},

View File

@ -6,11 +6,11 @@ import kotlinx.coroutines.channels.SendChannel
import net.sergeych.kiloparsec.Transport
open class ProxyDevice(
inputChannel: Channel<UByteArray?>,
inputChannel: Channel<UByteArray>,
outputChannel: Channel<UByteArray>,
private val onClose: suspend ()->Unit = {}): Transport.Device {
override val input: ReceiveChannel<UByteArray?> = inputChannel
override val input: ReceiveChannel<UByteArray> = inputChannel
override val output: SendChannel<UByteArray> = outputChannel
override suspend fun close() {
onClose()

View File

@ -6,7 +6,6 @@ import kotlinx.coroutines.channels.SendChannel
import kotlinx.coroutines.test.runTest
import net.sergeych.crypto.Key
import net.sergeych.crypto.initCrypto
import net.sergeych.kiloparsec.command
import net.sergeych.kiloparsec.*
import net.sergeych.mp_logger.Log
import kotlin.test.*
@ -17,7 +16,7 @@ fun createTestDevice(): Pair<Transport.Device, Transport.Device> {
val p2 = Channel<UByteArray>(256)
val id = ++dcnt
val d1 = object : Transport.Device {
override val input: ReceiveChannel<UByteArray?> = p1
override val input: ReceiveChannel<UByteArray> = p1
override val output: SendChannel<UByteArray> = p2
override suspend fun close() {
p2.close()
@ -28,7 +27,7 @@ fun createTestDevice(): Pair<Transport.Device, Transport.Device> {
}
}
val d2 = object : Transport.Device {
override val input: ReceiveChannel<UByteArray?> = p2
override val input: ReceiveChannel<UByteArray> = p2
override val output: SendChannel<UByteArray> = p1
override suspend fun close() {
p1.close()

View File

@ -98,7 +98,7 @@ suspend fun asyncSocketToDevice(socket: AsynchronousSocketChannel): InetTranspor
}
}
// transport device copes with blocks:
val inputBlocks = Channel<UByteArray?>()
val inputBlocks = Channel<UByteArray>()
// decode blocks from a byte channel read from the socket:
launch {
try {
@ -116,9 +116,7 @@ suspend fun asyncSocketToDevice(socket: AsynchronousSocketChannel): InetTranspor
}
}
} catch (_: CancellationException) {
inputBlocks.send(null)
} catch (_: ClosedReceiveChannelException) {
inputBlocks.send(null)
stop()
}
receiving.value = false