基于腾讯云产品建设实时数仓实践分享

建设实时数仓的目的和意义

实时数仓目的

  • 数仓概念:数据尽可能多,保存时间尽可能久
  • 实时概念:数据流式,处理及时、瞬时、短时、事件或者微批响应

数仓跟实时从概念上就有冲突,所以本质上不太适合处理广泛的问题,比如,对一个月,甚至是一年的数据进行统计计算。

所以,实时数仓应该目前作为离线数仓的一种补充,解决因离线数仓实时性低而无法解决的问题,具体点说就是处理离线两个周期间隔的数据问题,不适合解决大批量数据聚合问题、业务性太强的以及对实时性要求很高问题。

实时数仓的意义

实时数仓从概念上讲还是要靠近数仓的概念,数据分层,面向主题,数据尽可能集成,结构相对稳定,不易发生变化。

对于实时数仓来讲,数据量不需要保存像离线那么久,上一节我们提到,实时数仓处理两个离线周期间隔的数据即可,如上图,以时报为例,实时数仓补充中间数据即可,以天为例,实时数仓最多只需要保留3~5天数据即可,能够支持一段时间的数据追溯和重导就可以了。

实时数仓可以解决哪类问题

利用EMR建设实时数仓

实时数仓对比离线数仓

实时数仓架构

从图中可以看到,

  1. ODS并不是实时数仓的一部分,是依赖外部数据源,比如binlog,流量日志,系统日志,或者是其他消息队列
  2. 应用层也不是实时数仓的一部分,对于数据的使用,通过实时数仓暴露Topic来使用
  3. 实时数仓要求层次要少,因为需要尽可能降低延迟

用EMR搭建实时数仓

  1. 底层数据源可以接企业内部binlog、日志或者消息队列
  2. 从ODS层经过与维表轻度扩展,形成明细层明细表,明细表用一个Ckafka topic表示,计算采用Oceanus或者EMR FlinkSql 关联查询,维表采用EMR Hbase存储
  3. 从明细层经过进一步汇总计算,形成汇总层,此时数据已经是面向主题的汇总数据,就是传统意义上的大宽表,一个主题是一系列Ckafka topic,计算采用Oceanus或者EMR FlinkSql 关联查询以及汇总计算

实时数仓各层搭建

ODS层搭建

  1. 之所以没有把ODS层放在实时数仓的一部分,是因为实时数仓的ODS并不像离线数仓ODS是采集过来的原始数据,现在一般企业都已经具备了如上图的底层数据源
  2. Binlog,是数据库日志,通过Binlog可以自数据库主从间同步,可以同步关系型数据库数据,目前企业线上数据库都采用Mysql这样的数据库,可以通过抓取Mysql binlog 获取数据库变更信息,数仓中重要的业务数据,支付相关,用户相关,管理相关数据一般都从这种数据源获得
  3. Log日志,服务器日志,像服务器系统日志采集,都是通过这种形式进行采集
  4. Ckafka,企业通过消息队列提供数据源服务,比如,点击流服务,会把用户点击事件通过上报服务器上报到Ckafka,为后续分析提供原始数据

该层搭建的注意点:

  1. 业务选择数据源,尽量跟离线保持一致,比如某个业务,数据源即可以通过Binlog,也可以通过Log日志采集,如果离线数仓业务是通过Binlog,那么实时数仓也取Binlog,否则后续产生数据不一致,非常难以定位
  2. 数据源要求一致性,对于Ckafka和Binlog 需要进行分区一致性保证,解决数据乱序问题

明细层搭建

建设标准与离线数仓目标一致,解决原始数据存在噪音,不完整,形式不统一等问题

数据解析,业务整合,数据清洗,解决噪音,不完整,数据不一致问题;模型规范化(提前指定号规则,尽量跟离线保持一致),形成数据规范,规范尽可能跟离线保持一致,命名,比如,指标命名等;

与离线数仓不同之处在于,离线调度是有周期的,时报一小时,天报周期为一天,如果修改数据表字段,只要任务没开始,就可以修改,而实时是流式,7X24小时不间断运行的,想要修改流中的字段或者格式,对下游影响是不可预估的

实时数仓如果修改字段不像离线,在间隔期间通知下游把作业都改了就没事了,但是实时不一样,实时你改掉了字段,下游作业必须可以认识你修改的内容才行,kafka不是结构化存储,没有元数据的概念,不像Hive,如果表名不规范,找一个统一时间,把catolog改规范,然后把脚本一改就就解决了。

明细建设关键,我们会在每一条数据上增加一些额外字段到数仓里

额外字段 逻辑 解决问题
事件主键 标识流里唯一事件内容 解决流中事件重复问题
数据主键 标识唯一一个数据(理解为数据库主键概念) 解决流中分区一致性,有序性
数据元数据版本 对应元数据变更,例如,表结构变动 解决表结构等元数据变化,与变化前数据进行区分
数据批次 当数据发生重导时需要更新批次 解决数据重导问题

举例说明这些额外字段的意义

事件主键:对于上游数据重复问题,我们会根据一些数据内的字段来判断上有数据的唯一性,比如binlog,<集群id_><库id_><表id_>数据id_数据生成时间。

数据主键:唯一标识数据表的一行记录,可以使用数据库主键,主要用来解决分区一致性及分区有序。

数据元数据版本:上面介绍了,流式计算是7X24小时不间断的运算,当修改了数据结构,增加,删除了字段,对下游的影响是不可预估的,因此元数据变更需修改该字段,保持数据流中新老版本数据双跑,下游选择合适的时机进行数据切换。

数据批次:跟元数据用途相似,当明细层逻辑发现问题,需要重跑数据,为了对下游任务不产生影响,调整了明细层逻辑后,需要回倒位点重跑数据,同时需要跟老逻辑任务双跑,待下游业务都切换到新的逻辑后,老逻辑任务才可以停止。

还有一个思路,可以直接把明细层数据,也可以直接写到druid 直接用于分析。

维度层搭建

维度数据处理:

如上图,对于变化频率低,地理,节假日,代码转换,直接同步加载到缓存里,或者是新增数据,但是增加进来就不变了,通过数据接口,访问最新数据,然后通过本公司数据服务对外提供数据

如上图,对于变化频率高,比如商品价格,也是需要监听变化消息,然后实时更新维度拉链表。对于比如像最近一个月没有消费用户这样的衍生维度,是需要根据变化消息,通过计算得到的衍生维度拉链。

因为维度数据也在发生变化,为了能够让源表数据匹配到维表,我们会给维表增加多版本minversion,然后通过TIMERANGE => [1303668804, 1303668904]筛选出源数据指定的维表版本数据。

这里有些同学可能觉得如果版本一致保存下去,会不会非常大,是的,我们响应的需要配置TTL保证维表数据量可控,上文我们介绍过,实时数仓解决是离线数仓两个间隔的问题,那么像这种变化频繁的数据我们TTL设置一周足够了。

       关于源表与维表如果进行join,Flink原生sql以及Oceanus都是采用UDTF函数以及Lateral Table 进行联合使用,其中UDTF我们可以实现查询数据服务获取维表数据的能力,Oceanus请参考相关材料。

汇总层搭建

       汇总层加工其实跟离线数仓是一致的,对共性指标进行加工,比如,pv,uv,订单优惠金额,交易额等,会在汇总层进行统一计算。

Flink提供了丰富的窗口计算,这使得我们可以做更细力度的聚合运算,例如,我们可以算最近5分钟,10分钟的数据聚合,根据时间窗口的间隔,也需要调整相应的TTL,保障内存高效实用。

       Flink提供了丰富的聚合计算,数据都是要存在内存中的,因此需要注意设置state的TTL,例如,做Count(Distinct x)。或者在进行PV,UV计算时候,都会使用大量的内存,这一块,当处理的基数比较大的时候,推荐使用一些非高精度去重算法,Bloom过滤器,Hyper LogLog等。

汇总层也需要在每一条数据上增加一些额外字段到数仓里,这块与明细层一致,就不在单独讲解了。

数据质量保证

对于实时数仓数据质量的管理,我们通常由三步操作组成

第一步,数据与离线数据进行对比

首先,将汇总层数据Topic通过平台接入任务接入到离线仓库,然后通过数据质量任务,定时对实时数仓和离线数仓数据进行对比,并配置报警,数据差异,数据波动等。

第二步,配置报警,我们会在明细层以及汇总层,Topic配置生产监控,与以往数据波动,上游数据延迟或者积压,都需要进行报警。

第三部,构建实时血缘, Flink 在读取数据时候,会把信息读到flink catalog 这样就知道这个任务读取了哪个表,在解析客户DDL代码时,可以获得目标表信息,同步到我们的元数据服务。

参考文献:

美团实时数仓搭建:https://tech.meituan.com/2018/10/18/meishi-data-flink.html

菜鸟实时数仓:https://mp.weixin.qq.com/s/9ZRG76-vCM7AlRZNFCLrqA

基于Clickhouse分析平台:设计篇

背景

在伴鱼,服务器每天收集的用户行为日志达到上亿条,我们希望能够充分利用这些日志,了解用户行为模式,回答以下问题:

  • 最近三个月,来自哪个渠道的用户注册量最高?
  • 最近一周,北京地区的,发生过绘本浏览行为的用户,按照年龄段分布的情况如何?
  • 最近一周,注册过伴鱼绘本的用户,7日留存率如何?有什么变化趋势?
  • 最近一周,用户下单的转化路径上,各环节的转化率如何?

为了回答这些问题,事件分析平台应运而生。本文将首先介绍平台的功能,随后讨论平台在架构上的一些思考。

功能

总的来说,为了回答各种商业分析问题,事件分析平台支持基于事件的指标统计、属性分组、条件筛选等功能的查询。其中,事件指用户行为,例如登录、浏览伴鱼绘本、购买付费绘本等。更具体一些,事件分析平台支持三类分析:「事件分析」,「漏斗分析」,和「留存分析」。

事件分析

事件分析是指,用户指定一系列条件,查询目的指标,用于回答一个具体的分析问题。这些条件包括:

  • 事件类型:指用户行为,采集自埋点数据;例如登录伴鱼绘本,购买付费绘本
  • 指标:指标分为两类,基础指标和自定义指标
    • 基础指标:总次数(pv),总用户数(uv),人均次数(pv/uv)
    • 自定义指标:事件属性 + 计算类型,例如 「用户下单金额」的「总和/均值/最大值」
  • 过滤条件:用于筛选查询所关心的用户群体
  • 维度分组:基于分组,可以进行分组之间的对比
  • 时间范围:指定事件发生的时间范围

让我们举个具体的例子。我们希望回答「最近一周,在北京地区,不同年龄段的用户在下单一对一课程时,下单金额的平均数对比」这个问题。这个问题可以很直观地拆解为下图所示的事件分析,其中:

  • 事件类型 = 下单一对一课程
  • 指标 = 下单金额的平均数
  • 过滤条件 = 北京地区
  • 维度分组 = 按照年龄段分组
  • 时间范围 = 最近一周
event_analysis_flow


图注:事件分析创建流程

event_analysis


图注:事件分析界面

漏斗分析

漏斗分析用于分析多步骤过程中,每一步的转化与流失情况。

例如,伴鱼绘本用户的完整购买流程可能包含以下步骤:登录 app -> 浏览绘本 -> 购买付费绘本。我们可以将这个流程设置为一个漏斗,分析整体以及每一步转化情况。

此外,漏斗分析还需要定义「窗口期」,整个流程必须发生在窗口期内,才算一次成功转化。和事件分析类似,漏斗分析也支持选择维度分组和时间范围。

funnel_flow


图注:漏斗分析创建流程

funnel


图注:漏斗分析界面

留存分析

在留存分析中,用户定义初始事件和后续事件,并计算在发生初始事件后的第 N 天,发生后续事件的比率。这个比率能很好地衡量伴鱼用户的粘性高低。

在下图的例子中,我们希望了解伴鱼绘本 app 是否足够吸引用户,因此我们设置初始事件为登录 app,后续事件为浏览绘本,留存周期为 7 天,进行留存分析。

retention_flow


图注:留存分析创建流程

retention


图注:留存分析界面

架构

在架构上,事件分析平台分为两个模块,如下图所示:

  • 数据写入:埋点日志从客户端或者服务端被上报后,经过 Kafka 消息队列,由 Flink 完成 ETL,然后写入 ClickHouse。
  • 分析查询:用户通过前端页面,进行事件、条件、维度的勾选,后端将它们拼接为 SQL 语句,从 ClickHouse 中查询数据,展示给前端页面。
design


图注:总架构图

不难看出,ClickHouse 是构成事件分析平台的核心组件。我们为了确保平台的性能,围绕 ClickHouse 的使用进行了细致的调研,回答了以下三个问题:

  • 如何使用 ClickHouse 存储事件数据?
  • 如何高效写入 ClickHouse?
  • 如何高效查询 ClickHouse?

如何使用 ClickHouse 存储事件数据?

事件分析平台的数据来源有两大类:来源于埋点日志的用户行为数据,和来源于「用户画像平台」的用户属性数据。本文只介绍埋点日志数据的存储,对「用户画像平台」感兴趣的同学,可以期待一下我们后续的技术文章。

在进行埋点日志的存储选型前,我们首先明确了几个核心需求:

  • 支持海量数据的存储。当前,伴鱼每天产生的埋点日志在亿级别。
  • 支持实时聚合查询。由于产品和运营同学会使用事件分析平台来探索多种用户行为模式,分析引擎必须能灵活且高效地完成各种聚合。

ClickHouse 在海量数据存储场景被广泛使用,高效支持各类聚合查询,配套有成熟和活跃的社区,促使我们最终选择 ClickHouse 作为存储引擎。

根据我们对真实埋点数据的测试,亿级数据的简单查询,例如 PV 和 UV,都能在 1 秒内返回结果;对于留存分析、漏斗分析这类的复杂查询,可以在 10 秒内返回结果。

「存在哪」的问题解决后,接下来回答「怎么存」的问题。ClickHouse 的列式存储结构非常适合存储大宽表,以支持高效查询。但是,在事件分析平台这个场景下,我们还需要支持「自定义属性」的存储,这时「大宽表」的存储方式就不尽理想。

所谓「自定义属性」,即埋点日志中一些事件所独有的属性,例如:「下单一对一课程」这一事件在上报时,会带上「订单金额」这个很多其它事件所没有的属性。如果为了支持「下单一对一课程」这个事件的存储,就需要改变 ClickHouse 的表结构,新增一列,这将使得表结构的维护成本极高,因为每个新事件都可能附带多个「自定义属性」。

为了解决这个问题,我们将频繁变动的自定义属性统一存储在一个 Map 中,将基本不变的公共属性存为列,使之兼具大宽表方案的高效性,和 Map 方案的灵活性。

如何高效写入 ClickHouse?

在设计 ClickHouse 的部署方案时,我们采用了业界常用的读写分离模式:写本地表,读分布式表。在写入侧,分为3个分片,每个分片都有双重备份。

由于事件分析的绝大多数查询,都是以用户为单位,为了提高查询效率,我们在写入时,将数据按照 user_id 均匀分片,写入到不同的本地表中。如下图所示:

import_to_clickhouse


图注:将埋点数据写入到 ClickHouse

之所以不写分布式表,是因为我们使用大量数据对分布式表进行写入测试时,遇到过几个问题:

  1. Too many parts error:分布式表所在节点接收到数据后,需要按照 sharding_key 将数据拆分为多个 parts,再转发到其它节点,导致短期内 parts 过多,并且增加了 merge 的压力;
  2. 写放大:分布式表所在节点,如果在短时间内被写入大量数据,会产生大量临时数据,导致写放大。

如何高效查询 ClickHouse?

我们可以使用 ClickHouse 的内置函数,轻松实现事件分析平台所需要提供的事件分析、漏斗分析和留存分析三个功能。

事件分析可以用最朴素的 SQL 语句实现。例如,最近一周,北京地区的,发生过绘本浏览行为的用户,按照年龄段的分布,可以表述为:

1
2
3
4
5
6
7
8
9
10
SELECT
    count(1) as cnt,
    toDate(toStartOfDay(toDateTime(event_ms))) as date,
age
FROM event_analytics
WHERE
event = “view_picture_book_home_page” AND
city = “beijing” AND
event_ms >= 1613923200000 AND event_ms <= 1614528000000
GROUP BY (date, age);

留存分析使用 ClickHouse 提供的 retention 函数。例如,注册伴鱼绘本后,计算浏览绘本的次日留存、7日留存可以表述为:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
SELECT
    sum(ret[1]) AS original,
    sum(ret[2]) AS next_day_ret,
    sum(ret[3]) AS seven_day_ret
FROM
(SELECT
  user_id,
  retention(
      event = “register_picture_book” AND toDate(event_ms) = toDate(‘2021-03-01’),
      event = “view_picture_book” AND toDate(event_ms) = toDate(‘2021-03-02’),
      event = “view_picture_book” AND toDate(event_ms) = toDate(‘2021-03-08’)
      ) as ret
FROM event_analytics
WHERE  
    event_ms >= 1614528000000 AND event_ms <= 1615132800000
GROUP BY user_id);

漏斗分析使用 ClickHouse 提供的 windowFunnel 函数。例如,在 浏览绘本 -> 购买绘本,窗口期为2天的这个转化路径上,转化率的计算可以被表达为:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
SELECT
    array( sumIf(count, level >= 1), sumIf(count, level >= 2) ) AS funnel_uv,
FROM (
    SELECT
        level,
        count() AS count
    FROM (
            SELECT
                uid,
                windowFunnel(172800000)(
                    event_ms, event = “view_picture_book” AND event_ms >= 1613923200000 AND event_ms <= 1614009600000, event = “buy_picture_book”) AS level
            FROM
                event_analytics
            WHERE
                event_ms >= 1613923200000 AND event_ms <= 1614182400000
            GROUP BY uid
        )
    GROUP BY level
)

总结

在结束功能梳理和架构设计后,我们开始了事件分析平台有序的建设。我们期待在大规模使用后,与大家分享事件分析平台的下一步演进。

参考文献

[1] Fast and Reliable Schema-Agnostic Log Analytics Platform. https://eng.uber.com/logging/

[2] How ClickHouse saved our data. https://mux.com/blog/from-russia-with-love-how-clickhouse-saved-our-data/

[3] 最快开源 OLAP 引擎!ClickHouse 在头条的技术演进 https://www.infoq.cn/article/ntwo*yr2ujwlmp8wcxoe

from:https://tech.ipalfish.com/blog/2021/06/21/event-analytics-design/ 如有侵权,请通知作者删除

伴鱼用户画像平台:设计篇

1. 背景

在伴鱼,我们努力了解我们的用户,旨在为用户提供更好的服务。APP 内容推荐,需要根据用户特征来决定推送内容;促销活动,需要针对不同的用户群体设计不同的活动方案;线上产品售卖,也需要了解用户喜好,才能更好地把产品卖给用户。

为此,我们搭建了用户画像平台。本文将首先探讨平台的功能需求、标签体系定位,随后介绍平台的架构和具体功能实现。

2. 功能

用户画像平台把重点放在了分析场景,使用方主要是公司各业务线的运营和数据分析同学。平台在一期主要支持以下几个功能。

  1. 定义标签:标签是用于描述用户的一个维度,例如「注册设备类型」、「常驻城市」、「年龄段」等。
  2. 人群圈选:指定一组用户标签和其对应的标签值,得到符合条件的用户人群。例如,找出「城市为北京,且设备类型为苹果」的用户。
  3. 用户画像:对于人群圈选结果,查看该人群的标签分布。例如,查看「城市为北京,且设备类型为苹果」的用户的年龄段分布。

3. 标签体系

确定完用户画像平台的使用场景和主要功能,我们再来倒推看用户标签体系。用户标签可以从两个维度进行分类:标签的实时性,和标签的值类型。

首先看标签的实时性。考虑到用户画像平台的主要功能是「人群圈选」和「用户画像查看」,而这两个功能都不需要非常高的实时性,那么实时标签的收益就不大,T+1 的非实时标签完全能满足数据分析和运营同学的需求。

再来看标签的值类型,即标签是枚举和还是非枚举的。枚举标签,顾名思义,就是指标签值可枚举的标签,例如 device_type, network_type, country, city 等,这类标签往往在人群圈选方面有较大作用。而非枚举标签,就是标签值可无限递增的标签,比如 active_days,register_date 等,这类标签大多会用来做用户信息展示。考虑到「人群圈选」是各业务线最迫切的需求,我们在一期舍弃了非枚举标签这个功能。

综上,我们就确定了用户画像平台的一期标签体系为非实时的枚举标签,主要满足「人群圈选」和「人群画像」这两个查询功能。

4. 架构与实现

在架构上,用户画像平台分为两个模块:数据写入,分析查询。

4.1 数据写入

数据写入模块为人群圈选和用户画像功能提供数据支持。具体流程分为两步。

第一步,大数据团队完成每日标签计算后,得到一张 Hive 大宽表,如下表所示。表的每一行代表一个用户,每一列代表一个标签。

user_idcountrycitydevice_typeage
1ChinaBeijingIOS10
4ChinaShanghaiAndroid5
10United StatesNew YorkAndroid12

第二步,大数据团队将大宽表的数据「转置」后批量写入 ClickHouse,如下表所示。表中的每一行代表一个标签实例(即标签和标签值的组合),例如「city = Beijing」。此外,这一行同时存储了具有该标签值的所有用户的集合,服务于分析查询模块。

tagtag_itemusers
countryChina{1,2,3,4,5}
countryUnited States{6,7,8,9,10}
cityBeijing{1,2,3}
cityShanghai{4}

4.2 分析查询

分析查询模块则实现了人群圈选和用户画像的查询。用户通过前端页面,进行标签、标签值、组合方式的勾选,后端将它们拼接为 SQL 语句,从 ClickHouse 中查询数据,展示给前端页面。例如,在下图中,我们圈选了属于北京、深圳、上海的苹果用户,并且按照年龄、网络运营商、网络类型、性别查看人群的分布情况。

analytics

不难看出,ClickHouse 在用户画像平台的数据存储和计算中起到了最关键的作用。下面,让我们一起来回答几个问题:

  • 如何设计 ClickHouse 的表结构?
  • 如何使用 ClickHouse 进行人群圈选?
  • 如何使用 ClickHouse 查看人群画像?

4.2.1 设计 ClickHouse 表结构

根据使用场景,我们设计 ClickHouse 表结构如下:

1
2
3
4
5
6
7
8
9
10
11
12
CREATE TABLE analytics.user_tag_bitmap_local
(
`tag` String,
`tag_item` String,
`p_day` Date,
`origin_user` UInt64,
`users` AggregateFunction(groupBitmap, UInt64) MATERIALIZED bitmapBuild([origin_user])
)
ENGINE = ReplicatedAggregatingMergeTree(‘/clickhouse/tables/{shard}/analytics/user_tag_bitmap_local’, ‘{replica}’)
PARTITION BY toYYYYMMDD(p_day)
ORDER BY (tag, tag_item)
SETTINGS index_granularity = 8192;

首先,看表的名称。表名包含 _local 后缀,即这是一个本地表,也存在一个对应的分布式表。我们使用「写本地表,读分布式表」的读写分离模式,具体原因可以参考《伴鱼事件分析平台:设计篇》的「如何高效写入 ClickHouse」一节。

然后,看表包含的字段。

  • tag 代表标签, tag_item 代表标签值。因为在标签的圈选查询中,经常有 tag = "city" AND tag_item = "beijing" 的语句,我们将 (tag, tag_item) 作为主键,以提高查询效率。
  • p_day 代表数据写入的日期,也作为 ClickHouse 的分片键。因为每天的标签数据都是全量导入,p_day 不仅可以用来区分标签版本,也方便我们批量删除历史数据。
  • origin_user 是单个用户 ID。然而,相比单个用户的标签情况,我们更关心具有特定标签的用户人群。因此,我们使用 users 字段来表达根据 origin_user 聚合得到用户人群。为此,我们使用了 AggregatingMergeTree,它在原始数据插入后自动触发聚合,将具有相同 (tag, tag_item, p_day) 的数据聚合为一行。

最后,看表的存储引擎,我们使用了 ReplicatedAggregatingMergeTree 引擎。前文中我们提到 Aggregating 是用来聚合数据,而 Replicated 则是用来创建数据副本,对应双副本存储模式。

4.2.2 使用 ClickHouse 进行人群圈选

组合不同标签,圈选出最适合某个活动的用户人群里,是运营同学们较为关心的步骤。例如,我们想找出城市为北京、性别为女的用户。

analytics


图注:用户人群查询

我们只需首先找到城市为北京的用户人群(用 bitmap 表示),然后找到性别为女的用户人群,然后对它们进行 AND 操作即可。具体查询如下:

1
2
3
4
5
6
7
8
9
10
11
12
WITH
(
SELECT groupBitmapMergeState(users)
FROM user_tag_bitmap_all
WHERE p_day = ‘2021_06_01’ AND tag = ‘city’ AND tag_item = ‘beijing’
) AS user_group_1,
(
SELECT groupBitmapMergeState(users)
FROM user_tag_bitmap_all
WHERE p_day = ‘2021_06_01’ AND tag = ‘gender’ AND tag_item = ‘female’
) AS user_group_2
SELECT bitmapToArray(bitmapAnd(user_group_1, user_group_2))

其中,groupBitmapMergeState 函数对通过 WHERE 筛除得到的任意个数的 bitmap (users) 进行 AND 操作,而 bitmapAnd 只能对两个 bitmap 进行 AND 操作。

4.2.3 使用 ClickHouse 查看用户画像

再回到刚刚的例子,圈选得到「北京的女性用户」这一人群后,我们想知道,人群中有多少人在用苹果设备,而有多少人在用安卓。这类标签分布信息,就是我们所说的用户画像。

analytics


图注:用户画像查询

这个查询的实现同样是直观的。

  1. 我们采用和上一节一样的步骤,得到「北京的女性用户」这一 bitmap。
  2. 对人群进行分组,分别得到「设备为苹果的用户」和「设备为安卓的用户」的 bitmap。如果存在除了苹果和安卓之外的设备,我们这一步会得到更多的 bitmap。
  3. 将步骤 2 中的每一个 bitmap 与步骤 1 中的 bitmap 进行 AND 操作,就能得到「北京的女性用户」基于「设备类型」的分布情况。

具体的实现见下面的查询。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
WITH
(
SELECT groupBitmapMergeState(users)
FROM user_tag_bitmap_all
WHERE p_day = ‘2021_06_01’ AND tag = ‘city’ AND tag_item = ‘beijing’
) AS user_group_1,
(
SELECT groupBitmapMergeState(users)
FROM user_tag_bitmap_all
WHERE p_day = ‘2021_06_01’ AND tag = ‘gender’ AND tag_item = ‘female’
) AS user_group_2,
(
SELECT bitmapAnd(user_group_1, user_group_2)
) AS filter_users
SELECT
bitmapCardinality(bitmapAnd(filter_users, group_by_users)) AS count,
tag,
tag_item
FROM
(SELECT
groupBitmapMergeState(users) AS group_by_users,
tag,
tag_item
FROM user_tag_bitmap_all
WHERE tag = “device_type”
GROUP BY (tag, tag_item));

上述查询用到了一个没介绍到的函数 bitmapCardinality 。它的作用可以理解为计算 bitmap 中 1 的个数。

总结

在结束功能梳理和架构设计后,我们开始了用户画像平台的建设和推广。我们期待在大规模使用后,与大家分享用户画像平台的下一步演进。

参考文献

[1] 贝壳DMP平台建设实践 https://mp.weixin.qq.com/s/n6WMfypsqigENe7w0krXCA

[2] 苏宁超6亿会员如何做到秒级用户画像查询?https://mp.weixin.qq.com/s/jY9Z0-2RRtz3uu0EYfbC-Q

from:https://tech.ipalfish.com/blog/2021/08/05/user-profile-design/ 如有侵权,请联系作者删除

hive 响应慢问题定位

情景描述:

大数据集群,目前有两套hiveserver2和metastore的集群,通过nginx指向进行流量互切,发现流量打到哪个metastore集群,哪个集群就特别卡顿。

那么先来回顾一下hive整个调用流程和框架

image-20201227123352163

Hive 提供的另外一个shell 客户端,也就是我们常用的hive 命令的客户端它的设计是直接启动了一个org.apache.hadoop.hive.cli.CliDriver的进程,这个进程其实主要包含了两块内容一个是提供给我们交互的cli ,另外一个就是我们的Driver 驱动引擎,这样的设计导致如果我们有多个客户端的情况下,我们就需要有多个Driver

image-20201226204741803

但是我们通过HiveServer2连接的时候我们就可以共享Driver,一方面可以简化客户端的设计降低资源损耗,另外一方面还能降低对MetaStore 的压力,减少连接的个数。

原因分析:

目前来看,变慢的原因应该是出现在hs2服务,metastore服务,具体业务,网络,服务器等原因。

1、先从简单的硬件分析入手,验证网络和服务器,这块省略验证过程

2、验证hs2服务,利用排除的方式,通过hive cli进行多次验证,发现也有缓慢的情况,正常应该1秒内返回,所以先不定位hs2的情况

3、验证metastore服务,还是先从直观简单的分析,看能否找出些现象,先看日志:

3.1 既然通过cli和hs2访问都会慢,先从简单的cli发起请求,可以通过加debug参数,发起查询

beeline –verbose=true –showNestedErrs=true –debug=true 看一下客户端是否有明显异常

3.2 cd /var/log/hive 查看一下是否有大量客户端访问

cat hadoop-cmf-hive-HIVEMETASTORE-data-hadoop-16-2.192.168.0.1.log.out | grep audit | grep -v “ugi=hue” | awk -F “ip=” ‘{print $2}’ | awk ‘{print $1}’ | sort | uniq -c | sort -nr | head

3.3 查看服务GC情况 jstat -gcutil pid interval(ms)

3.4 查看服务端日志

Error: Error while compiling statement: [Error 10308]: Attempt to acquire compile lock timed out. (state=,code=10308)
org.apache.hive.service.cli.HiveSQLException: Error while compiling statement: [Error 10308]: Attempt to acquire compile lock timed out.
at org.apache.hive.jdbc.Utils.verifySuccess(Utils.java:241)
at org.apache.hive.jdbc.Utils.verifySuccessWithInfo(Utils.java:227)
at org.apache.hive.jdbc.HiveStatement.execute(HiveStatement.java:255)
at org.apache.hive.beeline.Commands.executeInternal(Commands.java:989)
at org.apache.hive.beeline.Commands.execute(Commands.java:1180)
at org.apache.hive.beeline.Commands.sql(Commands.java:1094)
at org.apache.hive.beeline.BeeLine.dispatch(BeeLine.java:1180)
at org.apache.hive.beeline.BeeLine.execute(BeeLine.java:1013)
at org.apache.hive.beeline.BeeLine.begin(BeeLine.java:922)
at org.apache.hive.beeline.BeeLine.mainWithInputRedirection(BeeLine.java:518)
at org.apache.hive.beeline.BeeLine.main(BeeLine.java:501)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.hadoop.util.RunJar.run(RunJar.java:226)
at org.apache.hadoop.util.RunJar.main(RunJar.java:141)
Caused by: org.apache.hive.service.cli.HiveSQLException: Error while compiling statement: [Error 10308]: Attempt to acquire compile lock timed out.
at org.apache.hive.service.cli.operation.Operation.toSQLException(Operation.java:400)
at org.apache.hive.service.cli.operation.SQLOperation.prepare(SQLOperation.java:187)
at org.apache.hive.service.cli.operation.SQLOperation.runInternal(SQLOperation.java:271)
at org.apache.hive.service.cli.operation.Operation.run(Operation.java:337)
at org.apache.hive.service.cli.session.HiveSessionImpl.executeStatementInternal(HiveSessionImpl.java:439)
at org.apache.hive.service.cli.session.HiveSessionImpl.executeStatementAsync(HiveSessionImpl.java:416)
at org.apache.hive.service.cli.CLIService.executeStatementAsync(CLIService.java:282)
at org.apache.hive.service.cli.thrift.ThriftCLIService.ExecuteStatement(ThriftCLIService.java:503)
at org.apache.hive.service.cli.thrift.TCLIService$Processor$ExecuteStatement.getResult(TCLIService.java:1313)
at org.apache.hive.service.cli.thrift.TCLIService$Processor$ExecuteStatement.getResult(TCLIService.java:1298)
at org.apache.thrift.ProcessFunction.process(ProcessFunction.java:39)
at org.apache.thrift.TBaseProcessor.process(TBaseProcessor.java:39)
at org.apache.hive.service.auth.TSetIpAddressProcessor.process(TSetIpAddressProcessor.java:56)
at org.apache.thrift.server.TThreadPoolServer$WorkerProcess.run(TThreadPoolServer.java:286)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)

这里显示有获取metastore连接超时的异常,关键的日志是compiling statement: [Error 10308]: Attempt to acquire compile lock timed out. 编译时候获取编译索失败

顺着这个思路,查看一下代码:

关键字:Completed compiling command(queryId,参考https://github.com/apache/hive/blob/master/ql/src/java/org/apache/hadoop/hive/ql/lock/CompileLock.java

分析结论如下:

并且hivethrift每次访问都会初始化metastore 重新初始化元数据,HIVESEVER2提交SQL阻塞tryAcquireCompileLock原因:

a、HIVE1.1 只支持串行编译SQL,hiveserver2并发接受到SQL请求后,在complile阶段变为串行执行。当compilie编译慢时,引起阻塞SQL的提交。

b、compile的慢的原因:complile阶段,会通过hivemetastore访问mysql。目前是hiveserver2的请求打入到同一个metastore,流量上来后,hivemetastore访问mysql速度下降。

解决方案:hiveserver2的请求分摊所有hivemetastore上

基于doris+es 搭建用户画像系统(待整理)

olap
画像 doris+es
1、过去画像也是es 好用不代表能用好,
画像场景,看里面人群数,符合某些条件的人群数有多少?统计级
select count() groupby xxx 语义. 重要不优雅,用户需要学习dsl。包一层平台可以解决

人群如果有1000万(有将近1千多万条数据,大概占用空间有 300G 左右),几百万,导出来就比较崩溃,es 怎么用接口,并发导出,分布式读,并发控制,es黑科技,平台复杂度较高。普通用户对于高级玩法做不到。

几十万,几十分钟很正常,用游标scoll来做,如果失败,需要重跑,几千万,上亿就跑不出来了

doris on es 实现,列存,本地优先扫描,不排序(es 本身会排序,节省了很多计算开销),性能解决了,并且可以直接写sql了,不用写dsl了,扩展性,用户可以直接写sql了。中台必要的能力

更新场景,doris 更新不好,提升查询性能,需要做很多预计算,预计算需要有规则的。分析场景,维度跟指标要定义好,可以通过维度可以把指标预计算出来,画像场景,主键或者叫维度,更新大量是指标,查的时候要根据指标去查,某个年龄,金额大于xxx的,某个地域是xxx的人群是多少个,属性来查询。doris需要索引,排序,查询速度非常快,因为会预聚合,更新的结果应不应该在结果集中,性能会掉的很厉害,但是更新满足不了,
es 通过docid就可以更新了,性能一般,但是好歹是支持的。并且不影响读取。

所以画像场景,es把更新搞定,doris把上层查询搞定,查询快,有sql接口.

画像场景key 就是uid,其他都是属性一直在更新。预计算,就是终端类型android ios,地市,我想看北京市下的ios的用户数总共有多少人,报名人数,这种就是根据维度,预聚合求和,
kylin是所有维度预计算变成kv查询,这样会有维度爆炸,存储成本很高,加维度不方便,无法实时
,doris可以实时更新维度 tables schema change,上线没有kylin那么快,不会是毫秒级别的,做不到kylin那种kv查询,全部用预计算,比如要算几千万uv,每天都要groupby,如果现查询,肯定做不到秒出,

doris 要源源不断的更新的,并且可以计算的很好。也有物化视图,功能米有ck那么丰富,用不到太高级的能力,加一个rollup上卷就搞定了,一个物化视图的子集
clickhouse是一坨数据扔进去,查询会非常快

k1 k2 k3(城市) 维度列,pv uv 指标求最大,最小值,求k1 k3 算pv uv 可以k1 k3 pv uv做一个上卷,直接可以命中rollup列
clichouse 物化视图是跑sql,定期跑sql,doris做的是实时的,不是微批的,写的时候就把预计合全做了。

doris 预计算会把结果存在一个独立的表中,对外是可见的,sql会自动路由到rollup表里,自动分析sql的,可以指定维度和指标做预计算,业务是知道哪些是常用列,

es—doris+es 数据是不用动的,doris把es当成一个外表,挂上去就可以了,实时可用,外表创建成功了,fe模块会跟es元数据信息拉取es原信息,shard信息会拉取过来,doris访问失败,基本上都是因为es某个节点失败了,运维排查一下就好了

dorisdb数据更新,es就不用了,es维护成本太高了,需要写两套不同的实现,写入es keyword,doris必须是varchar int就会有问题,sql就解析失败了,doris有date类型,datetime有可能有不兼容问题,会查不出来。

es 类型不敏感,类型对应不上就会很麻烦。现在客户是通过平台化统一了,做了对应关系。

最终一致性,事务要求也不高,从业务库过来变成日志流了,客户对doris熟悉,人和事都认识,文档技术大家都知道,有问题会有人快速响应。

clickhouse运维复杂,c++,很难运维

目前所有分析都用doris,字段变更也很常见,现在有平台,改完字段,可以直接掉doris接口,alter table接口,不影响业务,无感知的

系统要能把控,技术做不深入 做proxy 做一些屏蔽 做平台化建设。在平台化解决掉

教学和拉新,1、统计类,分析
拉新,投放 端上PV uv多少人+业务的指标 来源,维度列可以枚举出来的。直接用doris

doris+es
1、老业务,裸用es 迁移doris 有成本。
2、微批架构 lamada kappy flink读业务流写到两个es表,ods–dwd,跟业务侧比较近,上面写sql 5分钟调度一次,当前时间往前推5分钟,a表更新3行,跟b表进行更新,如果join到了,说明b表也有更新了,

flinksql跟doris语法不一样,开发环境也不同,dwd dws ad 全部都是doris sql
select xxxx into xxx表,5分钟调度一次

flink实时,多留Join很麻烦

选择标准,微批数据不能太多,如果数据太多,还是走离线回灌回去

flink更新数据量很大,用redis做维表,doris不适合

对于客户要求要计算时间较长的数据,从doris再同步到hive里面去一份,这样就可以了,短时查询用doris,长周期用hive查,一个指标出口复用,但是不同需求,不同语法是合理的,目前在跟投放合作,还没有大规模用起来

腾讯没有调度系统

airflow 运维工作,对客户没有价值,体量?

clickhouse 可以通过shuddingkey 让多个表相同数据到同一个节点吗?

sqoop用法之mysql与hive数据导入导出

本文目录

一. Sqoop介绍
二. Mysql 数据导入到 Hive
三. Hive数据导入到Mysql
四. mysql数据增量导入hive

  • 1 基于递增列Append导入
    • 1). 创建hive
    • 2). 创建job
    • 3) 执行job
  • Lastmodified 导入实战
    • 1). 新建一张表
    • 2). 初始化hive表:

一. Sqoop介绍

Sqoop是一个用来将Hadoop和关系型数据库中的数据相互转移的工具,可以将一个关系型数据库(例如:MySQL、Oracle、Postgres等)中的数据导进到HadoopHDFS中,也可以将HDFS的数据导进到关系型数据库中。对于某些NoSQL数据库它也提供了连接器。Sqoop,类似于其他ETL工具,使用元数据模型来判断数据类型并在数据从数据源转移到Hadoop时确保类型安全的数据处理。Sqoop专为大数据批量传输设计,能够分割数据集并创建Hadoop任务来处理每个区块。

本文版本说明

hadoop版本 : hadoop-2.7.2
hive版本 : hive-2.1.0
sqoop版本:sqoop-1.4.6

二. Mysql 数据导入到 Hive

1). 将mysqlpeople_access_log表导入到hiveweb.people_access_log,并且hive中的表不存在。
mysql中表people_access_log数据为:

1,15110101010,1577003281739,'112.168.1.2','https://www.baidu.com'
2,15110101011,1577003281749,'112.16.1.23','https://www.baidu.com'
3,15110101012,1577003281759,'193.168.1.2','https://www.taobao.com'
4,15110101013,1577003281769,'112.18.1.2','https://www.baidu.com'
5,15110101014,1577003281779,'112.168.10.2','https://www.baidu.com'
6,15110101015,1577003281789,'11.168.1.2','https://www.taobao.com'

mysql数据导入hive的命令为:

sqoop import \
--connect jdbc:mysql://master1.hadoop:3306/test \
--username root \
--password 123456 \
--table people_access_log \
-m 1 \
--hive-import \
--create-hive-table \
--fields-terminated-by '\t' \
--hive-table web.people_access_log

该命令会启用一个mapreduce任务,将mysql数据导入到hive表,并且指定了hive表的分隔符为\t,如果不指定则为默认分隔符^A(ctrl+A)

参数说明

参数说明
--connectmysql的连接信息
--usernamemysql的用户名
--passwordmysql的密码
--table被导入的mysql源表名
-m并行导入启用的map任务数量,与--num-mapper含义一样
--hive-import插入数据到hive当中,使用hive默认的分隔符,可以使用--fields-terminated-by参数来指定分隔符。
-- hive-tablehive当中的表名

2). 也可以通过--query条件查询Mysql数据,将查询结果导入到Hive

sqoop import \
--connect jdbc:mysql://master1.hadoop:3306/test \
--username root \
--password 123456 \
--query 'select * from people_access_log where \$CONDITIONS and url = "https://www.baidu.com"' \
--target-dir /user/hive/warehouse/web/people_access_log \
--delete-target-dir \
--fields-terminated-by '\t' \
-m 1
参数说明
--query后接查询语句,条件查询需要\$CONDITIONS and连接查询条件,这里的\$表示转义$,必须有.
--delete-target-dir如果目标hive表目录存在,则删除,相当于overwrite.

三. Hive数据导入到Mysql

还是使用上面的hiveweb.people_access_log,将其导入到mysql中的people_access_log_out表中.

sqoop export \
--connect jdbc:mysql://master1.hadoop:3306/test \
--username root \
--password 123456 \
--table people_access_log_out \
--input-fields-terminated-by '\t' \
--export-dir /user/hive/warehouse/web.db/people_access_log \
--num-mappers 1

注意:mysqlpeople_access_log_out需要提前建好,否则报错:ErrorException: Table 'test.people_access_log_out' doesn't exist。如果有id自增列,hive表也需要有,hive表与mysql表字段必须完全相同。

create table people_access_log_out like people_access_log;

执行完一个mr任务后,成功导入到mysqlpeople_access_log_out中.

四. mysql数据增量导入hive

实际中mysql数据会不断增加,这时候需要用sqoop将数据增量导入hive,然后进行海量数据分析统计。增量数据导入分两种,一是基于递增列的增量数据导入(Append方式)。二是基于时间列的增量数据导入(LastModified方式)。有几个核心参数:

  • –check-column:用来指定一些列,这些列在增量导入时用来检查这些数据是否作为增量数据进行导入,和关系型数据库中的自增字段及时间戳类似.注意:这些被指定的列的类型不能使任意字符类型,如char、varchar等类型都是不可以的,同时–check-column可以去指定多个列
  • –incremental:用来指定增量导入的模式,两种模式分别为AppendLastmodified
  • –last-value:指定上一次导入中检查列指定字段最大值

1. 基于递增列Append导入

接着前面的日志表,里面每行有一个唯一标识自增列ID,在关系型数据库中以主键形式存在。之前已经将id在0~6之间的编号的订单导入到Hadoop中了(这里为HDFS),现在一段时间后我们需要将近期产生的新的订 单数据导入Hadoop中(这里为HDFS),以供后续数仓进行分析。此时我们只需要指定–incremental 参数为append–last-value参数为6即可。表示只从id大于6后即7开始导入。

1). 创建hive

首先我们需要创建一张与mysql结构相同的hive表,假设指定字段分隔符为\t,后面导入数据时候分隔符也需要保持一致。

2). 创建job

增量导入肯定是多次进行的,可能每隔一个小时、一天等,所以需要创建计划任务,然后定时执行即可。我们都知道hive的数据是存在hdfs上面的,我们创建sqoop job的时候需要指定hive的数据表对应的hdfs目录,然后定时执行这个job即可。

当前mysql中数据,hive中数据与mysql一样也有6条:

iduser_idaccess_timeipurl
1151101010101577003281739112.168.1.2https://www.baidu.com
2151101010111577003281749112.16.1.23https://www.baidu.com
3151101010121577003281759193.168.1.2https://www.taobao.com
4151101010131577003281769112.18.1.2https://www.baidu.com
5151101010141577003281779112.168.10.2https://www.baidu.com
615110101015157700328178911.168.1.2https://www.taobao.com

增量导入有几个参数,保证下次同步的时候可以接着上次继续同步.

sqoop job --create mysql2hive_job -- import \
--connect jdbc:mysql://master1.hadoop:3306/test \
--username root \
--password 123456 \
--table people_access_log \
--target-dir /user/hive/warehouse/web.db/people_access_log \
--check-column id \
--incremental append \
--fields-terminated-by '\t' \
--last-value 6 \
-m 1

这里通过sqoop job --create job_name命令创建了一个名为mysql2hive_jobsqoop job

3). 执行job

创建好了job,后面只需要定时周期执行这个提前定义好的job即可。我们先往mysql里面插入2条数据。

INSERT INTO `people_access_log` (`id`,`user_id`,`access_time`,`ip`,`url`) VALUES
(7,15110101016,1577003281790,'112.168.1.3','https://www.qq.com'),
(8,15110101017,1577003281791,'112.1.1.3','https://www.microsoft.com');

这样mysql里面就会多了2条数据。此时hive里面只有id1 ~ 6的数据,执行同步job使用以下命令。

sqoop job -exec mysql2hive_job

执行完成后,发现刚才mysql新加入的id7 ~ 8的两条数据已经同步到hive

hive> select * from web.people_access_log;
OK
1   15110101010 1577003281739   112.168.1.2 https://www.baidu.com
2   15110101011 1577003281749   112.16.1.23 https://www.baidu.com
3   15110101012 1577003281759   193.168.1.2 https://www.taobao.com
4   15110101013 1577003281769   112.18.1.2  https://www.baidu.com
5   15110101014 1577003281779   112.168.10.2    https://www.baidu.com
6   15110101015 1577003281789   11.168.1.2  https://www.taobao.com
7   15110101016 1577003281790   112.168.1.3 https://www.qq.com
8   15110101017 1577003281791   112.1.1.3   https://www.microsoft.com

由于实际场景中,mysql表中的数据,比如订单表等,通常是一致有数据进入的,这时候只需要将sqoop job -exec mysql2hive_job这个命令定时(比如说10分钟频率)执行一次,就能将数据10分钟同步一次到hive数据仓库。

2. Lastmodified 导入实战

append适合业务系统库,一般业务系统表会通过自增ID作为主键标识唯一性。Lastmodified适合ETL的数据根据时间戳字段导入,表示只导入比这个时间戳大,即比这个时间晚的数据。

1). 新建一张表

mysql中新建一张表people_access_log2,并且初始化几条数据:

CREATE TABLE `people_access_log2` (
  `id` bigint(20) unsigned NOT NULL AUTO_INCREMENT COMMENT 'id',
  `user_id` bigint(20) unsigned NOT NULL COMMENT '用户id',
  `access_time` timestamp DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
  `ip` varchar(15) NOT NULL COMMENT '访客ip',
  `url` varchar(200) CHARACTER SET utf8mb4 COLLATE utf8mb4_bin NOT NULL,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;

插入数据:

insert into people_access_log2(id,user_id, ip, url) values(1,15110101010,'112.168.1.200','https://www.baidu.com');
insert into people_access_log2(id,user_id, ip, url) values(2,15110101011,'112.16.1.2','https://www.baidu.com');
insert into people_access_log2(id,user_id, ip, url) values(3,15110101012,'112.168.1.2','https://www.taobao.com');
insert into people_access_log2(id,user_id, ip, url) values(4,15110101013,'112.168.10.2','https://www.baidu.com');
insert into people_access_log2(id,user_id, ip, url) values(5,15110101014,'112.168.1.2','https://www.jd.com');
insert into people_access_log2(id,user_id, ip, url) values(6,15110101015,'112.168.12.4','https://www.qq.com');

mysql里面的数据就是这样:

iduser_idaccess_timeipurl
1151101010102019-12-28 16:23:10112.168.1.200https://www.baidu.com
2151101010112019-12-28 16:23:33112.16.1.2https://www.baidu.com
3151101010122019-12-28 16:23:41112.168.1.2https://www.taobao.com
4151101010132019-12-28 16:23:46112.168.10.2https://www.baidu.com
5151101010142019-12-28 16:23:52112.168.1.2https://www.jd.com
6151101010152019-12-28 16:23:56112.168.12.4https://www.qq.

2). 初始化hive表:

初始化hive数据,将mysql里面的6条数据导入hive中,并且可以自动帮助我们创建对应hive表,何乐而不为,否则我们需要自己手动创建,完成初始化工作。

sqoop import \
--connect jdbc:mysql://master1.hadoop:3306/test \
--username root \
--password 123456 \
--table people_access_log2 \
--hive-import \
--create-hive-table \
--fields-terminated-by ',' \
--hive-table web.people_access_log2

可以看到执行该命令后,启动了二一个mapreduce任务,这样6条数据就进入hiveweb.people_access_log2了:

hive> select * from web.people_access_log2;
OK
1   15110101010 2019-12-28 16:23:10.0   112.168.1.200   https://www.baidu.com
2   15110101011 2019-12-28 16:23:33.0   112.16.1.2  https://www.baidu.com
3   15110101012 2019-12-28 16:23:41.0   112.168.1.2 https://www.taobao.com
4   15110101013 2019-12-28 16:23:46.0   112.168.10.2    https://www.baidu.com
5   15110101014 2019-12-28 16:23:52.0   112.168.1.2 https://www.jd.com
6   15110101015 2019-12-28 16:23:56.0   112.168.12.4    https://www.qq.com
Time taken: 0.326 seconds, Fetched: 6 row(s)

3). 增量导入数据:

我们再次插入一条数据进入mysqlpeople_access_log2表:

insert into people_access_log2(id,user_id, ip, url) values(7,15110101016,'112.168.12.45','https://www.qq.com');

此时,mysql表里面已经有7条数据了,我们使用incremental的方式进行增量的导入到hive:

sqoop import \
--connect jdbc:mysql://master1.hadoop:3306/test \
--username root \
--password 123456 \
--table people_access_log2 \
--hive-import \
--hive-table people_access_log2 \
-m 1 \
--check-column access_time \
--incremental lastmodified \
--last-value "2019-12-28 16:23:56" \

2019-12-28 16:23:56就是第6条数据的时间,这里需要指定。报错了:

19/12/28 16:17:25 ERROR tool.ImportTool: Error during import: --merge-key or --append is required when using --incremental lastmodified and the output directory exists.

注意:可以看到--merge-key or --append is required when using --incremental lastmodified意思是,这种基于时间导入模式,需要指定--merge-key或者--append参数,表示根据时间戳导入,数据是直接在末尾追加(append)还是合并(merge),这里使用merge方式,根据id合并:

sqoop import \
--connect jdbc:mysql://master1.hadoop:3306/test \
--username root \
--password 123456 \
--table people_access_log2 \
--hive-import \
--hive-table web.people_access_log2 \
--check-column access_time \
--incremental lastmodified \
--last-value "2019-12-28 16:23:56" \
--fields-terminated-by ',' \
--merge-key id

执行该命令后,与直接导入不同,该命令启动了2个mapreduce任务,这样就把数据增量merge导入hive表了.

hive> select * from web.people_access_log2 order by id;
OK
1   15110101010 2019-12-28 16:23:10.0   112.168.1.200   https://www.baidu.com
2   15110101011 2019-12-28 16:23:33.0   112.16.1.2  https://www.baidu.com
3   15110101012 2019-12-28 16:23:41.0   112.168.1.2 https://www.taobao.com
4   15110101013 2019-12-28 16:23:46.0   112.168.10.2    https://www.baidu.com
5   15110101014 2019-12-28 16:23:52.0   112.168.1.2 https://www.jd.com
6   15110101015 2019-12-28 16:23:56.0   112.168.12.4    https://www.qq.com
6   15110101015 2019-12-28 16:23:56.0   112.168.12.4    https://www.qq.com
7   15110101016 2019-12-28 16:28:24.0   112.168.12.45   https://www.qq.com
Time taken: 0.241 seconds, Fetched: 8 row(s)

可以看到id=6的数据,有2条,它的时间刚好是--last-value指定的时间,则会导入大于等于--last-value指定时间的数据,这点需要注意。

转载请注明:柯广的网络日志 » sqoop用法之mysql与hive数据导入导出

Spark提交任务流程

Spark 调用了 AmRmClient 的 API addContainerRequest

AmRmClient 在处理 addContainerRequest 时,会针对每个 ContainerRequest 生成一个ResourceRequest

但是 ResourceRequest 接下来是由为 ResourceRequestInfo 处理的,这个是缓存在 (priority, resourceName, executionType) 三元组为 key 的 map 中的,每次取到了都会重新 set labelExpression。尽管 labelExpression 这里被 random 处理了,但只保留了最后一次。

问:是否可以把 ResourceRequestInfo 的 map 多加一层 label的,这样就能保留每次不同的 labelExpression 了?

答:应该是am去申请的container的时候,标签是随机的,一半提交到了资源紧张的分区,被pending了,客户端因为conainer已经申请完了,不会新申请container,这里要看看yarn有没有重新申请conainer的能力,如果pengding超过一定时间

问:但是我理解每次 assignContainerToNodes 都是以 nm node 为单位的哈,1500台 nm 的集群都没有资源在pending,只分配到小集群的概率太小了,应该不会稳定地出现一边倒的情况。上面分析的结论是,多个 containerRequest 会被合并成一个,所以只带了一个标签。

答:现在现象是长尾任务containaienr都pengding在资源比较少的那些分区是吧。所有的containerrequest本来就是一批哈 咱们这做的就是APP的container分配到同一个区分。

问:短任务也会,长任务时间长了可以跑满到所有分区。猜测是后面几个 stage 对应的request成功分配到了默认分区里。这个可能咱们还得对一下,不然有些典型场景可能会有问题哈。比如如果默认分区几乎跑满了,刚刚弹性扩容出来的分区是空的,所有任务仍然只有一半概率跑到扩容出来的分区上,除非默认分区 100% pending,才有可能在重试的时候都跑到新分区上去。同理,如果倒换过来,一个超过了弹性分区可用资源规模的任务调度到了弹性分区上,也有可能导致一些问题。我理解这种模式可能适合大量的小任务,但是大任务有较小任务更大的概率会变得更慢。

Clickhouse 性能问题分析(一)zk报连接异常,性能低下

原因分析:

table read only 跟 zk 也有直接关系。现在能肯定 zk 集群的性能跟不上么 ?在这个压力下只要错误日志里有zk相关的超时,就是ZK性能更不上。

调优策略:

小批次,大批量写入,降低对ZK的QPS,减少的压力,然后横向扩容解决并发问题,不要怕条数多,她设计的原则就是 要你条数多,才能达到最佳性能,1000/s这种 已经触达了ZK的极限了

  1. 如果zk内存使用率较高,zkEnv.sh 修改zk jvm 为8192m,在zoo.cfg 中修改MaxSessionTimeout=120000
  2. 超时时间socket_timeout 80000

3.Spark insert 的-batch 调大到30000,就是每批次取3万行数据,参数可能是任务的参数


4. metrika.xml的zookeepr配置里调整下参数,调整的ch zk客户端超时,这个值是默认的30s

例如

<zookeeper>

    <node>

        <host>example1</host>

        <port>2181</port>

    </node>

    <node>

        <host>example2</host>

        <port>2181</port>

    </node>

    <session_timeout_ms>30000</session_timeout_ms>  –客户端会话的最大超时(以毫秒为单元)

    <operation_timeout_ms>10000</operation_timeout_ms>

</zookeeper>

  1. 重启生效

现在开始写入了。是先写入buffer engine 然后buffer engine 刷新 底层表

写入的数据库和表是:

xxx_dis

buffer的元数据不在zookeeper上,但是跟其他表的读写需要zk支持,就等于是 buffer 表刷底层表 是要走zookeeper. 的。

看看ZK机器的snapshot文件大小,如下图,也不大

Buffer表的建表语句方便发下吗? ENGINE = Buffer(‘blower_data_prod’, ‘xxx_info’, 18, 60, 100, 5000000, 6000000, 80000000, 1000000000) 主要就是后面这个吧。前面字段无所谓。

hive任务结束了,但是hive终端或者命令行没退出

问题描述:从一个100多亿条记录的hive表里查数据,查出数据后,写入一张新hive表里。mapreduce执行完load data环节后,一直不结束,哪位大神知道怎么更进一步地定位问题?

原因分析:文件比较多,最后一个movetask在移动文件和搜集文件统计信息。hive是迭代式计算,最后会有一个movetask把最终数据文件移动到hive的location下,这个过程有两个地方比较耗时,一个是rename,另外一个是list文件读文件元数据,更新元数据库的统计信息,如果数据在对象存储,rename是copy+delete,这个过程会比较慢,movetask是finaltask,不会起mr,任务没完成前,新表里的数据,不是全量的,只是部分。只有任务退出,才算完全完成

解决方案:

set hive.stats.autogather=false;开启收集线程,可以减少最后收集的时间

mapred.dfsclient.parallelism.max 20 增加并行rename的能力

【原创】几句话说清楚《HBASE Snapshot 恢复 restore 原理》

首先snapshot恢复 理论上是建立在数据文件(region,hfile,storefile)都已经导入到集群中了,只是把元数据更新成snapshot的时刻
其实特别简单
Hmaster处理meta元数据表,跟当前元数据的region比较,分为三种情况:
1、snapshot 有,当前也有:说明这部分数据后来有可能被更新过,宁可错杀100,绝不放过一个,所以,直接先从meta信息删除,然后再从snapshot恢复
2、snapshot 有,当前没有:说明这部分数据被归档了,当前已经不用了,重新将snapshot这部分写入meta信息
3、snapshot 没有,当前有:说明是snapshot后面加入的数据,直接从meta删除

Hmaster恢复Hfile数据文件,分为三种情况:
1、snapshot 有,当前也有:同名的,不做任何改动,因为Hfile一旦落盘,很少会发生改变
2、snapshot 有,当前没有:对于缺少的文件,直接对exportsnapshot拷贝到archive中的Hfile文件做引用
3、snapshot 没有,当前有:说明是snapshot后面加入的数据,转移到archive归档

Hmaster恢复Wal文件:
写入recovered.edits文件夹的相应region中,每个region一个线程,只写入跟snapshot相关的表region信息,RS会从这里进行恢复,几句话说清楚《hbase snapshot 原理》也提到了,恢复只会从snapshot的flushid进行恢复,之前的就不会重复操作了

对于regionserver

参考:https://cloud.tencent.com/developer/article/1047967