Flink 零基础实战教程:如何计算实时热门商品

From:http://wuchong.me/blog/2018/11/07/use-flink-calculate-hot-items/

上一篇入门教程中,我们已经能够快速构建一个基础的 Flink 程序了。本文会一步步地带领你实现一个更复杂的 Flink 应用程序:实时热门商品。在开始本文前我们建议你先实践一遍上篇文章,因为本文会沿用上文的my-flink-project项目框架。

通过本文你将学到:

  1. 如何基于 EventTime 处理,如何指定 Watermark
  2. 如何使用 Flink 灵活的 Window API
  3. 何时需要用到 State,以及如何使用
  4. 如何使用 ProcessFunction 实现 TopN 功能

实战案例介绍

本案例将实现一个“实时热门商品”的需求,我们可以将“实时热门商品”翻译成程序员更好理解的需求:每隔5分钟输出最近一小时内点击量最多的前 N 个商品。将这个需求进行分解我们大概要做这么几件事情:

  • 抽取出业务时间戳,告诉 Flink 框架基于业务时间做窗口
  • 过滤出点击行为数据
  • 按一小时的窗口大小,每5分钟统计一次,做滑动窗口聚合(Sliding Window)
  • 按每个窗口聚合,输出每个窗口中点击量前N名的商品

数据准备

这里我们准备了一份淘宝用户行为数据集(来自阿里云天池公开数据集,特别感谢)。本数据集包含了淘宝上某一天随机一百万用户的所有行为(包括点击、购买、加购、收藏)。数据集的组织形式和MovieLens-20M类似,即数据集的每一行表示一条用户行为,由用户ID、商品ID、商品类目ID、行为类型和时间戳组成,并以逗号分隔。关于数据集中每一列的详细描述如下:

列名称说明
用户ID整数类型,加密后的用户ID
商品ID整数类型,加密后的商品ID
商品类目ID整数类型,加密后的商品所属类目ID
行为类型字符串,枚举类型,包括(‘pv’, ‘buy’, ‘cart’, ‘fav’)
时间戳行为发生的时间戳,单位秒

你可以通过下面的命令下载数据集到项目的 resources 目录下:

$ cd my-flink-project/src/main/resources
$ curl https://raw.githubusercontent.com/wuchong/my-flink-project/master/src/main/resources/UserBehavior.csv > UserBehavior.csv

这里是否使用 curl 命令下载数据并不重要,你也可以使用 wget 命令或者直接访问链接下载数据。关键是,将数据文件保存到项目的 resources 目录下,方便应用程序访问。

编写程序

在 src/main/java/myflink 下创建 HotItems.java 文件:

package myflink;

public class HotItems {

public static void main(String[] args) throws Exception {

}
}

与上文一样,我们会一步步往里面填充代码。第一步仍然是创建一个 StreamExecutionEnvironment,我们把它添加到 main 函数中。

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 为了打印到控制台的结果不乱序,我们配置全局的并发为1,这里改变并发对结果正确性没有影响
env.setParallelism(1);

创建模拟数据源

在数据准备章节,我们已经将测试的数据集下载到本地了。由于是一个csv文件,我们将使用 CsvInputFormat 创建模拟数据源。

注:虽然一个流式应用应该是一个一直运行着的程序,需要消费一个无限数据源。但是在本案例教程中,为了省去构建真实数据源的繁琐,我们使用了文件来模拟真实数据源,这并不影响下文要介绍的知识点。这也是一种本地验证 Flink 应用程序正确性的常用方式。

我们先创建一个 UserBehavior 的 POJO 类(所有成员变量声明成public便是POJO类),强类型化后能方便后续的处理。

/** 用户行为数据结构 **/
public static class UserBehavior {
public long userId; // 用户ID
public long itemId; // 商品ID
public int categoryId; // 商品类目ID
public String behavior; // 用户行为, 包括(“pv”, “buy”, “cart”, “fav”)
public long timestamp; // 行为发生的时间戳,单位秒
}

接下来我们就可以创建一个 PojoCsvInputFormat 了, 这是一个读取 csv 文件并将每一行转成指定 POJO
类型(在我们案例中是 UserBehavior)的输入器。

// UserBehavior.csv 的本地文件路径
URL fileUrl = HotItems2.class.getClassLoader().getResource(“UserBehavior.csv”);
Path filePath = Path.fromLocalFile(new File(fileUrl.toURI()));
// 抽取 UserBehavior 的 TypeInformation,是一个 PojoTypeInfo
PojoTypeInfo<UserBehavior> pojoType = (PojoTypeInfo<UserBehavior>) TypeExtractor.createTypeInfo(UserBehavior.class);
// 由于 Java 反射抽取出的字段顺序是不确定的,需要显式指定下文件中字段的顺序
String[] fieldOrder = new String[]{“userId”, “itemId”, “categoryId”, “behavior”, “timestamp”};
// 创建 PojoCsvInputFormat
PojoCsvInputFormat<UserBehavior> csvInput = new PojoCsvInputFormat<>(filePath, pojoType, fieldOrder);

下一步我们用 PojoCsvInputFormat 创建输入源。

DataStream<UserBehavior> dataSource = env.createInput(csvInput, pojoType);

这就创建了一个 UserBehavior 类型的 DataStream

EventTime 与 Watermark

当我们说“统计过去一小时内点击量”,这里的“一小时”是指什么呢? 在 Flink 中它可以是指 ProcessingTime ,也可以是 EventTime,由用户决定。

  • ProcessingTime:事件被处理的时间。也就是由机器的系统时间来决定。
  • EventTime:事件发生的时间。一般就是数据本身携带的时间。

在本案例中,我们需要统计业务时间上的每小时的点击量,所以要基于 EventTime 来处理。那么如果让 Flink 按照我们想要的业务时间来处理呢?这里主要有两件事情要做。

第一件是告诉 Flink 我们现在按照 EventTime 模式进行处理,Flink 默认使用 ProcessingTime 处理,所以我们要显式设置下。

env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

第二件事情是指定如何获得业务时间,以及生成 Watermark。Watermark 是用来追踪业务事件的概念,可以理解成 EventTime 世界中的时钟,用来指示当前处理到什么时刻的数据了。由于我们的数据源的数据已经经过整理,没有乱序,即事件的时间戳是单调递增的,所以可以将每条数据的业务时间就当做 Watermark。这里我们用 AscendingTimestampExtractor 来实现时间戳的抽取和 Watermark 的生成。

注:真实业务场景一般都是存在乱序的,所以一般使用 BoundedOutOfOrdernessTimestampExtractor

DataStream<UserBehavior> timedData = dataSource
.assignTimestampsAndWatermarks(new AscendingTimestampExtractor<UserBehavior>() {
@Override
public long extractAscendingTimestamp(UserBehavior userBehavior) {
// 原始数据单位秒,将其转成毫秒
return userBehavior.timestamp * 1000;
}
});

这样我们就得到了一个带有时间标记的数据流了,后面就能做一些窗口的操作。

过滤出点击事件

在开始窗口操作之前,先回顾下需求“每隔5分钟输出过去一小时内点击量最多的前 N 个商品”。由于原始数据中存在点击、加购、购买、收藏各种行为的数据,但是我们只需要统计点击量,所以先使用 FilterFunction 将点击行为数据过滤出来。

DataStream<UserBehavior> pvData = timedData
.filter(new FilterFunction<UserBehavior>() {
@Override
public boolean filter(UserBehavior userBehavior) throws Exception {
// 过滤出只有点击的数据
return userBehavior.behavior.equals(“pv”);
}
});

窗口统计点击量

由于要每隔5分钟统计一次最近一小时每个商品的点击量,所以窗口大小是一小时,每隔5分钟滑动一次。即分别要统计 [09:00, 10:00), [09:05, 10:05), [09:10, 10:10)… 等窗口的商品点击量。是一个常见的滑动窗口需求(Sliding Window)。

DataStream<ItemViewCount> windowedData = pvData
.keyBy(“itemId”)
.timeWindow(Time.minutes(60), Time.minutes(5))
.aggregate(new CountAgg(), new WindowResultFunction());

我们使用.keyBy("itemId")对商品进行分组,使用.timeWindow(Time size, Time slide)对每个商品做滑动窗口(1小时窗口,5分钟滑动一次)。然后我们使用 .aggregate(AggregateFunction af, WindowFunction wf) 做增量的聚合操作,它能使用AggregateFunction提前聚合掉数据,减少 state 的存储压力。较之.apply(WindowFunction wf)会将窗口中的数据都存储下来,最后一起计算要高效地多。aggregate()方法的第一个参数用于

这里的CountAgg实现了AggregateFunction接口,功能是统计窗口中的条数,即遇到一条数据就加一。

/** COUNT 统计的聚合函数实现,每出现一条记录加一 */
public static class CountAgg implements AggregateFunction<UserBehavior, Long, Long> {

@Override
public Long createAccumulator() {
return 0L;
}

@Override
public Long add(UserBehavior userBehavior, Long acc) {
return acc + 1;
}

@Override
public Long getResult(Long acc) {
return acc;
}

@Override
public Long merge(Long acc1, Long acc2) {
return acc1 + acc2;
}
}

.aggregate(AggregateFunction af, WindowFunction wf) 的第二个参数WindowFunction将每个 key每个窗口聚合后的结果带上其他信息进行输出。我们这里实现的WindowResultFunction将主键商品ID,窗口,点击量封装成了ItemViewCount进行输出。

/** 用于输出窗口的结果 */
public static class WindowResultFunction implements WindowFunction<Long, ItemViewCount, Tuple, TimeWindow> {

@Override
public void apply(
Tuple key, // 窗口的主键,即 itemId
TimeWindow window, // 窗口
Iterable<Long> aggregateResult, // 聚合函数的结果,即 count 值
Collector<ItemViewCount> collector // 输出类型为 ItemViewCount
) throws Exception {
Long itemId = ((Tuple1<Long>) key).f0;
Long count = aggregateResult.iterator().next();
collector.collect(ItemViewCount.of(itemId, window.getEnd(), count));
}
}

/** 商品点击量(窗口操作的输出类型) */
public static class ItemViewCount {
public long itemId; // 商品ID
public long windowEnd; // 窗口结束时间戳
public long viewCount; // 商品的点击量

public static ItemViewCount of(long itemId, long windowEnd, long viewCount) {
ItemViewCount result = new ItemViewCount();
result.itemId = itemId;
result.windowEnd = windowEnd;
result.viewCount = viewCount;
return result;
}
}

现在我们得到了每个商品在每个窗口的点击量的数据流。

TopN 计算最热门商品

为了统计每个窗口下最热门的商品,我们需要再次按窗口进行分组,这里根据ItemViewCount中的windowEnd进行keyBy()操作。然后使用 ProcessFunction 实现一个自定义的 TopN 函数 TopNHotItems 来计算点击量排名前3名的商品,并将排名结果格式化成字符串,便于后续输出。

DataStream<String> topItems = windowedData
.keyBy(“windowEnd”)
.process(new TopNHotItems(3)); // 求点击量前3名的商品

ProcessFunction 是 Flink 提供的一个 low-level API,用于实现更高级的功能。它主要提供了定时器 timer 的功能(支持EventTime或ProcessingTime)。本案例中我们将利用 timer 来判断何时收齐了某个 window 下所有商品的点击量数据。由于 Watermark 的进度是全局的,

在 processElement 方法中,每当收到一条数据(ItemViewCount),我们就注册一个 windowEnd+1 的定时器(Flink 框架会自动忽略同一时间的重复注册)。windowEnd+1 的定时器被触发时,意味着收到了windowEnd+1的 Watermark,即收齐了该windowEnd下的所有商品窗口统计值。我们在 onTimer() 中处理将收集的所有商品及点击量进行排序,选出 TopN,并将排名信息格式化成字符串后进行输出。

这里我们还使用了 ListState<ItemViewCount> 来存储收到的每条 ItemViewCount 消息,保证在发生故障时,状态数据的不丢失和一致性。ListState 是 Flink 提供的类似 Java List 接口的 State API,它集成了框架的 checkpoint 机制,自动做到了 exactly-once 的语义保证。

/** 求某个窗口中前 N 名的热门点击商品,key 为窗口时间戳,输出为 TopN 的结果字符串 */
public static class TopNHotItems extends KeyedProcessFunction<Tuple, ItemViewCount, String> {

private final int topSize;

public TopNHotItems(int topSize) {
this.topSize = topSize;
}

// 用于存储商品与点击数的状态,待收齐同一个窗口的数据后,再触发 TopN 计算
private ListState<ItemViewCount> itemState;

@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
// 状态的注册
ListStateDescriptor<ItemViewCount> itemsStateDesc = new ListStateDescriptor<>(
“itemState-state”,
ItemViewCount.class);
itemState = getRuntimeContext().getListState(itemsStateDesc);
}

@Override
public void processElement(
ItemViewCount input,
Context context,
Collector<String> collector) throws Exception {

// 每条数据都保存到状态中
itemState.add(input);
// 注册 windowEnd+1 的 EventTime Timer, 当触发时,说明收齐了属于windowEnd窗口的所有商品数据
context.timerService().registerEventTimeTimer(input.windowEnd + 1);
}

@Override
public void onTimer(
long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception {
// 获取收到的所有商品点击量
List<ItemViewCount> allItems = new ArrayList<>();
for (ItemViewCount item : itemState.get()) {
allItems.add(item);
}
// 提前清除状态中的数据,释放空间
itemState.clear();
// 按照点击量从大到小排序
allItems.sort(new Comparator<ItemViewCount>() {
@Override
public int compare(ItemViewCount o1, ItemViewCount o2) {
return (int) (o2.viewCount – o1.viewCount);
}
});
// 将排名信息格式化成 String, 便于打印
StringBuilder result = new StringBuilder();
result.append(“====================================\n”);
result.append(“时间: “).append(new Timestamp(timestamp-1)).append(“\n”);
for (int i=0;i<topSize;i++) {
ItemViewCount currentItem = allItems.get(i);
// No1: 商品ID=12224 浏览量=2413
result.append(“No”).append(i).append(“:”)
.append(” 商品ID=”).append(currentItem.itemId)
.append(” 浏览量=”).append(currentItem.viewCount)
.append(“\n”);
}
result.append(“====================================\n\n”);

out.collect(result.toString());
}
}

打印输出

最后一步我们将结果打印输出到控制台,并调用env.execute执行任务。

topItems.print();
env.execute(“Hot Items Job”);

运行程序

直接运行 main 函数,就能看到不断输出的每个时间点的热门商品ID。

总结

本文的完整代码可以通过 GitHub 访问到。本文通过实现一个“实时热门商品”的案例,学习和实践了 Flink 的多个核心概念和 API 用法。包括 EventTime、Watermark 的使用,State 的使用,Window API 的使用,以及 TopN 的实现。希望本文能加深大家对 Flink 的理解,帮助大家解决实战上遇到的问题。

十分钟带你了解 Apache Flink 核心技术

Apache Flink 介绍

Apache Flink 是近年来越来越流行的一款开源大数据计算引擎,它同时支持了批处理和流处理,也能用来做一些基于事件的应用。使用官网的一句话来介绍 Flink 就是 “Stateful Computations Over Streams”

首先 Flink 是一个纯流式的计算引擎,它的基本数据模型是数据流。流可以是无边界的无限流,即一般意义上的流处理。也可以是有边界的有限流,这样就是批处理。因此 Flink 用一套架构同时支持了流处理和批处理。其次,Flink 的一个优势是支持有状态的计算。如果处理一个事件(或一条数据)的结果只跟事件本身的内容有关,称为无状态处理;反之结果还和之前处理过的事件有关,称为有状态处理。稍微复杂一点的数据处理,比如说基本的聚合,数据流之间的关联都是有状态处理。

Apache Flink 之所以能越来越受欢迎,我们认为离不开它最重要的四个基石:Checkpoint、State、Time、Window。

首先是Checkpoint机制,这是 Flink 最重要的一个特性。Flink 基于 Chandy-Lamport 算法实现了分布式一致性的快照,从而提供了 exactly-once 的语义。在 Flink 之前的流计算系统(如 Strom,Samza)都没有很好地解决 exactly-once 的问题。提供了一致性的语义之后,Flink 为了让用户在编程时能够更轻松、更容易地去管理状态,引入了托管状态(managed state)并提供了 API 接口,让用户使用起来感觉就像在用 Java 的集合类一样。除此之外,Flink 还实现了 watermark 的机制,解决了基于事件时间处理时的数据乱序和数据迟到的问题。最后,流计算中的计算一般都会基于窗口来计算,所以 Flink 提供了一套开箱即用的窗口操作,包括滚动窗口、滑动窗口、会话窗口,还支持非常灵活的自定义窗口以满足特殊业务的需求。

在 Flink 1.0.0 时期,加入了 State API,即 ValueState、ReducingState、ListState 等等。State API 可以认为是 Flink 里程碑式的创新,它能够让用户像使用 Java 集合一样地使用 Flink State,却能够自动享受到状态的一致性保证,不会因为故障而丢失状态。包括后来 Apache Beam 的 State API 也从中借鉴了很多。

在 Flink 1.1.0 时期,支持了 Session Window 以及迟到数据容忍的功能。

在 Flink 1.2.0 时期,提供了 ProcessFunction,这是一个 Lower-level 的API,用于实现更高级更复杂的功能。它除了能够注册各种类型的 State 外,还支持注册定时器(支持 EventTime 和 ProcessingTime),常用于开发一些基于事件、基于时间的应用程序。

在 Flink 1.3.0 时期,提供了 Side Output 功能。算子的输出一般只有一种输出类型,但是有些时候可能需要输出另外的类型,比如除了输出主流外,还希望把一些异常数据、迟到数据以侧边流的形式进行输出,并分别交给下游不同节点进行处理。简而言之,Side Output 支持了多路输出的功能。

在 Flink 1.5.0 时期,加入了BroadcastState。BroadcastState是对 State API 的一个扩展。它用来存储上游被广播过来的数据,这个 operator 的每个并发上存的BroadcastState里面的数据都是一模一样的,因为它是从上游广播而来的。基于这种State可以比较好地去解决 CEP 中的动态规则的功能,以及 SQL 中不等值Join的场景。

在 Flink 1.6.0 时期,提供了State TTL功能、DataStream Interval Join功能。State TTL实现了在申请某个State时候可以在指定一个生命周期参数(TTL),指定该state过了多久之后需要被系统自动清除。在这个版本之前,如果用户想要实现这种状态清理操作需要使用ProcessFunction注册一个Timer,然后利用Timer的回调手动把这个State清除。从该版本开始,Flink框架可以基于TTL原生地解决这件事情。另外 DataStream Interval Join 功能也叫做 区间Join。例如左流的每一条数据去Join右流前后5分钟之内的数据,这种就是5分钟的区间Join。

在 Flink 1.0.0 时期,Table API (结构化数据处理API)和 CEP(复杂事件处理API)这两个框架被首次加入到仓库中。Table API 是一种结构化的高级 API,支持 Java 语言和 Scala 语言,类似于 Spark 的 DataFrame API。但是当时社区对于 SQL 的需求很大,而 SQL 和 Table API 非常相近,他们都是一种处理结构化数据的语言,实现上可以共用很多内容。所以在 Flink 1.1.0里面,社区基于Apache Calcite对整个 Table 模块做了重构,使得同时支持了 Table API 和 SQL 并共用了大部分代码。

在 Flink 1.2.0 时期,社区在Table API和SQL上支持丰富的内置窗口操作,包括Tumbling Window、Sliding Window、Session Window。

在 Flink 1.3.0 时期,社区首次提出了Dynamic Table这个概念,借助Dynamic Table,流和批之间可以相互进行转换。流可以是一张表,表也可以是一张流,这是流批统一的基础之一。其中Retraction机制是实现Dynamic Table的基础,基于Retraction才能够正确地实现多级Aggregate、多级Join,才能够保证流式 SQL 的语义与结果的正确性。另外,在该版本中还支持了 CEP 算子的可伸缩容(即改变并发)。

在 Flink 1.5.0 时期,在 Table API 和 SQL 上支持了Join操作,包括无限流的 Join 和带窗口的 Join。还添加了 SQL CLI 支持。SQL CLI 提供了一个类似Shell命令的对话框,可以交互式执行查询。

Checkpoint机制在Flink很早期的时候就已经支持,是Flink一个很核心的功能,Flink 社区也一直努力提升 Checkpoint 和 Recovery 的效率。

在 Flink 1.0.0 时期,提供了 RocksDB 状态后端的支持,在这个版本之前所有的状态数据只能存在进程的内存里面,JVM 内存是固定大小的,随着数据越来越多总会发生 FullGC 和 OOM 的问题,所以在生产环境中很难应用起来。如果想要存更多数据、更大的State就要用到 RocksDB。RocksDB是一款基于文件的嵌入式数据库,它会把数据存到磁盘,同时又提供高效的读写性能。所以使用RocksDB不会发生OOM这种事情。

在 Flink 1.1.0 时期,支持了 RocksDB Snapshot 的异步化。在之前的版本,RocksDB 的 Snapshot 过程是同步的,它会阻塞主数据流的处理,很影响吞吐量。在支持异步化之后,吞吐量得到了极大的提升。

在 Flink 1.2.0 时期,通过引入KeyGroup的机制,支持了 KeyedState 和 OperatorState 的可扩缩容。也就是支持了对带状态的流计算任务改变并发的功能。

在 Flink 1.3.0 时期,支持了 Incremental Checkpoint (增量检查点)机制。Incemental Checkpoint 的支持标志着 Flink 流计算任务正式达到了生产就绪状态。增量检查点是每次只将本次 checkpoint 期间新增的状态快照并持久化存储起来。一般流计算任务,GB 级别的状态,甚至 TB 级别的状态是非常常见的,如果每次都把全量的状态都刷到分布式存储中,这个效率和网络代价是很大的。如果每次只刷新增的数据,效率就会高很多。在这个版本里面还引入了细粒度的recovery的功能,细粒度的recovery在做恢复的时候,只需要恢复失败节点的联通子图,不用对整个 Job 进行恢复,这样便能够提高恢复效率。

在 Flink 1.5.0 时期,引入了本地状态恢复的机制。因为基于checkpoint机制,会把State持久化地存储到某个分布式存储,比如HDFS,当发生 failover 的时候需要重新把数据从远程HDFS再下载下来,如果这个状态特别大那么下载耗时就会较长,failover 恢复所花的时间也会拉长。本地状态恢复机制会提前将状态文件在本地也备份一份,当Job发生failover之后,恢复时可以在本地直接恢复,不需从远程HDFS重新下载状态文件,从而提升了恢复的效率。

在 Flink 1.2.0 时期,提供了Async I/O功能。Async I/O 是阿里巴巴贡献给社区的一个呼声非常高的特性,主要目的是为了解决与外部系统交互时网络延迟成为了系统瓶颈的问题。例如,为了关联某些字段需要查询外部 HBase 表,同步的方式是每次查询的操作都是阻塞的,数据流会被频繁的I/O请求卡住。当使用异步I/O之后就可以同时地发起N个异步查询的请求,不会阻塞主数据流,这样便提升了整个job的吞吐量,提升CPU利用率。

在 Flink 1.3.0 时期,引入了HistoryServer的模块。HistoryServer主要功能是当job结束以后,会把job的状态以及信息都进行归档,方便后续开发人员做一些深入排查。

在 Flink 1.4.0 时期,提供了端到端的 exactly-once 的语义保证。Exactly-once 是指每条输入的数据只会作用在最终结果上有且只有一次,即使发生软件或硬件的故障,不会有丢数据或者重复计算发生。而在该版本之前,exactly-once 保证的范围只是 Flink 应用本身,并不包括输出给外部系统的部分。在 failover 时,这就有可能写了重复的数据到外部系统,所以一般会使用幂等的外部系统来解决这个问题。在 Flink 1.4 的版本中,Flink 基于两阶段提交协议,实现了端到端的 exactly-once 语义保证。内置支持了 Kafka 的端到端保证,并提供了 TwoPhaseCommitSinkFunction 供用于实现自定义外部存储的端到端 exactly-once 保证。

在 Flink 1.5.0 时期,Flink 发布了新的部署模型和处理模型(FLIP6)。新部署模型的开发工作已经持续了很久,该模型的实现对Flink核心代码改动特别大,可以说是自 Flink 项目创建以来,Runtime 改动最大的一次。简而言之,新的模型可以在YARN, MESOS调度系统上更好地动态分配资源、动态释放资源,并实现更高的资源利用率,还有提供更好的作业之间的隔离。

除了 FLIP6 的改进,在该版本中,还对网站栈做了重构。重构的原因是在老版本中,上下游多个 task 之间的通信会共享同一个 TCP connection,导致某一个 task 发生反压时,所有共享该连接的 task 都会被阻塞,反压的粒度是 TCP connection 级别的。为了改进反压机制,Flink应用了在解决网络拥塞时一种经典的流控方法——基于Credit的流量控制。使得流控的粒度精细到具体某个 task 级别,有效缓解了反压对吞吐量的影响。

总结

Flink 同时支持了流处理和批处理,目前流计算的模型已经相对比较成熟和领先,也经历了各个公司大规模生产的验证。社区在接下来将继续加强流计算方面的性能和功能,包括对 Flink SQL 扩展更丰富的功能和引入更多的优化。另一方面也将加大力量提升批处理、机器学习等生态上的能力。

关注微信公众号 爱解决,更多技术大牛帮你解决问题!

FLINK 官方文档解读-数据流编程模型

上来一个图,就是抽象模型,越靠近底部的越底层

  • 最底层的抽象是指提供了基本的状态流,被嵌入到DataStreamAPI通过处理方法访问,它允许用户随意的一个或者多个流处理事件,并且用一致的容错机制,并且,用户可以注册时间和处理的回调方法,允许程序处理更为复杂的计算
  • Not done,To be continue

Flink 官方文档解读-导读

该文章,写给想看开源项目,不知道怎么看,或者不会看的朋友

访问Flink官方地址:https://flink.apache.org/ ,可以通过Apache官方 www.apache.org 点击Project或者直接搜索“Flink”

我的一个阅读习惯,从Documentation开始,然后找到最新的一个Release版本,当前是1.7

访问页面,我们会看到上面的页面,其中,一些知识点,都被加上了超链接,后续我们单独介绍,先来看看这一页说了什么内容。

一上来,先说了一些文档的编辑时间。说了一下Flink是一个分布式流处理和批处理的开源平台,Flink核心是一个流式处理引擎,他提供了数据分发,通信,在分布式计算数据流上进行数据容错,在留引擎上构建了批处理,包含本地迭代支持,内存管理,编程优化。

第一步,介绍一些概念,数据流编程模型分布式运行环境,它会帮助你理解其他章节,包括安装和编程,所以强烈建议你读一下。

初识

编程指南,你需要阅读我们的手册,包括基本API概念和去学习一下,DataStreaming APIDataSet API,如何去写一个流式程序。

部署

在你准备发布一个产品时,请先阅读产品清单

其他资源

一些讲座可以关注flink官方网站,或者看 YouTube

培训教材

博客data Artisans官方博客