Flink之Watermarks

1、代码案例

package windowimport com.yangwj.api.SensorReadingimport org.apache.flink.streaming.api.TimeCharacteristicimport org.apache.flink.streaming.api.functions.source.SourceFunctionimport org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractorimport org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment, _}import org.apache.flink.streaming.api.windowing.time.Timeimport scala.util.Random/** * @author yangwj * @date 2021/1/7 21:45 * @version 1.0 */object WaterMarksTime {  /**   * 窗口:分为时间窗口和计数窗口   * @param args   */  def main(args: Array[String]): Unit = {    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)    val dataStream: DataStream[SensorReading] = env.addSource(new MySensorSource)//      .assignAscendingTimestamps(_.timestamp)  //assignAscendingTimestamps 升序数据提取时间      //assignTimestampsAndWatermarks(毫秒)将大部分数据都放入bucket中,得到近似正确数据      .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[SensorReading](Time.seconds(3)) {        override def extractTimestamp(t: SensorReading): Long = {          t.timestamp        }      })    //每15秒统计一次温度最小值    val laterTag = new OutputTag[(String, Double, Long)]("later")    val value: DataStream[(String, Double, Long)] = dataStream      .map(data => (data.id, data.temperature, data.timestamp))      .keyBy(_._1) //按照二元组的第一个元素分组      // .window(TumblingEventTimeWindows.of(Time.seconds(15))) //定义滚动时间窗口      //      .window(SlidingEventTimeWindows.of(Time.minutes(5),Time.seconds(5)))//滑动窗口      //      .window(EventTimeSessionWindows.withGap(Time.seconds(5)))// 会话窗口      .timeWindow(Time.seconds(15)) //定义滚动时间窗口      //      .timeWindow(Time.minutes(5),Time.seconds(5))//滑动窗口      //.countWindow(5)//计数窗口      //      .minBy(1)      .allowedLateness(Time.minutes(1)) // 这个设定将watermark漏网之鱼进行补抓      .sideOutputLateData(laterTag)      .reduce((curs, newd) => (curs._1, curs._2.min(newd._2), newd._3))    value.print("windows")    value.getSideOutput(laterTag).print("outSide Stream")    env.execute("reduce test")  }}//自定义数据源class MySensorSource extends SourceFunction[SensorReading]{  //定义一个标识位,用来表示数据源是否正常运行发出数据  var running :Boolean = true  //sourceContext 发送数据  override def run(sourceContext: SourceFunction.SourceContext[SensorReading]): Unit = {    //定义无线循环,不断产生数据,除非被cancel    val rand = new Random()    var curTemp= 1.to(4).map(i => ("sensor"   i, rand.nextDouble() * 100))    while (running){      curTemp = curTemp.map(data =>(data._1,data._2 rand.nextGaussian()))      val curTime = System.currentTimeMillis()      println("输入值:" curTemp ",时间为:" curTime)      curTemp.foreach(data => sourceContext.collect(SensorReading(data._1,curTime,data._2)))      Thread.sleep(3000)    }  }  override def cancel(): Unit = false}

来源:https://www.icode9.com/content-4-818051.html

(0)

相关推荐