回顾·基于HBase实时数仓探索实践

本文根据上海久耶大数据研发工程师武基鹏在中国HBase技术社区第四届MeetUp上海站中分享的《基于HBase实时数仓探索实践》编辑整理而成。

今天从六个方面介绍,首先是久耶第一代离线数仓以及第二代实时数仓。接下来介绍下公司业务场景和业务开发,基于HBase的开发流程,然后介绍下公司CDH集群,介绍下CDH集群调优监控。最后分享两个生产案例。

第一代离线数仓是在去年三月份上线,主要是基于OMS和WMS,由于分库分表,大约有十几个库。前期通过SQOOP进行数据抽取,后来由于SQOOP的一些问题采用了阿里开源的DataX,时间粒度使用脚本调度实现T+1模式抽取到Hive。作分析时采用Apache的Kylin,将数据直接存入HBase中,最初数仓建立用于数表查询,用的是Superset,后面也用了Saiku。六个月就被淘汰,因为离线数仓有个问题,业务部分要查询数据,但是你的时间粒度是昨天的,业务部分需要看当天一段时间内的数据,离线是无法满足。

接下来在另一个集群构建实时数仓,其选型主要有两个,第一个是实时,实时采集利用Maxwell,直接采集公司数据库MySQL,将数据直接以json格式发送到kafka,数仓存储选型是HBase。

为什么选择Maxwell呢,第一个它能够使用“select * from Table”进行bootstrapping初始化数据,在大数据构建时可以利用Maxwell进行全表扫描,这句SQL会自动触发Maxwell某个线程进行数据拉取。第二个Maxwell支持断点还原功能,大数据平台架构不光考虑到高可靠、高性能,也要保证数据零丢失,它支持记录MySQL的post日志进行数据还原,这是当初选择最重要的原因。第三个Maxwell将数据从MySQL发送到Kafka,Kafka是分区的,如何保证全局有序是个问题。它能保证这个特性,支持database, table, primary key, or column的拼接,将数据发送到某个分区;比如一条业务数据在业务系统先在insert再做update再做delete,Kafka会将这三者发送到三个分区,key值为空不会记录,在销毁时用sparkstreaming可能会以delete、update、insert顺序,会造成数据紊乱。我们希望将这些特征数据发送到Kafka一个分区,而Kafka单分区是有序的。第四个Maxwell也会将这些数据发送到后端,当业务数据的表需要升级,如加索引、加字段,可以通过alert语句解析捕获,进行同步更新到HBase中。因此基于这四点要求选择了Maxwell,没有选择当前其他开源产品。

接下来讲一下为什么选择HBase而不选择pudu等产品。第一个是HBase是分布式、可缩的。第二个是随机的读和写,第三个HBase支持百万列。第三个介绍下为什么要选择Phoenix,首先原因是支持SQL,利用原生HBase进行查询、代码分析比较吃力。第二个我们构建的表是盐表,能够解决热点问题,避免一个节点很繁忙另一个节点很闲。第三Phoenix支持二级索引,由于表是盐表(分区),索引也是分区的。第四个支持Spark,可以直接将表传入Phoenix而不用通过HBase,有利于传统开发人员转型,而不用专注于底层HBase。

基于CDH HBase版本构建Phoenix版本历程,phoenix-for-cloudera-4.9、HBase-1.2、cdh5.9,这个存在问题,然后采用apache-phoenix-4.11.0、HBase-1.2,最后采用phoenix-for-cloudera-4.10、HBase-1.2、cdh5.12。cdh5.11的邮件配置存在bug。

进行编译的原因是去年Phoenix官方是不支持CDH版本,目前是支持的。编译时将pom文件,改为CDH支持,然后改生产需要的Spark版本。修复SYSTEM.MUTEX表在分布式的计算时,多次创建错误。QueryServicesOptions.java文件修改参数DEFAULT_IS_NAMESPACE_MAPPING_ENABLED=true。Phoenix存在一个问题就是时区,比如一条上午十点的业务数据在Phoenix周转下,时间数据会减一个8小时。修改DateUtil.java文件timezone为”Asia/Shanghai”,但是读写两种只解决了一种,而业务代码开发需要经过Phoenix架构JDBC,数据还是会出错,上面只解决了查询,后来采用下面改动,然后编译。

上图是实时数仓架构图,主要的存储层还是以HBase为主。第一层业务系统数据库在阿里云平台上,有OMS、WMS,Report DB是OMS和WMS的重复,将里面的数据全部同步于一台机器,使用的就是Maxwell,其支持白名单和黑名单。业务平台的表可能有两三百个,大数据平台的计算可能只需要100多个,可以添加白名单,有些表的数据就可以不用过来了。这些数据通过Json发送到Kafka,然后通过Spark streaming去消费Kafka,通过JDBC写入HBase。表不是通过Phoenix语句创建,不关心底层HBase,只需要通过Phoenix像MYSQL一样查询即可。同时会将计算结果存储到Redis,久耶慧策应用也会将数据写入ES里面。中间一层就是常见应用开发,如Spark Streaming、Spark SQl,也用Python和R语言。调度平台起先用的是Azkaban,然后是Airflow,最后用的是Oozie。上图蓝色是实时大屏,红色是全球仓库,大约有四十几个,数据绑定用的是saiKU,将Phoenix架包集成进去,saiKU分上卷和下卷,业务人员依据自己的需求去拿行和列数据,saiKU通过Phoenix组装SQL语句查询结果数据。也用到zeppelin,这是Spark交互式开发必须用到的。

接下来讲一下数据仓库,首先是模型建设,第一层是基础表,在Phoenix中建立与MySQL一样的表。在基础表的基础上构建事实表(订单实时发生的表)和维度表(如中国有多少省多少市等更新不是很大的表),依据事实表和维度表进行代码开发,构建领域表,就是依据业务需求得出的结果存到领域表。数据校验是通过数据量比对,起先是在重库时做触发器,但是MySQL重库触发器支持不友好。通过改造Phoenix代码将数据写入Redis,增加加一删除减一,MySQL数据和HBase数据是一天一查一对比,当不相等直接调用shell脚本进行全表扫描。当前只采用OMS、WMS的库,QPS处于2000,1条数据: 平均60列 495b。

业务场景开始是业务报表开发,有客诉妥投、ABC订单、商业季度等。也提供一个BI自助分析,第三个就是双十一大屏和龙虎榜,同时使用了BMS系统,是一个商业结算系统。第五个是今年做的领导层和客户层的慧策,商业决策分析。

业务开发套路就是依据业务需求将数据存在那些表里面,需要将构建表的语句提取出来构建Phoenix Table,然后Kafka+Spark Streaming+Phoenix进行数据的插入。接着就是Spark开发读和写,我们还利用了DBeaver。我们建表使用了联合组件,由于公司集群规模不是很高,regionServer是38台,COMPRESSION 是使用SNAPPY,这是依据压缩比、解压性能。

接下来是一个经典开发案例Kafka+Spark Streaming+Phoenix,Phoenix可以理解为MySQL架包的JDBC。我们并没有使用Phoenix的Pool池,官方也推荐使用正常JDBC文件,因为JDBC已经支持长连接,foreachPartition拿到Phoenix的JDBC,中间进行常见数据处理,Kafka接收过来数据是Json格式,如何将其转化为Phoenix的upset语法和delete语法,完成后就将连接关闭。

数据流入Phoenix大数据平台是通过bootstream的全表扫描,其增量数据也是实时进入。业务代码开发首先将架包导入pom文件,如何找维度是将Phoenix的Apache下载到IDEA,在测试类里面查找。Phoenix+Spark读取有好几种,选择以上写法原因有:首先其支持列裁剪,第二支持where条件,configuration指的是Spark的HDFS的conf。

业务开发是多张表,Spark表是df,接下来就和Phoenix和HBase无关。接下来就是对接Spark业务开发逻辑处理,最后结果集会回写到HBase中。还是通过Phoenix写入,有追加、overwrite。HBase没有很好地可视化工具,利用DBeaver,支持MYSQL、Oracle等所有数据库类型,也支持二次开发借助于接口实现。

接下来介绍下集群调优参数,分为六个方面:(1)Linux parameters、(2)HDFS parameters、(3)HBase parameters、(4)GC parameters、(5)Monitor、(6)Bug。句柄数、文件数、线程数这些都是要调,因为regionserver在操作时需要open file,处理时需要用到一些线程,一些系统都是架设在Linux上,因此集群调优都需要调它。需要注意的是改完后需要检查是否生效,立即生效是sysctt-p。Spark开发需要将数据频繁的写入HBase中,HBase底层是HDFS,在写入时就会出现问题,最后发现Linux系统参数没有调。

在正常的HBase节点机器上,swap是设置为0,这并不是禁用swap而是其惰性是最大的。由于我们公司由于业务系统较多,吃的内存比较紧,因此设为10,这样可以使job慢一点但是不能挂,但是如果做实时就需要设置为0。这个最终设置取决于你们自身业务环境,选择自己需要的就好。如果做CBH的平台部署必须要关闭大页面。

接下来分享一个有意思的参数HDFS Parameters,正常调优是CBH界面打开、HBase的xml文件打开。主要调优是timeout和handler参数,将其几倍放大,socket.timeout在HBase的xml文件一定要部署,否则无法支持高并发操作。

当一个本机线程无法创建一个本机线程,这段代码打在HDFS的dataload,当时dataload的内存配置是8G,实际只使用1G,这个时候就休要加上echo "kernel.threads-max=196605"->/etc/sysctl.conf,echo"kernel.pid_max=196605"->/etc/sysctl.conf,echo "vm.max_map_count=393210"-> /etc/sysctl.conf三个参数,这其实是底层Linux抛出的错误。提醒一点socket.timeout参数不仅在HDFS中需要配置,在HBase中也需要配置。

GC是regionserver配置,但是配置是CDH配置,GC默认垃圾选择器是CMS,需要将其改为GE,如果需要配置可以去尝试下,小米以前分享过。可以对参数进行调试进行压错调优,尤其大数据平台开发尤其如此。

项目上线需要做监控,第一个就是HBase的读和写,绿色是写,但是读存在两个波峰,因为我们的调度平台以一个小时将所有job调度完。图中Y轴是每秒的请求量,如果写的量上来了或者读的波峰没有规律,就有可能是集群宕了。

第二个监控的指标是FDS,就是Regionserver的文件句柄数,如果请求很多,句柄数会很高,因为其底层依赖于Linux,如果超过Linux设置值机器容易夯住下线,导致CPU不正常,这时需要后台强制机器下线。然后需要监控Zookeeper,监控的是Zookeeper Open Connections,因为HBase进行操作需要打开的连接,当业务场景为长服务,如Spark streaming一直运行,先前尝试用SparkSQL+Phoenix做一个长服务,因为调度都是通过shell脚本调度,在资源紧张时需要抢资源,在submit时需要申请资源(大约30S),线上是不允许的。最后采用Spark streaming+Spark SQL+Phoenix JDBC,Spark streaming是实时的每隔一小时判断进行数据处理,这个时候Zookeeper Open Connections就随着递增趋势上涨,当到Connection数(默认500)CDH会杀掉。后来改为水平,利用PHOENIX-4319:Zookeeper connection should be closed immediately解决问题。

接下来讲一下Kafka如何做监控,其实只需要上面一幅图,上图绿色指标读,Received是蓝色线,相当于生产者写到Kafka里面,绿色是Spark streaming进行消费,相当于Fetched。这幅图相当于实时同步架构,消息没有做积压。但是为什么波峰会比它高,原因是数据通过Maxwell发送到Kafka时是一个Json数据,但是Kafka消费时需要额外加一些东西(来自哪个topic、offset是什么等),如果两条线没问题就是没出问题。

Bug方面,PHOENIX-4056:java.lang.IllegalArgumentException: Can not create a Path from an empty string,先前有问题采用降版本,目前已经解决,方案在社区里有。SPARK-22968:java.lang.IllegalStateException: No current assignment for partition kssh-2,这个是Sparkstreaming读Kafka时抛出的错误,这个在Spark2.4.0有新的补丁。

接下来分享两个案例,分为两种,一种是3次RIT,园区断电机器挂掉出现RIT。HBase有个WAL,数据基本不会丢,只需要将机器重启。重启过程会有一些RIT操作,如果regionserver挂了申请维护时间,尝试重启regionserver节点,如果不行重启HBase集群,这个时候需要看HBase的master的active的log日志。还有一次是高并发内存不够用,regionserver挂掉,重启后在CDH的HBase运行正常,但是在监控页面HBase还是异常,这时候只需要将CMS的serviceMonitor重启就OK。第三次RIT事故regionserver挂掉,尝试使用HBCK命令修复问题还是很多。最后通过日志分析发现Hlog有问题,通过HDFS命令将文件移到某个地方,重启就OK了。丢失的数据通过Maxwell恢复,预估事故发生点通过全表扫描进行恢复。

接下来分享一个三支烟的故事,数据来源于阿里云,自建机房需要通过***将数据拉倒本地机房。双十一所有仓库都在运作,MySQL机器扛不住导致延迟比较大,延迟约半个小时。需要在T2将数据完全恢复,解决方案直接将Maxwell架设到阿里云进行实时同步,数据进行全表扫描,只需要扫描大屏显示需要的数据,将T1到T2的数据进行SparkSQL,将计算结果写到redis里面,Sparkstreaming进行现场改,只判断T2流进的数据才会将T2的基础值进行累积计算,实时Job跑了15分钟数据就实时过来了。

作者介绍:

武基鹏,上海久耶供应链管理有限公司大数据研发工程师。主要从事大数据平台产品的技术工作;负责设计、构建和优化基于HDFS/HBase的存储平台架构;负责提升Hadoop/HBase等集群的高可用性、高性能、高扩展特性;负责基于Spark开发及性能调优。


(0)

相关推荐

  • 大数据平台常用工具集介绍

    提起大数据,不得不提由IBM提出的关于大数据的5V特点:Volume(大量).Velocity(高速).Variety(多样).Value(低价值密度).Veracity(真实性),而对于大数据领域的 ...

  • 好未来基于DorisDB的全新实时数仓实践

    好未来(NYSE:TAL)是一家以智慧教育和开放平台为主体,以素质教育和课外辅导为载体,在全球范围内服务公办教育,助力民办教育,探索未来教育新模式的科技教育公司.截至2020年11月底,好未来在102 ...

  • 基于Flink构建实时数仓实践

    导读 随着公司用户增长业务快速发展,陆续孵化出 部落.同镇.C 端会员.游戏等非常多的业务板块.与此同时产品及运营对实时数据需求逐渐增多,帮助他们更快的做出决策,更好的进行产品迭代,实时数仓的建设变得 ...

  • 基于Kafka Flink平台化设计,实时数仓还能这样建

    背景 Flink + Kafka 平台化设计 Kafka 在实时数仓中的应用 问题 & 改进 一.背景介绍 1.流平台通用框架 目前流平台通用的架构一般来说包括消息队列.计算引擎和存储三部分, ...

  • 基于 Flink ClickHouse 打造轻量级点击流实时数仓

    Flink 和 ClickHouse 分别是实时计算和(近实时)OLAP 领域的翘楚,也是近些年非常火爆的开源框架,很多大厂都在将两者结合使用来构建各种用途的实时平台,效果很好.关于两者的优点就不再赘 ...

  • 腾讯看点王展雄:实时数仓与多维实时分析系统搭建

    近几年,数字驱动的口号越喊越响,在这样一个用数据说话的时代,数据在一定程度上决定企业的业务和决策.而从数据驱动的方面考虑,多维实时数据分析系统的重要性也不言而喻.但是当数据量巨大的情况下,企业对数据技 ...

  • 菜鸟实时数仓技术架构演进

    『平时多流汗,战时少流血』 分享嘉宾:贾元乔 菜鸟 高级数据技术专家 编辑整理:夏飞飞 内容来源:Flink Forward ASIA 出品平台:DataFunTalk 注:转载请在公众号后台回复&q ...

  • 当 TiDB 与 Flink 相结合:高效、易用的实时数仓

    随着互联网飞速发展,企业业务种类会越来越多,业务数据量会越来越大,当发展到一定规模时,传统的数据存储结构逐渐无法满足企业需求,实时数据仓库就变成了一个必要的基础服务.以维表 Join 为例,数据在业务 ...

  • 生产队的驴永不懈怠 | 尚硅谷Flink实时数仓视频教程

    摘要:这世间青山灼灼星光杳杳,秋雨淅淅晚风慢慢,也抵不过公子眉目间的星辰和大数据视频. 我趁老婆洗澡,看了一眼她手机, 发现她和丈母娘的语音聊天. 老婆说:今天胸口闷得慌, 老婆说:待会儿把他揍一顿出 ...

  • 校长专栏丨江志明:基于生本的美丽学校实践探索

    嘉宾简介:江志明,杭州下沙中学.景苑中学校长.(以下内容根据江志明校长于2018年第三届基础教育美丽学校建设研讨会上的演讲整理.) 杭州市下沙中学于1965年建校,2008年加入杭四中教育集团:200 ...