【Kotlin】Flow简介

1 前言

        Flow 是 Kotlin 标准库中的一个新的异步流处理框架,旨在简化异步数据流的操作和处理,它提供了一种声明式的方式来处理数据流。

        Flow 中一些接口调用有些类似 Sequence(详见 → Sequence简介),协程的使用详见 → 协程。

        Flow 有以下特性和概念。

  1. 异步流(Asynchronous Streams):Flow 允许以一种非阻塞的方式处理一系列的值或事件,这使得在处理大量数据或涉及 IO 操作时能够更加高效。

  2. 冷流(Cold Flow):只有在收集器(collector)订阅(或启动)了之后才会开始发射(emit)数据。

  3. 热流(Hot Flow):在创建后就立即开始发射(emit)数据,不管是否有收集器(collector),这会导致收集器可能只接收到部分数据。

  4. 声明式 API:Flow 提供了一套简洁清晰的操作符,允许以声明式的方式对流进行处理,如 map、filter、reduce 等。

  5. 协程集成:Flow 构建在协程之上,因此可以与协程一起使用,并且可以利用协程的优势,比如轻量级、顺序性等。

  6. 取消支持:Flow 支持与协程一样的取消操作,从而释放资源和避免内存泄漏。

  7. 背压支持:Flow 提供了背压支持,可以通过各种操作符来控制数据的生产和消费速率,防止生产者速度过快导致消费者无法跟上。

        Flow 有中间操作和终端操作,如下。

  • 中间操作:每次操作返回一个新的 Flow 对象(主要操作有:flowOn、catch、buffer、conflate、filter、distinctUntilChanged、drop、take、map 等)。
  • 终端操作:每次操作返回一个值或集合,每个 Flow 只能进行一次终端操作(主要操作有:first、last、count、reduce、collect、toCollection、toSet、toList 等)。

2 创建 Flow

2.1 emptyFlow

public fun <T> emptyFlow(): Flow<T> = EmptyFlow

2.2 flow

        1)源码

public fun <T> flow(block: suspend FlowCollector<T>.() -> Unit): Flow<T> = SafeFlow(block)

        2)应用

var coldFlow = flow<String> {  
    emit("A")
    emit("B")
}

2.3 MutableSharedFlow

        1)源码

public fun <T> MutableSharedFlow(
    replay: Int = 0,
    extraBufferCapacity: Int = 0,
    onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND
): MutableSharedFlow<T>

        2)应用

var hotFlow = MutableSharedFlow<String>()
CoroutineScope(Dispatchers.Default).launch {
    hotFlow.emit("A")
    hotFlow.emit("B")
}

2.4 flowOf

        1)源码

public fun <T> flowOf(value: T): Flow<T> = flow {
    emit(value)
}

public fun <T> flowOf(vararg elements: T): Flow<T> = flow {
    for (element in elements) {
        emit(element)
    }
}

        2)应用

var flow1 = flowOf("A")
var flow2 = flowOf("A", "B", "C")

2.5 asFlow

2.5.1 () -> T

        1)源码

public fun <T> (() -> T).asFlow(): Flow<T> = flow {
    emit(invoke())
}

        2)应用

fun main() {
    var fun2 = { fun1() }.asFlow()
}

fun fun1(): String {
    return "xxx"
}

2.5.2 Iterator

        1)源码

public fun <T> Iterator<T>.asFlow(): Flow<T> = flow {
    forEach { value ->
        emit(value)
    }
}

        2)应用

var array = intArrayOf(1, 2, 3)
var iterator = array.iterator()
var flow = iterator.asFlow()

2.5.3 Sequence

        1)源码

public fun <T> Sequence<T>.asFlow(): Flow<T> = flow {
    forEach { value ->
        emit(value)
    }
}

        2)应用

var array = intArrayOf(1, 2, 3)
var sequence = array.asSequence()
var flow = sequence.asFlow()

2.5.4 Array

        1)源码

public fun <T> Array<T>.asFlow(): Flow<T> = flow {
    forEach { value ->
        emit(value)
    }
}

        2)应用

var array = arrayOf(1, 2, 3)
var flow = array.asFlow()

2.5.5 Range

        1)源码

public fun IntRange.asFlow(): Flow<Int> = flow {
    forEach { value ->
        emit(value)
    }
}

        2)应用

var range = 1..3
var flow = range.asFlow()

3 Flow 冷流和热流

3.1 冷流

import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.MainScope
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.launch

fun main() {
    val coldFlow = emitFlow()
    CoroutineScope(Dispatchers.Default).launch {
        coldFlow.collect { value ->
            println("CoroutineScope, $value")
        }
    }
    MainScope().launch(Dispatchers.IO) {
        coldFlow.collect { value ->
            println("MainScope, $value")
        }
    }
    Thread.sleep(1000)
}

fun emitFlow(): Flow<String> = flow {
    for (i in 1..2) {
        emit("emit-$i")
        delay(100)
    }
}

        打印如下。

CoroutineScope, emit-1
MainScope, emit-1
CoroutineScope, emit-2
MainScope, emit-2

        说明:可以看到每一个订阅者都可以收到所有消息。

3.2 热流

import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.MainScope
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.MutableSharedFlow
import kotlinx.coroutines.launch

fun main() {
    var hotFlow = emitFlow()
    CoroutineScope(Dispatchers.Default).launch {
        hotFlow.collect { value ->
            println("CoroutineScope, $value")
        }
    }
    MainScope().launch(Dispatchers.IO) {
        hotFlow.collect { value ->
            println("MainScope, $value")
        }
    }
    Thread.sleep(1000)
}

fun emitFlow(): MutableSharedFlow<String> {
    var hotFlow = MutableSharedFlow<String>()
    CoroutineScope(Dispatchers.Default).launch {
        for (i in 1..2) {
            hotFlow.emit("emit-$i")
            delay(100)
        }
    }
    return hotFlow
}

        打印如下。

MainScope, emit-2
CoroutineScope, emit-2

        说明:可以看到每一个订阅者都只收到部分消息。

4 Flow 的中间操作

4.1 源码

// 切换线程
public fun <T> Flow<T>.flowOn(context: CoroutineContext): Flow<T>
// 捕获异常
public fun <T> Flow<T>.catch(action: suspend FlowCollector<T>.(Throwable) -> Unit): Flow<T>
// 在数据流中使用一个缓冲区来存储数据, 当数据产生速率超过消费速率时, 数据会暂时存储在缓冲区中, 直到有足够的空间将其传递给订阅者。这可以确保数据不会丢失,但可能会占用更多的内存。
public fun <T> Flow<T>.buffer(capacity: Int = BUFFERED, onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND): Flow<T>
// 当数据产生速率超过消费速率时, 跳过一些数据, 只保留最新的数据。这样可以减少内存占用,但会丢失一部分数据。
public fun <T> Flow<T>.conflate(): Flow<T> = buffer(CONFLATED)
// 过滤
public inline fun <T> Flow<T>.filter(crossinline predicate: suspend (T) -> Boolean): Flow<T>
// 去除相邻的重复元素
public fun <T> Flow<T>.distinctUntilChanged(): Flow<T>
// 丢弃前 n 个元素
public fun <T> Flow<T>.drop(count: Int): Flow<T>
// 截取前 n 个元素
public fun <T> Flow<T>.take(count: Int): Flow<T>
// 映射(T -> R)
public inline fun <T, R> Flow<T>.map(crossinline transform: suspend (value: T) -> R): Flow<R>

4.2 应用

fun main() {
    var array = arrayOf(4, 9, 1, 8, 5, 7, 7, 5, 3, 6, 2)
    var flow = array.asFlow()
    CoroutineScope(Dispatchers.Default).launch {
        flow.flowOn(Dispatchers.Default)
            .catch {
                println(it.message)
            }.buffer()
            .filter { it in 3..7 } // 4, 5, 7, 7, 5, 3, 6
            .distinctUntilChanged() // 4, 5, 7, 5, 3, 6
            .drop(1) // 5, 7, 5, 3, 6
            .take(4) // 5, 7, 5, 3
            .map { it * it } // 25, 49, 25, 9
            .collect {
                println(it)
            }
    }
    Thread.sleep(1000)
}

5 Flow 的终端操作

5.1 first、last、count

        1)源码

// 首元素
public suspend fun <T> Flow<T>.first(): T
// 尾元素
public suspend fun <T> Flow<T>.last(): T

        2)应用

fun main() {
    var array = arrayOf(3, 5, 7, 6)
    var flow = array.asFlow()
    CoroutineScope(Dispatchers.Default).launch {
        println(flow.first()) // 3
        println(flow.last()) // 6
        println(flow.count()) // 4
    }
    Thread.sleep(1000)
}

5.2 collect

        1)源码

public suspend fun collect(collector: FlowCollector<T>)

        2)应用

fun main() {
    var array = arrayOf(1, 3, 5, 7)
    var flow = array.asFlow()
    CoroutineScope(Dispatchers.Default).launch {
        flow.collect {
                println(it) // 1, 3, 5, 7
            }
    }
    Thread.sleep(1000)
}

5.3 reduce

        1)源码

// 规约运算,定义运算 o, result = ((((e1 o e2)) o e3) o e4) o ...
public suspend fun <S, T : S> Flow<T>.reduce(operation: suspend (accumulator: S, value: T) -> S): S

        2)应用

fun main() {
    var array = arrayOf(1, 3, 5)
    var flow = array.asFlow()
    CoroutineScope(Dispatchers.Default).launch {
        var sum = flow.reduce(Integer::sum)
        println(sum) // 9
        // 1*1-3*3=-8, (-8)*(-8)-5*5=39
        var res = flow.reduce { e1, e2 ->
            e1 * e1 - e2 * e2
        }
        println(res) // 39
    }
    Thread.sleep(1000)
}

5.4 集合转换

        1)源码

public suspend fun <T, C : MutableCollection<in T>> Flow<T>.toCollection(destination: C): C
public suspend fun <T> Flow<T>.toSet(destination: MutableSet<T> = LinkedHashSet()): Set<T>
public suspend fun <T> Flow<T>.toList(destination: MutableList<T> = ArrayList()): List<T>

        2)应用

fun main() {
    var array = arrayOf(1, 3, 5)
    var flow = array.asFlow()
    CoroutineScope(Dispatchers.Default).launch {
        var set1 = flow.toCollection(mutableSetOf()) // [1, 3, 5]
        var list1 = flow.toCollection(mutableListOf()) // [1, 3, 5]
        var set2 = flow.toSet() // [1, 3, 5]
        var list2 = flow.toList() // [1, 3, 5]
    }
    Thread.sleep(1000)
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/568553.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

SpringBoot xxl-job 任务调度

首先官网下载xxl-job的源代码&#xff0c;然后切换到jdk8&#xff0c;等Maven下载依赖 执行mysql的脚本&#xff0c;修改连接配置&#xff0c;启动admin站点 默认地址 http://localhost:8080/xxl-job-admin/ 先新增一个任务执行器&#xff0c;指向未来任务代码的站点 然后在…

[论文笔记] EcomGPT:COT扩充数据的电商大模型

社区供稿 | EcomGPT:基于任务链数据的电商大模型(附魔搭推理实践) - 知乎 https://arxiv.org/pdf/2312.15696.pdf EcomInstruct指令数据集构建 数据集组成 COT方式构造垂域训练数据:把原本的垂域任务分解成了原子任务,构造了基于解决原子任务的数据。这样能用类似…

OpenTelemetry-2.Go接入Jaeger(grpc,gin-http)

目录 1.什么是OpenTelemetry 2.搭建jaeger 3.链路追踪 本地调用 远程调用 GRPC proto server端 client端 Gin-HTTP 调用流程 api1 api2 grpc 4.完整代码 1.什么是OpenTelemetry 参考&#xff1a;OpenTelemetry-1.介绍-CSDN博客 2.搭建jaeger 参考&#xff1a;…

Rest微服务案例

Rest 父工程构建microservicecloud-api公共子模块Modulemicroservicecloud-provider-dept-8001部门微服务提供者Modulemicroservicecloud-consumer-dept-80部门微服务消费者Module 以Dept部门模块做一个微服务通用案例 Consumer消费者&#xff08;Client&#xff09;通过REST调…

阿里开源黑白图片上色算法DDColor的部署与测试并将模型转onnx后用c++推理

阿里开源黑白图片上色算法DDColor的部署与测试并将模型转onnx后用c推理 文章目录 阿里开源黑白图片上色算法DDColor的部署与测试并将模型转onnx后用c推理简介环境部署下载源码安装环境下载模型 测试一下看看效果模型转onnx测试一下生成的onnx模型看看效果C 推理 简介 DDColor是…

代码随想录算法训练营DAY32|C++贪心算法Part.2|122.买卖股票的最佳时机II、55.跳跃游戏、45.跳跃游戏II

文章目录 122.买卖股票的最佳时机II思路CPP代码 55.跳跃游戏思路CPP代码 45.跳跃游戏II思路方法一代码改善 CPP代码 122.买卖股票的最佳时机II 力扣题目链接 文章讲解&#xff1a;122.买卖股票的最佳时机II 视频讲解&#xff1a; 状态&#xff1a;本题可以用动态规划&#xff0…

Linux进程地址空间及其页表

文章目录 一、引言二、Linux进程地址空间的概念1、进程地址空间定义2、进程地址空间的组成3、进程地址空间与物理内存的关系 三、页表与内存映射1、页表的定义及作用2、页表的缺页中断 三、进程的写时拷贝 一、引言 在Linux中&#xff0c;进程管理是其核心功能之一&#xff0c…

OpenCV如何实现拉普拉斯算子的离散模拟

返回:OpenCV系列文章目录&#xff08;持续更新中......&#xff09; 上一篇&#xff1a;OpenCV的Sobel 衍生品 下一篇 &#xff1a;OpenCV 如何实现边缘检测器 目标 在本教程中&#xff0c;您将学习如何&#xff1a; 使用 OpenCV 函数 Laplacian&#xff08;&#xff09; 实…

Docker基本管理和虚拟化

一、docker的发展历史 https://www.cnblogs.com/rongba/articles/14782624.htmlhttps://www.cnblogs.com/rongba/articles/14782624.html 二、docker的概述 Docker是一个开源的应用容器引擎&#xff0c;基于go语言开发并遵循了apache2.0协议开源。 Docker是在Linux容器里运行…

Amazon云计算AWS之[2]弹性计算云EC2

文章目录 说明EC2基本架构Amazon机器映象&#xff08;AMI&#xff09;实例&#xff08;Instance&#xff09;弹性块存储&#xff08;EBS&#xff09; EC2关键技术地理区域和可用区域EC2通信机制弹性负载均衡监控服务自动缩放服务管理控制台 EC2安全及容错机制EC2弹性IP地址 说明…

开曼群岛:Web3企业的乐园

开曼群岛&#xff1a;Web3企业的理想之地 开曼群岛&#xff0c;在数字革命中大放异彩。近年来&#xff0c;该地区成立的Web3企业数量显著增加&#xff0c;如果保持目前的发展速度&#xff0c;并持续优化立法&#xff0c;那么扩展的速度将无可限量。本文将探讨推动这一增长的关…

使用Pycharm运行spark实例时没有pyspark包(ModuleNotFoundError: No module named ‘py4j‘)

一、问题描述 在安装并配置pyspark&#xff0c;下载并打开Pycharm&#xff08;专业版&#xff09;后进行spark实例操作&#xff08;笔者以统计文件中的行数为例&#xff09;时&#xff0c;运行程序后提示ModuleNotFoundError: No module named py4j&#xff1a; 二、解决办法 …

小程序线多点路图绘制

需求 当接口返回一连串地图坐标&#xff0c;需要根据这些坐标串联起来&#xff0c;形成一个线路图&#xff08;本次使用步行导航线路图&#xff09;。 思路 首先优先想到使用小程序Map组件的polyline属性去进行展示。但是我们发现直接使用该属性进行坐标绘制画出来的数据都是…

恶补《操作系统》2_1——王道学习笔记

2操作系统-进程 2.1_1 进程的定义、组成、组织方式、特征 组成&#xff1a;PCB&#xff08;进程存在唯一的标志&#xff09;&#xff0c;程序段&#xff0c;数据段 组织方式&#xff1a;链接方式&#xff0c;指针指向不同的队列&#xff1b;索引方式&#xff0c;索引表 特征…

fnm:Rust开发的高效Node版本管理工具

简介 fnm 是一个基于 Rust 开发的 Node 版本管理工具&#xff0c;它的目标是提供一个快速、简单且可靠的方式来管理 Node.js 的不同版本。同时&#xff0c;它是跨平台的&#xff0c;支持 macOS、Linux、Windows。&#x1f680; Fast and simple Node.js version manager, buil…

ISP比普通的静态代理相比有什么优势?

ISP&#xff08;Internet Service Provider&#xff09;&#xff0c;即互联网服务提供商&#xff0c;是向广大用户综合提供互联网接入业务、信息业务、增值业务的电信运营商。而静态代理则是一个固定不变的代理IP地址&#xff0c;具有稳定性强、兼容性好和管理方便等特点。当我…

QT中基于TCP的网络通信

QT中基于TCP的网络通信 QTcpServer公共成员函数信号 QTcpSocket公共成员函数信号 通信流程服务器端通信流程代码 客户端通信流程代码 使用Qt提供的类进行基于TCP的套接字通信需要用到两个类&#xff1a; QTcpServer&#xff1a;服务器类&#xff0c;用于监听客户端连接以及和客…

再谈C语言——理解指针(二)

指针变量类型的意义 指针变量的⼤⼩和类型⽆关&#xff0c;只要是指针变量&#xff0c;在同⼀个平台下&#xff0c;⼤⼩都是⼀样的&#xff0c;为什么还要有各种各样的指针类型呢&#xff1f; 其实指针类型是有特殊意义的&#xff0c;我们接下来继续学习。 指针的解引⽤ 对⽐…

Linux下的UDEV机制/守护进程

一. Udev机制概念引入 ( 需要在 etc/udev/rules.d/ 下创建设备的相关规则&#xff0c;不然有可能udev机制生成的设备文件不具备可读可写的权限&#xff0c;adb无法成功通过该设备文件访问设备 ) a. 创建文件夹 sudo vim Xiaomi-audroid.rules b. 添加规则 …

Laravel 6 - 第十四章 响应

​ 文章目录 Laravel 6 - 第一章 简介 Laravel 6 - 第二章 项目搭建 Laravel 6 - 第三章 文件夹结构 Laravel 6 - 第四章 生命周期 Laravel 6 - 第五章 控制反转和依赖注入 Laravel 6 - 第六章 服务容器 Laravel 6 - 第七章 服务提供者 Laravel 6 - 第八章 门面 Laravel 6 - …
最新文章