关于我们 | 联系我们

亚博yabo888vip官网|yabo888vip

成功案例
当前位置:主页 > 成功案例 >

趣头条基于Flink+ClickHouse的实时数据分析平台

本文摘要:趣头条一直致力于使用大数据分析指导业务生长。现在在实时化领域主要使用 Flink+ClickHouse 解决方案,笼罩场景包罗实时数据报表、Adhoc 即时查询、事件分析、漏斗分析、留存分析等精致化运营计谋,整体响应 80% 在 1 秒内完成,大大提升了用户实时取数体验,推动业务更快迭代生长。本次分享主要内容:业务场景与现状分析Flink to Hive 的小时级场景Flink to ClickHouse 的秒级场景未来计划趣头条的查询页面,分为离线查询和实时查询。

亚博yabo

趣头条一直致力于使用大数据分析指导业务生长。现在在实时化领域主要使用 Flink+ClickHouse 解决方案,笼罩场景包罗实时数据报表、Adhoc 即时查询、事件分析、漏斗分析、留存分析等精致化运营计谋,整体响应 80% 在 1 秒内完成,大大提升了用户实时取数体验,推动业务更快迭代生长。本次分享主要内容:业务场景与现状分析Flink to Hive 的小时级场景Flink to ClickHouse 的秒级场景未来计划趣头条的查询页面,分为离线查询和实时查询。离线查询有 presto,spark,hive 等,实时查询则引入了 ClickHouse 盘算引擎。

上图为实时数据报表,左边为数据指标的曲线图,右边为详细数据指标,现在数据指标的收罗和盘算,每五分钟一个时间窗口,固然也会有三分钟或者一分钟的特殊情况。数据都是从 Kafka 实时导入 ClickHouse 举行盘算的。

1. 小时级实现架构图Flink-to-Hive 小时级实现架构图如图所示,架构实现的思路如下:Database 中的 Binlog 抽数据到 Kafka,同时 Log server 数据也会上报到 Kafka,所有的实时数据落地到 Kafka 之后,通过 Flink 抽取到 HDFS 上。HDFS 到 Hive 之间有条虚线,即 Flink 落地到 HDFS 后,通历程序监控,Flink 在消费完成时,数据落地到 Hive 中可能是小时级的或者是半小时级的,甚至是分钟级的,此时需要知道数据的 Event time 已经到了什么时间,然后再去触发好比 alert table、add partition、 add location 等,把分区写进 Hive 中。这时还需要看一下当前的 Flink 任务的数据时间消费到了什么时间,如9点的数据要落地时,需要看一下 Kafka 里 Flink 数据消费是否到了9点,然后在 Hive 中触发分区写入。

2. 实现原理这块的实现原理主要是使用 Flink 高阶版本的特性 StreamingFileSink。StreamingFileSink 的主要功效如下:forBulkFormat 支持 avro、parquet 花样,也就是支持链式的存储花样withBucketAssigner 自界说按数据时间分桶,支持数据时间的分桶,上图用到该功效的地方界说了一个 EventtimeBucket,根据数据的时间落地到离线中OnCheckpointRollingPolicy,会凭据 CheckPoint 时间来举行数据的落地,此处可以明白为根据数据的时间,好比根据一定的 CheckPoint 时间内举行数据落地、回滚,数据落地计谋还可以根据数据巨细落地Exactly-Once 语义实现,Flink 中自带的 StreamingFileSink 是用 Exactly-Once 语义来实现的。Flink 中有两个 Exactly-Once 的实现,第一个是 Kafka 的 Exactly-Once,第二个是 StreamingFileSink 实现了 Exactly-Once 语义,像上图中 CheckpointRollingPolicy 设置的是十分钟落地一次到 HDFS 文件中下面来详细说一下 Exactly-Once 是如何实现的。

① Exactly-Once详细实现 Exactly-Once 的方式,如上图所示,左侧是一个二阶段的模型,Coordinator 发一个 perpare,所有的到场者或者执行者开始触发 ack 行动,Coordinator 收到所有人的 ack 行动后,就开始执行 commit,所有的执行者就把左右的数据举行落地。到了 Flink 这块,Source 收到了 checkpoint barrier 流的时候,开始触发 snapshorState 发送到 Job Manager,Job Manager 把所有的 CheckPoint 都完成以后,会发送一个 notifyCheckpointComplete,Flink 这块跟上图左边的二阶段提交协议是一致的,Flink 也是可以实现二阶段提交协议的。② 如何使用 Flink 实现二阶段提交协议首先 StramingFileSink 实现了两个接口,划分是 CheckpointedFunction 和 CheckpointListener。

CheckpointedFunction 实现了 initialzeState 和 snaoshotState 这两个函数;CheckpointListener 是 notifyCheckPoint Complete 的方法实现。所以这两个接口可以实现二阶段提交的语义,initialzeState 算子刚启动的时候,它会启动三个行动 commitpendingFile、restoreInProgressFile、truncate。第一步 commitpedingFile,也就是实时的数据落地到 HDFS 的时候,有三个状态,第一个状态是 in-progress,即正在举行中的一个状态,第二个状态是 pending 的状态,第三个状态是 finish 的状态。

在实时的写入时,如果 CheckPoint 还没有在这之间乐成的时候,法式出问题了,那接下来启动的时候就会触发 initialzeState,会把曾经 pending 的 file 举行 commit,然后把写了一半的文件好比 in-progress 文件重置或者截断,举行重置或者截断是使用的是 Hadoop 的2.7版本的 turncate 方式。也就是数据在一直写入,可是写入没有到达一个 CheckPoint 周期,也就是说中间数据断开了,下一次启动的时候,要么把之前没有写完整的数据截断掉,之前 CheckPoint 触发已经写好的数据直接 commit。第二步 invoke 就是数据实时的写入第三步 snapshotState 在触发 CheckPoint 的时候会把 in-progress 文件转成 pending state 文件,也就是开始提交文件,同时记载 length 长度。

记载长度是因为前边的步骤需要 truncate 来截断多长,snapshot 时,是没有真正的写入到 HDFS,其实是写入到 ListState,等所有的 CheckPoint 算子都完成了,就把 ListState 中的数据都刷到 HDFS 中,只要数据存在 Flink 自带的 state 中,不停把数据乐成的刷到 HDFS 中就行了。第四步 notifyCheckPoint Complete 会触发 pending 行动到 finished 状态的数据写入,实现的方式直接使用 rename,Streaming 会不停的写入 HDFS 中的暂时文件,等到 notifyCheckPoint 竣事之后,直接做一个 rename 行动,写成正式文件。3. 跨集群多 nameservices趣头条的实时集群跟离线集群是独立的,实时集群现在是一套,离线集群是有多套。

通过实时集群要写入到离线集群,这样就会遇到一个问题,HDFS nameservices 问题,如果在实时集群中把所有的离线集群的 nameservice 用 namenode HA 的方式全部打入到实时集群,是不太合适的。所以使用 Flink 任务中 resource 下边把 HDFS 中的 xml 文件中间加 final 标签,设置为 true。此处的 value 标签中,stream 是一个实时集群,date 是一个离线集群,这样把两个 HA 设置在 value 标签,从而到达实时集群是实时集群,离线集群是离线集群,中间的 HDFS 中 set 不需要相互修改,直接在客户端时间就行了。

4. 多用户写入权限针对多用户权限写入的问题,实时写入离线 HDFS 中的时候,会涉及到用户权限。遇到用户权限时,也会有一个问题,Flink 实时提交的用户,是界说好的,所有的法式里用户是同一个,可是离线是多个用户,Flink 现在对于这块用户的权限做的还不够好,所以我们自己革新了一下,在 API 中添加了 withBucketUser,上边已经设置好了 nameServices,然后通过该参数来设置详细是谁人用户来写入 HDFS 中,这是 API 层级的。API 层级的利益是一个 Flink 法式可以写多个,可以指定差别的 HDFS 的差别的用户就可以。详细实现就是在 Hadoop file system 中加一个 ugi.do as,署理用户。

以上是趣头条用 Flink 在实时数据同步到 Hive 做的一些事情。其中会有一些小文件的问题,针对小文件,我们通事后台法式定期的 merge,如果 CheckPoint 的时间很短,就会泛起大量的小文件的问题。

1. 秒级实现架构图首先来解释一下趣头条使用 Flink+ClickHouse 的场景,最开始展示的许多实时指标,可能是每五分钟盘算一次,也可能是每三分钟盘算一次。如果每一个实时指标用一个 Flink 任务,纵然是 FlinkSQL 来写,好比消费一个 Kafka Topic,盘算它的日活、新增、流程等,当用户提出一个新的需求,那这个 Flink 任务是需要修改还是再启动一个 Flink 任务来消费这个 Topic,这样的话就会泛起 Flink 任务在不停的修改或者不停的启动新的 Flink 新的任务。为相识决这个问题,就让 Flink 后边接一个套 ClickHouse 实现整体的 OLAP。上图为秒级实现架构图,从 Kafka 到 Flink 到 Hive 然后再到 ClickHouse 集群,对接外部 Horizon ( 实时报表 )、QE ( 实时 adhoc 查询 )、千寻 ( 数据分析 )、用户画像 ( 实时的用户画像 )。

2. Why Flink+ClickHouse详细来说为什么要用 Flink+ClickHouse,主要有以下几点:指标实现支持 sql 形貌,以前的方案使用是 storm 的法式,通过 stormsql 实现,包罗 flinksql,这些内容对于 UDF 支持相对有限,可是现在这套 Flink+ClickHouse 基本上可以把分析师提的指标通过 sql 实现。指标的上下线互不影响,这个主要是解决上边提到的关于 Flink 任务消费了 topic 以后,如果用户提出新的指标的时候,是启动新任务还是要不停修改的问题。数据可回溯,利便异常排查,这个就类似上边提到的如果我的日活掉了,需要知道哪些指标的口径的逻辑掉了、哪个上报的数据掉了,如 cmd 掉了还是数据流 kafka 掉了还是用户上报的时候指标没有上报导致的日活掉了。如果单纯的 flink 的话,只是会盘算出谁人指标掉了,是没措施回溯的。

盘算快,一个周期内完成所有的指标盘算,现在的 horizon 曲线可能是几百上千,需要在五分钟之内或者十分钟之内,把所有分时、累时、以及维度下降的指标全部盘算出来。支持实时流,分部署部署,运维简朴。现在趣头条 Flink 集群有 100+ 台 32 核 128 G 3.5T SSD,日数据量 2000+ 亿,日查询量 21w+ 次,80% 查询在 1s 内完成。上图为单表测试效果。

ClickHouse 单表测试速度快。但受制于架构,ClickHouse 的 Join 较弱。

亚博yabo

上图是处置惩罚相对较为庞大的 SQL,count+group by+order by,ClickHouse 在 3.6s内完成 26 亿数据盘算。3. Why ClickHouse so Fast接下来说一下为什么 ClickHouse 这么快,主要是有以下几点:列式存储+LZ4、ZSTD 数据压缩:列式存储基本是通用的。盘算存储当地化+向量化执行:盘算存储当地化,ClickHouse 跟 presto 纷歧样,presto 数据可能存在 Hadoop 集群里边或者 HDFS 中,需要把数据拉过来,然后举行实时的盘算;而 ClickHouse 是每一台盘算机械需要的数据存储在当地的 ssd 盘,只要盘算当地的数据就可以了,好比求 count 之类的,盘算完成后把其他的节点举行合并就可以了。

LSM merge tree+Index:LSM merge tree,他会不停的使用 batch 的形式把数据写入到 ClickHouse 之后,在后台做了一个线程把数据举行 merge,做一个 index 索引,也就是给这张数据表建设许多索引,类如常见的 DT 的时间索引、小时级的数据索引来提高查询性能或者速度。SIMD+LLVM 优化:SIMD 就是一个单指令多数据集,LLVM 是一个 C++ 的编译器SQL 语法、UDF 完善:在这块有很大的需求,好比数据分析以及维度下坠,通例的 horizon 数据报表可能就是 count、sum、以及 group by、order by 等,可是在一些维度下坠或者是数据分析领域,可能会有一个窗口期的观点,在一段窗口期内的留存,所以要用到一些更高的特性,类如时间窗口的功效。上图是 MergeTree 的运行原理图解,最上边的第一层是数据一个 batch 一个 batch 的实时写入,后台会做每一个层级的数据 merge,这块跟 HBase 差不多的实现,merge 的时候会举行数据的排序,然后做一个数据索引。

上图是 ClickHouse Connector,ClickHouse 有两个观点,local table 和 distribute table。local table 是用来写的,固然 distribute table 也可以写入,可是会泛起很大的 io 问题,所以只管不要写 distribute table。可是可以读 distribute table。

5-10w 一个 batch 举行数据写入,正常的情况下,是5秒一个周期。RoundRobinClickHouse DataSource 这块是趣头条自己实现的;ClickHouse 官方 API 使用:BalancedClickHouseDataSource 实现的。

上图是 ClickHouse 官方 API 使用:BalancedClickHouseDataSource里边有一个问题,好比 mysql 设置一个 ip 和端口号就可以把数据写入了,可是这块要写入 local table 的,所以必须要知道这一个集群到底有几多 local table,每一个 local table 的 ip 和端口号,如果有100台机械,就必须要把这100台机械的 ip 和端口号设置好,然后举行写入。官方的 api 中有两个 schedule:一个是 scheduleActualization另一个是 scheduleConnectionsCleaning第一个是指100台机械设置了100个 ip 或者端口号,可能会有一些机械泛起 ping 不通或者服务无响应,这块是定时的做一个 Actualiza 来发现这些机械哪些无法毗连,触发一个下限来把这些 ip 删除掉。第二个 scheduleConnectionsCleaning,因为 ClickHouse 是 http 的方式,定期的会把一些没用的 http 的请求清理掉。

亚博yabo888vip官网

针对于官方提供的 API,趣头条对这方面做了一个增强,开发了一个 RoundRobinClickHouseDataSource,实现了三个语义,划分是 testOnBorrow、testOnReturn、testWhileldle。第一个 testOnBorrow 取链接的时候,设置 为true,然后去 ping 一下这个链接能不能拿到,ClickHouse 写入的时候,使用的 batch,所以只管就是拿链接的时候要拿到乐成的链接;第二个 testOnReturn 设置为 false,testWhileldle 设置为 true,把上边官方的两个 schedule 功效集成进去了。为什么要实现 RoundRobin,主要是因为如果有100台机械,ClickHouse 相对于 Hadoop 来说,还是需要好好维护一下,如果是 insert 的话,后台是不停 merge 的历程,insert 速度大于 merge 速度时候,会导致 merge 速度永远跟不上,所以就写完这台机械接下来写此外机械,以及5秒一个距离的写,使 merge 的速度只管跟上 insert 的速度,这块是整个部门最需要注意的地方。

4. Backfill趣头条针对集群容错做了一些优化,主要包罗两点:第一点是 Flink 任务小时级容错第二点是 ClickHouse 集群小时级容错Flink 导入数据到 ClickHouse,来实现数据的查询、报表展示,会遇到一些问题。如 Flink 任务泛起故障、报错、数据反压、network 的一些问题;或者 ClickHouse 集群泛起了一些不行响应、ZK 跟不上等 ZK 问题;或者集群的负载问题;或者是上边提到的 insert 太快的问题;会导致整个任务都有问题。

如果数据量突然暴涨,把 Flink 启动,就会泛起一段时间内不停的追数据,可能就需要调大它的并行度之类的,让 Flink 任务把数据追上。可是数据已经积压了,Flink 又要加大它的并发度来处置惩罚数据,可是 ClickHouse 那块又限制了 insert 速度不能太快,所以就做了另外一个机制,也就是 Flink 故障了或者 ClickHouse 故障了,等到 ClickHouse 集群恢复之后,Flink 任务还是从最新的开始消费,已往的一段数据不再去追了,通过 Hive 来把数据导入到 ClickHouse。用 Hive 是因为数据通过 Kafka 已经实时落地到 Hive,通过 waterdrop 把数据写入到 ClickHouse,ClickHouse 是有分区的,只要把上一个小时的数据删除,再把 Hive 一个小时的数据导入进来,这样就可以继续提供数据查询操作了。

最后是对未来的生长与思考。1. Connector SQL对于未来的生长,首先是 Connectors SQL,也就是把 Connector 举行 SQL 化,现在是 Flink-to-Hive 以及 Flink-to-ClickHouse,相对来讲,都是比力固化的一些场景,所以是可以举行 sql 化,除了把 HDFS 的路径指定以及用户指定,其他的一些历程都是可以 SQL 化形貌出来的。2. Delta lakeFlink 是流批一体盘算引擎,可是没有流批一体的存储。

趣头条会用 HBase、Kudu、Redis 等能够与 Flink 实时交互的 KV 存储举行数据盘算。如盘算新增问题,现在趣头条的方案是需要将 Hive 历史用户刷到 Redis 或 HBase 中,与 Flink 举行实时交互判断用户是否新增。但因为 Hive 中的数据和 Redis 中的数据是存储为两份数据。

其次 Binlog 抽取数据会涉及 delete 行动,Hbase,Kudu 支持数据修改,定期回到 Hive 中。带来的问题是 HBase,Kudu 中存在数据,Hive 又生存了一份数据,多出一份或多份数据。

如果有流批一体的存储支持上述场景,当 Flink 任务过来,可以与离线数据举行实时交互,包罗实时查询 Hive 数据等,可以实时判断用户是否新增,对数据举行实时修改、更新或 delete,也能支持 Hive 的批的行动存储。未来,趣头条思量对 Flink 做流批的存储,使 Flink 生态统一为流批联合。作者:王金海 趣头条数据平台卖力人,10 年互联网历练,先后在唯品会卖力用户画像系统,提供人群的个性化营销服务;饿了么担任架构师,卖力大数据任务调理、元数据开发、任务画像等事情;现为趣头条数据中心平台卖力人,卖力大数据基础盘算层 ( spark、presto、flink、clickhouse )、平台服务层 ( libra 实时盘算、kepler 离线调理 )、数据产物层 ( qe即时查询、horizon 数据报表、metadata 元数据、数据权限等 )、以及团队建设。


本文关键词:趣,头条,亚博yabo888vip官网,基于,Flink+ClickHouse,的,实时,数据分析

本文来源:yabo888vip-www.mochamcn.com

Copyright © 2009-2021 www.mochamcn.com. yabo888vip科技 版权所有 备案号:ICP备51175110号-9