为何flow的first无效?


#1

再将多个flow合并然后使用first获取第一个发射值,发现会等待所有flow发射完才能接收到。求教原因。
测试代码:

 @JvmStatic
    fun main(args: Array<String>) = runBlocking {
        val result = listOf(1, 2, 3, 4).map {
            flow {
                emit(deal(it, System.currentTimeMillis()).also {
                    println("emit " + "  time coast " + (System.currentTimeMillis() - it))
                })
            }.flowOn(Dispatchers.IO)
        }.merge().buffer(4)

        result.first().also {
            println("collect result" + "   " + (System.currentTimeMillis() - it))
        }
        println("end")
    }

    fun deal(value: Int, time: Long): Long {
        println("FLOW " + Thread.currentThread().name)
        sleep(value * 1000L)
        return time
    }

#2

打印结果是:

FLOW DefaultDispatcher-worker-3
FLOW DefaultDispatcher-worker-2
FLOW DefaultDispatcher-worker-4
FLOW DefaultDispatcher-worker-1
emit   time coast 1001
emit   time coast 2004
emit   time coast 3001
emit   time coast 4001
collect result   4002
end

但实际预期应该是第一个发射完就应该结束掉

FLOW DefaultDispatcher-worker-3
FLOW DefaultDispatcher-worker-2
FLOW DefaultDispatcher-worker-4
FLOW DefaultDispatcher-worker-1
emit   time coast 1001
collect result   1002
end

#3

不要用sleep,用delay


#4

这里疑惑的是在不同线程为何还会阻塞。 sleep不是只阻塞当前线程吗 ? 而且接收线程和发射线程不是同一个,为何第一个emit之后不能立即收到? 你用任何挂起函数都不会发生这样的问题,但还是没能解释first要等所有emit结束的原因。


#5

我说一说我的看法吧
我在代码里面加了一些输出内容,输出是这样的:

Start 3
FLOW Thread[DefaultDispatcher-worker-3,5,main]
Start 4
FLOW Thread[DefaultDispatcher-worker-4,5,main]
Start 2
FLOW Thread[DefaultDispatcher-worker-2,5,main]
Start 1
FLOW Thread[DefaultDispatcher-worker-1,5,main]
1 emit   time coast 1005
1 value sent
2 emit   time coast 2008
3 emit   time coast 3006
4 emit   time coast 4013
collect result   4013
end

感觉是first()收集完之后想取消flow,但是flow里面没挂起点,所以只能等到flow调用emit时候才能取消成功并且结束

如果把deal换成suspend函数,并且用delay的话,增加了挂起点,所以就可以成功取消了
使用delay的输出是这样的:

Start 1
Start 3
Start 4
Start 2
FLOW Thread[DefaultDispatcher-worker-1,5,main]
FLOW Thread[DefaultDispatcher-worker-2,5,main]
FLOW Thread[DefaultDispatcher-worker-4,5,main]
FLOW Thread[DefaultDispatcher-worker-3,5,main]
1 emit   time coast 1004
1 value sent
collect result   1078
end

#6

感觉说的不对,merge最后是channelFlow,内部使用Channel进行发送数据。而first是说在接受到第二个数据抛异常来停止接收。

public suspend fun <T> Flow<T>.first(): T {
    var result: Any? = NULL
    try {
        collect { value ->
            result = value
            throw AbortFlowException(NopCollector)
        }
    } catch (e: AbortFlowException) {
        // Do nothing
    }

    if (result === NULL) throw NoSuchElementException("Expected at least one element")
    return result as T
}

但实际情况是 一直接收不到数据,直到所有flow发射完。


京ICP备16022265号-2 Kotlin China 2017 - 2018