基于Clickhouse分析平台:设计篇

背景

在伴鱼,服务器每天收集的用户行为日志达到上亿条,我们希望能够充分利用这些日志,了解用户行为模式,回答以下问题:

  • 最近三个月,来自哪个渠道的用户注册量最高?
  • 最近一周,北京地区的,发生过绘本浏览行为的用户,按照年龄段分布的情况如何?
  • 最近一周,注册过伴鱼绘本的用户,7日留存率如何?有什么变化趋势?
  • 最近一周,用户下单的转化路径上,各环节的转化率如何?

为了回答这些问题,事件分析平台应运而生。本文将首先介绍平台的功能,随后讨论平台在架构上的一些思考。

功能

总的来说,为了回答各种商业分析问题,事件分析平台支持基于事件的指标统计、属性分组、条件筛选等功能的查询。其中,事件指用户行为,例如登录、浏览伴鱼绘本、购买付费绘本等。更具体一些,事件分析平台支持三类分析:「事件分析」,「漏斗分析」,和「留存分析」。

事件分析

事件分析是指,用户指定一系列条件,查询目的指标,用于回答一个具体的分析问题。这些条件包括:

  • 事件类型:指用户行为,采集自埋点数据;例如登录伴鱼绘本,购买付费绘本
  • 指标:指标分为两类,基础指标和自定义指标
    • 基础指标:总次数(pv),总用户数(uv),人均次数(pv/uv)
    • 自定义指标:事件属性 + 计算类型,例如 「用户下单金额」的「总和/均值/最大值」
  • 过滤条件:用于筛选查询所关心的用户群体
  • 维度分组:基于分组,可以进行分组之间的对比
  • 时间范围:指定事件发生的时间范围

让我们举个具体的例子。我们希望回答「最近一周,在北京地区,不同年龄段的用户在下单一对一课程时,下单金额的平均数对比」这个问题。这个问题可以很直观地拆解为下图所示的事件分析,其中:

  • 事件类型 = 下单一对一课程
  • 指标 = 下单金额的平均数
  • 过滤条件 = 北京地区
  • 维度分组 = 按照年龄段分组
  • 时间范围 = 最近一周
event_analysis_flow


图注:事件分析创建流程

event_analysis


图注:事件分析界面

漏斗分析

漏斗分析用于分析多步骤过程中,每一步的转化与流失情况。

例如,伴鱼绘本用户的完整购买流程可能包含以下步骤:登录 app -> 浏览绘本 -> 购买付费绘本。我们可以将这个流程设置为一个漏斗,分析整体以及每一步转化情况。

此外,漏斗分析还需要定义「窗口期」,整个流程必须发生在窗口期内,才算一次成功转化。和事件分析类似,漏斗分析也支持选择维度分组和时间范围。

funnel_flow


图注:漏斗分析创建流程

funnel


图注:漏斗分析界面

留存分析

在留存分析中,用户定义初始事件和后续事件,并计算在发生初始事件后的第 N 天,发生后续事件的比率。这个比率能很好地衡量伴鱼用户的粘性高低。

在下图的例子中,我们希望了解伴鱼绘本 app 是否足够吸引用户,因此我们设置初始事件为登录 app,后续事件为浏览绘本,留存周期为 7 天,进行留存分析。

retention_flow


图注:留存分析创建流程

retention


图注:留存分析界面

架构

在架构上,事件分析平台分为两个模块,如下图所示:

  • 数据写入:埋点日志从客户端或者服务端被上报后,经过 Kafka 消息队列,由 Flink 完成 ETL,然后写入 ClickHouse。
  • 分析查询:用户通过前端页面,进行事件、条件、维度的勾选,后端将它们拼接为 SQL 语句,从 ClickHouse 中查询数据,展示给前端页面。
design


图注:总架构图

不难看出,ClickHouse 是构成事件分析平台的核心组件。我们为了确保平台的性能,围绕 ClickHouse 的使用进行了细致的调研,回答了以下三个问题:

  • 如何使用 ClickHouse 存储事件数据?
  • 如何高效写入 ClickHouse?
  • 如何高效查询 ClickHouse?

如何使用 ClickHouse 存储事件数据?

事件分析平台的数据来源有两大类:来源于埋点日志的用户行为数据,和来源于「用户画像平台」的用户属性数据。本文只介绍埋点日志数据的存储,对「用户画像平台」感兴趣的同学,可以期待一下我们后续的技术文章。

在进行埋点日志的存储选型前,我们首先明确了几个核心需求:

  • 支持海量数据的存储。当前,伴鱼每天产生的埋点日志在亿级别。
  • 支持实时聚合查询。由于产品和运营同学会使用事件分析平台来探索多种用户行为模式,分析引擎必须能灵活且高效地完成各种聚合。

ClickHouse 在海量数据存储场景被广泛使用,高效支持各类聚合查询,配套有成熟和活跃的社区,促使我们最终选择 ClickHouse 作为存储引擎。

根据我们对真实埋点数据的测试,亿级数据的简单查询,例如 PV 和 UV,都能在 1 秒内返回结果;对于留存分析、漏斗分析这类的复杂查询,可以在 10 秒内返回结果。

「存在哪」的问题解决后,接下来回答「怎么存」的问题。ClickHouse 的列式存储结构非常适合存储大宽表,以支持高效查询。但是,在事件分析平台这个场景下,我们还需要支持「自定义属性」的存储,这时「大宽表」的存储方式就不尽理想。

所谓「自定义属性」,即埋点日志中一些事件所独有的属性,例如:「下单一对一课程」这一事件在上报时,会带上「订单金额」这个很多其它事件所没有的属性。如果为了支持「下单一对一课程」这个事件的存储,就需要改变 ClickHouse 的表结构,新增一列,这将使得表结构的维护成本极高,因为每个新事件都可能附带多个「自定义属性」。

为了解决这个问题,我们将频繁变动的自定义属性统一存储在一个 Map 中,将基本不变的公共属性存为列,使之兼具大宽表方案的高效性,和 Map 方案的灵活性。

如何高效写入 ClickHouse?

在设计 ClickHouse 的部署方案时,我们采用了业界常用的读写分离模式:写本地表,读分布式表。在写入侧,分为3个分片,每个分片都有双重备份。

由于事件分析的绝大多数查询,都是以用户为单位,为了提高查询效率,我们在写入时,将数据按照 user_id 均匀分片,写入到不同的本地表中。如下图所示:

import_to_clickhouse


图注:将埋点数据写入到 ClickHouse

之所以不写分布式表,是因为我们使用大量数据对分布式表进行写入测试时,遇到过几个问题:

  1. Too many parts error:分布式表所在节点接收到数据后,需要按照 sharding_key 将数据拆分为多个 parts,再转发到其它节点,导致短期内 parts 过多,并且增加了 merge 的压力;
  2. 写放大:分布式表所在节点,如果在短时间内被写入大量数据,会产生大量临时数据,导致写放大。

如何高效查询 ClickHouse?

我们可以使用 ClickHouse 的内置函数,轻松实现事件分析平台所需要提供的事件分析、漏斗分析和留存分析三个功能。

事件分析可以用最朴素的 SQL 语句实现。例如,最近一周,北京地区的,发生过绘本浏览行为的用户,按照年龄段的分布,可以表述为:

1
2
3
4
5
6
7
8
9
10
SELECT
    count(1) as cnt,
    toDate(toStartOfDay(toDateTime(event_ms))) as date,
age
FROM event_analytics
WHERE
event = “view_picture_book_home_page” AND
city = “beijing” AND
event_ms >= 1613923200000 AND event_ms <= 1614528000000
GROUP BY (date, age);

留存分析使用 ClickHouse 提供的 retention 函数。例如,注册伴鱼绘本后,计算浏览绘本的次日留存、7日留存可以表述为:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
SELECT
    sum(ret[1]) AS original,
    sum(ret[2]) AS next_day_ret,
    sum(ret[3]) AS seven_day_ret
FROM
(SELECT
  user_id,
  retention(
      event = “register_picture_book” AND toDate(event_ms) = toDate(‘2021-03-01’),
      event = “view_picture_book” AND toDate(event_ms) = toDate(‘2021-03-02’),
      event = “view_picture_book” AND toDate(event_ms) = toDate(‘2021-03-08’)
      ) as ret
FROM event_analytics
WHERE  
    event_ms >= 1614528000000 AND event_ms <= 1615132800000
GROUP BY user_id);

漏斗分析使用 ClickHouse 提供的 windowFunnel 函数。例如,在 浏览绘本 -> 购买绘本,窗口期为2天的这个转化路径上,转化率的计算可以被表达为:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
SELECT
    array( sumIf(count, level >= 1), sumIf(count, level >= 2) ) AS funnel_uv,
FROM (
    SELECT
        level,
        count() AS count
    FROM (
            SELECT
                uid,
                windowFunnel(172800000)(
                    event_ms, event = “view_picture_book” AND event_ms >= 1613923200000 AND event_ms <= 1614009600000, event = “buy_picture_book”) AS level
            FROM
                event_analytics
            WHERE
                event_ms >= 1613923200000 AND event_ms <= 1614182400000
            GROUP BY uid
        )
    GROUP BY level
)

总结

在结束功能梳理和架构设计后,我们开始了事件分析平台有序的建设。我们期待在大规模使用后,与大家分享事件分析平台的下一步演进。

参考文献

[1] Fast and Reliable Schema-Agnostic Log Analytics Platform. https://eng.uber.com/logging/

[2] How ClickHouse saved our data. https://mux.com/blog/from-russia-with-love-how-clickhouse-saved-our-data/

[3] 最快开源 OLAP 引擎!ClickHouse 在头条的技术演进 https://www.infoq.cn/article/ntwo*yr2ujwlmp8wcxoe

from:https://tech.ipalfish.com/blog/2021/06/21/event-analytics-design/ 如有侵权,请通知作者删除

伴鱼用户画像平台:设计篇

1. 背景

在伴鱼,我们努力了解我们的用户,旨在为用户提供更好的服务。APP 内容推荐,需要根据用户特征来决定推送内容;促销活动,需要针对不同的用户群体设计不同的活动方案;线上产品售卖,也需要了解用户喜好,才能更好地把产品卖给用户。

为此,我们搭建了用户画像平台。本文将首先探讨平台的功能需求、标签体系定位,随后介绍平台的架构和具体功能实现。

2. 功能

用户画像平台把重点放在了分析场景,使用方主要是公司各业务线的运营和数据分析同学。平台在一期主要支持以下几个功能。

  1. 定义标签:标签是用于描述用户的一个维度,例如「注册设备类型」、「常驻城市」、「年龄段」等。
  2. 人群圈选:指定一组用户标签和其对应的标签值,得到符合条件的用户人群。例如,找出「城市为北京,且设备类型为苹果」的用户。
  3. 用户画像:对于人群圈选结果,查看该人群的标签分布。例如,查看「城市为北京,且设备类型为苹果」的用户的年龄段分布。

3. 标签体系

确定完用户画像平台的使用场景和主要功能,我们再来倒推看用户标签体系。用户标签可以从两个维度进行分类:标签的实时性,和标签的值类型。

首先看标签的实时性。考虑到用户画像平台的主要功能是「人群圈选」和「用户画像查看」,而这两个功能都不需要非常高的实时性,那么实时标签的收益就不大,T+1 的非实时标签完全能满足数据分析和运营同学的需求。

再来看标签的值类型,即标签是枚举和还是非枚举的。枚举标签,顾名思义,就是指标签值可枚举的标签,例如 device_type, network_type, country, city 等,这类标签往往在人群圈选方面有较大作用。而非枚举标签,就是标签值可无限递增的标签,比如 active_days,register_date 等,这类标签大多会用来做用户信息展示。考虑到「人群圈选」是各业务线最迫切的需求,我们在一期舍弃了非枚举标签这个功能。

综上,我们就确定了用户画像平台的一期标签体系为非实时的枚举标签,主要满足「人群圈选」和「人群画像」这两个查询功能。

4. 架构与实现

在架构上,用户画像平台分为两个模块:数据写入,分析查询。

4.1 数据写入

数据写入模块为人群圈选和用户画像功能提供数据支持。具体流程分为两步。

第一步,大数据团队完成每日标签计算后,得到一张 Hive 大宽表,如下表所示。表的每一行代表一个用户,每一列代表一个标签。

user_idcountrycitydevice_typeage
1ChinaBeijingIOS10
4ChinaShanghaiAndroid5
10United StatesNew YorkAndroid12

第二步,大数据团队将大宽表的数据「转置」后批量写入 ClickHouse,如下表所示。表中的每一行代表一个标签实例(即标签和标签值的组合),例如「city = Beijing」。此外,这一行同时存储了具有该标签值的所有用户的集合,服务于分析查询模块。

tagtag_itemusers
countryChina{1,2,3,4,5}
countryUnited States{6,7,8,9,10}
cityBeijing{1,2,3}
cityShanghai{4}

4.2 分析查询

分析查询模块则实现了人群圈选和用户画像的查询。用户通过前端页面,进行标签、标签值、组合方式的勾选,后端将它们拼接为 SQL 语句,从 ClickHouse 中查询数据,展示给前端页面。例如,在下图中,我们圈选了属于北京、深圳、上海的苹果用户,并且按照年龄、网络运营商、网络类型、性别查看人群的分布情况。

analytics

不难看出,ClickHouse 在用户画像平台的数据存储和计算中起到了最关键的作用。下面,让我们一起来回答几个问题:

  • 如何设计 ClickHouse 的表结构?
  • 如何使用 ClickHouse 进行人群圈选?
  • 如何使用 ClickHouse 查看人群画像?

4.2.1 设计 ClickHouse 表结构

根据使用场景,我们设计 ClickHouse 表结构如下:

1
2
3
4
5
6
7
8
9
10
11
12
CREATE TABLE analytics.user_tag_bitmap_local
(
`tag` String,
`tag_item` String,
`p_day` Date,
`origin_user` UInt64,
`users` AggregateFunction(groupBitmap, UInt64) MATERIALIZED bitmapBuild([origin_user])
)
ENGINE = ReplicatedAggregatingMergeTree(‘/clickhouse/tables/{shard}/analytics/user_tag_bitmap_local’, ‘{replica}’)
PARTITION BY toYYYYMMDD(p_day)
ORDER BY (tag, tag_item)
SETTINGS index_granularity = 8192;

首先,看表的名称。表名包含 _local 后缀,即这是一个本地表,也存在一个对应的分布式表。我们使用「写本地表,读分布式表」的读写分离模式,具体原因可以参考《伴鱼事件分析平台:设计篇》的「如何高效写入 ClickHouse」一节。

然后,看表包含的字段。

  • tag 代表标签, tag_item 代表标签值。因为在标签的圈选查询中,经常有 tag = "city" AND tag_item = "beijing" 的语句,我们将 (tag, tag_item) 作为主键,以提高查询效率。
  • p_day 代表数据写入的日期,也作为 ClickHouse 的分片键。因为每天的标签数据都是全量导入,p_day 不仅可以用来区分标签版本,也方便我们批量删除历史数据。
  • origin_user 是单个用户 ID。然而,相比单个用户的标签情况,我们更关心具有特定标签的用户人群。因此,我们使用 users 字段来表达根据 origin_user 聚合得到用户人群。为此,我们使用了 AggregatingMergeTree,它在原始数据插入后自动触发聚合,将具有相同 (tag, tag_item, p_day) 的数据聚合为一行。

最后,看表的存储引擎,我们使用了 ReplicatedAggregatingMergeTree 引擎。前文中我们提到 Aggregating 是用来聚合数据,而 Replicated 则是用来创建数据副本,对应双副本存储模式。

4.2.2 使用 ClickHouse 进行人群圈选

组合不同标签,圈选出最适合某个活动的用户人群里,是运营同学们较为关心的步骤。例如,我们想找出城市为北京、性别为女的用户。

analytics


图注:用户人群查询

我们只需首先找到城市为北京的用户人群(用 bitmap 表示),然后找到性别为女的用户人群,然后对它们进行 AND 操作即可。具体查询如下:

1
2
3
4
5
6
7
8
9
10
11
12
WITH
(
SELECT groupBitmapMergeState(users)
FROM user_tag_bitmap_all
WHERE p_day = ‘2021_06_01’ AND tag = ‘city’ AND tag_item = ‘beijing’
) AS user_group_1,
(
SELECT groupBitmapMergeState(users)
FROM user_tag_bitmap_all
WHERE p_day = ‘2021_06_01’ AND tag = ‘gender’ AND tag_item = ‘female’
) AS user_group_2
SELECT bitmapToArray(bitmapAnd(user_group_1, user_group_2))

其中,groupBitmapMergeState 函数对通过 WHERE 筛除得到的任意个数的 bitmap (users) 进行 AND 操作,而 bitmapAnd 只能对两个 bitmap 进行 AND 操作。

4.2.3 使用 ClickHouse 查看用户画像

再回到刚刚的例子,圈选得到「北京的女性用户」这一人群后,我们想知道,人群中有多少人在用苹果设备,而有多少人在用安卓。这类标签分布信息,就是我们所说的用户画像。

analytics


图注:用户画像查询

这个查询的实现同样是直观的。

  1. 我们采用和上一节一样的步骤,得到「北京的女性用户」这一 bitmap。
  2. 对人群进行分组,分别得到「设备为苹果的用户」和「设备为安卓的用户」的 bitmap。如果存在除了苹果和安卓之外的设备,我们这一步会得到更多的 bitmap。
  3. 将步骤 2 中的每一个 bitmap 与步骤 1 中的 bitmap 进行 AND 操作,就能得到「北京的女性用户」基于「设备类型」的分布情况。

具体的实现见下面的查询。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
WITH
(
SELECT groupBitmapMergeState(users)
FROM user_tag_bitmap_all
WHERE p_day = ‘2021_06_01’ AND tag = ‘city’ AND tag_item = ‘beijing’
) AS user_group_1,
(
SELECT groupBitmapMergeState(users)
FROM user_tag_bitmap_all
WHERE p_day = ‘2021_06_01’ AND tag = ‘gender’ AND tag_item = ‘female’
) AS user_group_2,
(
SELECT bitmapAnd(user_group_1, user_group_2)
) AS filter_users
SELECT
bitmapCardinality(bitmapAnd(filter_users, group_by_users)) AS count,
tag,
tag_item
FROM
(SELECT
groupBitmapMergeState(users) AS group_by_users,
tag,
tag_item
FROM user_tag_bitmap_all
WHERE tag = “device_type”
GROUP BY (tag, tag_item));

上述查询用到了一个没介绍到的函数 bitmapCardinality 。它的作用可以理解为计算 bitmap 中 1 的个数。

总结

在结束功能梳理和架构设计后,我们开始了用户画像平台的建设和推广。我们期待在大规模使用后,与大家分享用户画像平台的下一步演进。

参考文献

[1] 贝壳DMP平台建设实践 https://mp.weixin.qq.com/s/n6WMfypsqigENe7w0krXCA

[2] 苏宁超6亿会员如何做到秒级用户画像查询?https://mp.weixin.qq.com/s/jY9Z0-2RRtz3uu0EYfbC-Q

from:https://tech.ipalfish.com/blog/2021/08/05/user-profile-design/ 如有侵权,请联系作者删除

基于doris+es 搭建用户画像系统(待整理)

olap
画像 doris+es
1、过去画像也是es 好用不代表能用好,
画像场景,看里面人群数,符合某些条件的人群数有多少?统计级
select count() groupby xxx 语义. 重要不优雅,用户需要学习dsl。包一层平台可以解决

人群如果有1000万(有将近1千多万条数据,大概占用空间有 300G 左右),几百万,导出来就比较崩溃,es 怎么用接口,并发导出,分布式读,并发控制,es黑科技,平台复杂度较高。普通用户对于高级玩法做不到。

几十万,几十分钟很正常,用游标scoll来做,如果失败,需要重跑,几千万,上亿就跑不出来了

doris on es 实现,列存,本地优先扫描,不排序(es 本身会排序,节省了很多计算开销),性能解决了,并且可以直接写sql了,不用写dsl了,扩展性,用户可以直接写sql了。中台必要的能力

更新场景,doris 更新不好,提升查询性能,需要做很多预计算,预计算需要有规则的。分析场景,维度跟指标要定义好,可以通过维度可以把指标预计算出来,画像场景,主键或者叫维度,更新大量是指标,查的时候要根据指标去查,某个年龄,金额大于xxx的,某个地域是xxx的人群是多少个,属性来查询。doris需要索引,排序,查询速度非常快,因为会预聚合,更新的结果应不应该在结果集中,性能会掉的很厉害,但是更新满足不了,
es 通过docid就可以更新了,性能一般,但是好歹是支持的。并且不影响读取。

所以画像场景,es把更新搞定,doris把上层查询搞定,查询快,有sql接口.

画像场景key 就是uid,其他都是属性一直在更新。预计算,就是终端类型android ios,地市,我想看北京市下的ios的用户数总共有多少人,报名人数,这种就是根据维度,预聚合求和,
kylin是所有维度预计算变成kv查询,这样会有维度爆炸,存储成本很高,加维度不方便,无法实时
,doris可以实时更新维度 tables schema change,上线没有kylin那么快,不会是毫秒级别的,做不到kylin那种kv查询,全部用预计算,比如要算几千万uv,每天都要groupby,如果现查询,肯定做不到秒出,

doris 要源源不断的更新的,并且可以计算的很好。也有物化视图,功能米有ck那么丰富,用不到太高级的能力,加一个rollup上卷就搞定了,一个物化视图的子集
clickhouse是一坨数据扔进去,查询会非常快

k1 k2 k3(城市) 维度列,pv uv 指标求最大,最小值,求k1 k3 算pv uv 可以k1 k3 pv uv做一个上卷,直接可以命中rollup列
clichouse 物化视图是跑sql,定期跑sql,doris做的是实时的,不是微批的,写的时候就把预计合全做了。

doris 预计算会把结果存在一个独立的表中,对外是可见的,sql会自动路由到rollup表里,自动分析sql的,可以指定维度和指标做预计算,业务是知道哪些是常用列,

es—doris+es 数据是不用动的,doris把es当成一个外表,挂上去就可以了,实时可用,外表创建成功了,fe模块会跟es元数据信息拉取es原信息,shard信息会拉取过来,doris访问失败,基本上都是因为es某个节点失败了,运维排查一下就好了

dorisdb数据更新,es就不用了,es维护成本太高了,需要写两套不同的实现,写入es keyword,doris必须是varchar int就会有问题,sql就解析失败了,doris有date类型,datetime有可能有不兼容问题,会查不出来。

es 类型不敏感,类型对应不上就会很麻烦。现在客户是通过平台化统一了,做了对应关系。

最终一致性,事务要求也不高,从业务库过来变成日志流了,客户对doris熟悉,人和事都认识,文档技术大家都知道,有问题会有人快速响应。

clickhouse运维复杂,c++,很难运维

目前所有分析都用doris,字段变更也很常见,现在有平台,改完字段,可以直接掉doris接口,alter table接口,不影响业务,无感知的

系统要能把控,技术做不深入 做proxy 做一些屏蔽 做平台化建设。在平台化解决掉

教学和拉新,1、统计类,分析
拉新,投放 端上PV uv多少人+业务的指标 来源,维度列可以枚举出来的。直接用doris

doris+es
1、老业务,裸用es 迁移doris 有成本。
2、微批架构 lamada kappy flink读业务流写到两个es表,ods–dwd,跟业务侧比较近,上面写sql 5分钟调度一次,当前时间往前推5分钟,a表更新3行,跟b表进行更新,如果join到了,说明b表也有更新了,

flinksql跟doris语法不一样,开发环境也不同,dwd dws ad 全部都是doris sql
select xxxx into xxx表,5分钟调度一次

flink实时,多留Join很麻烦

选择标准,微批数据不能太多,如果数据太多,还是走离线回灌回去

flink更新数据量很大,用redis做维表,doris不适合

对于客户要求要计算时间较长的数据,从doris再同步到hive里面去一份,这样就可以了,短时查询用doris,长周期用hive查,一个指标出口复用,但是不同需求,不同语法是合理的,目前在跟投放合作,还没有大规模用起来

腾讯没有调度系统

airflow 运维工作,对客户没有价值,体量?

clickhouse 可以通过shuddingkey 让多个表相同数据到同一个节点吗?

sqoop用法之mysql与hive数据导入导出

本文目录

一. Sqoop介绍
二. Mysql 数据导入到 Hive
三. Hive数据导入到Mysql
四. mysql数据增量导入hive

  • 1 基于递增列Append导入
    • 1). 创建hive
    • 2). 创建job
    • 3) 执行job
  • Lastmodified 导入实战
    • 1). 新建一张表
    • 2). 初始化hive表:

一. Sqoop介绍

Sqoop是一个用来将Hadoop和关系型数据库中的数据相互转移的工具,可以将一个关系型数据库(例如:MySQL、Oracle、Postgres等)中的数据导进到HadoopHDFS中,也可以将HDFS的数据导进到关系型数据库中。对于某些NoSQL数据库它也提供了连接器。Sqoop,类似于其他ETL工具,使用元数据模型来判断数据类型并在数据从数据源转移到Hadoop时确保类型安全的数据处理。Sqoop专为大数据批量传输设计,能够分割数据集并创建Hadoop任务来处理每个区块。

本文版本说明

hadoop版本 : hadoop-2.7.2
hive版本 : hive-2.1.0
sqoop版本:sqoop-1.4.6

二. Mysql 数据导入到 Hive

1). 将mysqlpeople_access_log表导入到hiveweb.people_access_log,并且hive中的表不存在。
mysql中表people_access_log数据为:

1,15110101010,1577003281739,'112.168.1.2','https://www.baidu.com'
2,15110101011,1577003281749,'112.16.1.23','https://www.baidu.com'
3,15110101012,1577003281759,'193.168.1.2','https://www.taobao.com'
4,15110101013,1577003281769,'112.18.1.2','https://www.baidu.com'
5,15110101014,1577003281779,'112.168.10.2','https://www.baidu.com'
6,15110101015,1577003281789,'11.168.1.2','https://www.taobao.com'

mysql数据导入hive的命令为:

sqoop import \
--connect jdbc:mysql://master1.hadoop:3306/test \
--username root \
--password 123456 \
--table people_access_log \
-m 1 \
--hive-import \
--create-hive-table \
--fields-terminated-by '\t' \
--hive-table web.people_access_log

该命令会启用一个mapreduce任务,将mysql数据导入到hive表,并且指定了hive表的分隔符为\t,如果不指定则为默认分隔符^A(ctrl+A)

参数说明

参数说明
--connectmysql的连接信息
--usernamemysql的用户名
--passwordmysql的密码
--table被导入的mysql源表名
-m并行导入启用的map任务数量,与--num-mapper含义一样
--hive-import插入数据到hive当中,使用hive默认的分隔符,可以使用--fields-terminated-by参数来指定分隔符。
-- hive-tablehive当中的表名

2). 也可以通过--query条件查询Mysql数据,将查询结果导入到Hive

sqoop import \
--connect jdbc:mysql://master1.hadoop:3306/test \
--username root \
--password 123456 \
--query 'select * from people_access_log where \$CONDITIONS and url = "https://www.baidu.com"' \
--target-dir /user/hive/warehouse/web/people_access_log \
--delete-target-dir \
--fields-terminated-by '\t' \
-m 1
参数说明
--query后接查询语句,条件查询需要\$CONDITIONS and连接查询条件,这里的\$表示转义$,必须有.
--delete-target-dir如果目标hive表目录存在,则删除,相当于overwrite.

三. Hive数据导入到Mysql

还是使用上面的hiveweb.people_access_log,将其导入到mysql中的people_access_log_out表中.

sqoop export \
--connect jdbc:mysql://master1.hadoop:3306/test \
--username root \
--password 123456 \
--table people_access_log_out \
--input-fields-terminated-by '\t' \
--export-dir /user/hive/warehouse/web.db/people_access_log \
--num-mappers 1

注意:mysqlpeople_access_log_out需要提前建好,否则报错:ErrorException: Table 'test.people_access_log_out' doesn't exist。如果有id自增列,hive表也需要有,hive表与mysql表字段必须完全相同。

create table people_access_log_out like people_access_log;

执行完一个mr任务后,成功导入到mysqlpeople_access_log_out中.

四. mysql数据增量导入hive

实际中mysql数据会不断增加,这时候需要用sqoop将数据增量导入hive,然后进行海量数据分析统计。增量数据导入分两种,一是基于递增列的增量数据导入(Append方式)。二是基于时间列的增量数据导入(LastModified方式)。有几个核心参数:

  • –check-column:用来指定一些列,这些列在增量导入时用来检查这些数据是否作为增量数据进行导入,和关系型数据库中的自增字段及时间戳类似.注意:这些被指定的列的类型不能使任意字符类型,如char、varchar等类型都是不可以的,同时–check-column可以去指定多个列
  • –incremental:用来指定增量导入的模式,两种模式分别为AppendLastmodified
  • –last-value:指定上一次导入中检查列指定字段最大值

1. 基于递增列Append导入

接着前面的日志表,里面每行有一个唯一标识自增列ID,在关系型数据库中以主键形式存在。之前已经将id在0~6之间的编号的订单导入到Hadoop中了(这里为HDFS),现在一段时间后我们需要将近期产生的新的订 单数据导入Hadoop中(这里为HDFS),以供后续数仓进行分析。此时我们只需要指定–incremental 参数为append–last-value参数为6即可。表示只从id大于6后即7开始导入。

1). 创建hive

首先我们需要创建一张与mysql结构相同的hive表,假设指定字段分隔符为\t,后面导入数据时候分隔符也需要保持一致。

2). 创建job

增量导入肯定是多次进行的,可能每隔一个小时、一天等,所以需要创建计划任务,然后定时执行即可。我们都知道hive的数据是存在hdfs上面的,我们创建sqoop job的时候需要指定hive的数据表对应的hdfs目录,然后定时执行这个job即可。

当前mysql中数据,hive中数据与mysql一样也有6条:

iduser_idaccess_timeipurl
1151101010101577003281739112.168.1.2https://www.baidu.com
2151101010111577003281749112.16.1.23https://www.baidu.com
3151101010121577003281759193.168.1.2https://www.taobao.com
4151101010131577003281769112.18.1.2https://www.baidu.com
5151101010141577003281779112.168.10.2https://www.baidu.com
615110101015157700328178911.168.1.2https://www.taobao.com

增量导入有几个参数,保证下次同步的时候可以接着上次继续同步.

sqoop job --create mysql2hive_job -- import \
--connect jdbc:mysql://master1.hadoop:3306/test \
--username root \
--password 123456 \
--table people_access_log \
--target-dir /user/hive/warehouse/web.db/people_access_log \
--check-column id \
--incremental append \
--fields-terminated-by '\t' \
--last-value 6 \
-m 1

这里通过sqoop job --create job_name命令创建了一个名为mysql2hive_jobsqoop job

3). 执行job

创建好了job,后面只需要定时周期执行这个提前定义好的job即可。我们先往mysql里面插入2条数据。

INSERT INTO `people_access_log` (`id`,`user_id`,`access_time`,`ip`,`url`) VALUES
(7,15110101016,1577003281790,'112.168.1.3','https://www.qq.com'),
(8,15110101017,1577003281791,'112.1.1.3','https://www.microsoft.com');

这样mysql里面就会多了2条数据。此时hive里面只有id1 ~ 6的数据,执行同步job使用以下命令。

sqoop job -exec mysql2hive_job

执行完成后,发现刚才mysql新加入的id7 ~ 8的两条数据已经同步到hive

hive> select * from web.people_access_log;
OK
1   15110101010 1577003281739   112.168.1.2 https://www.baidu.com
2   15110101011 1577003281749   112.16.1.23 https://www.baidu.com
3   15110101012 1577003281759   193.168.1.2 https://www.taobao.com
4   15110101013 1577003281769   112.18.1.2  https://www.baidu.com
5   15110101014 1577003281779   112.168.10.2    https://www.baidu.com
6   15110101015 1577003281789   11.168.1.2  https://www.taobao.com
7   15110101016 1577003281790   112.168.1.3 https://www.qq.com
8   15110101017 1577003281791   112.1.1.3   https://www.microsoft.com

由于实际场景中,mysql表中的数据,比如订单表等,通常是一致有数据进入的,这时候只需要将sqoop job -exec mysql2hive_job这个命令定时(比如说10分钟频率)执行一次,就能将数据10分钟同步一次到hive数据仓库。

2. Lastmodified 导入实战

append适合业务系统库,一般业务系统表会通过自增ID作为主键标识唯一性。Lastmodified适合ETL的数据根据时间戳字段导入,表示只导入比这个时间戳大,即比这个时间晚的数据。

1). 新建一张表

mysql中新建一张表people_access_log2,并且初始化几条数据:

CREATE TABLE `people_access_log2` (
  `id` bigint(20) unsigned NOT NULL AUTO_INCREMENT COMMENT 'id',
  `user_id` bigint(20) unsigned NOT NULL COMMENT '用户id',
  `access_time` timestamp DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
  `ip` varchar(15) NOT NULL COMMENT '访客ip',
  `url` varchar(200) CHARACTER SET utf8mb4 COLLATE utf8mb4_bin NOT NULL,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;

插入数据:

insert into people_access_log2(id,user_id, ip, url) values(1,15110101010,'112.168.1.200','https://www.baidu.com');
insert into people_access_log2(id,user_id, ip, url) values(2,15110101011,'112.16.1.2','https://www.baidu.com');
insert into people_access_log2(id,user_id, ip, url) values(3,15110101012,'112.168.1.2','https://www.taobao.com');
insert into people_access_log2(id,user_id, ip, url) values(4,15110101013,'112.168.10.2','https://www.baidu.com');
insert into people_access_log2(id,user_id, ip, url) values(5,15110101014,'112.168.1.2','https://www.jd.com');
insert into people_access_log2(id,user_id, ip, url) values(6,15110101015,'112.168.12.4','https://www.qq.com');

mysql里面的数据就是这样:

iduser_idaccess_timeipurl
1151101010102019-12-28 16:23:10112.168.1.200https://www.baidu.com
2151101010112019-12-28 16:23:33112.16.1.2https://www.baidu.com
3151101010122019-12-28 16:23:41112.168.1.2https://www.taobao.com
4151101010132019-12-28 16:23:46112.168.10.2https://www.baidu.com
5151101010142019-12-28 16:23:52112.168.1.2https://www.jd.com
6151101010152019-12-28 16:23:56112.168.12.4https://www.qq.

2). 初始化hive表:

初始化hive数据,将mysql里面的6条数据导入hive中,并且可以自动帮助我们创建对应hive表,何乐而不为,否则我们需要自己手动创建,完成初始化工作。

sqoop import \
--connect jdbc:mysql://master1.hadoop:3306/test \
--username root \
--password 123456 \
--table people_access_log2 \
--hive-import \
--create-hive-table \
--fields-terminated-by ',' \
--hive-table web.people_access_log2

可以看到执行该命令后,启动了二一个mapreduce任务,这样6条数据就进入hiveweb.people_access_log2了:

hive> select * from web.people_access_log2;
OK
1   15110101010 2019-12-28 16:23:10.0   112.168.1.200   https://www.baidu.com
2   15110101011 2019-12-28 16:23:33.0   112.16.1.2  https://www.baidu.com
3   15110101012 2019-12-28 16:23:41.0   112.168.1.2 https://www.taobao.com
4   15110101013 2019-12-28 16:23:46.0   112.168.10.2    https://www.baidu.com
5   15110101014 2019-12-28 16:23:52.0   112.168.1.2 https://www.jd.com
6   15110101015 2019-12-28 16:23:56.0   112.168.12.4    https://www.qq.com
Time taken: 0.326 seconds, Fetched: 6 row(s)

3). 增量导入数据:

我们再次插入一条数据进入mysqlpeople_access_log2表:

insert into people_access_log2(id,user_id, ip, url) values(7,15110101016,'112.168.12.45','https://www.qq.com');

此时,mysql表里面已经有7条数据了,我们使用incremental的方式进行增量的导入到hive:

sqoop import \
--connect jdbc:mysql://master1.hadoop:3306/test \
--username root \
--password 123456 \
--table people_access_log2 \
--hive-import \
--hive-table people_access_log2 \
-m 1 \
--check-column access_time \
--incremental lastmodified \
--last-value "2019-12-28 16:23:56" \

2019-12-28 16:23:56就是第6条数据的时间,这里需要指定。报错了:

19/12/28 16:17:25 ERROR tool.ImportTool: Error during import: --merge-key or --append is required when using --incremental lastmodified and the output directory exists.

注意:可以看到--merge-key or --append is required when using --incremental lastmodified意思是,这种基于时间导入模式,需要指定--merge-key或者--append参数,表示根据时间戳导入,数据是直接在末尾追加(append)还是合并(merge),这里使用merge方式,根据id合并:

sqoop import \
--connect jdbc:mysql://master1.hadoop:3306/test \
--username root \
--password 123456 \
--table people_access_log2 \
--hive-import \
--hive-table web.people_access_log2 \
--check-column access_time \
--incremental lastmodified \
--last-value "2019-12-28 16:23:56" \
--fields-terminated-by ',' \
--merge-key id

执行该命令后,与直接导入不同,该命令启动了2个mapreduce任务,这样就把数据增量merge导入hive表了.

hive> select * from web.people_access_log2 order by id;
OK
1   15110101010 2019-12-28 16:23:10.0   112.168.1.200   https://www.baidu.com
2   15110101011 2019-12-28 16:23:33.0   112.16.1.2  https://www.baidu.com
3   15110101012 2019-12-28 16:23:41.0   112.168.1.2 https://www.taobao.com
4   15110101013 2019-12-28 16:23:46.0   112.168.10.2    https://www.baidu.com
5   15110101014 2019-12-28 16:23:52.0   112.168.1.2 https://www.jd.com
6   15110101015 2019-12-28 16:23:56.0   112.168.12.4    https://www.qq.com
6   15110101015 2019-12-28 16:23:56.0   112.168.12.4    https://www.qq.com
7   15110101016 2019-12-28 16:28:24.0   112.168.12.45   https://www.qq.com
Time taken: 0.241 seconds, Fetched: 8 row(s)

可以看到id=6的数据,有2条,它的时间刚好是--last-value指定的时间,则会导入大于等于--last-value指定时间的数据,这点需要注意。

转载请注明:柯广的网络日志 » sqoop用法之mysql与hive数据导入导出

Spark提交任务流程

Spark 调用了 AmRmClient 的 API addContainerRequest

AmRmClient 在处理 addContainerRequest 时,会针对每个 ContainerRequest 生成一个ResourceRequest

但是 ResourceRequest 接下来是由为 ResourceRequestInfo 处理的,这个是缓存在 (priority, resourceName, executionType) 三元组为 key 的 map 中的,每次取到了都会重新 set labelExpression。尽管 labelExpression 这里被 random 处理了,但只保留了最后一次。

问:是否可以把 ResourceRequestInfo 的 map 多加一层 label的,这样就能保留每次不同的 labelExpression 了?

答:应该是am去申请的container的时候,标签是随机的,一半提交到了资源紧张的分区,被pending了,客户端因为conainer已经申请完了,不会新申请container,这里要看看yarn有没有重新申请conainer的能力,如果pengding超过一定时间

问:但是我理解每次 assignContainerToNodes 都是以 nm node 为单位的哈,1500台 nm 的集群都没有资源在pending,只分配到小集群的概率太小了,应该不会稳定地出现一边倒的情况。上面分析的结论是,多个 containerRequest 会被合并成一个,所以只带了一个标签。

答:现在现象是长尾任务containaienr都pengding在资源比较少的那些分区是吧。所有的containerrequest本来就是一批哈 咱们这做的就是APP的container分配到同一个区分。

问:短任务也会,长任务时间长了可以跑满到所有分区。猜测是后面几个 stage 对应的request成功分配到了默认分区里。这个可能咱们还得对一下,不然有些典型场景可能会有问题哈。比如如果默认分区几乎跑满了,刚刚弹性扩容出来的分区是空的,所有任务仍然只有一半概率跑到扩容出来的分区上,除非默认分区 100% pending,才有可能在重试的时候都跑到新分区上去。同理,如果倒换过来,一个超过了弹性分区可用资源规模的任务调度到了弹性分区上,也有可能导致一些问题。我理解这种模式可能适合大量的小任务,但是大任务有较小任务更大的概率会变得更慢。