Kotlin 数据流:冷热Flow 全面解析
1、Flow 概述
Flow 是 Kotlin 协程中的数据流处理工具,它提供了一种响应式编程模型,用于处理异步数据流。Flow 基于协程构建,具有挂起函数的特性,可以轻松处理异步操作而不会阻塞主线程。
核心特点:
- 异步处理:基于协程,支持挂起操作
- 响应式:支持数据流的转换和处理
- 背压处理:自动处理生产者和消费者之间的速度差异
- 操作符丰富:提供大量操作符进行数据流转换
2、冷流(Cold Flow)
2.1、定义与特性
冷流是指只有在被收集(collect)时才开始执行的流。每个收集者都会触发流的重新执行,每个收集者都有自己独立的数据流。
核心特性:
- 惰性执行:不收集不执行
- 每次收集都会重新执行
- 多个收集者相互独立
- 生产者和消费者一对一
2.2、代码示例
fun simpleFlow(): Flow<Int> = flow {
println("Flow started")
for (i in 1..3) {
delay(100)
emit(i)
}
}
// 第一次收集
println("First collect")
simpleFlow().collect { value ->
println("Collected: $value")
}
// 第二次收集
println("Second collect")
simpleFlow().collect { value ->
println("Collected: $value")
}
// 输出:
// First collect
// Flow started
// Collected: 1
// Collected: 2
// Collected: 3
// Second collect
// Flow started
// Collected: 1
// Collected: 2
// Collected: 3
2.3、适用场景
冷流适用于以下场景:
- 一次性数据获取(如网络请求)
- 每次需要重新计算的数据
- 每个消费者需要独立数据源的场景
3、热流(Hot Flow)
3.1、定义与特性
热流是指无论是否被收集,都会持续产生数据的流。多个收集者可以共享同一个数据流,接收相同的数据。
核心特性:
- 主动执行:不依赖收集者
- 多个收集者共享数据
- 数据产生与收集分离
- 可以有多个订阅者
3.2、SharedFlow 与 StateFlow
Kotlin 提供了两种热流实现:
- SharedFlow:通用热流,支持多个订阅者
- StateFlow:特殊的 SharedFlow,始终保持最新状态
3.3、代码示例
// SharedFlow 示例
val sharedFlow = MutableSharedFlow<Int>()
// 生产者
scope.launch {
for (i in 1..5) {
delay(100)
sharedFlow.emit(i)
println("Emitted: $i")
}
}
// 消费者 1
scope.launch {
sharedFlow.collect {
println("Consumer 1 received: $it")
}
}
// 消费者 2
scope.launch {
delay(200) // 延迟订阅
sharedFlow.collect {
println("Consumer 2 received: $it")
}
}
3.4、适用场景
热流适用于以下场景:
- 事件总线
- 状态管理
- 实时数据更新
- 多个组件需要共享同一数据源的场景
4、创建流
4.1、使用 flow 构建器
fun createFlow(): Flow<Int> = flow {
for (i in 1..5) {
delay(100)
emit(i)
}
}
4.2、从集合创建
val flowFromList = listOf(1, 2, 3).asFlow()
val flowFromSequence = sequenceOf(1, 2, 3).asFlow()
4.3、使用 flowOf
val simpleFlow = flowOf(1, 2, 3, 4, 5)
4.4、使用 channelFlow
fun createChannelFlow(): Flow<Int> = channelFlow {
send(1)
delay(100)
send(2)
delay(100)
send(3)
}
4.5、从回调转换
fun callbackToFlow(): Flow<Int> = callbackFlow {
val callback = object : SomeCallback {
override fun onData(data: Int) {
trySend(data)
}
override fun onError(error: Throwable) {
close(error)
}
}
registerCallback(callback)
awaitClose { unregisterCallback(callback) }
}
5、流的基本操作
5.1、转换操作
- map:转换每个元素
flow.map { it * 2 }- filter:过滤元素
flow.filter { it > 5 }- transform:更灵活的转换
flow.transform {
emit(it)
emit(it * 2)
}
5.2、组合操作
- zip:组合两个流
flow1.zip(flow2) { a, b -> a + b }- combine:合并两个流的最新值
flow1.combine(flow2) { a, b -> a + b }
5.3、扁平化操作
- flattenMerge:扁平化流的流
flowOf(flow1, flow2).flattenMerge()
- flatMapConcat:顺序扁平化
flow.flatMapConcat { createFlow(it) }- flatMapMerge:并行扁平化
flow.flatMapMerge { createFlow(it) }
5.4、终端操作
- collect:收集所有元素
flow.collect { println(it) }- toList / toSet:转换为集合
val list = flow.toList()
- first / last:获取第一个或最后一个元素
val first = flow.first()
- reduce / fold:累积计算
val sum = flow.reduce { acc, value -> acc + value }
5.5、背压处理
- buffer:缓冲元素
flow.buffer(10)
- conflate:只处理最新元素
flow.conflate()
- collectLatest:取消旧的处理
flow.collectLatest { process(it) }
通过合理使用 Flow,可以使异步代码更加简洁、可读,同时保持高性能和可靠性。