fix #48 flows
This commit is contained in:
parent
1a90b25b1e
commit
9aae33d564
@ -160,23 +160,32 @@ Great: the generator is not executed until collected bu the `f.take()` call, whi
|
|||||||
|
|
||||||
Important difference from the channels or like, every time you collect the flow, you collect it anew:
|
Important difference from the channels or like, every time you collect the flow, you collect it anew:
|
||||||
|
|
||||||
|
var isStarted = false
|
||||||
val f = flow {
|
val f = flow {
|
||||||
emit("start")
|
emit("start")
|
||||||
|
isStarted = true
|
||||||
(1..4).forEach { emit(it) }
|
(1..4).forEach { emit(it) }
|
||||||
}
|
}
|
||||||
|
// flow is not yet started, e.g. not got execited,
|
||||||
|
// that is called 'cold':
|
||||||
|
assertEquals( false, isStarted )
|
||||||
|
|
||||||
// let's collect flow:
|
// let's collect flow:
|
||||||
val result = []
|
val result = []
|
||||||
for( x in f ) result += x
|
for( x in f ) result += x
|
||||||
println(result)
|
println(result)
|
||||||
|
|
||||||
// let's collect it once again:
|
assertEquals( true, isStarted)
|
||||||
|
|
||||||
|
// let's collect it once again, it should be the same:
|
||||||
println(f.toList())
|
println(f.toList())
|
||||||
|
|
||||||
// and again:
|
// and again:
|
||||||
//assertEquals( result, f.toList() )
|
assertEquals( result, f.toList() )
|
||||||
|
|
||||||
>>> ["start", 1, 2, 3, 4]
|
>>> ["start", 1, 2, 3, 4]
|
||||||
>>> ["start", 1, 2, 3, 4]
|
>>> ["start", 1, 2, 3, 4]
|
||||||
>>> void
|
>>> void
|
||||||
|
|
||||||
1
|
Notice that flow's lambda is not called until actual collection is started. Cold flows are
|
||||||
|
better in terms of resource consumption.
|
||||||
|
@ -23,10 +23,7 @@ class ObjFlowBuilder(val output: SendChannel<Obj>) : Obj() {
|
|||||||
val type = object : ObjClass("FlowBuilder") {}.apply {
|
val type = object : ObjClass("FlowBuilder") {}.apply {
|
||||||
addFn("emit") {
|
addFn("emit") {
|
||||||
val data = requireOnlyArg<Obj>()
|
val data = requireOnlyArg<Obj>()
|
||||||
println("well well $data")
|
|
||||||
try {
|
try {
|
||||||
println("builder ${thisAs<ObjFlowBuilder>().hashCode()}")
|
|
||||||
println("channel ${thisAs<ObjFlowBuilder>().output.hashCode()}")
|
|
||||||
val channel = thisAs<ObjFlowBuilder>().output
|
val channel = thisAs<ObjFlowBuilder>().output
|
||||||
if( !channel.isClosedForSend )
|
if( !channel.isClosedForSend )
|
||||||
channel.send(data)
|
channel.send(data)
|
||||||
@ -74,7 +71,6 @@ class ObjFlow(val producer: Statement) : Obj() {
|
|||||||
}
|
}
|
||||||
}.apply {
|
}.apply {
|
||||||
addFn("iterator") {
|
addFn("iterator") {
|
||||||
println("called iterator!")
|
|
||||||
ObjFlowIterator(thisAs<ObjFlow>().producer)
|
ObjFlowIterator(thisAs<ObjFlow>().producer)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -90,28 +90,20 @@ class TestCoroutines {
|
|||||||
fun testFlow2() = runTest {
|
fun testFlow2() = runTest {
|
||||||
eval("""
|
eval("""
|
||||||
val f = flow {
|
val f = flow {
|
||||||
println("Starting generator")
|
|
||||||
emit("start")
|
emit("start")
|
||||||
emit("start2")
|
emit("start2")
|
||||||
println("Emitting")
|
|
||||||
(1..4).forEach {
|
(1..4).forEach {
|
||||||
// println("you hoo "+it)
|
|
||||||
emit(it)
|
emit(it)
|
||||||
}
|
}
|
||||||
println("Done emitting")
|
|
||||||
}
|
}
|
||||||
// let's collect flow:
|
// let's collect flow:
|
||||||
val result = []
|
val result = []
|
||||||
// for( x in f ) result += x
|
for( x in f ) result += x
|
||||||
println(result)
|
println(result)
|
||||||
|
|
||||||
// let's collect it once again:
|
// let's collect it once again:
|
||||||
println(f.toList())
|
assertEquals( result, f.toList())
|
||||||
println(f.toList())
|
assertEquals( result, f.toList())
|
||||||
// for( x in f ) println(x)
|
|
||||||
// for( x in f ) println(x)
|
|
||||||
|
|
||||||
//assertEquals( result, f.toList() )
|
|
||||||
""".trimIndent())
|
""".trimIndent())
|
||||||
}
|
}
|
||||||
}
|
}
|
Loading…
x
Reference in New Issue
Block a user