Flink之Watermarks
1、代码案例
package windowimport com.yangwj.api.SensorReadingimport org.apache.flink.streaming.api.TimeCharacteristicimport org.apache.flink.streaming.api.functions.source.SourceFunctionimport org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractorimport org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment, _}import org.apache.flink.streaming.api.windowing.time.Timeimport scala.util.Random/** * @author yangwj * @date 2021/1/7 21:45 * @version 1.0 */object WaterMarksTime { /** * 窗口:分为时间窗口和计数窗口 * @param args */ def main(args: Array[String]): Unit = { val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) val dataStream: DataStream[SensorReading] = env.addSource(new MySensorSource)// .assignAscendingTimestamps(_.timestamp) //assignAscendingTimestamps 升序数据提取时间 //assignTimestampsAndWatermarks(毫秒)将大部分数据都放入bucket中,得到近似正确数据 .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[SensorReading](Time.seconds(3)) { override def extractTimestamp(t: SensorReading): Long = { t.timestamp } }) //每15秒统计一次温度最小值 val laterTag = new OutputTag[(String, Double, Long)]("later") val value: DataStream[(String, Double, Long)] = dataStream .map(data => (data.id, data.temperature, data.timestamp)) .keyBy(_._1) //按照二元组的第一个元素分组 // .window(TumblingEventTimeWindows.of(Time.seconds(15))) //定义滚动时间窗口 // .window(SlidingEventTimeWindows.of(Time.minutes(5),Time.seconds(5)))//滑动窗口 // .window(EventTimeSessionWindows.withGap(Time.seconds(5)))// 会话窗口 .timeWindow(Time.seconds(15)) //定义滚动时间窗口 // .timeWindow(Time.minutes(5),Time.seconds(5))//滑动窗口 //.countWindow(5)//计数窗口 // .minBy(1) .allowedLateness(Time.minutes(1)) // 这个设定将watermark漏网之鱼进行补抓 .sideOutputLateData(laterTag) .reduce((curs, newd) => (curs._1, curs._2.min(newd._2), newd._3)) value.print("windows") value.getSideOutput(laterTag).print("outSide Stream") env.execute("reduce test") }}//自定义数据源class MySensorSource extends SourceFunction[SensorReading]{ //定义一个标识位,用来表示数据源是否正常运行发出数据 var running :Boolean = true //sourceContext 发送数据 override def run(sourceContext: SourceFunction.SourceContext[SensorReading]): Unit = { //定义无线循环,不断产生数据,除非被cancel val rand = new Random() var curTemp= 1.to(4).map(i => ("sensor" i, rand.nextDouble() * 100)) while (running){ curTemp = curTemp.map(data =>(data._1,data._2 rand.nextGaussian())) val curTime = System.currentTimeMillis() println("输入值:" curTemp ",时间为:" curTime) curTemp.foreach(data => sourceContext.collect(SensorReading(data._1,curTime,data._2))) Thread.sleep(3000) } } override def cancel(): Unit = false}
赞 (0)