(11条消息) Flink之状态后端(StateBackends)
目录
- (1)状态后端的分类
- (2)配置状态后端
每传入一条数据,有状态的算子任务都会 读取和更新状态 。由于有效的状态访问对于处 理数据的低延迟至关重要,因此每个并行任务(子任务)都会在本地维护其状态,以确保快速 的状态访问。
状态的存储、访问以及维护,由一个 可插入 的组件决定,这个组件就叫做 状态后端 (state backend)
状态后端主要负责两件事:
- 本地的状态管理
- 将检查点(checkpoint)状态写入远程存储
(1)状态后端的分类
状态后端作为一个可插入的组件, 没有固定的配置, 我们可以根据需要选择一个合适 的状态后端。
Flink 提供了 3 中状态后端:
MemoryStateBackend
内存级别的状态后端
- 构造方法
env.setStateBackend(new MemoryStateBackend( "file://"+
baseCheckpointPath, null).configure(conf, classLoader))
- 数据存储
- State 数据存储在TaskManager 内存中
- Checkpoint 数据数据存储在jobManager 内存
- 容量限制
- 单词State maxStateSize默认为5M
- maxStateSize <= akka.framesize默认10M
- 总大小不能超过JobMananger的内存
- 默认后端状态管理器
- 推荐场景:
- 本地测试
- 状态比较少的作业
- 不推荐生产环境中使用
- 特点:快速, 低延迟, 但不稳定
FsStateBackend
- 构造方法
env.setStateBackend(new FsStateBackend(tmpPath))
- 数据存储:
- 状态数据:TaskManager 内存
- Checkpoint:外部文件系统(本地或HDFS)
- 容量限制:
- 单个TaskManager上State总量不能超过TM内存
- 总数据大小不超过文件系统容量
- 推荐场景:
- 常规状态作业
- 窗口时间比较长,如分钟级别窗口聚合,Join等
- 需要开启HA的作业
- 可在生产环境中使用
- 特点:拥有内存级别的本地访问速度, 和更好的容错保证
RocksDBStateBackend
将所有的状态序列化之后, 存入本地的 RocksDB 数据库中.(一种 NoSql 数 据库, KV 形式存储)
- 创建方法
env.setStateBackend(new RocksDBStateBackend("file://"+
basecheckpointPath).configure(conf,classLoader))
- 数据存储
- State: TaskManager 中的KV数据库(实际使用内存+磁盘)
- Checkpoint:外部文件系统(本地或HDFS)
- 容量限制:
- 单TaskManager 上 State总量不超过其内存+磁盘大小
- 单 Key最大容量2G
- 总大小不超过配置的文件系统容量
- 推荐场景:
- 超大状态作业
- 需要开启HA的作业
- 对状态读写性能要求不高的作业
- 生产环境可用
(2)配置状态后端
全局配置状态后端
在 flink-conf.yaml 文件中设置默认的全局后端
在代码中配置状态后端
可以在代码中单独为这个 Job 设置状态后端.
env.setStateBackend(new MemoryStateBackend());
env.setStateBackend(new FsStateBackend("hdfs://hadoop102:8020/flink/checkpoints/fs"));
如何要使用 RocksDBBackend, 需要先引入依赖:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-statebackend-rocksdb_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
env.setStateBackend(newRocksDBStateBackend("hdfs://hadoop102:8020/flink/checkpoints/rocksdb"));
以上内容仅供参考学习,如有侵权请联系我删除!
如果这篇文章对您有帮助,左下角的大拇指就是对博主最大的鼓励。
您的鼓励就是博主最大的动力!
赞 (0)