(11条消息) Flink DataStream 转换

一.简介

基本转换做一个概述,基于时间算子(窗口,水位线)以及其他一些特殊转换会在后面文章介绍。

DataStream API的转换分为四大类:

  • 作用于单个事件的基本转换。
  • 针对相同键值事件的KeyedStream转换。
  • 将多条数据流合并为一条或将一条数据流拆分成多条流转换。
  • 对流中的事件进行重新组织的分发转换。

二.基本转换

map

用一个数据元生成一个数据元。一个map函数,它将输入流的值加倍:

DataStream<Integer> dataStream = //...
dataStream.map(new MapFunction<Integer, Integer>() {
    @Override
    public Integer map(Integer value) throws Exception {
        return 2 * value;
    }
});

FlatMap

采用一个数据元并生成零个,一个或多个数据元。将句子分割为单词的flatmap函数:

dataStream.flatMap(new FlatMapFunction<String, String>() {
    @Override
    public void flatMap(String value, Collector<String> out)
        throws Exception {
        for(String word: value.split(" ")){
            out.collect(word);
        }
    }
});

计算每个数据元的布尔函数,并保存函数返回true的数据元。过滤掉零值的过滤器:

dataStream.filter(new FilterFunction<Integer>() {
    @Override
    public boolean filter(Integer value) throws Exception {
        return value != 0;
    }
});

三.基于KeyedStream转换

很多应用需要将事件按照某个属性分组后再进行处理。KeyedStream抽象可以从逻辑上将事件按照键值分配到多条独立子流中。

KeyBy

逻辑上将流分区为不相交的分区。具有相同Keys的所有记录都分配给同一分区。在内部,keyBy()是使用散列分区实现的。指定键有不同的方法。

此转换返回KeyedStream,其中包括使用被Keys化状态所需的KeyedStream。

dataStream.keyBy("someKey") // Key by field "someKey"
dataStream.keyBy(0) // Key by the first element of a Tuple

注意
如果出现以下情况,则类型不能成为key:

  • 它是POJO类型但不覆盖hashCode()方法并依赖于Object.hashCode()实现。
  • 任何类型的数组。

滚动聚合

滚动聚合转换作用于KeyedStream上,它将生成一个包含聚合结果(求和、最大值、最小值)DataSream。

sum()

滚动计算输入流中指定的字段和。

min()

滚动计算输入流中指定字段最小值。

max()

滚动计算输入流中最大值。

minBy

滚动计算输入流中迄今为止最小值,返回该值所在事件。

maxBy

滚动计算输入流中迄今为止最大值,返回该值所在事件。

val inputStream = env.fromElements((1,2,3),(2,3,1),(2,3,5),(1,5,3))
val resultStream:DataStream[(Int,Int,Int)] = inputStream
.keyBy(0) //以元组第一个字段为键值进行分区
.sum(1) //滚动计算每个分区内元组第二字段的总和

Reduce

将当前数据元与最后一个Reduce的值组合并发出新值。

例如:reduce函数,用于创建部分和的流:

keyedStream.reduce(new ReduceFunction<Integer>() {
    @Override
    public Integer reduce(Integer value1, Integer value2)
    throws Exception {
        return value1 + value2;
    }
});

Fold

具有初始值的被Keys化数据流上的“滚动”折叠。将当前数据元与最后折叠的值组合并发出新值。

折叠函数,当应用于序列(1,2,3,4,5)时,发出序列“start-1”,“start-1-2”,“start-1-2-3”,. …

DataStream<String> result =
  keyedStream.fold("start", new FoldFunction<Integer, String>() {
    @Override
    public String fold(String current, Integer value) {
        return current + "-" + value;
    }
  });

四.多流转换

Union

DataStream.union()方法可以合并两条或多条类型相同的DataStream。

union执行过程中,来自两条流的事件会以FIFO(先进先出)的方式合并,其顺序无法得到任务保证。

Connect,coMap,coFlatMap

合并类型不同的数据流。

Connect后使用CoProcessFunction、CoMap、CoFlatMap、KeyedCoProcessFunction等API 对两个流分别处理。

CoMap:

val warning = high.map( sensorData => (sensorData.id, sensorData.temperature) )
val connected = warning.connect(low)
val coMap = connected.map(
warningData => (warningData._1, warningData._2, "warning"),
lowData => (lowData.id, "healthy")
)

多个流:

// first stream
val first: DataStream[(Int, Long)] = ...
// second stream
val second: DataStream[(Int, String)] = ...
// connect streams with keyBy
val keyedConnect: ConnectedStreams[(Int, Long), (Int, String)] = first
  .connect(second)
  .keyBy(0, 0) // key both input streams on first attribute
// connect streams with broadcast
val keyedConnect: ConnectedStreams[(Int, Long), (Int, String)] = first
  .connect(second.broadcast()) // broadcast second input stream

Split[DataStream => SplitStream], Select[SplitStream => DataStream]

Split 方法和 union 相反。对于一个输入数据流,可以被分为 0 个或多个输出数据流,因此 split 也可以被当做 filter 使用。

DataStream.split() 方法接收一个 OutputSelector 参数,OutputSelector 定义了一个 select() 方法,用于定义一个数据流元素是怎么被分配到输出的,返回一个 Iterable[String],这些 String 代表输出的名称。

split() 方法返回一个 SplitStream,SplitStream 提供了一个 select 方法,接收刚才说的, String 用于选择一个或者多个输出名称。

val inputStream: DataStream[(Int, String)] = ...
val splitted: SplitStream[(Int, String)] = inputStream
  .split(t => if (t._1 > 1000) Seq("large") else Seq("small"))
val large: DataStream[(Int, String)] = splitted.select("large")
val small: DataStream[(Int, String)] = splitted.select("small")
val all: DataStream[(Int, String)] = splitted.select("small", "large")

五.分发转换

随机

根据均匀分布随机分配数据元。

dataStream.shuffle();

rebalance

将输入流中事件以轮流方式均匀分配给后继任务。

分区数据元循环,每个分区创建相等的负载。在存在数据倾斜时用于性能优化。

dataStream.rebalance();

rescale

如果上游 算子操作具有并行性2并且下游算子操作具有并行性6,则一个上游 算子操作将分配元件到三个下游算子操作,而另一个上游算子操作将分配到其他三个下游 算子操作。另一方面,如果下游算子操作具有并行性2而上游 算子操作具有并行性6,则三个上游 算子操作将分配到一个下游算子操作,而其他三个上游算子操作将分配到另一个下游算子操作。

在不同并行度不是彼此的倍数的情况下,一个或多个下游 算子操作将具有来自上游 算子操作的不同数量的输入。

rebalance 和 rescale区别:

生成连接方式不同。

rebalance 会在所有发送任务和接收任务之间建立通信通道。

rescale每个发送任务只会和下游算子的部分任务建立通道。

dataStream.rescale();

广播

dataStream.broadcast();

将输入流中事件复制并发往所有下游算子的并行任务。

全局

global() 方法会将输入流中所有事件发往下游算子的第一个并行任务。(对性能有影响)

自定义

如果所有预定义分区策略都不合适,可以使用partitionCustom()方法自定义分区策略。

val numbers: DataStream[(Int)] = ...
numbers.partitionCustom(myPartitioner, 0)
object myPartitioner extends Partitioner[Int] {
  val r = scala.util.Random
  override def partition(key: Int, numPartitions: Int): Int = {
    if (key < 0) 0 else r.nextInt(numPartitions)
  }
}

参考

https://blog.kyleliu.cn/2020/04/10/15.%E5%A4%A7%E6%95%B0%E6%8D%AE/Flink/7-Flink%20DataStreamAPI/

《Stream Processing with Apache Flink》

公众号

名称:大数据计算
微信号:bigdata_limeng

(0)

相关推荐