一 Flow使用注意事项
多个Flow
不能放到一个lifecycleScope.launch
里去collect{}
,因为进入collect{}
相当于一个死循环,下一行代码永远不会执行;如果就想写到一个lifecycleScope.launch{}
里去,可以在内部再开启launch{}
子协程去执行。
示例,下面是错误写法:
//NOTE: 下面的示例是错误写法
lifecycleScope.launch {
mFlowModel.caseFlow1
.flowWithLifecycle(lifecycle, Lifecycle.State.STARTED)
.collect {}
mFlowModel.caseFlow2
.flowWithLifecycle(lifecycle, Lifecycle.State.STARTED)
.collect {}
}
正确写法:
lifecycleScope.launch {
launch {
mFlowModel.caseFlow1
.flowWithLifecycle(lifecycle, Lifecycle.State.STARTED)
.collect {}
}
launch {
mFlowModel.caseFlow2
.flowWithLifecycle(lifecycle, Lifecycle.State.STARTED)
.collect {}
}
}
当然,直接启动多个 lifecycleScope.launch
也是可以的。
二 几种使用场景
2.1、处理复杂、耗时逻辑
一般在处理复杂逻辑、耗时操作时,我们会将其放到子线程中去处理,避免在主线程中处理导致卡顿。而Flow
可以方便地进行线程切换,所以处理复杂逻辑、耗时操作时,可以考虑使用Flow
来进行处理,下面来看一个例子:
假设我们想读取本地Assets
目录下的person.json
文件,并将其解析出来,json
文件中的内容
// assets目录下person.json
{
"name": "小马快跑",
"age": 18,
"interest": "money! lots of money!"
}
下面通过Flow
的方式实现在IO
线程中读取json
文件,并最终在主线程中输出结果:
/**
* 通过Flow方式,获取本地文件
*/
private fun getFileInfo() {
lifecycleScope.launch {
flow {
//解析本地json文件,并生成对应字符串
val configStr = getAssetJsonInfo(requireContext(), "person.json")
//最后将得到的实体类发送到下游
emit(configStr)
}
.map { json ->
Gson().fromJson(json, PersonModel::class.java) //通过Gson将字符串转为实体类
}
.flowOn(Dispatchers.IO) //在flowOn之上的所有操作都是在IO线程中进行的
.onStart { log("onStart") }
.filterNotNull()
.onCompletion { log("onCompletion") }
.catch { ex -> log("catch:${ex.message}") }
.collect {
log("collect parse result:$it")
}
}
}
/**
* 读取Assets下的json文件
*/
private fun getAssetJsonInfo(context: Context, fileName: String): String {
val strBuilder = StringBuilder()
var input: InputStream? = null
var inputReader: InputStreamReader? = null
var reader: BufferedReader? = null
try {
input = context.assets.open(fileName, AssetManager.ACCESS_BUFFER)
inputReader = InputStreamReader(input, StandardCharsets.UTF_8)
reader = BufferedReader(inputReader)
var line: String?
while ((reader.readLine().also { line = it }) != null) {
strBuilder.append(line)
}
} catch (ex: Exception) {
ex.printStackTrace()
} finally {
try {
input?.close()
inputReader?.close()
reader?.close()
} catch (e: IOException) {
e.printStackTrace()
}
}
return strBuilder.toString()
}
执行结果:
11:11:32.178 E onStart
11:11:32.197 E collect parse result:PersonModel(name=小马快跑, age=18, interest=money! lots of money!)
11:11:32.198 E onCompletion
可以看到在collect{}
中得到了正确的数据,这里注意一下flowOn()
的作用域是在自身之上的操作,上述例子中flowOn(Dispatchers.IO)
意味着在flowOn
之上的所有操作都是在IO
线程中进行的。
2.2、存在依赖关系的接口请求
如果最终展示依赖多个接口且接口之间是有依赖关系的,之前我们可能会在第一个接口请求成功的回调里继续调用第二个接口,以此类推,这样虽然能实现,但是会导致回调层级很深,也就是所谓的回调地狱;此时可以使用Flow
的flatMapConcat
将多个接口串联起来。
lifecycleScope.launch {
lifecycle.repeatOnLifecycle(Lifecycle.State.STARTED) {
//将两个flow串联起来 先搜索目的地,然后到达目的地
mFlowModel.getSearchFlow()
.flatMapConcat {
//第二个flow依赖第一个的结果
mFlowModel.goDestinationFlow(it)
}.collect {
mTvCallbackFlow.text = it ?: "error"
}
}
}
2.3、组合多个接口的数据
有这样一种场景:数据的最终展示依赖多个接口请求到的数据,有两种实现方式:
- 一个个串行去请求接口,拿到数据后最终拼到一起;
- 所有接口并行去请求,拿到数据后最终拼到一起。
串行请求虽然可以,但是效率并不高;更优的方式是采用接口并行,可以使用Flow
的zip
操作符,如下要获取电费、水费、网费的总花销,对应的花费需要各自请求自己的接口,最终把数据进行合并统计:
//ViewModel中
//分别请求电费、水费、网费,Flow之间是并行关系
suspend fun requestElectricCost(): Flow =
flow {
delay(500)
emit(ExpendModel("电费", 10f, 500))
}.flowOn(Dispatchers.IO)
suspend fun requestWaterCost(): Flow =
flow {
delay(1000)
emit(ExpendModel("水费", 20f, 1000))
}.flowOn(Dispatchers.IO)
suspend fun requestInternetCost(): Flow =
flow {
delay(2000)
emit(ExpendModel("网费", 30f, 2000))
}.flowOn(Dispatchers.IO)
data class ExpendModel(val type: String, val cost: Float, val apiTime: Int) {
fun info(): String {
return "${type}: ${cost}, 接口请求耗时约$apiTime ms"
}
}
//UI层
mBtnZip.setOnClickListener {
lifecycleScope.launch {
val electricFlow = mFlowModel.requestElectricCost()
val waterFlow = mFlowModel.requestWaterCost()
val internetFlow = mFlowModel.requestInternetCost()
val builder = StringBuilder()
var totalCost = 0f
val startTime = System.currentTimeMillis()
//NOTE:注意这里可以多个zip操作符来合并Flow,且多个Flow之间是并行关系
electricFlow.zip(waterFlow) { electric, water ->
totalCost = electric.cost + water.cost
builder.append("${electric.info()},\n").append("${water.info()},\n")
}.zip(internetFlow) { two, internet ->
totalCost += internet.cost
two.append(internet.info()).append(",\n\n总花费:$totalCost")
}.collect {
mTvZipResult.text = it.append(",总耗时:${System.currentTimeMillis() - startTime} ms")
}
}
}
执行结果:
电费: 10.0, 接口请求耗时约500 ms,
水费: 20.0, 接口请求耗时约1000 ms,
网费: 30.0, 接口请求耗时约2000 ms,
总花费:60.0,总耗时:2012 ms
可以看到不但得到了所有接口的数据,而且总耗时基本等于耗时最长的接口的时间,说明zip
操作符合并的多个Flow
内部接口请求是并行的。
推荐阅