working suspended TCP block device on JVM (server and client)
This commit is contained in:
		
							parent
							
								
									fe29bec1b0
								
							
						
					
					
						commit
						1814572a04
					
				| @ -7,6 +7,7 @@ import com.ionspin.kotlin.crypto.secretbox.crypto_secretbox_NONCEBYTES | |||||||
| import com.ionspin.kotlin.crypto.util.LibsodiumRandom | import com.ionspin.kotlin.crypto.util.LibsodiumRandom | ||||||
| import kotlinx.coroutines.channels.ReceiveChannel | import kotlinx.coroutines.channels.ReceiveChannel | ||||||
| import kotlinx.serialization.Serializable | import kotlinx.serialization.Serializable | ||||||
|  | import net.sergeych.bintools.encodeToHex | ||||||
| import net.sergeych.bintools.toDataSource | import net.sergeych.bintools.toDataSource | ||||||
| import net.sergeych.bipack.BipackDecoder | import net.sergeych.bipack.BipackDecoder | ||||||
| import net.sergeych.bipack.BipackEncoder | import net.sergeych.bipack.BipackEncoder | ||||||
| @ -30,13 +31,17 @@ data class WithFill( | |||||||
| suspend fun readVarUnsigned(input: ReceiveChannel<UByte>): UInt { | suspend fun readVarUnsigned(input: ReceiveChannel<UByte>): UInt { | ||||||
|     var result = 0u |     var result = 0u | ||||||
|     var cnt = 0 |     var cnt = 0 | ||||||
|  |     println("----start") | ||||||
|     while(true) { |     while(true) { | ||||||
|         val b = input.receive().toUInt() |         val b = input.receive().toUInt() | ||||||
|         result = (result shr 7) or (b and 0x7fu) |         println("RVU: ${b.encodeToHex()} / ${b and 0x80u}") | ||||||
|         if( (b and 0x80u) != 0u ) break |         result = (result shl 7) or (b and 0x7fu) | ||||||
|         if( ++cnt > 4 ) throw IllegalArgumentException("overflow while decoding varuint") |         if( (b and 0x80u) != 0u ) { | ||||||
|  |             println("---! $result") | ||||||
|  |             return result | ||||||
|  |         } | ||||||
|  |         if( ++cnt > 5 ) throw IllegalArgumentException("overflow while decoding varuint") | ||||||
|     } |     } | ||||||
|     return result |  | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| fun encodeVarUnsigned(value: UInt): UByteArray { | fun encodeVarUnsigned(value: UInt): UByteArray { | ||||||
|  | |||||||
| @ -51,6 +51,6 @@ fun CharSequence.toNetworkAddress() : NetworkAddress { | |||||||
| } | } | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
| expect fun acceptTcpDevice(pord: Int): Flow<Transport.Device> | expect fun acceptTcpDevice(port: Int): Flow<Transport.Device> | ||||||
| 
 | 
 | ||||||
| expect suspend fun connectTcpDevice(address: NetworkAddress): Transport.Device | expect suspend fun connectTcpDevice(address: NetworkAddress): Transport.Device | ||||||
| @ -7,7 +7,7 @@ actual fun NetworkAddress(host: String, port: Int): NetworkAddress { | |||||||
|     TODO("Not yet implemented") |     TODO("Not yet implemented") | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| actual fun acceptTcpDevice(pord: Int): Flow<Transport.Device> { | actual fun acceptTcpDevice(port: Int): Flow<Transport.Device> { | ||||||
|     TODO("Not yet implemented") |     TODO("Not yet implemented") | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
|  | |||||||
| @ -4,19 +4,35 @@ import kotlinx.coroutines.Dispatchers | |||||||
| import kotlinx.coroutines.channels.SendChannel | import kotlinx.coroutines.channels.SendChannel | ||||||
| import kotlinx.coroutines.flow.Flow | import kotlinx.coroutines.flow.Flow | ||||||
| import kotlinx.coroutines.flow.flow | import kotlinx.coroutines.flow.flow | ||||||
|  | import kotlinx.coroutines.suspendCancellableCoroutine | ||||||
| import kotlinx.coroutines.withContext | import kotlinx.coroutines.withContext | ||||||
| import net.sergeych.kiloparsec.Transport | import net.sergeych.kiloparsec.Transport | ||||||
| import java.net.InetAddress | import java.net.InetAddress | ||||||
|  | import java.net.InetSocketAddress | ||||||
|  | import java.nio.channels.AsynchronousServerSocketChannel | ||||||
| import java.nio.channels.AsynchronousSocketChannel | import java.nio.channels.AsynchronousSocketChannel | ||||||
| import kotlin.coroutines.suspendCoroutine | import kotlin.coroutines.suspendCoroutine | ||||||
| 
 | 
 | ||||||
| actual fun NetworkAddress(host: String, port: Int): NetworkAddress = | actual fun NetworkAddress(host: String, port: Int): NetworkAddress = | ||||||
|     JvmNetworkAddress(InetAddress.getByName(host), port) |     JvmNetworkAddress(InetAddress.getByName(host), port) | ||||||
| 
 | 
 | ||||||
| actual fun acceptTcpDevice(pord: Int): Flow<Transport.Device> { | actual fun acceptTcpDevice(port: Int): Flow<Transport.Device> { | ||||||
|     return flow { |     return flow { | ||||||
| 
 |         println("start generating tcp accept flow") | ||||||
|         TODO("Not yet implemented") |         val socket = withContext(Dispatchers.IO) { | ||||||
|  |             AsynchronousServerSocketChannel.open().also { | ||||||
|  |                 it.bind(InetSocketAddress(InetAddress.getLocalHost(), port)) | ||||||
|  |             } | ||||||
|  |         } | ||||||
|  |         println("Server socket ready $socket") | ||||||
|  |         val connectedSocket = suspendCancellableCoroutine { continuation -> | ||||||
|  |             continuation.invokeOnCancellation { | ||||||
|  |                 socket.close() | ||||||
|  |             } | ||||||
|  |             socket.accept(continuation, ContinuationHandler()) | ||||||
|  |         } | ||||||
|  |         println("incoming connection") | ||||||
|  |         emit(asyncSocketToDevice(connectedSocket)) | ||||||
|     } |     } | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| @ -33,5 +49,5 @@ actual suspend fun connectTcpDevice(address: NetworkAddress): Transport.Device { | |||||||
| } | } | ||||||
| 
 | 
 | ||||||
| suspend fun SendChannel<UByte>.sendAll(bytes: Collection<UByte>) { | suspend fun SendChannel<UByte>.sendAll(bytes: Collection<UByte>) { | ||||||
|     for( b in bytes) send(b) |     for (b in bytes) send(b) | ||||||
| } | } | ||||||
| @ -9,6 +9,7 @@ import kotlinx.coroutines.isActive | |||||||
| import kotlinx.coroutines.launch | import kotlinx.coroutines.launch | ||||||
| import net.sergeych.crypto.encodeVarUnsigned | import net.sergeych.crypto.encodeVarUnsigned | ||||||
| import net.sergeych.crypto.readVarUnsigned | import net.sergeych.crypto.readVarUnsigned | ||||||
|  | import net.sergeych.crypto.toDump | ||||||
| import net.sergeych.kiloparsec.Transport | import net.sergeych.kiloparsec.Transport | ||||||
| import net.sergeych.mp_tools.globalLaunch | import net.sergeych.mp_tools.globalLaunch | ||||||
| import java.nio.ByteBuffer | import java.nio.ByteBuffer | ||||||
| @ -38,21 +39,22 @@ suspend fun asyncSocketToDevice(socket: AsynchronousSocketChannel): Transport.De | |||||||
|                 val size: Int = suspendCoroutine { continuation -> |                 val size: Int = suspendCoroutine { continuation -> | ||||||
|                     socket.read(inb, continuation, IntCompletionHandler) |                     socket.read(inb, continuation, IntCompletionHandler) | ||||||
|                 } |                 } | ||||||
|  |                 println("--------- read chunk $size") | ||||||
|                 if (size < 0) stop() |                 if (size < 0) stop() | ||||||
|                 else for (i in 0..<size) input.send(inb[i].toUByte().also { print(it.toInt().toChar())}) |                 else for (i in 0..<size) input.send(inb[i].toUByte().also { print(it.toInt().toChar())}) | ||||||
|             } |             } | ||||||
|         } |         } | ||||||
|         // copy from output tp socket |         // copy from output tp socket | ||||||
|         launch { |         launch { | ||||||
|             val outb = ByteBuffer.allocate(1024) |             val outBuff = ArrayList<Byte>(1024) | ||||||
|             try { |             try { | ||||||
|                 while (isActive) { |                 while (isActive) { | ||||||
|                     var count = 0 |                     outBuff.clear() | ||||||
|                     outb.put(count++, output.receive().toByte()) |                     outBuff.add(output.receive().toByte()) | ||||||
|                     while (!output.isEmpty && count < outb.capacity()) |                     while (!output.isEmpty) | ||||||
|                         outb.put(count++, output.receive().toByte()) |                         outBuff.add(output.receive().toByte()) | ||||||
|                     suspendCoroutine { continuation -> |                     suspendCoroutine { continuation -> | ||||||
|                         socket.write(outb, continuation, IntCompletionHandler) |                         socket.write(ByteBuffer.wrap(outBuff.toByteArray()), continuation, IntCompletionHandler) | ||||||
|                     } |                     } | ||||||
|                 } |                 } | ||||||
|             } catch (_: ClosedReceiveChannelException) { |             } catch (_: ClosedReceiveChannelException) { | ||||||
| @ -65,12 +67,14 @@ suspend fun asyncSocketToDevice(socket: AsynchronousSocketChannel): Transport.De | |||||||
|             try { |             try { | ||||||
|                 while (isActive) { |                 while (isActive) { | ||||||
|                     val size = readVarUnsigned(input) |                     val size = readVarUnsigned(input) | ||||||
|  |                     println("expected size $size") | ||||||
|                     if (size == 0u) println("*** zero size block is ignored!") |                     if (size == 0u) println("*** zero size block is ignored!") | ||||||
|                     else { |                     else { | ||||||
|                         val block = UByteArray(size.toInt()) |                         val block = UByteArray(size.toInt()) | ||||||
|                         for (i in 0..<size.toInt()) { |                         for (i in 0..<size.toInt()) { | ||||||
|                             block[i] = input.receive() |                             block[i] = input.receive() | ||||||
|                         } |                         } | ||||||
|  |                         println("ready block:\n${block.toDump()}") | ||||||
|                         inputBlocks.send(block) |                         inputBlocks.send(block) | ||||||
|                     } |                     } | ||||||
|                 } |                 } | ||||||
|  | |||||||
| @ -1,8 +1,13 @@ | |||||||
| package net.sergeych.kiloparsec.adapters | package net.sergeych.kiloparsec.adapters | ||||||
| 
 | 
 | ||||||
|  | import com.ionspin.kotlin.crypto.util.decodeFromUByteArray | ||||||
| import com.ionspin.kotlin.crypto.util.encodeToUByteArray | import com.ionspin.kotlin.crypto.util.encodeToUByteArray | ||||||
|  | import kotlinx.coroutines.coroutineScope | ||||||
|  | import kotlinx.coroutines.launch | ||||||
| import kotlinx.coroutines.test.runTest | import kotlinx.coroutines.test.runTest | ||||||
|  | import kotlinx.coroutines.yield | ||||||
| import net.sergeych.kiloparsec.adapter.UdpServer | import net.sergeych.kiloparsec.adapter.UdpServer | ||||||
|  | import net.sergeych.kiloparsec.adapter.acceptTcpDevice | ||||||
| import net.sergeych.kiloparsec.adapter.connectTcpDevice | import net.sergeych.kiloparsec.adapter.connectTcpDevice | ||||||
| import net.sergeych.kiloparsec.adapter.toNetworkAddress | import net.sergeych.kiloparsec.adapter.toNetworkAddress | ||||||
| import net.sergeych.mp_logger.Log | import net.sergeych.mp_logger.Log | ||||||
| @ -16,20 +21,39 @@ class NetworkTest { | |||||||
|         Log.connectConsole(Log.Level.DEBUG) |         Log.connectConsole(Log.Level.DEBUG) | ||||||
|         val s1 = UdpServer(17120) |         val s1 = UdpServer(17120) | ||||||
|         val s2 = UdpServer(17121) |         val s2 = UdpServer(17121) | ||||||
|         s1.send("Hello".encodeToUByteArray(), "localhost",17121) |         s1.send("Hello".encodeToUByteArray(), "localhost", 17121) | ||||||
|         val d1 = s2.incoming.receive() |         val d1 = s2.incoming.receive() | ||||||
|         assertEquals(d1.address.port, 17120) |         assertEquals(d1.address.port, 17120) | ||||||
|         assertEquals("Hello", d1.message.toByteArray().decodeToString()) |         assertEquals("Hello", d1.message.toByteArray().decodeToString()) | ||||||
|         s1.send("world".encodeToUByteArray(),d1.address) |         s1.send("world".encodeToUByteArray(), d1.address) | ||||||
|         assertEquals("world", s1.incoming.receive().message.toByteArray().decodeToString()) |         assertEquals("world", s1.incoming.receive().message.toByteArray().decodeToString()) | ||||||
|     //        println("s1: ${s1.bindAddress()}") |         //        println("s1: ${s1.bindAddress()}") | ||||||
| 
 | 
 | ||||||
|     } |     } | ||||||
| 
 | 
 | ||||||
|     @Test |     @Test | ||||||
|     fun tcpAsyncConnectionTest() = runTest { |     fun tcpAsyncConnectionTest() = runTest { | ||||||
|         Log.connectConsole(Log.Level.DEBUG) |         Log.connectConsole(Log.Level.DEBUG) | ||||||
|         val s = connectTcpDevice("sergeych.net:80".toNetworkAddress()) | 
 | ||||||
|         s.input.receive() |         coroutineScope { | ||||||
|  |             val serverFlow = acceptTcpDevice(17171) | ||||||
|  |             launch { | ||||||
|  |                 serverFlow.collect { device -> | ||||||
|  |                     println("collected input: $device") | ||||||
|  |                     device.output.send("Hello, world!".encodeToUByteArray()) | ||||||
|  |                     device.output.send("Great".encodeToUByteArray()) | ||||||
|  |                     while(true) { | ||||||
|  |                         val x = device.input.receive()!!.decodeFromUByteArray() | ||||||
|  |                         if( x== "Goodbye" ) break | ||||||
|  |                         println("ignoring unexpected input: $x") | ||||||
|  |                     } | ||||||
|  |                 } | ||||||
|  |             } | ||||||
|  |             yield() | ||||||
|  |             val s = connectTcpDevice("127.0.1.1:17171".toNetworkAddress()) | ||||||
|  |             assertEquals("Hello, world!", s.input.receive()!!.decodeFromUByteArray()) | ||||||
|  |             assertEquals("Great", s.input.receive()!!.decodeFromUByteArray()) | ||||||
|  |             s.output.send("Goodbye".encodeToUByteArray()) | ||||||
|  |         } | ||||||
|     } |     } | ||||||
| } | } | ||||||
| @ -7,7 +7,7 @@ actual fun NetworkAddress(host: String, port: Int): NetworkAddress { | |||||||
|     TODO("Not yet implemented") |     TODO("Not yet implemented") | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| actual fun acceptTcpDevice(pord: Int): Flow<Transport.Device> { | actual fun acceptTcpDevice(port: Int): Flow<Transport.Device> { | ||||||
|     TODO("Not yet implemented") |     TODO("Not yet implemented") | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
|  | |||||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user