【原创】用EMR建设实时数仓

建设实时数仓的目的和意义

实时数仓目的

数仓概念:数据尽可能多,保存时间尽可能久

图片描述

实时概念:数据流式,处理及时、瞬时、短时、事件或者微批响应

图片描述

数仓跟实时从概念上就有冲突,所以本质上不太适合处理广泛的问题,比如,对一个月,甚至是一年的数据进行统计计算。

图片描述

所以,实时数仓应该目前作为离线数仓的一种补充,解决因离线数仓实时性低而无法解决的问题,具体点说就是处理离线两个周期间隔的数据问题,不适合解决大批量数据聚合问题、业务性太强的以及对实时性要求很高问题。

实时数仓的意义

图片描述

实时数仓从概念上讲还是要靠近数仓的概念,数据分层,面向主题,数据尽可能集成,结构相对稳定,不易发生变化。

图片描述

对于实时数仓来讲,数据量不需要保存像离线那么久,上一节我们提到,实时数仓处理两个离线周期间隔的数据即可,如上图,以时报为例,实时数仓补充中间数据即可,以天为例,实时数仓最多只需要保留3~5天数据即可,能够支持一段时间的数据追溯和重导就可以了。

实时数仓可以解决哪类问题

图片描述

利用EMR建设实时数仓

实时数仓对比离线数仓

图片描述

实时数仓架构

图片描述

从图中可以看到,

1、ODS并不是实时数仓的一部分,是依赖外部数据源,比如binlog,流量日志,系统日志,或者是其他消息队列

2、应用层也不是实时数仓的一部分,对于数据的使用,通过实时数仓暴露Topic来使用

3、实时数仓要求层次要少,因为需要尽可能降低延迟

用EMR搭建实时数仓

图片描述

1、底层数据源可以接企业内部binlog、日志或者消息队列

2、从ODS层经过与维表轻度扩展,形成明细层明细表,明细表用一个Ckafka topic表示,计算采用Oceanus或者EMR FlinkSql 关联查询,维表采用EMR Hbase存储

3、从明细层经过进一步汇总计算,形成汇总层,此时数据已经是面向主题的汇总数据,就是传统意义上的大宽表,一个主题是一系列Ckafka topic,计算采用Oceanus或者EMR FlinkSql 关联查询以及汇总计算

实时数仓各层搭建

ODS层搭建

图片描述

1、 之所以没有把ODS层放在实时数仓的一部分,是因为实时数仓的ODS并不像离线数仓ODS是采集过来的原始数据,现在一般企业都已经具备了如上图的底层数据源

2、 Binlog,是数据库日志,通过Binlog可以自数据库主从间同步,可以同步关系型数据库数据,目前企业线上数据库都采用Mysql这样的数据库,可以通过抓取Mysql binlog 获取数据库变更信息,数仓中重要的业务数据,支付相关,用户相关,管理相关数据一般都从这种数据源获得

3、 Log日志,服务器日志,像服务器系统日志采集,都是通过这种形式进行采集

Ckafka,企业通过消息队列提供数据源服务,比如,点击流服务,会把用户点击事件通过上报服务器上报到Ckafka,为后续分析提供原始数据

该层搭建的注意点:
1、 业务选择数据源,尽量跟离线保持一致,比如某个业务,数据源即可以通过Binlog,也可以通过Log日志采集,如果离线数仓业务是通过Binlog,那么实时数仓也取Binlog,否则后续产生数据不一致,非常难以定位

2、数据源要求一致性,对于Ckafka和Binlog 需要进行分区一致性保证,解决数据乱序问题

明细层搭建

图片描述

建设标准与离线数仓目标一致,解决原始数据存在噪音,不完整,形式不统一等问题

图片描述

数据解析,业务整合,数据清洗,解决噪音,不完整,数据不一致问题;模型规范化(提前指定号规则,尽量跟离线保持一致),形成数据规范,规范尽可能跟离线保持一致,命名,比如,指标命名等;

与离线数仓不同之处在于,离线调度是有周期的,时报一小时,天报周期为一天,如果修改数据表字段,只要任务没开始,就可以修改,而实时是流式,7X24小时不间断运行的,想要修改流中的字段或者格式,对下游影响是不可预估的

实时数仓如果修改字段不像离线,在间隔期间通知下游把作业都改了就没事了,但是实时不一样,实时你改掉了字段,下游作业必须可以认识你修改的内容才行,kafka不是结构化存储,没有元数据的概念,不像Hive,如果表名不规范,找一个统一时间,把catolog改规范,然后把脚本一改就就解决了。

明细建设关键,我们会在每一条数据上增加一些额外字段到数仓里

图片描述

举例说明这些额外字段的意义

事件主键:对于上游数据重复问题,我们会根据一些数据内的字段来判断上有数据的唯一性,比如binlog,<集群id_><库id_><表id_>数据id_数据生成时间。

数据主键:唯一标识数据表的一行记录,可以使用数据库主键,主要用来解决分区一致性及分区有序。

数据元数据版本:上面介绍了,流式计算是7X24小时不间断的运算,当修改了数据结构,增加,删除了字段,对下游的影响是不可预估的,因此元数据变更需修改该字段,保持数据流中新老版本数据双跑,下游选择合适的时机进行数据切换。

数据批次:跟元数据用途相似,当明细层逻辑发现问题,需要重跑数据,为了对下游任务不产生影响,调整了明细层逻辑后,需要回倒位点重跑数据,同时需要跟老逻辑任务双跑,待下游业务都切换到新的逻辑后,老逻辑任务才可以停止。

还有一个思路,可以直接把明细层数据,也可以直接写到druid 直接用于分析。

维度层搭建

图片描述

维度数据处理:

图片描述

如上图,对于变化频率低,地理,节假日,代码转换,直接同步加载到缓存里,或者是新增数据,但是增加进来就不变了,通过数据接口,访问最新数据,然后通过本公司数据服务对外提供数据

图片描述

如上图,对于变化频率高,比如商品价格,也是需要监听变化消息,然后实时更新维度拉链表。对于比如像最近一个月没有消费用户这样的衍生维度,是需要根据变化消息,通过计算得到的衍生维度拉链。

因为维度数据也在发生变化,为了能够让源表数据匹配到维表,我们会给维表增加多版本minversion,然后通过TIMERANGE => [1303668804,
1303668904]筛选出源数据指定的维表版本数据。

这里有些同学可能觉得如果版本一致保存下去,会不会非常大,是的,我们响应的需要配置TTL保证维表数据量可控,上文我们介绍过,实时数仓解决是离线数仓两个间隔的问题,那么像这种变化频繁的数据我们TTL设置一周足够了。

关于源表与维表如果进行join,Flink原生sql以及Oceanus都是采用UDTF函数以及Lateral
Table 进行联合使用,其中UDTF我们可以实现查询数据服务获取维表数据的能力,Oceanus请参考相关材料。

汇总层搭建

图片描述

汇总层加工其实跟离线数仓是一致的,对共性指标进行加工,比如,pv,uv,订单优惠金额,交易额等,会在汇总层进行统一计算。

Flink提供了丰富的窗口计算,这使得我们可以做更细力度的聚合运算,例如,我们可以算最近5分钟,10分钟的数据聚合,根据时间窗口的间隔,也需要调整相应的TTL,保障内存高效实用。

Flink提供了丰富的聚合计算,数据都是要存在内存中的,因此需要注意设置state的TTL,例如,做Count(Distinct
x)。或者在进行PV,UV计算时候,都会使用大量的内存,这一块,当处理的基数比较大的时候,推荐使用一些非高精度去重算法,Bloom过滤器,Hyper LogLog等。

汇总层也需要在每一条数据上增加一些额外字段到数仓里,这块与明细层一致,就不在单独讲解了。

数据质量保证

图片描述

对于实时数仓数据质量的管理,我们通常由三步操作组成

第一步,数据与离线数据进行对比

首先,将汇总层数据Topic通过平台接入任务接入到离线仓库,然后通过数据质量任务,定时对实时数仓和离线数仓数据进行对比,并配置报警,数据差异,数据波动等。

第二步,配置报警,我们会在明细层以及汇总层,Topic配置生产监控,与以往数据波动,上游数据延迟或者积压,都需要进行报警。

第三部,构建实时血缘, Flink 在读取数据时候,会把信息读到flink catalog 这样就知道这个任务读取了哪个表,在解析客户DDL代码时,可以获得目标表信息,同步到我们的元数据服务。

参考文献:

美团实时数仓搭建:https://tech.meituan.com/2018/10/18/meishi-data-flink.html

菜鸟实时数仓:https://mp.weixin.qq.com/s/9ZRG76-vCM7AlRZNFCLrqA

hive 响应慢问题定位

情景描述:

大数据集群,目前有两套hiveserver2和metastore的集群,通过nginx指向进行流量互切,发现流量打到哪个metastore集群,哪个集群就特别卡顿。

那么先来回顾一下hive整个调用流程和框架

image-20201227123352163

Hive 提供的另外一个shell 客户端,也就是我们常用的hive 命令的客户端它的设计是直接启动了一个org.apache.hadoop.hive.cli.CliDriver的进程,这个进程其实主要包含了两块内容一个是提供给我们交互的cli ,另外一个就是我们的Driver 驱动引擎,这样的设计导致如果我们有多个客户端的情况下,我们就需要有多个Driver

image-20201226204741803

但是我们通过HiveServer2连接的时候我们就可以共享Driver,一方面可以简化客户端的设计降低资源损耗,另外一方面还能降低对MetaStore 的压力,减少连接的个数。

原因分析:

目前来看,变慢的原因应该是出现在hs2服务,metastore服务,具体业务,网络,服务器等原因。

1、先从简单的硬件分析入手,验证网络和服务器,这块省略验证过程

2、验证hs2服务,利用排除的方式,通过hive cli进行多次验证,发现也有缓慢的情况,正常应该1秒内返回,所以先不定位hs2的情况

3、验证metastore服务,还是先从直观简单的分析,看能否找出些现象,先看日志:

3.1 既然通过cli和hs2访问都会慢,先从简单的cli发起请求,可以通过加debug参数,发起查询

beeline –verbose=true –showNestedErrs=true –debug=true 看一下客户端是否有明显异常

3.2 cd /var/log/hive 查看一下是否有大量客户端访问

cat hadoop-cmf-hive-HIVEMETASTORE-data-hadoop-16-2.192.168.0.1.log.out | grep audit | grep -v “ugi=hue” | awk -F “ip=” ‘{print $2}’ | awk ‘{print $1}’ | sort | uniq -c | sort -nr | head

3.3 查看服务GC情况 jstat -gcutil pid interval(ms)

3.4 查看服务端日志

Error: Error while compiling statement: [Error 10308]: Attempt to acquire compile lock timed out. (state=,code=10308)
org.apache.hive.service.cli.HiveSQLException: Error while compiling statement: [Error 10308]: Attempt to acquire compile lock timed out.
at org.apache.hive.jdbc.Utils.verifySuccess(Utils.java:241)
at org.apache.hive.jdbc.Utils.verifySuccessWithInfo(Utils.java:227)
at org.apache.hive.jdbc.HiveStatement.execute(HiveStatement.java:255)
at org.apache.hive.beeline.Commands.executeInternal(Commands.java:989)
at org.apache.hive.beeline.Commands.execute(Commands.java:1180)
at org.apache.hive.beeline.Commands.sql(Commands.java:1094)
at org.apache.hive.beeline.BeeLine.dispatch(BeeLine.java:1180)
at org.apache.hive.beeline.BeeLine.execute(BeeLine.java:1013)
at org.apache.hive.beeline.BeeLine.begin(BeeLine.java:922)
at org.apache.hive.beeline.BeeLine.mainWithInputRedirection(BeeLine.java:518)
at org.apache.hive.beeline.BeeLine.main(BeeLine.java:501)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.hadoop.util.RunJar.run(RunJar.java:226)
at org.apache.hadoop.util.RunJar.main(RunJar.java:141)
Caused by: org.apache.hive.service.cli.HiveSQLException: Error while compiling statement: [Error 10308]: Attempt to acquire compile lock timed out.
at org.apache.hive.service.cli.operation.Operation.toSQLException(Operation.java:400)
at org.apache.hive.service.cli.operation.SQLOperation.prepare(SQLOperation.java:187)
at org.apache.hive.service.cli.operation.SQLOperation.runInternal(SQLOperation.java:271)
at org.apache.hive.service.cli.operation.Operation.run(Operation.java:337)
at org.apache.hive.service.cli.session.HiveSessionImpl.executeStatementInternal(HiveSessionImpl.java:439)
at org.apache.hive.service.cli.session.HiveSessionImpl.executeStatementAsync(HiveSessionImpl.java:416)
at org.apache.hive.service.cli.CLIService.executeStatementAsync(CLIService.java:282)
at org.apache.hive.service.cli.thrift.ThriftCLIService.ExecuteStatement(ThriftCLIService.java:503)
at org.apache.hive.service.cli.thrift.TCLIService$Processor$ExecuteStatement.getResult(TCLIService.java:1313)
at org.apache.hive.service.cli.thrift.TCLIService$Processor$ExecuteStatement.getResult(TCLIService.java:1298)
at org.apache.thrift.ProcessFunction.process(ProcessFunction.java:39)
at org.apache.thrift.TBaseProcessor.process(TBaseProcessor.java:39)
at org.apache.hive.service.auth.TSetIpAddressProcessor.process(TSetIpAddressProcessor.java:56)
at org.apache.thrift.server.TThreadPoolServer$WorkerProcess.run(TThreadPoolServer.java:286)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)

这里显示有获取metastore连接超时的异常,关键的日志是compiling statement: [Error 10308]: Attempt to acquire compile lock timed out. 编译时候获取编译索失败

顺着这个思路,查看一下代码:

关键字:Completed compiling command(queryId,参考https://github.com/apache/hive/blob/master/ql/src/java/org/apache/hadoop/hive/ql/lock/CompileLock.java

分析结论如下:

并且hivethrift每次访问都会初始化metastore 重新初始化元数据,HIVESEVER2提交SQL阻塞tryAcquireCompileLock原因:

a、HIVE1.1 只支持串行编译SQL,hiveserver2并发接受到SQL请求后,在complile阶段变为串行执行。当compilie编译慢时,引起阻塞SQL的提交。

b、compile的慢的原因:complile阶段,会通过hivemetastore访问mysql。目前是hiveserver2的请求打入到同一个metastore,流量上来后,hivemetastore访问mysql速度下降。

解决方案:hiveserver2的请求分摊所有hivemetastore上

Clickhouse 性能问题分析(一)zk报连接异常,性能低下

原因分析:

table read only 跟 zk 也有直接关系。现在能肯定 zk 集群的性能跟不上么 ?在这个压力下只要错误日志里有zk相关的超时,就是ZK性能更不上。

调优策略:

小批次,大批量写入,降低对ZK的QPS,减少的压力,然后横向扩容解决并发问题,不要怕条数多,她设计的原则就是 要你条数多,才能达到最佳性能,1000/s这种 已经触达了ZK的极限了

  1. 如果zk内存使用率较高,zkEnv.sh 修改zk jvm 为8192m,在zoo.cfg 中修改MaxSessionTimeout=120000
  2. 超时时间socket_timeout 80000

3.Spark insert 的-batch 调大到30000,就是每批次取3万行数据,参数可能是任务的参数


4. metrika.xml的zookeepr配置里调整下参数,调整的ch zk客户端超时,这个值是默认的30s

例如

<zookeeper>

    <node>

        <host>example1</host>

        <port>2181</port>

    </node>

    <node>

        <host>example2</host>

        <port>2181</port>

    </node>

    <session_timeout_ms>30000</session_timeout_ms>  –客户端会话的最大超时(以毫秒为单元)

    <operation_timeout_ms>10000</operation_timeout_ms>

</zookeeper>

  1. 重启生效

现在开始写入了。是先写入buffer engine 然后buffer engine 刷新 底层表

写入的数据库和表是:

xxx_dis

buffer的元数据不在zookeeper上,但是跟其他表的读写需要zk支持,就等于是 buffer 表刷底层表 是要走zookeeper. 的。

看看ZK机器的snapshot文件大小,如下图,也不大

Buffer表的建表语句方便发下吗? ENGINE = Buffer(‘blower_data_prod’, ‘xxx_info’, 18, 60, 100, 5000000, 6000000, 80000000, 1000000000) 主要就是后面这个吧。前面字段无所谓。

hive任务结束了,但是hive终端或者命令行没退出

问题描述:从一个100多亿条记录的hive表里查数据,查出数据后,写入一张新hive表里。mapreduce执行完load data环节后,一直不结束,哪位大神知道怎么更进一步地定位问题?

原因分析:文件比较多,最后一个movetask在移动文件和搜集文件统计信息。hive是迭代式计算,最后会有一个movetask把最终数据文件移动到hive的location下,这个过程有两个地方比较耗时,一个是rename,另外一个是list文件读文件元数据,更新元数据库的统计信息,如果数据在对象存储,rename是copy+delete,这个过程会比较慢,movetask是finaltask,不会起mr,任务没完成前,新表里的数据,不是全量的,只是部分。只有任务退出,才算完全完成

解决方案:

set hive.stats.autogather=false;开启收集线程,可以减少最后收集的时间

mapred.dfsclient.parallelism.max 20 增加并行rename的能力

【原创】几句话说清楚《HBASE Snapshot 恢复 restore 原理》

首先snapshot恢复 理论上是建立在数据文件(region,hfile,storefile)都已经导入到集群中了,只是把元数据更新成snapshot的时刻
其实特别简单
Hmaster处理meta元数据表,跟当前元数据的region比较,分为三种情况:
1、snapshot 有,当前也有:说明这部分数据后来有可能被更新过,宁可错杀100,绝不放过一个,所以,直接先从meta信息删除,然后再从snapshot恢复
2、snapshot 有,当前没有:说明这部分数据被归档了,当前已经不用了,重新将snapshot这部分写入meta信息
3、snapshot 没有,当前有:说明是snapshot后面加入的数据,直接从meta删除

Hmaster恢复Hfile数据文件,分为三种情况:
1、snapshot 有,当前也有:同名的,不做任何改动,因为Hfile一旦落盘,很少会发生改变
2、snapshot 有,当前没有:对于缺少的文件,直接对exportsnapshot拷贝到archive中的Hfile文件做引用
3、snapshot 没有,当前有:说明是snapshot后面加入的数据,转移到archive归档

Hmaster恢复Wal文件:
写入recovered.edits文件夹的相应region中,每个region一个线程,只写入跟snapshot相关的表region信息,RS会从这里进行恢复,几句话说清楚《hbase snapshot 原理》也提到了,恢复只会从snapshot的flushid进行恢复,之前的就不会重复操作了

对于regionserver

参考:https://cloud.tencent.com/developer/article/1047967

【原创】几句话说清楚《HBASE Snapshot 原理》

首先snapshot 理论上是用于做备份恢复的
1、恢复哪些内容?
答:snapshot 指定表,在线region和split之前的region

2、谁参与了snapshot
答:
hmastr,regionserver

3、如何snapshot
hmaster 主要负责管控snapshot,
创建snapshot相关信息,比如,表信息,关联region的信息,以及region对应的server信息
hmaster会通过zk管控任务,通过三种目录,acquired、reached、abort;abort下有Regionserver(RS)信息,说明RS有执行snapshot失败的,直接退出snapshot
acquired,hmaster 创建snapshot信息(表,region信息),然后RS进行确认,确认成功,在该目录下创建自己的名字
reached,hmaster 确认acquired下RS都确认了,通知RS可以进行干活,每个RS干完活,会在该目录下创建自己的名字
abort,RS如果执行遇到问题,则在该路径下留下名字,然后Hmaster 取消整个snapshot流程

regionserver 负责具体在线region的信息采集
对memstore和WAL进行flush到相应的Hfile和Hlog中,其中,WAL Hlog要记录本次的flushID,restore_snapshot后,即便发生Hbase集群恢复,也只从该flushID恢复,更早的不会再恢复了
把Hfile 创建相应的映射到snapshot相应目录中,只有映射(空文件),这也是snapshot快的原因
把Hlog 文件放入该snapshot相应文件下

hmaster还负责split之前的region(已经不用的,父region)相关信息的snapshot,其实也是把相关信息留到snapshot下的目录下

hmaster 负责校验snapshot结果,确认snapshot目录信息,表信息是否正确,最主要的要确认需要snapshot的region信息是否还在

总结一下:

1

snapshot流程主要涉及3个步骤:

1. 加一把全局锁,此时不允许任何的数据写入更新以及删除

2. 将Memstore中的缓存数据flush到文件中(可选)

3. 为所有HFile文件分别新建引用指针,这些指针元数据就是snapshot

扩展思考:LSM类系统确实比较容易理解,那其他非LSM系统原地更新的存储系统如何实现snapshot呢?

参考:https://yq.aliyun.com/articles/60455?spm=a2c4e.11163080.searchblog.18.151a2ec1uNNVP8

Mac 安装Hadoop

随着运维工作的进一步深入,后期可能涉及到对源码的修改,需要多次重新编译。因此,这一关是绕不过的。

如有疑问,请联系作者,微信 17180081757

版本声明

  • 源码:Apache Hadoop 2.7.3
  • 系统:macOS 10.14.5
  • 依赖:
    • oracle jdk 1.8.0_231
    • Apache Maven 3.6.0
    • libprotoc 2.5.0

编译

核心命令

如果使用命令行:package -Pdist,native -DskipTests -Dtar
如果使用IDEA 开发工具,右上角,选择“Run/Debug Configuration”,如下图,,点击“+”新增一个,maven的run configuration,Name随便起,主要是Command Line:package -Pdist,native -DskipTests -Dtar

编译Hadoop源码时间比较长

Hadoop源码量巨大、依赖众多,编译时间比较长。

下载jar包和编译protoc是两个大头。编译protoc用了1小时左右,下载jar包+编译Hadoop用了2个多小时。除去这些时间,也需要1小时左右才能编译成功。

还好上半年为了看Yarn的状态机编译过一回,虽然是不完全编译,但也下载了大部分依赖的jar包,并编译安装了protoc(强烈建议编译安装,忘记当时有什么坑来着)。这次只需要继续踩上次剩下的坑了。

不过,鉴于第一次编译时,大部分人都会重复多次才能编译成功,单次编译的时间也没什么意义了。喝杯茶,慢慢来吧。

JDK版本

[ERROR] Failed to execute goal org.apache.maven.plugins:maven-compiler-plugin:2.5.1:compile (default-compile) on project hadoop-annotations: Compilation failure: Compilation failure:[ERROR] /Volumes/Extended/Users/msh/IdealProjects/Git-Study/hadoop/hadoop-common-project/hadoop-annotations/src/main/java/org/apache/hadoop/classification/tools/ExcludePrivateAnnotationsJDiffDoclet.java:[20,22] 错误: 程序包com.sun.javadoc不存在

不明白为啥这个包会不存在,可能是JDK版本问题。google一番,参考解决Mac OS 下编译Hadoop Annotations 程序包com.sun.javadoc找不到问题解决。

验证

在所有的pom.xml里面找设置1.7 jdk的地方:

 12345678910111213141516 find . -name pom.xml > tmp/tmp.txt
while read filedocnt=0grep ‘1.7’ $file -C2 | while read line; doif [ -n “$line” ]; thenif [ $cnt -eq 0 ]; thenecho “+++file: $file”ficnt=$((cnt+1))echo $linefidonecnt=0done < tmp/tmp.txt

输出:

 12345678910111213141516171819 +++file: ./hadoop-common-project/hadoop-annotations/pom.xml</profile><profile><id>jdk1.7</id><activation>–<activation><jdk>1.7</jdk></activation><dependencies>—-<groupId>jdk.tools</groupId><artifactId>jdk.tools</artifactId><version>1.7</version><scope>system</scope><systemPath>${java.home}/../lib/tools.jar</systemPath>+++file: ./hadoop-project/pom.xml…(略)

确实./hadoop-common-project/hadoop-annotations/pom.xml中限制了jdk版本。

解决

Mac上的默认JDK是oracle jdk1.8.0_102的,翻了下jdk源码也有这个包。说明不是因为该包实际不存在。

可以尝试修改pom里限制的jdk版本;不过,为了防止使用了deprecated方法等麻烦,这里直接切jdk 1.7,不改pom。

openssl引起编译报错

 12 [ERROR] Failed to execute goal org.apache.maven.plugins:maven-antrun-plugin:1.7:run (make) on project hadoop-pipes: An Ant BuildException has occured: exec returned: 1[ERROR] around Ant part …<exec dir=”/Volumes/Extended/Users/msh/IdealProjects/Git-Study/hadoop/hadoop-tools/hadoop-pipes/target/native” executable=”cmake” failonerror=”true”>… @ 5:153 in /Volumes/Extended/Users/msh/IdealProjects/Git-Study/hadoop/hadoop-tools/hadoop-pipes/target/antrun/build-main.xml

猜测是ant版本问题,重装了jdk1.7适配的ant。

Ant BuildException也是够迷惑的。而且之前猴子电脑配置的jdk1.8,切到1.7之后ant就不能用了(brew安装的ant用1.8jdk编译的,1.7无法解析class文件),重装了适配1.7的ant版本后,ant可以正常使用了,却还是报这个错。。。

结果还是报这个错,打开build-main.xml看,发现是一个cmake命令的配置,copy到终端执行:

 1 cmake /Volumes/Extended/Users/msh/IdealProjects/Git-Study/hadoop/hadoop-tools/hadoop-pipes/src/ -DJVM_ARCH_DATA_MODEL=64

输出:

 123456789101112131415 …(略)CommandLineTools/usr/bin/c++ — works– Detecting CXX compiler ABI info– Detecting CXX compiler ABI info – done– Detecting CXX compile features– Detecting CXX compile features – doneCMake Error at /usr/local/Cellar/cmake/3.6.2/share/cmake/Modules/FindPackageHandleStandardArgs.cmake:148 (message):Could NOT find OpenSSL, try to set the path to OpenSSL root folder in thesystem variable OPENSSL_ROOT_DIR (missing: OPENSSL_INCLUDE_DIR)Call Stack (most recent call first):/usr/local/Cellar/cmake/3.6.2/share/cmake/Modules/FindPackageHandleStandardArgs.cmake:388 (_FPHSA_FAILURE_MESSAGE)/usr/local/Cellar/cmake/3.6.2/share/cmake/Modules/FindOpenSSL.cmake:380 (find_package_handle_standard_args)CMakeLists.txt:20 (find_package)
…(略)

OPENSSL_ROOT_DIROPENSSL_INCLUDE_DIR没有设置。echo一下确实没有设置。

解决

Mac自带OpenSSL,然而猴子并不知道哪里算是root,哪里算是include;另外,据说mac计划移除默认的openssl。干脆自己重新安装:

 1 brew install openssl

然后配置环境变量:

 12export OPENSSL_ROOT_DIR=/usr/local/Cellar/openssl/1.0.2nexport OPENSSL_INCLUDE_DIR=$OPENSSL_ROOT_DIR/include
如下是我的配置,请参考:
cat ~/.bash_profile
export PROTOC=”/usr/local/bin/protoc”
export M2_HOME=/Users/lynchgao/apache-maven-3.6.3
export PATH=$PATH:$M2_HOME/bin
JAVA_HOME=/Library/Java/JavaVirtualMachines/jdk1.8.0_231.jdk/Contents/Home
PATH=$JAVA_HOME/bin:$PATH:.
CLASSPATH=$JAVA_HOME/lib/tools.jar:$JAVA_HOME/lib/dt.jar:.
export JAVA_HOME
export PATH
export CLASSPATH
export OPENSSL_ROOT_DIR=/usr/local/Cellar/openssl1.0.2k
export PATH=OPENSSL_ROOT_DIR/bin:$PATH
export OPENSSL_INCLUDE_DIR=$OPENSSL_ROOT_DIR/include
export OPENSSL_LIB_DIR=$OPENSSL_ROOT_DIR/lib

继续编译,发现编译hadoop-pipes时候,报错“around Ant part …<exec dir=”/Users/lynchgao/IdeaProjects/hadoop/hadoop-tools/hadoop-pipes/target/native” executable=”make”

按照上面的方法,进入指定目录执行make

cd /Users/lynchgao/IdeaProjects/hadoop/hadoop-tools/hadoop-pipes/target/native

执行make

报错“/usr/local/opt/openssl@1.1/include/openssl/ossl_typ.h:102:16: note: forward
declaration of ‘hmac_ctx_st’”

网上搜了一下,说是版本冲突,查了一下hadoop 2.7.3 用的是openssl1.0.2k,

从git上拉取代码https://github.com/openssl/openssl/tree/OpenSSL_1_0_2k

编译mac 64位版本版本需要注意,darwin64-x86_64-cc 是编译64位

sudo ./Configure darwin64-x86_64-cc –prefix=/usr/local/Cellar/openssl1.0.2k

make

make install

然后替换brew 默认安装的路径,替换brew默认路径,是因为hadoop-pipeline c代码 include时候需要找openssl

mv /usr/local/opt/openssl@1.1/ /usr/local/opt/openssl@1.1_bak/

cp -rf /usr/local/Cellar/openssl1.0.2k /usr/local/opt/openssl@1.1

如果从命令行执行 openssl version还是高版本,请联系我

maven仓库不稳定

 1 [ERROR] Failed to execute goal on project hadoop-aws: Could not resolve dependencies for project org.apache.hadoop:hadoop-aws:jar:2.6.0: Could not transfer artifact com.amazonaws:aws-java-sdk:jar:1.7.4 from/to central (https://repo.maven.apache.org/maven2): GET request of: com/amazonaws/aws-java-sdk/1.7.4/aws-java-sdk-1.7.4.jar from central failed: SSL peer shut down incorrectly -> [Help 1]

出现类似“Could not resolve dependencies”、“SSL peer shut down incorrectly”等语句,一般是maven不稳定,换个稳定的maven源,或者重新编译多试几次。

更换旧的Openssl办法

不过,我们还有最后一步,那就是当我们使用openssl时,使用的是我们用homebrew新下载的openssl。为了达到这个目的,我们有两种方法。

将homebrew下载的openssl软链接/usr/bin/openssl目录下。这里,我们先将它保存一份老的,然后再软链接新下载的。


$ mv /usr/bin/openssl /usr/bin/openssl_old
mv: rename /usr/bin/openssl to /usr/bin/openssl_old: Operation not permitted
$ ln -s /usr/local/Cellar/openssl/1.0.2p/bin/openssl /usr/bin/openssl
ln: /usr/bin/openssl: Operation not permitted

Operation not permitted提示没有权限操作,对/usr/bin目录下的东西,我已经遇到过几次这个问题了,于是继续google,在stackoverflow上找到了Operation Not Permitted when on root El capitan (rootless disabled)

重启系统,当启动的时候我们同时按下cmd+r进入Recovery模式,之后选择实用工具 => 终端,在终端输入如下命令,接口文件系统的锁定,并且重启电脑(cmd+r后,会进入另外一个选择系统启动的界面,在这个界面里面不要马上重新启动,先找到终端,在終端中输入csrutil disable):

$ csrutil disable
$ reboot

最后,我们执行前面两个命令,查看版本。

$ sudo mv /usr/bin/openssl /usr/bin/openssl_old
$ sudo ln -s /usr/local/Cellar/openssl/1.0.2p/bin/openssl /usr/bin/openssl
$ openssl version
OpenSSL 1.0.2p  14 Aug 2018

➜ which openssl
/usr/local/opt/openssl/bin/openssl

这样,我们的openssl升级成功了。不过,为了安全起见,我还是重新启动电脑,然后重新开启了csrutil。

csrutil enable
reboot

其他

历史遗留坑

上次编译有个小坑,是Hadoop源码里的历史遗留问题。

编译过程中会在$JAVA_HOME/Classes下找一个并不存在的jar包classes.jar,实际上需要的是$JAVA_HOME/lib/tools.jar,加个软链就好(注意mac加软链时与linux的区别)。

因此上次编译猴子已经修复了这个问题,这里就不复现了。具体可以看这篇mac下编译Hadoop

没有SKIPTESTS

没有skipTests的话,至少会在测试过程中以下错误:

 1234567891011 [ERROR] Failed to execute goal org.apache.maven.plugins:maven-surefire-plugin:2.16:test (default-test) on project hadoop-auth: There are test failures.[ERROR][ERROR] Please refer to /Volumes/Extended/Users/msh/IdealProjects/Git-Study/hadoop/hadoop-common-project/hadoop-auth/target/surefire-reports for the individual test results.
Results :
Tests in error:TestKerberosAuthenticator.testAuthenticationHttpClientPost:157 » ClientProtocolTestKerberosAuthenticator.testAuthenticationHttpClientPost:157 » ClientProtocol
Tests run: 92, Failures: 0, Errors: 2, Skipped: 0

可以暂时忽略这些相关错误,skipTests跳过测试,能追踪源码了解主要过程即可。

启动伪分布式“集群”

编译成功后,在hadoop-dist模块的target目录下,生成了各种发行版。选择hadoop-2.6.0.tar.gz,找个地方解压。

配置SSH 无密码链接

如果没有安装SSH,执行下面命令安装

# Install ssh
$ apt install ssh
# Check 22 port
$ netstat –nat

回到用户目录
即 /home/ubuntu (ubuntu 是当前用户的主目录)

$ cd ~

执行 ssh-keygen 命令, 一直回车。

$ ssh-keygen -t rsa

在当前用户目录下有个隐藏目录 .ssh 目录 ,进入该目录

$ cd .ssh

里面有 id_rsa.pub 文件, 将其赋值到 authorized_keys 文件

$ cp id_rsa.pub authorized_keys

然后再测试 SSH登录

当你尝试连接本机的时候就可以直接链接不需要登录。
如果你想直接链接其他VM, 只需要将其他机器上的 id_rsa.pub 添加到authorized_keys, 这样就可以直接ssh 链接过去而不需要输入密码。 这个在后面启动hadoop 时候就很有用,启动服务就不用输入密码。

配置IP

$ sudo vim /etc/hosts
# 通过此命令配置IP映射

第一个VM的 mster

10.0.1.6 slave1
10.0.1.10 slave2
10.0.1.12 master

第二个VM的 slave1

10.0.1.6 slave1
10.0.1.10 slave2
10.0.1.12 master

第三个VM的 slave2

10.0.1.6 slave1
10.0.1.10 slave2
10.0.1.12 master

配置Hadoop

首先解压 hadoop 文件

$ tar -xvf hadoop-2.7.3.tar.gz
咱们这个案例是把hadoop编译好的包拷出来使用
cd /Users/lynchgao/IdeaProjects/hadoop/hadoop-dist/target

解压完成之后进入 配置文件所在目录即 hadoop-2.7.3 目录下 etc/hadoop 内

cd /data/install/apache/hadoop-2.7.3/etc/hadoop/

接下来要配置以下几个文件:
core-site.xml、hdfs-site.xml、yarn-site.xml、mapred-site.xml、slaves、hadoop-env.sh、yarn-env.sh

hadoop-env.sh和yarn-env.sh 配置 jdk 环境

# The java implementation to use.
#export JAVA_HOME=${JAVA_HOME}
export JAVA_HOME=/home/ubuntu/developer/jdk1.8.0_121
core-site.xml
<!-- Put site-specific property overrides in this file. -->
<configuration>
    <property>
        <name>fs.default.name</name>
        <value>hdfs://master:9000</value>
    </property>
    <property>
        <name>hadoop.tmp.dir</name>
        <value>file:/home/ubuntu/developer/hadoop-2.7.3/tmp</value>
    </property>
    <property>
        <name>io.file.buffer.size</name>
        <value>131702</value>
    </property>
</configuration>
hdfs-site.xml

<configuration>
    <property>
        <name>dfs.namenode.name.dir</name>
        <value>file:/home/ubuntu/developer/hadoop-2.7.3/hdfs/name</value>
    </property>
    <property>
        <name>dfs.datanode.data.dir</name>
        <value>file:/home/ubuntu/developer/hadoop-2.7.3/hdfs/data</value>
    </property>
    <property>
        <name>dfs.replication</name>
        <value>2</value>
    </property>
    <property>
        <name>dfs.namenode.secondary.http-address</name>
        <value>master:9001</value>
    </property>
    <property>
        <name>dfs.webhdfs.enabled</name>
        <value>true</value>
    </property>
    <property>
        <name>dfs.namenode.datanode.registration.ip-hostname-check</name>
        <value>false</value>
    </property>
    <property>
        <name>dfs.permissions</name>
        <value>false</value>
    </property> 
</configuration>
yarn-site.xml
<configuration>

  <property>
    <name>yarn.nodemanager.aux-services</name>
    <value>mapreduce_shuffle</value>
  </property>
  <property>
    <name>yarn.nodemanager.auxservices.mapreduce.shuffle.class</name>
    <value>org.apache.hadoop.mapred.ShuffleHandler</value>
  </property>
  <property>
    <name>yarn.resourcemanager.address</name>
    <value>master:8032</value>
  </property>
  <property>
    <name>yarn.resourcemanager.scheduler.address</name>
    <value>master:8030</value>
  </property>
  <property>
    <name>yarn.resourcemanager.resource-tracker.address</name>
    <value>master:8031</value>
  </property>
  <property>
    <name>yarn.resourcemanager.admin.address</name>
    <value>master:8033</value>
  </property>
  <property>
    <name>yarn.resourcemanager.webapp.address</name>
    <value>master:8088</value>
  </property>
</configuration>
mapred-site.xml

默认没有这个文件 但是提供了个模板 mapred-site.xml.template
通过这个模板复制一个

$ cp etc/hadoop/mapred-site.xml.template etc/hadoop/mapred-site.xml

<configuration>
    <property>
        <name>mapreduce.framework.name</name>
        <value>yarn</value>
    </property>
    <property>
        <name>mapreduce.jobhistory.address</name>
        <value>master:10020</value>
    </property>
    <property>
        <name>mapreduce.jobhistory.webapp.address</name>
        <value>master:19888</value>
    </property>
</configuration>

slaves
slave1
slave2
将配置好的hadoop 文件夹复制给其他节点(slave1 和slave2)
scp -r /home/ubuntu/developer/hadoop-2.7.3 ubuntu@slave1:/home/ubuntu/developer/hadoop-2.7.3 
scp -r /home/ubuntu/developer/hadoop-2.7.3 ubuntu@slave2:/home/ubuntu/developer/hadoop-2.7.3 
运行启动Hadoop

1- 初始化hadoop(清空hdfs数据):

rm -rf /home/ubuntu/developer/hadoop-2.7.3/hdfs/*
rm -rf /home/ubuntu/developer/hadoop-2.7.3/tmp/*
/home/ubuntu/developer/hadoop-2.7.3/bin/hdfs namenode -format

2- 启动hdfs,yarn

/home/ubuntu/developer/hadoop-2.7.3/sbin/start-dfs.sh
/home/ubuntu/developer/hadoop-2.7.3/sbin/start-yarn.sh

3- 停止hdfs,yarn

/home/ubuntu/developer/hadoop-2.7.3/sbin/stop-dfs.sh
/home/ubuntu/developer/hadoop-2.7.3/sbin/stop-yarn.sh

4- 检查是否成功
在 master 终端敲 jps 命令

master-jps.png

在 slave 终端敲 jps 命令

slave-jps.png

或者在master 节点看 report

$ bin/hdfs dfsadmin -report

到此, hadoop 可以正常启动。

一些常用命令
#列出HDFS下的文件
hdfs dfs -ls 
#列出HDFS下某个文档中的文件
hdfs dfs -ls in 
#上传文件到指定目录并且重新命名,只有所有的DataNode都接收完数据才算成功
hdfs dfs -put test1.txt test2.txt 
#从HDFS获取文件并且重新命名为getin,
同put一样可操作文件也可操作目录
hdfs dfs -get in getin 
#删除指定文件从HDFS上
hdfs dfs -rmr out 
#查看HDFS上in目录的内容
hdfs dfs -cat in/* 
#查看HDFS的基本统计信息
hdfs dfsadmin -report 
#退出安全模式
hdfs dfsadmin -safemode leave 
#进入安全模式
hdfs dfsadmin -safemode enter 

运行WordCount官方例子

  1. 在 /home/ubuntu 下建立一个文件夹input, 并放几个txt文件在内
  2. 切换到 hadoop-2.7.3目录内
  3. 给hadoop创建一个 wc_input文件夹
$  bin/hdfs dfs -mkdir /wc_input
  1. 将 /home/ubuntu/input 内的文件传到hadoop /wc_input 内
$ bin/hdfs dfs –put /home/ubuntu/input*   /wc_input
$ bin/hadoop jar share/hadoop/mapreduce/hadoop-mapreduce-examples-2.7.3.jar wordcount /wc_input /wc_oput
  1. 查看结果
$ bin/hdfs dfs -ls /wc_output
$ bin/hdfs dfs -ls /wc_output/part-r-00000
  1. 在浏览器上查看
    http://your-floating-ip:50070/dfshealth.html
    但是在此之前可能需要开通端口,为了简便我在OpenStack上将所有端口开通。

tcp-ports.png

http://your-floating-ip:8088/cluster/scheduler

Flink 零基础实战教程:如何计算实时热门商品

From:http://wuchong.me/blog/2018/11/07/use-flink-calculate-hot-items/

上一篇入门教程中,我们已经能够快速构建一个基础的 Flink 程序了。本文会一步步地带领你实现一个更复杂的 Flink 应用程序:实时热门商品。在开始本文前我们建议你先实践一遍上篇文章,因为本文会沿用上文的my-flink-project项目框架。

通过本文你将学到:

  1. 如何基于 EventTime 处理,如何指定 Watermark
  2. 如何使用 Flink 灵活的 Window API
  3. 何时需要用到 State,以及如何使用
  4. 如何使用 ProcessFunction 实现 TopN 功能

实战案例介绍

本案例将实现一个“实时热门商品”的需求,我们可以将“实时热门商品”翻译成程序员更好理解的需求:每隔5分钟输出最近一小时内点击量最多的前 N 个商品。将这个需求进行分解我们大概要做这么几件事情:

  • 抽取出业务时间戳,告诉 Flink 框架基于业务时间做窗口
  • 过滤出点击行为数据
  • 按一小时的窗口大小,每5分钟统计一次,做滑动窗口聚合(Sliding Window)
  • 按每个窗口聚合,输出每个窗口中点击量前N名的商品

数据准备

这里我们准备了一份淘宝用户行为数据集(来自阿里云天池公开数据集,特别感谢)。本数据集包含了淘宝上某一天随机一百万用户的所有行为(包括点击、购买、加购、收藏)。数据集的组织形式和MovieLens-20M类似,即数据集的每一行表示一条用户行为,由用户ID、商品ID、商品类目ID、行为类型和时间戳组成,并以逗号分隔。关于数据集中每一列的详细描述如下:

列名称说明
用户ID整数类型,加密后的用户ID
商品ID整数类型,加密后的商品ID
商品类目ID整数类型,加密后的商品所属类目ID
行为类型字符串,枚举类型,包括(‘pv’, ‘buy’, ‘cart’, ‘fav’)
时间戳行为发生的时间戳,单位秒

你可以通过下面的命令下载数据集到项目的 resources 目录下:

$ cd my-flink-project/src/main/resources
$ curl https://raw.githubusercontent.com/wuchong/my-flink-project/master/src/main/resources/UserBehavior.csv > UserBehavior.csv

这里是否使用 curl 命令下载数据并不重要,你也可以使用 wget 命令或者直接访问链接下载数据。关键是,将数据文件保存到项目的 resources 目录下,方便应用程序访问。

编写程序

在 src/main/java/myflink 下创建 HotItems.java 文件:

package myflink;

public class HotItems {

public static void main(String[] args) throws Exception {

}
}

与上文一样,我们会一步步往里面填充代码。第一步仍然是创建一个 StreamExecutionEnvironment,我们把它添加到 main 函数中。

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 为了打印到控制台的结果不乱序,我们配置全局的并发为1,这里改变并发对结果正确性没有影响
env.setParallelism(1);

创建模拟数据源

在数据准备章节,我们已经将测试的数据集下载到本地了。由于是一个csv文件,我们将使用 CsvInputFormat 创建模拟数据源。

注:虽然一个流式应用应该是一个一直运行着的程序,需要消费一个无限数据源。但是在本案例教程中,为了省去构建真实数据源的繁琐,我们使用了文件来模拟真实数据源,这并不影响下文要介绍的知识点。这也是一种本地验证 Flink 应用程序正确性的常用方式。

我们先创建一个 UserBehavior 的 POJO 类(所有成员变量声明成public便是POJO类),强类型化后能方便后续的处理。

/** 用户行为数据结构 **/
public static class UserBehavior {
public long userId; // 用户ID
public long itemId; // 商品ID
public int categoryId; // 商品类目ID
public String behavior; // 用户行为, 包括(“pv”, “buy”, “cart”, “fav”)
public long timestamp; // 行为发生的时间戳,单位秒
}

接下来我们就可以创建一个 PojoCsvInputFormat 了, 这是一个读取 csv 文件并将每一行转成指定 POJO
类型(在我们案例中是 UserBehavior)的输入器。

// UserBehavior.csv 的本地文件路径
URL fileUrl = HotItems2.class.getClassLoader().getResource(“UserBehavior.csv”);
Path filePath = Path.fromLocalFile(new File(fileUrl.toURI()));
// 抽取 UserBehavior 的 TypeInformation,是一个 PojoTypeInfo
PojoTypeInfo<UserBehavior> pojoType = (PojoTypeInfo<UserBehavior>) TypeExtractor.createTypeInfo(UserBehavior.class);
// 由于 Java 反射抽取出的字段顺序是不确定的,需要显式指定下文件中字段的顺序
String[] fieldOrder = new String[]{“userId”, “itemId”, “categoryId”, “behavior”, “timestamp”};
// 创建 PojoCsvInputFormat
PojoCsvInputFormat<UserBehavior> csvInput = new PojoCsvInputFormat<>(filePath, pojoType, fieldOrder);

下一步我们用 PojoCsvInputFormat 创建输入源。

DataStream<UserBehavior> dataSource = env.createInput(csvInput, pojoType);

这就创建了一个 UserBehavior 类型的 DataStream

EventTime 与 Watermark

当我们说“统计过去一小时内点击量”,这里的“一小时”是指什么呢? 在 Flink 中它可以是指 ProcessingTime ,也可以是 EventTime,由用户决定。

  • ProcessingTime:事件被处理的时间。也就是由机器的系统时间来决定。
  • EventTime:事件发生的时间。一般就是数据本身携带的时间。

在本案例中,我们需要统计业务时间上的每小时的点击量,所以要基于 EventTime 来处理。那么如果让 Flink 按照我们想要的业务时间来处理呢?这里主要有两件事情要做。

第一件是告诉 Flink 我们现在按照 EventTime 模式进行处理,Flink 默认使用 ProcessingTime 处理,所以我们要显式设置下。

env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

第二件事情是指定如何获得业务时间,以及生成 Watermark。Watermark 是用来追踪业务事件的概念,可以理解成 EventTime 世界中的时钟,用来指示当前处理到什么时刻的数据了。由于我们的数据源的数据已经经过整理,没有乱序,即事件的时间戳是单调递增的,所以可以将每条数据的业务时间就当做 Watermark。这里我们用 AscendingTimestampExtractor 来实现时间戳的抽取和 Watermark 的生成。

注:真实业务场景一般都是存在乱序的,所以一般使用 BoundedOutOfOrdernessTimestampExtractor

DataStream<UserBehavior> timedData = dataSource
.assignTimestampsAndWatermarks(new AscendingTimestampExtractor<UserBehavior>() {
@Override
public long extractAscendingTimestamp(UserBehavior userBehavior) {
// 原始数据单位秒,将其转成毫秒
return userBehavior.timestamp * 1000;
}
});

这样我们就得到了一个带有时间标记的数据流了,后面就能做一些窗口的操作。

过滤出点击事件

在开始窗口操作之前,先回顾下需求“每隔5分钟输出过去一小时内点击量最多的前 N 个商品”。由于原始数据中存在点击、加购、购买、收藏各种行为的数据,但是我们只需要统计点击量,所以先使用 FilterFunction 将点击行为数据过滤出来。

DataStream<UserBehavior> pvData = timedData
.filter(new FilterFunction<UserBehavior>() {
@Override
public boolean filter(UserBehavior userBehavior) throws Exception {
// 过滤出只有点击的数据
return userBehavior.behavior.equals(“pv”);
}
});

窗口统计点击量

由于要每隔5分钟统计一次最近一小时每个商品的点击量,所以窗口大小是一小时,每隔5分钟滑动一次。即分别要统计 [09:00, 10:00), [09:05, 10:05), [09:10, 10:10)… 等窗口的商品点击量。是一个常见的滑动窗口需求(Sliding Window)。

DataStream<ItemViewCount> windowedData = pvData
.keyBy(“itemId”)
.timeWindow(Time.minutes(60), Time.minutes(5))
.aggregate(new CountAgg(), new WindowResultFunction());

我们使用.keyBy("itemId")对商品进行分组,使用.timeWindow(Time size, Time slide)对每个商品做滑动窗口(1小时窗口,5分钟滑动一次)。然后我们使用 .aggregate(AggregateFunction af, WindowFunction wf) 做增量的聚合操作,它能使用AggregateFunction提前聚合掉数据,减少 state 的存储压力。较之.apply(WindowFunction wf)会将窗口中的数据都存储下来,最后一起计算要高效地多。aggregate()方法的第一个参数用于

这里的CountAgg实现了AggregateFunction接口,功能是统计窗口中的条数,即遇到一条数据就加一。

/** COUNT 统计的聚合函数实现,每出现一条记录加一 */
public static class CountAgg implements AggregateFunction<UserBehavior, Long, Long> {

@Override
public Long createAccumulator() {
return 0L;
}

@Override
public Long add(UserBehavior userBehavior, Long acc) {
return acc + 1;
}

@Override
public Long getResult(Long acc) {
return acc;
}

@Override
public Long merge(Long acc1, Long acc2) {
return acc1 + acc2;
}
}

.aggregate(AggregateFunction af, WindowFunction wf) 的第二个参数WindowFunction将每个 key每个窗口聚合后的结果带上其他信息进行输出。我们这里实现的WindowResultFunction将主键商品ID,窗口,点击量封装成了ItemViewCount进行输出。

/** 用于输出窗口的结果 */
public static class WindowResultFunction implements WindowFunction<Long, ItemViewCount, Tuple, TimeWindow> {

@Override
public void apply(
Tuple key, // 窗口的主键,即 itemId
TimeWindow window, // 窗口
Iterable<Long> aggregateResult, // 聚合函数的结果,即 count 值
Collector<ItemViewCount> collector // 输出类型为 ItemViewCount
) throws Exception {
Long itemId = ((Tuple1<Long>) key).f0;
Long count = aggregateResult.iterator().next();
collector.collect(ItemViewCount.of(itemId, window.getEnd(), count));
}
}

/** 商品点击量(窗口操作的输出类型) */
public static class ItemViewCount {
public long itemId; // 商品ID
public long windowEnd; // 窗口结束时间戳
public long viewCount; // 商品的点击量

public static ItemViewCount of(long itemId, long windowEnd, long viewCount) {
ItemViewCount result = new ItemViewCount();
result.itemId = itemId;
result.windowEnd = windowEnd;
result.viewCount = viewCount;
return result;
}
}

现在我们得到了每个商品在每个窗口的点击量的数据流。

TopN 计算最热门商品

为了统计每个窗口下最热门的商品,我们需要再次按窗口进行分组,这里根据ItemViewCount中的windowEnd进行keyBy()操作。然后使用 ProcessFunction 实现一个自定义的 TopN 函数 TopNHotItems 来计算点击量排名前3名的商品,并将排名结果格式化成字符串,便于后续输出。

DataStream<String> topItems = windowedData
.keyBy(“windowEnd”)
.process(new TopNHotItems(3)); // 求点击量前3名的商品

ProcessFunction 是 Flink 提供的一个 low-level API,用于实现更高级的功能。它主要提供了定时器 timer 的功能(支持EventTime或ProcessingTime)。本案例中我们将利用 timer 来判断何时收齐了某个 window 下所有商品的点击量数据。由于 Watermark 的进度是全局的,

在 processElement 方法中,每当收到一条数据(ItemViewCount),我们就注册一个 windowEnd+1 的定时器(Flink 框架会自动忽略同一时间的重复注册)。windowEnd+1 的定时器被触发时,意味着收到了windowEnd+1的 Watermark,即收齐了该windowEnd下的所有商品窗口统计值。我们在 onTimer() 中处理将收集的所有商品及点击量进行排序,选出 TopN,并将排名信息格式化成字符串后进行输出。

这里我们还使用了 ListState<ItemViewCount> 来存储收到的每条 ItemViewCount 消息,保证在发生故障时,状态数据的不丢失和一致性。ListState 是 Flink 提供的类似 Java List 接口的 State API,它集成了框架的 checkpoint 机制,自动做到了 exactly-once 的语义保证。

/** 求某个窗口中前 N 名的热门点击商品,key 为窗口时间戳,输出为 TopN 的结果字符串 */
public static class TopNHotItems extends KeyedProcessFunction<Tuple, ItemViewCount, String> {

private final int topSize;

public TopNHotItems(int topSize) {
this.topSize = topSize;
}

// 用于存储商品与点击数的状态,待收齐同一个窗口的数据后,再触发 TopN 计算
private ListState<ItemViewCount> itemState;

@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
// 状态的注册
ListStateDescriptor<ItemViewCount> itemsStateDesc = new ListStateDescriptor<>(
“itemState-state”,
ItemViewCount.class);
itemState = getRuntimeContext().getListState(itemsStateDesc);
}

@Override
public void processElement(
ItemViewCount input,
Context context,
Collector<String> collector) throws Exception {

// 每条数据都保存到状态中
itemState.add(input);
// 注册 windowEnd+1 的 EventTime Timer, 当触发时,说明收齐了属于windowEnd窗口的所有商品数据
context.timerService().registerEventTimeTimer(input.windowEnd + 1);
}

@Override
public void onTimer(
long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception {
// 获取收到的所有商品点击量
List<ItemViewCount> allItems = new ArrayList<>();
for (ItemViewCount item : itemState.get()) {
allItems.add(item);
}
// 提前清除状态中的数据,释放空间
itemState.clear();
// 按照点击量从大到小排序
allItems.sort(new Comparator<ItemViewCount>() {
@Override
public int compare(ItemViewCount o1, ItemViewCount o2) {
return (int) (o2.viewCount – o1.viewCount);
}
});
// 将排名信息格式化成 String, 便于打印
StringBuilder result = new StringBuilder();
result.append(“====================================\n”);
result.append(“时间: “).append(new Timestamp(timestamp-1)).append(“\n”);
for (int i=0;i<topSize;i++) {
ItemViewCount currentItem = allItems.get(i);
// No1: 商品ID=12224 浏览量=2413
result.append(“No”).append(i).append(“:”)
.append(” 商品ID=”).append(currentItem.itemId)
.append(” 浏览量=”).append(currentItem.viewCount)
.append(“\n”);
}
result.append(“====================================\n\n”);

out.collect(result.toString());
}
}

打印输出

最后一步我们将结果打印输出到控制台,并调用env.execute执行任务。

topItems.print();
env.execute(“Hot Items Job”);

运行程序

直接运行 main 函数,就能看到不断输出的每个时间点的热门商品ID。

总结

本文的完整代码可以通过 GitHub 访问到。本文通过实现一个“实时热门商品”的案例,学习和实践了 Flink 的多个核心概念和 API 用法。包括 EventTime、Watermark 的使用,State 的使用,Window API 的使用,以及 TopN 的实现。希望本文能加深大家对 Flink 的理解,帮助大家解决实战上遇到的问题。

如何从小白成长为 Apache Committer?

from:http://wuchong.me/blog/2019/02/12/how-to-become-apache-committer/

过去三年,我一直在为 Apache Flink 开源项目贡献,也在两年前成为了 Flink Committer。我在 Flink 社区成长的过程中受到过社区大神的很多指导,如今也有很多人在向我咨询如何能参与到开源社区中,如何能成为 Committer。这也是本文写作的初衷,希望能帮助更多人参与到开源社区中。

本文将以 Apache Flink 为例,介绍如何参与社区贡献,如何成为 Apache Committer。

我们先来了解下一个小白在 Apache 社区中的成长路线是什么样的。

Apache 社区的成长路线

Apache 软件基金会(Apache Software Foundation,ASF)在开源软件界大名鼎鼎。ASF 能保证旗下 200 多个项目的社区活动运转良好,得益于其独特的组织架构和良好的制度。

用户 (User): 通过使用社区的项目构建自己的业务架构的开发者都是Apache的用户。

贡献者 (Contributor): 帮助解答用户的问题,贡献代码或文档,在邮件列表中参与讨论设计和方案的都是 Contributor。

提交者 (Committer): 贡献多了以后,就有可能经过 PMC 的提议和投票,邀请你成为 Committer。成为 Committer 也就意味着正式加入 Apache了,不但拥有相应项目的写入权限还有 apache.org 的专属邮箱。成为 Committer 的一个福利是可以免费使用 JetBrains 家的全套付费产品,包括全宇宙最好用的 IntelliJ IDEA (这是笔者当初成为 Committer 的最大动力之一)。

PMC: Committer 再往上走就是 PMC,这个必须由现有 PMC 成员提名。PMC 主要负责保证开源项目的社区活动都能运转良好,包括 Roadmap 的制定,版本的发布,Committer 的提拔。

ASF Member 相当于是基金会的“股东”,有董事会选举的投票权,也可以参与董事会竞选。ASF Member 也有权利决定是否接受一个新项目,主要关注 Apache 基金会本身的发展。ASF Member 通常要从 Contributor, Committer 等这些角色起步,逐步通过行动证明自己后,才可能被接受成为ASF Member。

Apache 社区的成员分类,权限由低到高,像极了我们在公司的晋升路线,一步步往上走。

如何成为 Committer

成为 Apache Committer 并没有一个确切的标准,但是 Committer 的候选人一般都是长期活跃的贡献者。成为 Committer 并没有要求必须有巨大的架构改进贡献,或者多少行的代码贡献。贡献文档、参与邮件列表的讨论、帮助回答问题都是很重要的增加贡献,提升影响力的方式。

所以如何成为 Committer 的问题归根结底还是如何参与贡献,以及如何开始贡献的问题。

成为 Committer 的关键在于持之以恒。不同项目,项目所处的不同阶段,成为 Committer 的难度都不太一样,笔者之前也持续贡献了近一年才有幸成为了 Committer。但是只要能坚持,保持活跃,持续贡献,为项目做的贡献被大家认可后,成为 Committer 也只是时间问题了。

如何参与贡献

参与贡献 Apache 项目有许多途径,包括提Bug,提需求,参与讨论,贡献代码和文档等等。

  1. 订阅开发者邮件列表:dev@flink.apache.org。关注社区动向,参与设计和方案的讨论,大胆地提出你的想法!
  2. 订阅用户邮件列表:user@flink.apache.orguser-zh@flink.apache.org。帮助解答用户问题。
  3. 提Bug和提需求:Flink 使用 JIRA 来管理issue。打开 Flink JIRA 并登录,点击菜单栏中的红色 “Create“ 按钮,创建一个issue。
  4. 贡献代码:可以在 Flink JIRA 中寻找自己感兴趣的 issue,并提交一个 Pull Request(下文会介绍提交一个 PR 的全过程)。如果是新手,建议从 “starter” 标记的 issue 入手。笔者在 Flink 项目的第一个 issue 就是修复了打印日志中的错别字,非常适合于熟悉贡献流程,而且当天就 merge 了,成就感满满。当熟悉了流程之后,建议专注贡献某个模块(如 SQL, DataStream, Runtime),有利于积累影响力。
  5. 贡献文档:文档是一个项目很重要的部分,可以在 JIRA 中寻找并解决文档类的 issue。熟悉中英文的同学可以参与贡献中文翻译,可以搜索 “chinese-translation” 的 issue
  6. 代码审查:Flink 每天都会在 GitHub 上收到很多 Pull Request 。帮助 review 代码也是对社区很重要的贡献。
  7. 还有很多参与贡献的方式,比如帮助测试RC版本,写Flink相关的博客等等。

如何提交第一个 Pull Request

1. 订阅 dev 邮件列表

  1. 用自己的邮箱给 dev-subscribe@flink.apache.org 发送任意邮件。
  2. 收到官方确认邮件。
  3. 回复该邮件,内容随意,表示确认即可。
  4. 确认后,会收到一封欢迎邮件,表示订阅成功。

2. 在 dev 邮件列表中申请 JIRA Contributor 权限

Apache 项目都使用 JIRA 来管理 issue,认领 issue 需要 Contributor 权限。申请权限之前,先在 JIRA 注册一个账号。然后发邮件到 dev 邮件列表(dev@flink.apache.org)申请 Contributor 权限,邮件内容要带上 JIRA 账号 id。例如:

Hi,

I want to contribute to Apache Flink.
Would you please give me the contributor permission?
My JIRA ID is xxxx.

社区的PMC大佬们看到后会第一时间给你权限的。

3. 在 JIRA 中挑选 issue

申请到权限后,就可以在 JIRA 中认领 issue了,推荐从简单的开始做起。例如中文翻译的issue。认领的方式非常简单,在 issue 页面右侧 Assignee 处点击 “Assign to me”,如下图所示。

Tip: 如果感兴趣的 issue 已经被别人认领,但是 Assignee 迟迟没有开始开发。那么可以在 issue 下面友好地询问下是否有时间开发,是否介意重新认领该 issue。

4. 本地开发代码

认领了 issue 后建议尽快开始开发,本地的开发环境建议使用 IntelliJ IDEA。在开发过程中有几个注意点:

  • 分支开发。 从最新的 master 分支切出一个开发分支用于 issue 开发。
  • 单 PR 单改动。 不要在 PR 中混入不相关的改动,不做无关的代码优化,不做无关的代码格式化。如果真有必要,可以另开 JIRA 解决。
  • 保证新代码能被单元测试覆盖到。如果原本的测试用例,无法覆盖到,则需要自己编写对应的单元测试。

5. 创建 pull request

在提交之前,先更新 master 分支,并通过 git rebase -i master 命令,将自己的提交置顶(也可以通过 IDEA > VCS > Git > Rebase 可视化界面来做 rebase)。同时保证自己的提交信息中只有一个 commit,commit message 遵循规范格式。Commit 格式是 “[FLINK-XXX] [YYY] ZZZ”,其中 XXX 是 JIRA ID,YYY 是 component 名字,ZZZ 是 JIRA title。例如 [FLINK-5385] [core] Add a helper method to create Row object

要创建一个 pull request,需要将这个开发分支推到自己 fork 的 Flink 仓库中。并在 fork 仓库页面(https://github.com/<your-user-name>/flink)点击 “Compare & pull request” 或者 “New pull request” 按钮,开始创建一个 PR。确保 base 是 apache/flink master,head 是刚刚的开发分支。另外在编辑框中按提示提供尽可能丰富的PR描述,然后点击 “Create pull request”。

6. 解决 code review 反馈的问题和建议

提交 PR 后会收到修改建议,只需要为这些修改 追加commit 就行,commit message 随意。注意不要 rebase/squash commits。追加 commit 能方便地看出距离上次的改动,而 rebase/squash 会导致 reviewer 不得不从头到尾重新看一遍 diff。

7. Committer merge PR

当 PR 获得 Committer 的 +1 认可后,就可以等待被 merge 到主干分支了。merge 的工作会由 Committer 来完成,Committer 会将你的分支再次 rebase 到最新的master 之上,并将多个 commits 合并成一个,完善 commit 信息,做最后的测试检查,最后会 merge 到 master 。

此时在 Flink 仓库的 commit 历史中就能看到自己的提交信息了。恭喜你成为了 code contributor!

总结

在我看来,成为 Apache Committer 的小窍门有几点:

  1. 把项目看成自己的事情,自发地,有激情地去做贡献
  2. 保持活跃,持续贡献,耐心和平常心都很重要
  3. 专注一个模块,吃透该模块的源码和原理,成为某个模块的专家
  4. 提升个人的代码品位和质量,让他人信任你的代码
  5. 勇敢地在邮件列表中参与讨论

希望通过本文能让大家了解到,成为 Contributor 并没有想象中那么难,成为 Committer 也不是不可能,只要怀有开源的热情,找到自己感兴趣的项目,在开源贡献中成长,持之以恒,付出总会有回报的。

成为 Apache Committer 不仅仅是一种光环和荣誉,更多的是一种责任,代表着社区的信任,期盼着你能为社区做更多的贡献。所以成为 Committer 远不是终点,而是一个更高起点,毕竟 Committer 之上还有 PMC 呢 ;-)。

关注微信公众号 爱解决,更多大牛在线帮你解决问题

十分钟带你了解 Apache Flink 核心技术

Apache Flink 介绍

Apache Flink 是近年来越来越流行的一款开源大数据计算引擎,它同时支持了批处理和流处理,也能用来做一些基于事件的应用。使用官网的一句话来介绍 Flink 就是 “Stateful Computations Over Streams”

首先 Flink 是一个纯流式的计算引擎,它的基本数据模型是数据流。流可以是无边界的无限流,即一般意义上的流处理。也可以是有边界的有限流,这样就是批处理。因此 Flink 用一套架构同时支持了流处理和批处理。其次,Flink 的一个优势是支持有状态的计算。如果处理一个事件(或一条数据)的结果只跟事件本身的内容有关,称为无状态处理;反之结果还和之前处理过的事件有关,称为有状态处理。稍微复杂一点的数据处理,比如说基本的聚合,数据流之间的关联都是有状态处理。

Apache Flink 之所以能越来越受欢迎,我们认为离不开它最重要的四个基石:Checkpoint、State、Time、Window。

首先是Checkpoint机制,这是 Flink 最重要的一个特性。Flink 基于 Chandy-Lamport 算法实现了分布式一致性的快照,从而提供了 exactly-once 的语义。在 Flink 之前的流计算系统(如 Strom,Samza)都没有很好地解决 exactly-once 的问题。提供了一致性的语义之后,Flink 为了让用户在编程时能够更轻松、更容易地去管理状态,引入了托管状态(managed state)并提供了 API 接口,让用户使用起来感觉就像在用 Java 的集合类一样。除此之外,Flink 还实现了 watermark 的机制,解决了基于事件时间处理时的数据乱序和数据迟到的问题。最后,流计算中的计算一般都会基于窗口来计算,所以 Flink 提供了一套开箱即用的窗口操作,包括滚动窗口、滑动窗口、会话窗口,还支持非常灵活的自定义窗口以满足特殊业务的需求。

在 Flink 1.0.0 时期,加入了 State API,即 ValueState、ReducingState、ListState 等等。State API 可以认为是 Flink 里程碑式的创新,它能够让用户像使用 Java 集合一样地使用 Flink State,却能够自动享受到状态的一致性保证,不会因为故障而丢失状态。包括后来 Apache Beam 的 State API 也从中借鉴了很多。

在 Flink 1.1.0 时期,支持了 Session Window 以及迟到数据容忍的功能。

在 Flink 1.2.0 时期,提供了 ProcessFunction,这是一个 Lower-level 的API,用于实现更高级更复杂的功能。它除了能够注册各种类型的 State 外,还支持注册定时器(支持 EventTime 和 ProcessingTime),常用于开发一些基于事件、基于时间的应用程序。

在 Flink 1.3.0 时期,提供了 Side Output 功能。算子的输出一般只有一种输出类型,但是有些时候可能需要输出另外的类型,比如除了输出主流外,还希望把一些异常数据、迟到数据以侧边流的形式进行输出,并分别交给下游不同节点进行处理。简而言之,Side Output 支持了多路输出的功能。

在 Flink 1.5.0 时期,加入了BroadcastState。BroadcastState是对 State API 的一个扩展。它用来存储上游被广播过来的数据,这个 operator 的每个并发上存的BroadcastState里面的数据都是一模一样的,因为它是从上游广播而来的。基于这种State可以比较好地去解决 CEP 中的动态规则的功能,以及 SQL 中不等值Join的场景。

在 Flink 1.6.0 时期,提供了State TTL功能、DataStream Interval Join功能。State TTL实现了在申请某个State时候可以在指定一个生命周期参数(TTL),指定该state过了多久之后需要被系统自动清除。在这个版本之前,如果用户想要实现这种状态清理操作需要使用ProcessFunction注册一个Timer,然后利用Timer的回调手动把这个State清除。从该版本开始,Flink框架可以基于TTL原生地解决这件事情。另外 DataStream Interval Join 功能也叫做 区间Join。例如左流的每一条数据去Join右流前后5分钟之内的数据,这种就是5分钟的区间Join。

在 Flink 1.0.0 时期,Table API (结构化数据处理API)和 CEP(复杂事件处理API)这两个框架被首次加入到仓库中。Table API 是一种结构化的高级 API,支持 Java 语言和 Scala 语言,类似于 Spark 的 DataFrame API。但是当时社区对于 SQL 的需求很大,而 SQL 和 Table API 非常相近,他们都是一种处理结构化数据的语言,实现上可以共用很多内容。所以在 Flink 1.1.0里面,社区基于Apache Calcite对整个 Table 模块做了重构,使得同时支持了 Table API 和 SQL 并共用了大部分代码。

在 Flink 1.2.0 时期,社区在Table API和SQL上支持丰富的内置窗口操作,包括Tumbling Window、Sliding Window、Session Window。

在 Flink 1.3.0 时期,社区首次提出了Dynamic Table这个概念,借助Dynamic Table,流和批之间可以相互进行转换。流可以是一张表,表也可以是一张流,这是流批统一的基础之一。其中Retraction机制是实现Dynamic Table的基础,基于Retraction才能够正确地实现多级Aggregate、多级Join,才能够保证流式 SQL 的语义与结果的正确性。另外,在该版本中还支持了 CEP 算子的可伸缩容(即改变并发)。

在 Flink 1.5.0 时期,在 Table API 和 SQL 上支持了Join操作,包括无限流的 Join 和带窗口的 Join。还添加了 SQL CLI 支持。SQL CLI 提供了一个类似Shell命令的对话框,可以交互式执行查询。

Checkpoint机制在Flink很早期的时候就已经支持,是Flink一个很核心的功能,Flink 社区也一直努力提升 Checkpoint 和 Recovery 的效率。

在 Flink 1.0.0 时期,提供了 RocksDB 状态后端的支持,在这个版本之前所有的状态数据只能存在进程的内存里面,JVM 内存是固定大小的,随着数据越来越多总会发生 FullGC 和 OOM 的问题,所以在生产环境中很难应用起来。如果想要存更多数据、更大的State就要用到 RocksDB。RocksDB是一款基于文件的嵌入式数据库,它会把数据存到磁盘,同时又提供高效的读写性能。所以使用RocksDB不会发生OOM这种事情。

在 Flink 1.1.0 时期,支持了 RocksDB Snapshot 的异步化。在之前的版本,RocksDB 的 Snapshot 过程是同步的,它会阻塞主数据流的处理,很影响吞吐量。在支持异步化之后,吞吐量得到了极大的提升。

在 Flink 1.2.0 时期,通过引入KeyGroup的机制,支持了 KeyedState 和 OperatorState 的可扩缩容。也就是支持了对带状态的流计算任务改变并发的功能。

在 Flink 1.3.0 时期,支持了 Incremental Checkpoint (增量检查点)机制。Incemental Checkpoint 的支持标志着 Flink 流计算任务正式达到了生产就绪状态。增量检查点是每次只将本次 checkpoint 期间新增的状态快照并持久化存储起来。一般流计算任务,GB 级别的状态,甚至 TB 级别的状态是非常常见的,如果每次都把全量的状态都刷到分布式存储中,这个效率和网络代价是很大的。如果每次只刷新增的数据,效率就会高很多。在这个版本里面还引入了细粒度的recovery的功能,细粒度的recovery在做恢复的时候,只需要恢复失败节点的联通子图,不用对整个 Job 进行恢复,这样便能够提高恢复效率。

在 Flink 1.5.0 时期,引入了本地状态恢复的机制。因为基于checkpoint机制,会把State持久化地存储到某个分布式存储,比如HDFS,当发生 failover 的时候需要重新把数据从远程HDFS再下载下来,如果这个状态特别大那么下载耗时就会较长,failover 恢复所花的时间也会拉长。本地状态恢复机制会提前将状态文件在本地也备份一份,当Job发生failover之后,恢复时可以在本地直接恢复,不需从远程HDFS重新下载状态文件,从而提升了恢复的效率。

在 Flink 1.2.0 时期,提供了Async I/O功能。Async I/O 是阿里巴巴贡献给社区的一个呼声非常高的特性,主要目的是为了解决与外部系统交互时网络延迟成为了系统瓶颈的问题。例如,为了关联某些字段需要查询外部 HBase 表,同步的方式是每次查询的操作都是阻塞的,数据流会被频繁的I/O请求卡住。当使用异步I/O之后就可以同时地发起N个异步查询的请求,不会阻塞主数据流,这样便提升了整个job的吞吐量,提升CPU利用率。

在 Flink 1.3.0 时期,引入了HistoryServer的模块。HistoryServer主要功能是当job结束以后,会把job的状态以及信息都进行归档,方便后续开发人员做一些深入排查。

在 Flink 1.4.0 时期,提供了端到端的 exactly-once 的语义保证。Exactly-once 是指每条输入的数据只会作用在最终结果上有且只有一次,即使发生软件或硬件的故障,不会有丢数据或者重复计算发生。而在该版本之前,exactly-once 保证的范围只是 Flink 应用本身,并不包括输出给外部系统的部分。在 failover 时,这就有可能写了重复的数据到外部系统,所以一般会使用幂等的外部系统来解决这个问题。在 Flink 1.4 的版本中,Flink 基于两阶段提交协议,实现了端到端的 exactly-once 语义保证。内置支持了 Kafka 的端到端保证,并提供了 TwoPhaseCommitSinkFunction 供用于实现自定义外部存储的端到端 exactly-once 保证。

在 Flink 1.5.0 时期,Flink 发布了新的部署模型和处理模型(FLIP6)。新部署模型的开发工作已经持续了很久,该模型的实现对Flink核心代码改动特别大,可以说是自 Flink 项目创建以来,Runtime 改动最大的一次。简而言之,新的模型可以在YARN, MESOS调度系统上更好地动态分配资源、动态释放资源,并实现更高的资源利用率,还有提供更好的作业之间的隔离。

除了 FLIP6 的改进,在该版本中,还对网站栈做了重构。重构的原因是在老版本中,上下游多个 task 之间的通信会共享同一个 TCP connection,导致某一个 task 发生反压时,所有共享该连接的 task 都会被阻塞,反压的粒度是 TCP connection 级别的。为了改进反压机制,Flink应用了在解决网络拥塞时一种经典的流控方法——基于Credit的流量控制。使得流控的粒度精细到具体某个 task 级别,有效缓解了反压对吞吐量的影响。

总结

Flink 同时支持了流处理和批处理,目前流计算的模型已经相对比较成熟和领先,也经历了各个公司大规模生产的验证。社区在接下来将继续加强流计算方面的性能和功能,包括对 Flink SQL 扩展更丰富的功能和引入更多的优化。另一方面也将加大力量提升批处理、机器学习等生态上的能力。

关注微信公众号 爱解决,更多技术大牛帮你解决问题!