(11条消息) Flink之状态后端(StateBackends)

目录

  • (1)状态后端的分类
  • (2)配置状态后端

每传入一条数据,有状态的算子任务都会 读取和更新状态 。由于有效的状态访问对于处 理数据的低延迟至关重要,因此每个并行任务(子任务)都会在本地维护其状态,以确保快速 的状态访问。

状态的存储、访问以及维护,由一个 可插入 的组件决定,这个组件就叫做 状态后端 (state backend)

状态后端主要负责两件事:

  • 本地的状态管理
  • 将检查点(checkpoint)状态写入远程存储

(1)状态后端的分类

状态后端作为一个可插入的组件, 没有固定的配置, 我们可以根据需要选择一个合适 的状态后端。

Flink 提供了 3 中状态后端:

MemoryStateBackend

内存级别的状态后端

  • 构造方法
env.setStateBackend(new MemoryStateBackend( "file://"+
baseCheckpointPath, null).configure(conf, classLoader))
  • 数据存储

    • State 数据存储在TaskManager 内存中
    • Checkpoint 数据数据存储在jobManager 内存
  • 容量限制
    • 单词State maxStateSize默认为5M
    • maxStateSize <= akka.framesize默认10M
    • 总大小不能超过JobMananger的内存
  • 默认后端状态管理器
  • 推荐场景:
    • 本地测试
    • 状态比较少的作业
  • 不推荐生产环境中使用
    • 特点:快速, 低延迟, 但不稳定

FsStateBackend

  • 构造方法
env.setStateBackend(new FsStateBackend(tmpPath))
  • 数据存储:

    • 状态数据:TaskManager 内存
    • Checkpoint:外部文件系统(本地或HDFS)
  • 容量限制:
    • 单个TaskManager上State总量不能超过TM内存
    • 总数据大小不超过文件系统容量
  • 推荐场景:
    • 常规状态作业
    • 窗口时间比较长,如分钟级别窗口聚合,Join等
    • 需要开启HA的作业
  • 可在生产环境中使用
  • 特点:拥有内存级别的本地访问速度, 和更好的容错保证

RocksDBStateBackend

将所有的状态序列化之后, 存入本地的 RocksDB 数据库中.(一种 NoSql 数 据库, KV 形式存储)

  • 创建方法
env.setStateBackend(new RocksDBStateBackend("file://"+
 basecheckpointPath).configure(conf,classLoader))
  • 数据存储

    • State: TaskManager 中的KV数据库(实际使用内存+磁盘)
    • Checkpoint:外部文件系统(本地或HDFS)
  • 容量限制:
    • 单TaskManager 上 State总量不超过其内存+磁盘大小
    • 单 Key最大容量2G
    • 总大小不超过配置的文件系统容量
  • 推荐场景:
    • 超大状态作业
    • 需要开启HA的作业
    • 对状态读写性能要求不高的作业
  • 生产环境可用

(2)配置状态后端

全局配置状态后端

在 flink-conf.yaml 文件中设置默认的全局后端

在代码中配置状态后端
可以在代码中单独为这个 Job 设置状态后端.

env.setStateBackend(new MemoryStateBackend());

env.setStateBackend(new FsStateBackend("hdfs://hadoop102:8020/flink/checkpoints/fs"));

如何要使用 RocksDBBackend, 需要先引入依赖:

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-statebackend-rocksdb_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
env.setStateBackend(newRocksDBStateBackend("hdfs://hadoop102:8020/flink/checkpoints/rocksdb"));

以上内容仅供参考学习,如有侵权请联系我删除!
如果这篇文章对您有帮助,左下角的大拇指就是对博主最大的鼓励。
您的鼓励就是博主最大的动力!

(0)

相关推荐