0.6.* started:

kotlin upgrade to 2.1.0
ktor updgared to 3.1.6
less debug noise
yet no wasmJS
This commit is contained in:
Sergey Chernov 2025-02-18 11:24:47 +03:00
parent 0ff27e6de9
commit c1bd6f09a9
7 changed files with 108 additions and 57 deletions

View File

@ -6,13 +6,14 @@ plugins {
}
group = "net.sergeych"
version = "0.5.4-SNAPSHOT"
version = "0.6.1-SNAPSHOT"
repositories {
mavenCentral()
mavenLocal()
maven("https://maven.universablockchain.com/")
maven("https://gitea.sergeych.net/api/packages/SergeychWorks/maven")
maven("https://gitea.sergeych.net/api/packages/YoungBlood/maven")
}
kotlin {
@ -32,8 +33,10 @@ kotlin {
// macosX64()
// macosX64()
mingwX64()
// @OptIn(ExperimentalWasmDsl::class)
// wasmJs()
val ktor_version = "2.3.12"
val ktor_version = "3.1.0"
sourceSets {
all {

View File

@ -0,0 +1,42 @@
package net.sergeych.kiloparsec
import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock
/**
* Multiplatform atomically mutable value to be used in [kotlinx.coroutines],
* with suspending mutating operations, see [mutate].
*
* Actual value can be either changed in a block of [mutate] when
* new value _depends on the current value_ or with [reset].
*
* [value] getter is suspended because it waits until the mutation finishes
*/
open class AtomicAsyncValue<T>(initialValue: T) {
private var actualValue = initialValue
private val access = Mutex()
/**
* Change the value: get the current and set to the returned, all in the
* atomic suspend operation. All other mutating requests including assigning to [value]
* will be blocked and queued.
* @return result of the mutation. Note that immediate call to property [value]
* could already return modified bu some other thread value!
*/
suspend fun mutate(mutator: suspend (T) -> T): T = access.withLock {
actualValue = mutator(actualValue)
actualValue
}
/**
* Atomic get or set the value. Atomic get means if there is a [mutate] in progress
* it will wait until the mutation finishes and then return the correct result.
*/
suspend fun value() = access.withLock { actualValue }
/**
* Set the new value without checking it. Shortcut to
* ```mutate { value = newValue }```
*/
suspend fun reset(value: T) = mutate { value }
}

View File

@ -15,7 +15,7 @@ import net.sergeych.kiloparsec.KiloServerConnection
import net.sergeych.kiloparsec.RemoteInterface
import net.sergeych.mp_logger.*
import net.sergeych.tools.AtomicCounter
import java.time.Duration
import kotlin.time.Duration.Companion.seconds
/**
* Create a ktor-based websocket server.
@ -32,7 +32,6 @@ import java.time.Duration
* session could be transport specific.
*
* @param localInterface where the actual work is performed.
* @param timeout how long to wait for the connection to be established.
* @param path default http path to the websocket.
* @param serverKey optional key to authenticate the connection. If the client specify expected
* server key it should match of connection will not be established.
@ -45,8 +44,8 @@ fun <S> Application.setupWebsocketServer(
createSession: () -> S,
) {
install(WebSockets) {
pingPeriod = Duration.ofSeconds(15)
timeout = Duration.ofSeconds(15)
pingPeriod = 60.seconds //Duration.ofSeconds(15)
timeout = 45.seconds
maxFrameSize = Long.MAX_VALUE
masking = false
}

View File

@ -3,6 +3,7 @@ package net.sergeych.kiloparsec
import assertThrows
import io.ktor.server.engine.*
import io.ktor.server.netty.*
import kotlinx.coroutines.delay
import kotlinx.coroutines.launch
import kotlinx.coroutines.test.runTest
import net.sergeych.crypto2.initCrypto
@ -10,6 +11,7 @@ import net.sergeych.kiloparsec.adapter.setupWebsocketServer
import net.sergeych.kiloparsec.adapter.websocketClient
import net.sergeych.mp_logger.Log
import java.net.InetAddress
import kotlin.random.Random
import kotlin.test.Test
import kotlin.test.assertEquals
import kotlin.test.assertFalse
@ -50,11 +52,12 @@ class ClientTest {
}
}
val ns: NettyApplicationEngine = embeddedServer(Netty, port = 8080, host = "0.0.0.0", module = {
val port = Random.nextInt(8080,9090)
val ns = embeddedServer(Netty, port = port, host = "0.0.0.0", module = {
setupWebsocketServer(serverInterface) { Session() }
}).start(wait = false)
val client = websocketClient<Unit>("ws://localhost:8080/kp")
val client = websocketClient<Unit>("ws://localhost:$port/kp")
val states = mutableListOf<Boolean>()
val collector = launch {
client.connectedStateFlow.collect {
@ -75,6 +78,9 @@ class ClientTest {
}
// connection should now be closed
// the problem is: it needs some unspecified time to close
// as it is async process.
delay(100)
assertFalse { client.connectedStateFlow.value }
// this should be run on automatically reopen connection

View File

@ -3,6 +3,7 @@ package net.sergeych.kiloparsec.adapter
import io.ktor.network.selector.*
import io.ktor.network.sockets.*
import io.ktor.utils.io.*
import io.ktor.utils.io.writeByte
import kotlinx.coroutines.*
import kotlinx.coroutines.CancellationException
import kotlinx.coroutines.channels.Channel
@ -12,14 +13,10 @@ import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock
import kotlinx.datetime.Clock
import net.sergeych.kiloparsec.AsyncVarint
import net.sergeych.kiloparsec.KiloClient
import net.sergeych.kiloparsec.KiloServer
import net.sergeych.kiloparsec.LocalInterface
import net.sergeych.kiloparsec.*
import net.sergeych.mp_logger.*
import net.sergeych.mp_tools.globalLaunch
import net.sergeych.tools.AtomicCounter
import net.sergeych.tools.AtomicValue
import kotlin.time.Duration.Companion.seconds
private val logCounter = AtomicCounter(0)
@ -33,10 +30,10 @@ internal val PING_INACTIVITY_TIME = 30.seconds
* Listen for incoming TCP/IP connections on all local interfaces and the specified [port]
* anc create flow of [InetTransportDevice] suitable for [KiloClient].
*/
fun acceptTcpDevice(port: Int,localInterface: String = "0.0.0.0"): Flow<InetTransportDevice> {
fun acceptTcpDevice(port: Int, localInterface: String = "0.0.0.0"): Flow<InetTransportDevice> {
val selectorManager = SelectorManager(Dispatchers.IO)
val serverSocket = aSocket(selectorManager).tcp().bind(localInterface, port)
return flow {
val serverSocket = aSocket(selectorManager).tcp().bind(localInterface, port)
while (true) {
serverSocket.accept().let { sock ->
emit(inetTransportDevice(sock, "srv"))
@ -74,7 +71,7 @@ private fun inetTransportDevice(
val outputBlocks = Channel<UByteArray>(4096)
val log = LogTag("TCPT${logCounter.incrementAndGet()}:$suffix:$networkAddress")
val job = AtomicValue<Job?>(null)
val job = AtomicAsyncValue<Job?>(null)
val sockOutput = sock.openWriteChannel()
val sockInput = runCatching { sock.openReadChannel() }.getOrElse {
@ -82,16 +79,16 @@ private fun inetTransportDevice(
throw IllegalStateException("failed to open read channel")
}
fun stop() {
suspend fun stop() {
job.mutate {
if ( it != null ) {
if (it != null) {
log.debug { "stopping" }
runCatching { inputBlocks.close() }
runCatching { outputBlocks.close() }
// The problem: on mac platofrms closing the socket does not close its input
// and output channels!
runCatching { sockInput.cancel() }
runCatching { sockOutput.close() }
runCatching { sockOutput.flushAndClose() }
if (!sock.isClosed)
runCatching {
log.debug { "closing socket by stop" }
@ -108,46 +105,47 @@ private fun inetTransportDevice(
}
var lastActiveAt = Clock.System.now()
job.value = globalLaunch {
launch {
globalLaunch {
job.reset(globalLaunch {
launch {
log.debug { "opening read channel" }
log.debug { "opening read channel" }
while (isActive && sock.isActive) {
try {
val size = AsyncVarint.decodeUnsigned(sockInput).toInt()
if (size > MAX_TCP_BLOCK_SIZE) // 16M is a max command block
throw ProtocolException("Illegal block size: $size should be < $MAX_TCP_BLOCK_SIZE")
val data = ByteArray(size)
if (size == 0) {
log.debug { "ping received" }
lastActiveAt = Clock.System.now()
} else {
sockInput.readFully(data, 0, size)
inputBlocks.send(data.toUByteArray())
while (isActive && sock.isActive) {
try {
val size = AsyncVarint.decodeUnsigned(sockInput).toInt()
if (size > MAX_TCP_BLOCK_SIZE) // 16M is a max command block
throw ProtocolException("Illegal block size: $size should be < $MAX_TCP_BLOCK_SIZE")
val data = ByteArray(size)
if (size == 0) {
log.debug { "ping received" }
lastActiveAt = Clock.System.now()
} else {
sockInput.readFully(data, 0, size)
inputBlocks.send(data.toUByteArray())
}
} catch (e: ClosedReceiveChannelException) {
log.error { "closed receive channel " }
stop()
break
} catch (_: CancellationException) {
log.error { "cancellation exception " }
break
} catch (e: Exception) {
log.exception { "unexpected exception in TCP socket read" to e }
stop()
break
}
} catch (e: ClosedReceiveChannelException) {
log.error { "closed receive channel " }
stop()
break
} catch (_: CancellationException) {
log.error { "cancellation exception " }
break
} catch (e: Exception) {
log.exception { "unexpected exception in TCP socket read" to e }
stop()
break
}
}
}
})
launch {
val outAccess = Mutex()
var lastSentAt = Clock.System.now()
launch {
while (isActive && sock.isActive) {
delay(500)
val activityTime = if(lastSentAt > lastActiveAt) lastSentAt else lastActiveAt
val activityTime = if (lastSentAt > lastActiveAt) lastSentAt else lastActiveAt
if (Clock.System.now() - activityTime > PING_INACTIVITY_TIME) {
log.debug { "pinging for inactivity" }
val repeat = outAccess.withLock {

View File

@ -87,7 +87,7 @@ fun acceptUdpDevice(
* the module automatically issues pings on inactivity when there is no data often enough
* to maintain the connection open.
*/
fun connectUdpDevice(
suspend fun connectUdpDevice(
hostPort: String,
maxInactivityTimeout: Duration = 2.minutes,
) = connectUdpDevice(hostPort.toNetworkAddress(), maxInactivityTimeout)
@ -107,16 +107,16 @@ fun connectUdpDevice(
* the module automatically issues pings on inactivity when there is no data often enough
* to maintain the connection open.
*/
fun connectUdpDevice(
suspend fun connectUdpDevice(
addr: NetworkAddress,
maxInactivityTimeout: Duration = 2.minutes,
): InetTransportDevice {
val selectorManager = SelectorManager(Dispatchers.IO)
val remoteAddress = InetSocketAddress(addr.host, addr.port)
val socket = aSocket(selectorManager).udp().connect(remoteAddress)
val done = CompletableDeferred<Unit>()
val socket = aSocket(selectorManager).udp().connect(remoteAddress)
val transport = UdpSocketTransport(object : UdpConnector {
override suspend fun sendBlock(block: UdpBlock, toAddress: SocketAddress) {
socket.send(block.toDatagram(remoteAddress))

View File

@ -14,6 +14,7 @@ import net.sergeych.mp_logger.LogTag
import net.sergeych.mp_logger.Loggable
import net.sergeych.mp_logger.debug
import net.sergeych.mp_logger.exception
import net.sergeych.mp_tools.globalDefer
import kotlin.time.Duration
import kotlin.time.Duration.Companion.minutes
@ -51,7 +52,9 @@ class UdpServer(val port: Int, localInterface: String = "0.0.0.0", maxInactivity
private val access = Mutex()
private val selectorManager = SelectorManager(Dispatchers.IO)
private val serverSocket = aSocket(selectorManager).udp().bind(InetSocketAddress(localInterface, port))
private val serverSocket = globalDefer {
aSocket(selectorManager).udp().bind(InetSocketAddress(localInterface, port))
}
override suspend fun disconnectClient(address: SocketAddress) {
access.withLock { sessions.remove(address) }
@ -65,7 +68,7 @@ class UdpServer(val port: Int, localInterface: String = "0.0.0.0", maxInactivity
flow {
while (true) {
try {
val datagram = serverSocket.receive()
val datagram = serverSocket.await().receive()
val block = UdpBlock.decode(datagram)
val remoteAddress = datagram.address
@ -97,10 +100,10 @@ class UdpServer(val port: Int, localInterface: String = "0.0.0.0", maxInactivity
}
override suspend fun sendBlock(block: UdpBlock, toAddress: SocketAddress) {
serverSocket.send(block.toDatagram(toAddress))
serverSocket.await().send(block.toDatagram(toAddress))
}
val isClosed: Boolean get() = serverSocket.isClosed
suspend fun isClosed(): Boolean = serverSocket.await().isClosed
/**
* Close the UDP server. Calling it will cause:
@ -113,8 +116,8 @@ class UdpServer(val port: Int, localInterface: String = "0.0.0.0", maxInactivity
*/
suspend fun close() {
access.withLock {
if (!isClosed) {
runCatching { serverSocket.close() }
if (!isClosed()) {
runCatching { serverSocket.await().close() }
}
}
while (sessions.isNotEmpty()) {