(11条消息) Flink DataStream 转换
一.简介
基本转换做一个概述,基于时间算子(窗口,水位线)以及其他一些特殊转换会在后面文章介绍。
DataStream API的转换分为四大类:
- 作用于单个事件的基本转换。
- 针对相同键值事件的KeyedStream转换。
- 将多条数据流合并为一条或将一条数据流拆分成多条流转换。
- 对流中的事件进行重新组织的分发转换。
二.基本转换
map
用一个数据元生成一个数据元。一个map函数,它将输入流的值加倍:
DataStream<Integer> dataStream = //...
dataStream.map(new MapFunction<Integer, Integer>() {
@Override
public Integer map(Integer value) throws Exception {
return 2 * value;
}
});
FlatMap
采用一个数据元并生成零个,一个或多个数据元。将句子分割为单词的flatmap函数:
dataStream.flatMap(new FlatMapFunction<String, String>() {
@Override
public void flatMap(String value, Collector<String> out)
throws Exception {
for(String word: value.split(" ")){
out.collect(word);
}
}
});
计算每个数据元的布尔函数,并保存函数返回true的数据元。过滤掉零值的过滤器:
dataStream.filter(new FilterFunction<Integer>() {
@Override
public boolean filter(Integer value) throws Exception {
return value != 0;
}
});
三.基于KeyedStream转换
很多应用需要将事件按照某个属性分组后再进行处理。KeyedStream抽象可以从逻辑上将事件按照键值分配到多条独立子流中。
KeyBy
逻辑上将流分区为不相交的分区。具有相同Keys的所有记录都分配给同一分区。在内部,keyBy()是使用散列分区实现的。指定键有不同的方法。
此转换返回KeyedStream,其中包括使用被Keys化状态所需的KeyedStream。
dataStream.keyBy("someKey") // Key by field "someKey"
dataStream.keyBy(0) // Key by the first element of a Tuple
注意
如果出现以下情况,则类型不能成为key:
- 它是POJO类型但不覆盖hashCode()方法并依赖于Object.hashCode()实现。
- 任何类型的数组。
滚动聚合
滚动聚合转换作用于KeyedStream上,它将生成一个包含聚合结果(求和、最大值、最小值)DataSream。
sum()
滚动计算输入流中指定的字段和。
min()
滚动计算输入流中指定字段最小值。
max()
滚动计算输入流中最大值。
minBy
滚动计算输入流中迄今为止最小值,返回该值所在事件。
maxBy
滚动计算输入流中迄今为止最大值,返回该值所在事件。
val inputStream = env.fromElements((1,2,3),(2,3,1),(2,3,5),(1,5,3))
val resultStream:DataStream[(Int,Int,Int)] = inputStream
.keyBy(0) //以元组第一个字段为键值进行分区
.sum(1) //滚动计算每个分区内元组第二字段的总和
Reduce
将当前数据元与最后一个Reduce的值组合并发出新值。
例如:reduce函数,用于创建部分和的流:
keyedStream.reduce(new ReduceFunction<Integer>() {
@Override
public Integer reduce(Integer value1, Integer value2)
throws Exception {
return value1 + value2;
}
});
Fold
具有初始值的被Keys化数据流上的“滚动”折叠。将当前数据元与最后折叠的值组合并发出新值。
折叠函数,当应用于序列(1,2,3,4,5)时,发出序列“start-1”,“start-1-2”,“start-1-2-3”,. …
DataStream<String> result =
keyedStream.fold("start", new FoldFunction<Integer, String>() {
@Override
public String fold(String current, Integer value) {
return current + "-" + value;
}
});
四.多流转换
Union
DataStream.union()方法可以合并两条或多条类型相同的DataStream。
union执行过程中,来自两条流的事件会以FIFO(先进先出)的方式合并,其顺序无法得到任务保证。
Connect,coMap,coFlatMap
合并类型不同的数据流。
Connect后使用CoProcessFunction、CoMap、CoFlatMap、KeyedCoProcessFunction等API 对两个流分别处理。
CoMap:
val warning = high.map( sensorData => (sensorData.id, sensorData.temperature) )
val connected = warning.connect(low)
val coMap = connected.map(
warningData => (warningData._1, warningData._2, "warning"),
lowData => (lowData.id, "healthy")
)
多个流:
// first stream
val first: DataStream[(Int, Long)] = ...
// second stream
val second: DataStream[(Int, String)] = ...
// connect streams with keyBy
val keyedConnect: ConnectedStreams[(Int, Long), (Int, String)] = first
.connect(second)
.keyBy(0, 0) // key both input streams on first attribute
// connect streams with broadcast
val keyedConnect: ConnectedStreams[(Int, Long), (Int, String)] = first
.connect(second.broadcast()) // broadcast second input stream
Split[DataStream => SplitStream], Select[SplitStream => DataStream]
Split 方法和 union 相反。对于一个输入数据流,可以被分为 0 个或多个输出数据流,因此 split 也可以被当做 filter 使用。
DataStream.split() 方法接收一个 OutputSelector 参数,OutputSelector 定义了一个 select() 方法,用于定义一个数据流元素是怎么被分配到输出的,返回一个 Iterable[String],这些 String 代表输出的名称。
split() 方法返回一个 SplitStream,SplitStream 提供了一个 select 方法,接收刚才说的, String 用于选择一个或者多个输出名称。
val inputStream: DataStream[(Int, String)] = ...
val splitted: SplitStream[(Int, String)] = inputStream
.split(t => if (t._1 > 1000) Seq("large") else Seq("small"))
val large: DataStream[(Int, String)] = splitted.select("large")
val small: DataStream[(Int, String)] = splitted.select("small")
val all: DataStream[(Int, String)] = splitted.select("small", "large")
五.分发转换
随机
根据均匀分布随机分配数据元。
dataStream.shuffle();
rebalance
将输入流中事件以轮流方式均匀分配给后继任务。
分区数据元循环,每个分区创建相等的负载。在存在数据倾斜时用于性能优化。
dataStream.rebalance();
rescale
如果上游 算子操作具有并行性2并且下游算子操作具有并行性6,则一个上游 算子操作将分配元件到三个下游算子操作,而另一个上游算子操作将分配到其他三个下游 算子操作。另一方面,如果下游算子操作具有并行性2而上游 算子操作具有并行性6,则三个上游 算子操作将分配到一个下游算子操作,而其他三个上游算子操作将分配到另一个下游算子操作。
在不同并行度不是彼此的倍数的情况下,一个或多个下游 算子操作将具有来自上游 算子操作的不同数量的输入。
rebalance 和 rescale区别:
生成连接方式不同。
rebalance 会在所有发送任务和接收任务之间建立通信通道。
rescale每个发送任务只会和下游算子的部分任务建立通道。
dataStream.rescale();
广播
dataStream.broadcast();
将输入流中事件复制并发往所有下游算子的并行任务。
全局
global() 方法会将输入流中所有事件发往下游算子的第一个并行任务。(对性能有影响)
自定义
如果所有预定义分区策略都不合适,可以使用partitionCustom()方法自定义分区策略。
val numbers: DataStream[(Int)] = ...
numbers.partitionCustom(myPartitioner, 0)
object myPartitioner extends Partitioner[Int] {
val r = scala.util.Random
override def partition(key: Int, numPartitions: Int): Int = {
if (key < 0) 0 else r.nextInt(numPartitions)
}
}
参考
https://blog.kyleliu.cn/2020/04/10/15.%E5%A4%A7%E6%95%B0%E6%8D%AE/Flink/7-Flink%20DataStreamAPI/
《Stream Processing with Apache Flink》
公众号
名称:大数据计算
微信号:bigdata_limeng