基于Clickhouse建日志中心

一、项目背景介绍

经过十三年的发展,快递100目前C端累计注册用户超2.5亿、P端(专业用户)累计注册快递员及网点经营者超130万、B端累计服务电商企业/泛电商企业/品牌企业/政府与公共组织等客户超250万家;每天快递查询调用量超4亿次、寄件下单量超30万单。

公司的业务量和数据量是相对较大且复杂的,因此拥有一个实时性、可扩展性、并拥有强大的搜索与分析功能的日志中心至关重要,它不仅可以记录系统的性能、运行状态,还可以为我们提供很多有价值的业务数据和用户行为分析,这些都将为业务的洞察与决策提供有效支持,从而推动产品的迭代升级和运营策略调整。

同时,日志也是我们线上定位问题的重要手段,我们经常会依赖日志排查解决来自客户的问题,帮助我们提升服务质量,增强客户对公司的产品信赖。

如何构建一套适合我们的日志中心?在业务的发展过程中,日志的架构从原始的文件记录到ELK体系,我们也遇到了一系列的问题,经历实践和研究分析,我们最终构建了新一代的日志系统。下面就给大家分享整个过程。

二、初期架构1. 原始架构

把时间回溯到10年前,我们会怎样去记录日志?

架构比较简单,但问题也比较明显,有几个明显的缺点:

  • 每次查看日志文件都需要登录到不同的机器,非常不方便;
  • 通过 tail 或者 cat 等命令查看日志,如果对日志文件进行检索、聚合等操作,还会对服务器的 io 造成很大的压力,甚至导致故障的产生;
  • 日志文件过大不仅导致查询变得特别慢,还经常带来磁盘告警甚至磁盘空间不足等严重后果;
  • 日志格式不规范,日志随意写到文件,可读性和可分析性几乎为零;
  • 应用多节点挂载 NFS 性能差,容易产生日志丢失,从而影响问题定位和排障。

2.ELK体系

经历了这些问题之后,2017年,我们把目光转向新一代的基于Elasticsearch的日志体系,恰逢之前快递100订单检索引入了Elasticsearch,侧面又了解到基于Elastic Stack的ELK体系,经过一系列研究,开始意识到日志规范的重要性和采集的好处。

ELK是业界最成熟的日志技术栈,使用JSON格式存储,易于解析,再配合全文检索能力,能够快速从众多的日志信息中搜索到关键信息,加上Kibana的易用性,使得日志体验上升了几个档次,架构大致如下,不具体拆分细讲。

我们再回顾一下之前的挑战和缺点,可以看到基本解决了之前遇到的问题:

三、实践中的挑战

使用ELK一段时间之后,我们解决了从无到有的问题。但随着一系列基于日志进行分析和告警的工作逐步开展,新的问题也开始浮现。我们尝试进行优化后,效果并不明显,这促使我们重新考量架构的升级,主要的问题体现在:

  • 成本问题:ES压缩率不高,基于目前的日志量,合规性要求需要保留6个月,需要耗费巨大的存储成本;
  • 吞吐瓶颈:ES分词特性写入吞吐瓶颈问题,容易导致日志写入发生延迟;
  • 资源占用率高:ES在内存使用上的消耗过高;
  • 生命周期维护:ES旧版本TTL问题,需要手工介入数据过期,维护成本高;
  • 分析能力一般:由于更多的分析需求出现,ES的聚合能力受到了挑战。

针对上述的一些问题,在2020年开始了解到Clickhouse的存在,我们对ES和Clickhouse做了一个选型对比。基于对比得出的结论,最终决定选用 Clickhouse 作为下一代日志存储数据库。

四、新的架构体系

ELK体系经过多年的发展,生态已经非常强大,Clickhouse想达到同样的生态,需要更长的时间去发展,因此这个过程也需要投入一些研究或者开发量才能达到更好的效果,幸好 Clickhouse 本身的学习曲线较低,经过短时间的研究,我们制定了新的日志架构。

可以从几个组件的构成来看这个架构图,对比ELK体系与新的体系的不同:

  • 采集层:从Logstash到ilogtail,ilogtail性能更强,资源消耗更低;
  • 处理层:从Logstash到ilogtail,ilogtail还支持数据脱敏,多行拆分等实用功能;
  • 存储层:从Elasticsearch改为Clickhouse,选型过程已有对比,这里不再赘述;
  • 可视化:从Kibana到Clickvisual,这里有优点也有缺点,所以还是配合了Grafana才达到类似的效果。

Clickvisual的优点:灵活的SQL、日志审计、告警策略;

Kibana的优点:Kibana具备一些基础的BI功能,可用于日志分析。

1.新架构的成果

还是回顾一下之前的挑战,问题基本得到了解决:

2.基于Clickhouse的日志存储

基于10亿日志数据进行测试,得出磁盘占用的对比柱状图:

基于10亿数据的测试,在两者集群模式下,消费Kafka的速度对比

新的架构最核心的改变,就是将ES换成了Clickhouse,看中的就是极高的压缩率,最终的结果是同等存储条件下,原来ES只能保留一个月的数据,现在可以做到保留六个月,这其中少不了很多存储细节的优化,其中包含:

  • 大部分字段采用ZSTD压缩模式来提升压缩率;
  • 低基数LowCardinality的使用,节省存储的同时还做到性能提升;
  • 连续性时间字段的Delta+ZSTD压缩;
  • 冷热策略的配置,近一个月保留在SSD盘,一到六个月的数据自动流到HDD盘,六个月前数据自动清理。


建表语句如下:

3.基于Clickvisual的可视化

ClickVisual 是一个轻量级的开源日志查询、分析、报警的可视化平台,致力于提供一站式应用可靠性的可视化的解决方案。既可以独立部署使用,也可作为插件集成到第三方系统。目前是市面上唯一一款支持 ClickHouse 的类 Kibana 的开源业务日志查询平台。

它具备的特性,部分符合我们的需求:

  • 支持可视化的查询面板,可查询命中条数直方图和原始日志;
  • 支持设置日志索引功能,分析不同索引的占比情况;
  • 支持 Proxy Auth 功能,能被非常轻松地集成到第三方系统;
  • 支持基于 ClickHouse 日志的实时报警功能。

还提供了原始的SQL查询功能,直接输入SQL聚合语句,即时简单地对日志进行聚合分析:

体验总体类似Kibana,细节稍有不足,通过这个查询分析界面作为一个入口,搭配日志告警模块,快速定位问题和故障排除方面的能力得到了大大的提升,基本无缝从Kibana上切换过来,拥有不错的排障体验。

五、进一步优化

诚然,做到上述的效果还是不足以满足我们的要求。因而在此基础上,我们进行了优化方面的思考 ,其中也踩了一些Clickhouse的坑,使用了一些Clickhouse的新特性,是一个很有意思的过程。1.日志查询优化探索

得益于Clickhouse的高压缩率和查询性能,小日志量的表直接配合时间分区搜索即可,但是当日志量涨到一定程度的时候,查询缓慢总是一个难受的事,我们对一些场景进行了总结:

  • traceid场景:在 Skywalking 中根据 traceid 查询链路日志时,使用 tokenbf_v1索引,并通过 hasToken 查询,由于跳过大部分无效 parts,可快速命中返回;
  • 无结构化日志:对于这种无结构化日志,用 like 性能会非常慢且消耗 CPU 甚至内存,新版本的 Clickhouse 已经支持了倒排索引,也开始基于倒排— 索引优化,可大大提高响应速度;
  • 聚合场景:一些常规的聚合需求,可通过Clickhouse的Projection功能来满足。

2.本地表还是分布式表

本地表是Clickhouse的存储表,分布式表只是逻辑表,本身并不存储数据,在日志高频写入的场景,还是推荐写本地表,原因有这么几点:

  • 当我们大批量写入日志时,可以直接往分布式表写,但数据会先拆分成不同parts,再通过Zookeeper进行分发,增加了集群间网络的负载,导致写入变慢,甚至出现Too many parts问题
  • 写分布式表更容易出现数据一致性问题
  • Zookeeper压力变大

3.Clickhouse的限制策略

随着日志中心的建设,日志体量越来越大,开始暴露一些配置层面的问题,我们开始对 Clickhouse 增加一些限制,以免错误的 SQL 导致集群缓慢甚至 OOM 的问题,对于日志查询用户,单独做了 SQL 复杂度的限制,users.xml 中有几个参数:

  • max_memory_usage:单个服务器上运行查询的最大内存;
  • max_memory_usage_for_user:单个服务器上运行用户查询的最大内存;
  • max_memory_usage_for_all_queries:单个服务器上运行所有查询的最大内存;
  • max_rows_to_read:运行查询时可从表中读取的最大行数;
  • max_result_rows:限制结果中的行数;
  • max_bytes_to_read:运行查询时可以从表中读取的最大字节数(未压缩数据)。

结语

在这篇文章中,我们分享了快递100在云原生技术方面的实践、思考和应用,特别是在日志中心建设方面的实践。日志中心的上线应用,让我们在问题定位方面的效率得到了极大的提升,系统更加稳定和可靠;同时,通过对日志的收集、分析和挖掘,我们也更好地了解了用户需求和行为,优化了产品设计和运营策略,促进业务高速增长。

from:https://mp.weixin.qq.com/s?__biz=MzkzMjYzNjkzNw==&mid=2247612190&idx=1&sn=046e724f035dedd610ffb43e56ae07ea&source=41#wechat_redirect

(转)Iceberg文件组织

使用以下 SQL 创建名为user_log_iceberg的 Iceberg 表并插入一条数据:

create table hadoop_catalog.iceberg_db.user_log_iceberg (
    imei string,
    uuid string,
    udt timestamp
)
using iceberg
partitioned by (days(udt));

insert into hadoop_catalog.iceberg_db.user_log_iceberg values ('xxxxxxxxxxxxx', 'yyyyyyyyyyyyy', cast(1640966400 as timestamp));

user_log_iceberg 在文件系统中的存储结构为:

user_log_iceberg/
├── data
│   └── udt_day=2021-12-31
│       └── 00000-0-67ab9286-794b-456d-a1d3-9c797a2b4b03-00001.parquet
└── metadata
    ├── f9d66153-6745-4103-ad24-334fc62f0d1e-m0.avro
    ├── snap-6744647507914918603-1-f9d66153-6745-4103-ad24-334fc62f0d1e.avro
    ├── v1.metadata.json
    ├── v2.metadata.json
    └── version-hint.text

可以看到该表目录下有两个子文件夹:data 和 metadata。其中 data 就是真正的数据文件目录,metadata 是该表的元数据目录,这里的 metadata 就是替代 Hive 中的 Metastore 服务的。

在了解 Iceberg 元数据管理之前先看几个概念:

  1. SnapshotSnapshot 就是表在某个时间点的状态,其中包括该时间点所有的数据文件。Iceberg 对表的每次更改都会新增一个 Snapshot。
  2. Metadata File每新增一个 Snapshot 就会新增一个 Metadata 文件,该文件记录了表的存储位置、Schema 演化信息、分区演化信息以及所有的 Snapshot 以及所有的 Manifest List 信息。
  3. Manifest ListManifest List 是一个元数据文件,其中记录了所有组成快照的 Manifest 文件信息。
  4. Manifest FileManifest File 是记录 Iceberg 表快照的众多元数据文件的其中一个。其中的每一行都记录了一个数据文件的分区,列级统计等信息。 一个 Manifest List 文件中可以包含多个 Manifest File 的信息。
  5. Partition Spec表示字段值和分区值之间的逻辑关系。
  6. Data File包含表中所有行的文件。
  7. Delete File对按位置或数据值删除的表行进行编码的文件。
iceberg metadata

从上图中可以看出 Iceberg 表通过三级关系管理表数据,下面以 Spark 中的 spark.sql.catalog.hadoop_prod.type=hadoop 为例说明。:

最上层中记录了 Iceberg 表当前元数据的版本,对应的是version-hint.text文件,version-hint.text文件中只记录了一个数字表示当前的元数据版本,初始为 1,后续表每变更一次就加 1。

中间层是元数据层。其中 Metadata File 记录了表的存储位置、Schema 演化信息、分区演化信息以及所有的 Snapshot 和 Manifest List 信息,对应的是v1.metadata.jsonv2.metadata.json文件,其中v后面的数字和version-hint.text文件中的数字对应,每当新增一个 Snapshot 的时候,version-hint.text中的数字加 1,同时也会新增一个vx.metadata.json文件,比如执行insert into hadoop_catalog.iceberg_db.user_log_iceberg values ('xxxxxxxxxxxxx', 'yyyyyyyyyyyyy', cast(1640986400 as timestamp))delete from hadoop_catalog.iceberg_db.user_log_iceberg where udt = cast(1640986400 as timestamp)之后,版本就会变成v4:

user_log_iceberg/
├── data
│   └── udt_day=2021-12-31
│       ├── 00000-0-67ab9286-794b-456d-a1d3-9c797a2b4b03-00001.parquet
│       └── 00000-0-88d582ef-605e-4e51-ba98-953ee3dd4c02-00001.parquet
└── metadata
    ├── b3b1643b-56a2-471e-a4ec-0f87f1efcd80-m0.avro
    ├── ecb9255a-bcc5-4954-a4e9-3a54f5b09500-m0.avro
    ├── f9d66153-6745-4103-ad24-334fc62f0d1e-m0.avro
    ├── snap-4140724156423386841-1-ecb9255a-bcc5-4954-a4e9-3a54f5b09500.avro
    ├── snap-6744647507914918603-1-f9d66153-6745-4103-ad24-334fc62f0d1e.avro
    ├── snap-8046643380197343006-1-b3b1643b-56a2-471e-a4ec-0f87f1efcd80.avro
    ├── v1.metadata.json
    ├── v2.metadata.json
    ├── v3.metadata.json
    ├── v4.metadata.json
    └── version-hint.text

查看v4.metadata.json中的内容如下:点击查看

{
  "format-version": 1,
  "table-uuid": "c69c9f46-b9d8-40cf-99da-85f55cb7bffc",
  "location": "/opt/module/spark-3.2.1/spark-warehouse/iceberg_db/user_log_iceberg",
  "last-updated-ms": 1647772606874,
  "last-column-id": 3,
  "schema": {
    "type": "struct",
    "schema-id": 0,
    "fields": [
      {
        "id": 1,
        "name": "imei",
        "required": false,
        "type": "string"
      },
      {
        "id": 2,
        "name": "uuid",
        "required": false,
        "type": "string"
      },
      {
        "id": 3,
        "name": "udt",
        "required": false,
        "type": "timestamptz"
      }
    ]
  },
  "current-schema-id": 0,
  "schemas": [
    {
      "type": "struct",
      "schema-id": 0,
      "fields": [
        {
          "id": 1,
          "name": "imei",
          "required": false,
          "type": "string"
        },
        {
          "id": 2,
          "name": "uuid",
          "required": false,
          "type": "string"
        },
        {
          "id": 3,
          "name": "udt",
          "required": false,
          "type": "timestamptz"
        }
      ]
    }
  ],
  "partition-spec": [
    {
      "name": "udt_day",
      "transform": "day",
      "source-id": 3,
      "field-id": 1000
    }
  ],
  "default-spec-id": 0,
  "partition-specs": [
    {
      "spec-id": 0,
      "fields": [
        {
          "name": "udt_day",
          "transform": "day",
          "source-id": 3,
          "field-id": 1000
        }
      ]
    }
  ],
  "last-partition-id": 1000,
  "default-sort-order-id": 0,
  "sort-orders": [
    {
      "order-id": 0,
      "fields": []
    }
  ],
  "properties": {
    "owner": "PowerYang"
  },
  "current-snapshot-id": 4140724156423386600,
  "snapshots": [
    {
      "snapshot-id": 6744647507914919000,
      "timestamp-ms": 1647758232673,
      "summary": {
        "operation": "append",
        "spark.app.id": "local-1647757937137",
        "added-data-files": "1",
        "added-records": "1",
        "added-files-size": "1032",
        "changed-partition-count": "1",
        "total-records": "1",
        "total-files-size": "1032",
        "total-data-files": "1",
        "total-delete-files": "0",
        "total-position-deletes": "0",
        "total-equality-deletes": "0"
      },
      "manifest-list": "/opt/module/spark-3.2.1/spark-warehouse/iceberg_db/user_log_iceberg/metadata/snap-6744647507914918603-1-f9d66153-6745-4103-ad24-334fc62f0d1e.avro",
      "schema-id": 0
    },
    {
      "snapshot-id": 8046643380197343000,
      "parent-snapshot-id": 6744647507914919000,
      "timestamp-ms": 1647772293740,
      "summary": {
        "operation": "append",
        "spark.app.id": "local-1647770527459",
        "added-data-files": "1",
        "added-records": "1",
        "added-files-size": "1032",
        "changed-partition-count": "1",
        "total-records": "2",
        "total-files-size": "2064",
        "total-data-files": "2",
        "total-delete-files": "0",
        "total-position-deletes": "0",
        "total-equality-deletes": "0"
      },
      "manifest-list": "/opt/module/spark-3.2.1/spark-warehouse/iceberg_db/user_log_iceberg/metadata/snap-8046643380197343006-1-b3b1643b-56a2-471e-a4ec-0f87f1efcd80.avro",
      "schema-id": 0
    },
    {
      "snapshot-id": 4140724156423386600,
      "parent-snapshot-id": 8046643380197343000,
      "timestamp-ms": 1647772606874,
      "summary": {
        "operation": "delete",
        "spark.app.id": "local-1647770527459",
        "deleted-data-files": "1",
        "deleted-records": "1",
        "removed-files-size": "1032",
        "changed-partition-count": "1",
        "total-records": "1",
        "total-files-size": "1032",
        "total-data-files": "1",
        "total-delete-files": "0",
        "total-position-deletes": "0",
        "total-equality-deletes": "0"
      },
      "manifest-list": "/opt/module/spark-3.2.1/spark-warehouse/iceberg_db/user_log_iceberg/metadata/snap-4140724156423386841-1-ecb9255a-bcc5-4954-a4e9-3a54f5b09500.avro",
      "schema-id": 0
    }
  ],
  "snapshot-log": [
    {
      "timestamp-ms": 1647758232673,
      "snapshot-id": 6744647507914919000
    },
    {
      "timestamp-ms": 1647772293740,
      "snapshot-id": 8046643380197343000
    },
    {
      "timestamp-ms": 1647772606874,
      "snapshot-id": 4140724156423386600
    }
  ],
  "metadata-log": [
    {
      "timestamp-ms": 1647757946953,
      "metadata-file": "/opt/module/spark-3.2.1/spark-warehouse/iceberg_db/user_log_iceberg/metadata/v1.metadata.json"
    },
    {
      "timestamp-ms": 1647758232673,
      "metadata-file": "/opt/module/spark-3.2.1/spark-warehouse/iceberg_db/user_log_iceberg/metadata/v2.metadata.json"
    },
    {
      "timestamp-ms": 1647772293740,
      "metadata-file": "/opt/module/spark-3.2.1/spark-warehouse/iceberg_db/user_log_iceberg/metadata/v3.metadata.json"
    }
  ]
}

可以看到snapshots属性中记录了多个 Snapshot 信息,每个 Snapshot 中包含了 snapshot-id、parent-snapshot-id、manifest-list 等信息。通过最外层的 current-snapshot-id 可以定位到当前 Snapshot 以及 manifest-list 文件为/opt/module/spark-3.2.1/spark-warehouse/iceberg_db/user_log_iceberg/metadata/snap-4140724156423386841-1-ecb9255a-bcc5-4954-a4e9-3a54f5b09500.avro。使用java -jar avro-tools-1.11.0.jar tojson snap-4140724156423386841-1-ecb9255a-bcc5-4954-a4e9-3a54f5b09500.avro > manifest_list.json将该文件转换成 json 格式,查看其内容:点击查看

({
  "manifest_path": "/opt/module/spark-3.2.1/spark-warehouse/iceberg_db/user_log_iceberg/metadata/ecb9255a-bcc5-4954-a4e9-3a54f5b09500-m0.avro",
  "manifest_length": 6127,
  "partition_spec_id": 0,
  "added_snapshot_id": {
    "long": 4140724156423386841
  },
  "added_data_files_count": {
    "int": 0
  },
  "existing_data_files_count": {
    "int": 0
  },
  "deleted_data_files_count": {
    "int": 1
  },
  "partitions": {
    "array": [
      {
        "contains_null": false,
        "contains_nan": {
          "boolean": false
        },
        "lower_bound": {
          "bytes": "0J\u0000\u0000"
        },
        "upper_bound": {
          "bytes": "0J\u0000\u0000"
        }
      }
    ]
  },
  "added_rows_count": {
    "long": 0
  },
  "existing_rows_count": {
    "long": 0
  },
  "deleted_rows_count": {
    "long": 1
  }
},
{
  "manifest_path": "/opt/module/spark-3.2.1/spark-warehouse/iceberg_db/user_log_iceberg/metadata/f9d66153-6745-4103-ad24-334fc62f0d1e-m0.avro",
  "manifest_length": 6128,
  "partition_spec_id": 0,
  "added_snapshot_id": {
    "long": 6744647507914918603
  },
  "added_data_files_count": {
    "int": 1
  },
  "existing_data_files_count": {
    "int": 0
  },
  "deleted_data_files_count": {
    "int": 0
  },
  "partitions": {
    "array": [
      {
        "contains_null": false,
        "contains_nan": {
          "boolean": false
        },
        "lower_bound": {
          "bytes": "0J\u0000\u0000"
        },
        "upper_bound": {
          "bytes": "0J\u0000\u0000"
        }
      }
    ]
  },
  "added_rows_count": {
    "long": 1
  },
  "existing_rows_count": {
    "long": 0
  },
  "deleted_rows_count": {
    "long": 0
  }
})

里面包含了两条 json 数据,分别对应了个 Manifest 文件信息,除了 Manifest 文件的路径之外还有一些统计信息。使用java -jar avro-tools-1.11.0.jar tojson f9d66153-6745-4103-ad24-334fc62f0d1e-m0.avro > manifest_1.jsonjava -jar avro-tools-1.11.0.jar tojson ecb9255a-bcc5-4954-a4e9-3a54f5b09500-m0.avro > manifest_2.json将两个 Manifest 文件转为 json 格式,观察其内容:点击查看

{
  "status": 1,
  "snapshot_id": {
    "long": 6744647507914918603
  },
  "data_file": {
    "file_path": "/opt/module/spark-3.2.1/spark-warehouse/iceberg_db/user_log_iceberg/data/udt_day=2021-12-31/00000-0-67ab9286-794b-456d-a1d3-9c797a2b4b03-00001.parquet",
    "file_format": "PARQUET",
    "partition": {
      "udt_day": {
        "int": 18992
      }
    },
    "record_count": 1,
    "file_size_in_bytes": 1032,
    "block_size_in_bytes": 67108864,
    "column_sizes": {
      "array": [
        {
          "key": 1,
          "value": 48
        },
        {
          "key": 2,
          "value": 48
        },
        {
          "key": 3,
          "value": 51
        }
      ]
    },
    "value_counts": {
      "array": [
        {
          "key": 1,
          "value": 1
        },
        {
          "key": 2,
          "value": 1
        },
        {
          "key": 3,
          "value": 1
        }
      ]
    },
    "null_value_counts": {
      "array": [
        {
          "key": 1,
          "value": 0
        },
        {
          "key": 2,
          "value": 0
        },
        {
          "key": 3,
          "value": 0
        }
      ]
    },
    "nan_value_counts": {
      "array": []
    },
    "lower_bounds": {
      "array": [
        {
          "key": 1,
          "value": "xxxxxxxxxxxxx"
        },
        {
          "key": 2,
          "value": "yyyyyyyyyyyyy"
        },
        {
          "key": 3,
          "value": "\u0000@\\CsÔ\u0005\u0000"
        }
      ]
    },
    "upper_bounds": {
      "array": [
        {
          "key": 1,
          "value": "xxxxxxxxxxxxx"
        },
        {
          "key": 2,
          "value": "yyyyyyyyyyyyy"
        },
        {
          "key": 3,
          "value": "\u0000@\\CsÔ\u0005\u0000"
        }
      ]
    },
    "key_metadata": null,
    "split_offsets": {
      "array": [4]
    },
    "sort_order_id": {
      "int": 0
    }
  }
}
{
  "status": 2,
  "snapshot_id": {
    "long": 4140724156423386841
  },
  "data_file": {
    "file_path": "/opt/module/spark-3.2.1/spark-warehouse/iceberg_db/user_log_iceberg/data/udt_day=2021-12-31/00000-0-88d582ef-605e-4e51-ba98-953ee3dd4c02-00001.parquet",
    "file_format": "PARQUET",
    "partition": {
      "udt_day": {
        "int": 18992
      }
    },
    "record_count": 1,
    "file_size_in_bytes": 1032,
    "block_size_in_bytes": 67108864,
    "column_sizes": {
      "array": [
        {
          "key": 1,
          "value": 48
        },
        {
          "key": 2,
          "value": 48
        },
        {
          "key": 3,
          "value": 51
        }
      ]
    },
    "value_counts": {
      "array": [
        {
          "key": 1,
          "value": 1
        },
        {
          "key": 2,
          "value": 1
        },
        {
          "key": 3,
          "value": 1
        }
      ]
    },
    "null_value_counts": {
      "array": [
        {
          "key": 1,
          "value": 0
        },
        {
          "key": 2,
          "value": 0
        },
        {
          "key": 3,
          "value": 0
        }
      ]
    },
    "nan_value_counts": {
      "array": []
    },
    "lower_bounds": {
      "array": [
        {
          "key": 1,
          "value": "xxxxxxxxxxxxx"
        },
        {
          "key": 2,
          "value": "yyyyyyyyyyyyy"
        },
        {
          "key": 3,
          "value": "\u0000\btëwÔ\u0005\u0000"
        }
      ]
    },
    "upper_bounds": {
      "array": [
        {
          "key": 1,
          "value": "xxxxxxxxxxxxx"
        },
        {
          "key": 2,
          "value": "yyyyyyyyyyyyy"
        },
        {
          "key": 3,
          "value": "\u0000\btëwÔ\u0005\u0000"
        }
      ]
    },
    "key_metadata": null,
    "split_offsets": {
      "array": [4]
    },
    "sort_order_id": {
      "int": 0
    }
  }
}

可以看到,每个 Manifest 文件中的每一行都对应一个 data 目录下的数据文件,除了记录数据文件的路径之外,还记录了该数据文件对应的文件格式、分区信息、以及尽可能详细地记录了各个字段的统计信息、排序信息等。Manifest 文件中的 status,表示此次操作的类型,1 表示 add,2 表示 delete。

可以发现 Iceberg 中对数据文件的管理是文件级别,分区管理、字段统计也是到文件级别,而不是目录级别,这也是为什么 Iceberg 扫描要比 Hive 快的原因。在扫描计划里,查询谓词会自动转换为分区数据的谓词,并首先应用于过滤数据文件。接下来,使用列级值计数、空计数、下限和上限来消除无法匹配查询谓词的文件,在某些情况下可以提升数十倍效率。

但是由于 Iceberg 用 json 文件存储 Metadata,表的每次更改都会新增一个 Metadata 文件,以保证操作的原子性。历史 Metadata 文件不会删除,所以像流式作业就需要定期清理 Metadata 文件,因为频繁的提交会导致堆积大量的 Metadata。可以通过配置write.metadata.delete-after-commit.enabledwrite.metadata.previous-versions-max属性实现自动清理元数据。

提示

前面是以 spark.sql.catalog.hadoop_prod.type=hadoop举例,如果 spark.sql.catalog.hadoop_prod.type=hive,文件组织方式会稍有不同,如:

  1. 没有version-hint.text文件,而是通过 Metastore 服务来记录当前版本指针;
  2. Metadata File 的命名不再是vx.metadata.json的方式,.metadata.json前面的vx部分将是一个很长的 UUID。

from:https://www.sqlboy.tech/pages/19baa8/

(转)「大数据」Hive 分区和分桶的区别及示例讲解

一、概述

在大数据处理过程中,Hive是一种非常常用的数据仓库工具。Hive分区和分桶是优化Hive性能的两种方式,它们的区别如下:

1)分区概述

Hive分区是把数据按照某个属性分成不同的数据子集。

  • 在Hive中,数据被存储在HDFS中,每个分区实际上对应HDFS下的一个文件夹,这个文件夹中保存了这个分区的数据。
  • 因此,在Hive中使用分区,实际上是将数据按照某个属性值进行划分,然后将相同属性值的数据存储在同一个文件夹中。Hive分区的效率提升主要是因为,当进行查询操作时,只需读取与查询相关的数据分区,避免了全表扫描,节约了查询时间

Hive分区的主要作用是:

  • 提高查询效率: 使用分区对数据进行访问时,系统只需要读取和此次查询相关的分区,避免了全表扫描,从而显著提高查询效率。
  • 降低存储成本: 分区可以更加方便的删除过期数据,减少不必要的存储。

2)分桶概述

Hive分桶是将数据划分为若干个存储文件,并规定存储文件的数量。

  • Hive分桶的实现原理是将数据按照某个字段值分成若干桶,并将相同字段值的数据放到同一个桶中。在存储数据时,桶内的数据会被写入到对应数量的文件中,最终形成多个文件。
  • Hive分桶主要是为了提高分布式查询的效率。它能够通过将数据划分为若干数据块来将大量数据分发到多个节点,使得数据均衡分布到多个机器上处理。这样分发到不同节点的数据可以在本地进行处理,避免了数据的传输和网络带宽的浪费,同时提高了查询效率。

分桶的主要作用是:

  • 数据聚合: 分桶可以使得数据被分成较小的存储单元,提高了数据统计和聚合的效率。
  • 均衡负载: 数据经过分桶后更容易实现均衡负载,数据可以分发到多个节点中,提高了查询效率。

综上所述,分区和分桶的区别在于其提供的性能优化方向不同。分区适用于对于数据常常进行的聚合查询数据分析,而分桶适用于对于数据的均衡负载、高效聚合等方面的性能优化。当数据量较大、查询效率比较低时,使用分区和分桶可以有效优化性能。分区主要关注数据的分区和存储,而分桶则重点考虑数据的分布以及查询效率。

二、环境准备

如果已经有了环境了,可以忽略,如果想快速部署环境可以参考我这篇文章:通过 docker-compose 快速部署 Hive 详细教程

三、外部表和管理表

在Hive中,可以创建两种类型的表:外部表和管理表。它们之间的主要区别如下:

1)外部表

1、外部表介绍

外部表是指在Hive中创建的表,实际上其数据是存储在外部文件系统(HDFS或本地文件系统)中的。

  • 外部分区表是一种特殊类型的表,它们的数据存储在Hive之外的文件系统上,例如HDFS、S3等。
  • 对于外部分区表,Hive只会管理它们的元数据信息,而不会管理数据文件本身,这意味着,如果你使用Hive命令删除一个外部分区表,只会删除该表的元数据,而不会删除数据文件
  • 外部分区表通常用于存储和管理原始数据,这些数据通常需要在多个系统和工具之间共享。

2、示例讲解

【示例一】下面是创建Hive外部表的一个示例(数据存储在HDFS):

假设我们有一个存储在 HDFS 上的数据文件,其路径为’/user/hive/external_table/data’,我们可以通过以下语句,在Hive中创建一个外部表:

# 登录容器docker exec -it hive-hiveserver2 
# 登录hive客户端beeline -u jdbc:hive2://hive-hiveserver2:10000  -n hadoop
# 建表
CREATE EXTERNAL TABLE external_table1 (
    column1 STRING,
    column2 INT,
    column3 DOUBLE)
ROW FORMAT DELIMITEDFIELDS TERMINATED BY ','LINES TERMINATED BY '\n'STORED AS TEXTFILELOCATION '/user/hive/external_table/data';

在该表中,我们指定了表的各列的数据类型和分隔符等信息,并且使用了LOCATION 关键字来指定数据文件的存储位置。这样,在Hive中对该外部表进行查询操作时,Hive会自动去对应的位置读取数据文件,并据此返回查询结果。

load 数据

# 模拟一些数据cat >data<<EOFc1,12,56.33c2,14,58.99c3,15,66.34c4,16,76.78EOF
# 登录hive客户端beeline -u jdbc:hive2://hive-hiveserver2:10000  -n hadoop# 加载数据,local 是加载本机文件数据load data local inpath './data' into table external_table1;

需要注意的是,在使用外部表时,我们必须保证Hive对数据文件的访问权限与HDFS的文件权限相同,否则会导致外部表的查询失败。此外,在使用外部表时,务必不要删除外部表的数据文件,否则将会导致查询结果的不准确。

【示例一】下面是创建外部表访问本地数据文件的示例(数据存储在本地,很少使用):

在Hive中,我们同样可以创建外部表来访问本地文件系统上的数据文件。在这种情况下,我们需要注意的是,在Hive的配置中,必须开启hive.stats.autogather 功能。否则,在查询外部表时可能会出现错误。

假设我们有一个存储在本地文件系统上的数据文件,路径为’/path/to/local/file’,我们可以通过以下语句,在Hive中创建一个外部表:

CREATE EXTERNAL TABLE external_table2 (
    column1 STRING,
    column2 INT,
    column3 DOUBLE)ROW FORMAT DELIMITEDFIELDS TERMINATED BY ','LINES TERMINATED BY '\n'STORED AS TEXTFILELOCATION 'file:///path/to/local/file';### hive文件存储格式包括以下几类(STORED AS TEXTFILE):#1、TEXTFILE#2、SEQUENCEFILE#3、RCFILE#4、ORCFILE(0.11以后出现)#其中TEXTFILE为默认格式,建表时不指定默认为这个格式,导入数据时会直接把数据文件拷贝到hdfs上不进行处理;

需要注意的是,我们在使用LOCATION关键字时,要指定为’file:///path/to/local/file’,而不是 ‘/path/to/local/file’ ,这是因为我们需要使用文件系统的URL来访问本地文件系统上的数据文件。

2)管理表(内部表)

1、管理表(内部表)介绍

管理表是利用Hive自身的存储能力来对数据进行存储和管理的表。在Hive中创建管理表时,必须指定数据的存储路径。

  • 管理表也称为内部表(Internal Table),管理表是Hive默认创建的表类型,它的数据存储在Hive默认的文件系统上(通常是HDFS)。
  • Hive会自动管理这些表的数据和元数据,包括表的位置、数据格式等。如果你使用Hive命令删除了一个管理表,那么该表的数据也会被删除
  • 通常情况下,管理表用于存储和管理中间结果、汇总数据和基础数据。当数据规模较小时,管理表是一个不错的选择,因为它可以提供更好的查询性能,同时也更容易管理。

2、示例讲解

在Hive中,除了外部表,我们还可以创建内部表来存储数据。与外部表不同的是,内部表存储的数据位于Hive自身管理的HDFS上,因此,在创建内部表时,我们需要确保数据可以被正确地上传到HDFS上。下面是创建内部表并存储在本机的示例:

假设我们有以下数据文件,名为data.csv,存储在本地文件系统的/path/to/local目录下:

cat >data.csv<<EOFvalue1,1,2.3value2,2,3.4value3,3,4.5EOF

我们可以使用以下语句,在Hive中创建一个内部表:

CREATE TABLE internal_table (    column1 STRING,    column2 INT,    column3 DOUBLE)ROW FORMAT DELIMITEDFIELDS TERMINATED BY ','LINES TERMINATED BY '\n'STORED AS TEXTFILE;# 加载本地数据,LOCAL LOAD DATA LOCAL INPATH './data.csv' INTO TABLE internal_table;# 加载HDFS数据# 先将文件推送到HDFS上hdfs dfs -put ./data.csv /tmp/# 登录hive客户端beeline -u jdbc:hive2://hive-hiveserver2:10000  -n hadoop# 加载HDFS上的数据LOAD DATA INPATH '/tmp/data.csv' INTO TABLE internal_table;# 查询select * from internal_table;


总之,外部表和管理表都可以在Hive中实现数据的存储和管理,但它们之间的不同主要体现在数据的存储和处理方式上。

四、分区表之静态分区和动态分区

Hive中的分区表可以进一步细分为静态分区和动态分区。

静态分区是指通过手动指定分区列的值来创建分区。例如,在创建一个基于年份的分区表时,我们可以手动指定每个分区名对应的年份:

CREATE TABLE sales (
  id int,
  date string,
  amount double)PARTITIONED BY (year string);
ALTER TABLE sales ADD PARTITION (year='2019') location '/data/sales/2022';
ALTER TABLE sales ADD PARTITION (year='2020') location '/data/sales/2023';

在上述示例中,我们通过 ALTER TABLE 语句手动添加了2019和2020两个年份的分区。

动态分区是指在加载数据时通过SQL语句自动创建分区。例如,在从一个包含销售记录的数据文件中加载数据时,可以自动根据数据中的年份信息创建相应的分区:

INSERT INTO TABLE sales PARTITION (year)SELECT id, date, amount, YEAR(date)FROM raw_sales;

在上述示例中,我们使用 PARTITION 子句指定在 CREATE TABLE 语句中定义的分区列year,并使用 YEAR(date) 表达式从数据中提取出年份信息。

动态分区的优点在于它可以大大简化创建和管理分区表的过程并提高效率;但是需要注意的是,它可能会在某些情况下产生不可预期的行为,例如可能创建太多分区。

总之,静态分区和动态分区都是用于在Hive中管理大型数据集的有效工具,具体使用需要根据具体情况选择最适合的方法,并理解它们的优点和缺点。

五、hive分区表严格模式和非严格模式

Hive分区表的严格模式和非严格模式可以通过以下两个参数进行设置:

  1. hive.exec.dynamic.partition.mode:该参数用于设置分区模式,其默认值为strict,即严格模式。可以将其设置为nonstrict,即非严格模式
# 登录hive客户端beeline -u jdbc:hive2://hive-hiveserver2:10000  -n hadoop# 设置SET hive.exec.dynamic.partition.mode=nonstrict;
  1. hive.exec.max.dynamic.partitions:该参数用于限制动态分区的最大数量。在非严格模式下,当动态分区的数量超过该参数指定的值时,Hive将抛出异常。可以通过以下语句来修改该参数:
SET hive.exec.max.dynamic.partitions=<value>;

其中,<value> 为一个整数值,表示限制的动态分区数量。如果需要取消该限制,可以将该参数设置为一个非正数,例如:

SET hive.exec.max.dynamic.partitions=-1;

需要注意的是,这些参数的设置仅对当前会话有效,也可以将其添加到Hive的配置文件中以在每个会话中自动应用。

总之,hive.exec.dynamic.partition.mode 和 hive.exec.max.dynamic.partitions 是控制Hive分区表严格模式和非严格模式的两个重要参数,开发人员可以根据自己的需求进行设置。

1)严格模式

严格模式要求在加载数据时必须指定所有分区列的值,否则将会导致抛出异常。例如,在下面的分区表中:

CREATE TABLE sales (
  id int,
  date string,
  amount double)PARTITIONED BY (year string, month string, day string)CLUSTERED BY (id) INTO 10 BUCKETS;

在严格模式下,我们必须为year、month和day三个分区列的所有可能取值指定一个分区:

INSERT INTO TABLE sales PARTITION (year='2019', month='01', day='03')SELECT id, date, amountFROM raw_salesWHERE YEAR(date) = 2019 AND MONTH(date) = 1 AND DAY(date) = 3;

在上述示例中,我们使用 PARTITION 子句手动为分区列year、month、day指定取值。

2)非严格模式

非严格模式则允许忽略某些分区列的值,这样使用 INSERT INTO 语句时只需指定提供的分区值即可。例如:

# SET hive.exec.dynamic.partition.mode=nonstrict;INSERT INTO TABLE sales PARTITION (year, month, day)SELECT id, YEAR(date), MONTH(date), DAY(date), amountFROM raw_salesWHERE YEAR(date) = 2019;

在上述示例中,我们使用 SET 语句设置分区模式为非严格模式,然后只提供了year分区列的值,而month和day分区列的值是从数据中动态计算得出的。

使用非严格模式可以简化分区表的创建和管理,但需要注意,它可能会产生一些意料之外的结果(例如可能创建太多分区),所以需要谨慎使用。

总之,分区表的严格模式和非严格模式都具有一些优点和缺点,具体使用需要根据具体情况选择最适合的方式。

六、分区表和分桶表示例讲解

1)分区表示例讲解

在Hive中,我们可以使用分区表来更有效地组织和管理数据。分区表将数据分为子集,每个子集对应一个或多个分区。这样,我们就可以更快地访问和查询数据,而不必扫描整个数据集。

创建分区表的语法类似于创建普通表,只不过要使用 PARTITIONED BY 子句指定一个或多个分区列,例如:

# 内部表CREATE TABLE partitioned_internal_table (
  id INT,
  mesg STRING)PARTITIONED BY (
  year INT,
  month INT)
ROW FORMAT DELIMITEDFIELDS TERMINATED BY ','LINES TERMINATED BY '\n'STORED AS TEXTFILE;
# 外部表
CREATE EXTERNAL TABLE partitioned_external_table (
  id INT,
  mesg STRING)PARTITIONED BY (
  year INT,
  month INT)
ROW FORMAT DELIMITEDFIELDS TERMINATED BY ','LINES TERMINATED BY '\n'STORED AS TEXTFILELOCATION '/user/hive/partitioned_table/data';

上述语句创建了一个分区表,在列column1和column2的基础上,按照year和month两列进行了分区。

【注意】分区的实现依赖于Hive的底层存储Hadoop分布式文件系统(HDFS)。为了确定如何分配数据,Hive要求每个分区对应一个目录,该目录包含该分区数据的所有文件。因此,在将数据加载到分区表中时,必须提供与分区对应的目录

例如,如果我们要将一个CSV文件加载到分区表中,我们可以使用以下语句:

LOAD DATA LOCAL INPATH './file.csv' INTO TABLE partitioned_external_table PARTITION (year=2019, month=1);# 查看分区show partitions partitioned_external_table;

在上述语句中,我们使用 LOAD DATA 子句将 /data/file.csv 文件加载到partitioned_table 表中,并指定了分区year为2019,分区month为1。

假设我们的CSV文件具有以下内容:

1,test1,2019,11,test2,2019,12,test3,2022,13,test4,2023,1

使用以下语句查询分区表:

SELECT * FROM partitioned_external_table WHERE year=2019 AND month=1;

分区表的优点在于可以更高效地组织数据,同时也允许我们根据需要删除或添加分区。例如,我们可以使用以下语句删除分区:

ALTER TABLE partitioned_table DROP PARTITION (year=2019, month=1);

可以使用以下语句添加分区:

ALTER TABLE partitioned_external_table ADD PARTITION (year=2020, month=2);
# 查看分区show partitions partitioned_external_table;

总之,分区表是管理和查询大型数据集的有效方式,可以帮助我们更轻松地处理大量数据。

2)分桶表示例讲解

除了分区表之外,Hive还提供了另一种将数据分割成可管理单元的方式,即分桶。

分区和分桶的概念有一些相似之处,但也存在一些重要的区别。

  • 分区是指基于表的某些列将数据分割成不同的存储单元;
  • 而分桶是指将数据根据哈希函数分成一组固定的桶。

类比于分区,在创建一个分桶表时,我们需要指定分桶的数量和分桶的列。例如,以下是一个创建分桶表的示例:

CREATE TABLE bucketed_table (
  column1 data_type,
  column2 data_type,  ...) 
CLUSTERED BY (column1) -- 分桶列INTO 10 BUCKETS; -- 桶数量

在上述示例中,我们将column1作为分桶列,并将数据分成10个桶。

加载数据时,Hive根据指定的桶列计算哈希值,并将数据存储在对应的桶中。

INSERT INTO TABLE bucketed_table VALUES ('value1', 1, 2.3)

查询时,可以使用以下格式指定桶列:

SELECT * FROM bucketed_table TABLESAMPLE(BUCKET x OUT OF y ON column1);

在上述示例中,我们使用用于抽样数据的 TABLESAMPLE 子句,指定从桶x中抽取数据,并在分桶列column1上进行抽样。

分桶表的优点在于,我们可以更容易地执行等值和范围查询,并更好地利用MapReduce 的数据本地性,从而提高查询性能。但分桶表也有一些缺点,例如添加和删除数据涉及重新计算哈希函数和移动数据的成本。

总之,分区表和分桶表都是Hive管理和处理大型数据集的重要工具,可以帮助我们更轻松地组织、查询和分析大量数据。在具体使用时,需要考虑表的存储和查询需求,选择最适合的表类型。在实际场景中分区用的居多。

关于Hive 分区和分桶的区别和示例讲解就先到这里了,有任何疑问欢迎给我留言,后续会持续更新相关文章,请小伙伴耐心等待

from:https://baijiahao.baidu.com/s?id=1764915870006249443&wfr=spider&for=pc

如何优化 Spark 小文件,Kyuubi 一步搞定!(转)


Hive 表中太多的小文件会影响数据的查询性能和效率,同时加大了 HDFS NameNode 的压力。Hive (on MapReduce) 一般可以简单的通过一些参数来控制小文件,而 Spark 中并没有提供小文件合并的功能。下面我们来简单了解一下 Spark 小文件问题,以及如何处理小文件。

01Spark 小文件问题

 1.1 环境 

Kyuubi 版本:1.6.0-SNAPSHOT

Spark 版本:3.1.3、3.2.0 

 1.2 TPCDS 数据集 

Kyuubi 中提供了一个 TPCDS Spark Connector,可以通过配置 Catalog 的方式,在读取时自动生成 TPCDS 数据。 

只需要将 kyuubi-spark-connector-tpcds_2.12-1.6.0-SNAPSHOT.jar 包放入 Spark jars 目录中,并配置:

spark.sql.catalog.tpcds=org.apache.kyuubi.spark.connector.tpcds.TPCDSCatalog;

这样我们就可以直接读取 TPCDS 数据集:

use tpcds; 

show databases; 

use sf3000; 

show tables; 

select * from sf300.catalog_returns limit 10;

 1.3 小文件产生

首先我们在 Hive 中创建一个 sample.catalog_returns 表,用于写入生成的 TPCDScatalog_returns 数据,并添加一个 hash 字段作为分区。

我们先关闭 Kyuubi 的优化,读取 catalog_returns 数据并写入 Hive:

Spark SQL 最终产生的文件数最多可能是最后一个写入的 Stage 的 Task 数乘以动态分区的数量我们 可以看到由于读取输入表的 Task 数是 44 个,所以最终产生了 44 个文件,每个文件大小约 69 M。 

 1.4 改变分区数(Repartition) 

由于写入的文件数跟最终写入 Stage 的 Task 数据有关,那么我们可以通过添加一个 Repartition 操作, 来减少最终写入的 task 数,从而控制小文件:

添加 REPARTITION(10) 后,会在读取后做一个 Repartition 操作,将 partition 数变成 10,所以最终 写入的文件数变成 10 个。 

 1.5 Spark AQE 自动合并小分区 

Spark 3.0 以后引入了自适应查询优化(Adaptive Query Execution, AQE),可以自动合并较小的分区。

开启 AQE,并通过添加 distribute by cast(rand() * 100 as int) 触发 Shuffle 操作:

默认 Shuffle 分区数 spark.sql.shuffle.partitions=200 ,如果不开启 AQE 会产生 200 个小文件, 开启 AQE 后,会自动合并小分区,根据spark.sql.adaptive.advisoryPartitionSizeInBytes=512M 配置合并较小的分区,最终产生 12 个文件。

02Kyuubi 小文件优化分析

Apache Kyuubi (Incubating) 作为增强版的 Spark Thrift Server 服务,可通过 Spark SQL 进行大规模的 数据处理分析。Kyuubi 通过 Spark SQL Extensions 实现了很多的 Spark 优化,其中包括了RepartitionBeforeWrite 的优化,再结合 Spark AQE 可以自动优化小文件问题,下面我们具体分析 一下 Kyuubi 如何实现小文件优化。

  2.1 Kyuubi 如何优化小文件

 Kyuubi 提供了在写入前加上 Repartition 操作的优化,我们只需要将 kyuubi-extension-spark-3- 1_2.12-1.6.0-SNAPSHOT.jar 放入 Spark jars 目录中,并配置spark.sql.extensions=org.apache.kyuubi.sql.KyuubiSparkSQLExtension 。相关配置:

通过 spark.sql.optimizer.insertRepartitionNum 参数可以配置最终插入 Repartition 的分区数, 当不开启 AQE,默认为 spark.sql.shuffle.partitions 的值。需要注意,当我们设置此配置会导致 AQE 失效,所以开启 AQE 不建议设置此值。 

对于动态分区写入,会根据动态分区字段进行 Repartition,并添加一个随机数来避免产生数据倾斜,spark.sql.optimizer.dynamicPartitionInsertionRepartitionNum 用来配置随机数的范围,不 过添加随机数后,由于加大了动态分区的基数,还是可能会导致小文件。这个操作类似在 SQL 中添加distribute by DYNAMIC_PARTITION_COLUMN, cast(rand() * 100 as int)。

  2.2 静态分区写入 

开启 Kyuubi 优化和 AQE,测试静态分区写入:

可以看到 AQE 生效了,很好地控制了小文件,产生了 11 个文件,文件大小 314.5 M 左右。

 2.3 动态分区写入

我们测试一下动态分区写入的情况,先关闭 Kyuubi 优化,并生成 10 个 hash 分区:

产生了 44 × 10 = 440 个文件,文件大小 8 M 左右。

开启 Kyuubi 优化和 AQE:

产生了 12 × 10 = 120 个文件,文件大小 30 M 左右,可以看到小文件有所改善,不过仍然不够理想。

此案例中 hash 分区由 rand 函数产生,分布比较均匀,所以我们将spark.sql.optimizer.dynamicPartitionInsertionRepartitionNum 设置成 0 ,重新运行,同时将动态分区数设置为5 :

由于动态分区数只有 5 个,所以实际上只有 5 个 Task 有数据写入,每个 Task 对应一个分区,导致最终每个分区只有一个较大的大文件。

通过上面的分析可以看到,对于动态分区写入,Repartition 的优化可以缓解小文件,配置spark.sql.optimizer.dynamicPartitionInsertionRepartitionNum=100 解决了数据倾斜问题, 不过同时还是可能会有小文件。 

  2.4 Rebalance 优化 

Spark 3.2+ 引入了 Rebalance 操作,借助于 Spark AQE 来平衡分区,进行小分区合并和倾斜分区拆 分,避免分区数据过大或过小,能够很好地处理小文件问题。 

Kyuubi 对于 Spark 3.2+ 的优化,是在写入前插入 Rebalance 操作,对于动态分区,则指定动态分区列 进行 Rebalance 操作。不再需要 spark.sql.optimizer.insertRepartitionNum 和spark.sql.optimizer.dynamicPartitionInsertionRepartitionNum 配置。

测试静态分区写入,使用 Spark 3.2.0 开启 Kyuubi 优化和 AQE:

Repartition 操作自动合并了小分区,产生了 11 个文件,文件大小 334.6 M 左右,解决了小文件的问题。

测试动态分区写入,使用 Spark 3.2.0 开启 Kyuubi 优化和 AQE,生成 5 个动态分区:

Repartition 操作自动拆分较大分区,产生了 2 × 5 = 10 个文件,文件大小 311 M 左右,很好地解决倾斜问题。

03总 结

从上面的分析可以看到,对于 Spark 3.2+,Kyuubi 结合 Rebalance 能够很好地解决小文件问题,对于 Spark 3.1,Kyuubi 也能自动优化小文件,不过动态分区写入的情况还是可能存在问题。

相关的配置总结:

更多 AQE 配置可以参考:How To Use Spark Adaptive Query Execution (AQE) in Kyuubi

原文连接:https://www.modb.pro/db/423478

【原创】用EMR建设实时数仓

建设实时数仓的目的和意义

实时数仓目的

数仓概念:数据尽可能多,保存时间尽可能久

图片描述

实时概念:数据流式,处理及时、瞬时、短时、事件或者微批响应

图片描述

数仓跟实时从概念上就有冲突,所以本质上不太适合处理广泛的问题,比如,对一个月,甚至是一年的数据进行统计计算。

图片描述

所以,实时数仓应该目前作为离线数仓的一种补充,解决因离线数仓实时性低而无法解决的问题,具体点说就是处理离线两个周期间隔的数据问题,不适合解决大批量数据聚合问题、业务性太强的以及对实时性要求很高问题。

实时数仓的意义

图片描述

实时数仓从概念上讲还是要靠近数仓的概念,数据分层,面向主题,数据尽可能集成,结构相对稳定,不易发生变化。

图片描述

对于实时数仓来讲,数据量不需要保存像离线那么久,上一节我们提到,实时数仓处理两个离线周期间隔的数据即可,如上图,以时报为例,实时数仓补充中间数据即可,以天为例,实时数仓最多只需要保留3~5天数据即可,能够支持一段时间的数据追溯和重导就可以了。

实时数仓可以解决哪类问题

图片描述

利用EMR建设实时数仓

实时数仓对比离线数仓

图片描述

实时数仓架构

图片描述

从图中可以看到,

1、ODS并不是实时数仓的一部分,是依赖外部数据源,比如binlog,流量日志,系统日志,或者是其他消息队列

2、应用层也不是实时数仓的一部分,对于数据的使用,通过实时数仓暴露Topic来使用

3、实时数仓要求层次要少,因为需要尽可能降低延迟

用EMR搭建实时数仓

图片描述

1、底层数据源可以接企业内部binlog、日志或者消息队列

2、从ODS层经过与维表轻度扩展,形成明细层明细表,明细表用一个Ckafka topic表示,计算采用Oceanus或者EMR FlinkSql 关联查询,维表采用EMR Hbase存储

3、从明细层经过进一步汇总计算,形成汇总层,此时数据已经是面向主题的汇总数据,就是传统意义上的大宽表,一个主题是一系列Ckafka topic,计算采用Oceanus或者EMR FlinkSql 关联查询以及汇总计算

实时数仓各层搭建

ODS层搭建

图片描述

1、 之所以没有把ODS层放在实时数仓的一部分,是因为实时数仓的ODS并不像离线数仓ODS是采集过来的原始数据,现在一般企业都已经具备了如上图的底层数据源

2、 Binlog,是数据库日志,通过Binlog可以自数据库主从间同步,可以同步关系型数据库数据,目前企业线上数据库都采用Mysql这样的数据库,可以通过抓取Mysql binlog 获取数据库变更信息,数仓中重要的业务数据,支付相关,用户相关,管理相关数据一般都从这种数据源获得

3、 Log日志,服务器日志,像服务器系统日志采集,都是通过这种形式进行采集

Ckafka,企业通过消息队列提供数据源服务,比如,点击流服务,会把用户点击事件通过上报服务器上报到Ckafka,为后续分析提供原始数据

该层搭建的注意点:
1、 业务选择数据源,尽量跟离线保持一致,比如某个业务,数据源即可以通过Binlog,也可以通过Log日志采集,如果离线数仓业务是通过Binlog,那么实时数仓也取Binlog,否则后续产生数据不一致,非常难以定位

2、数据源要求一致性,对于Ckafka和Binlog 需要进行分区一致性保证,解决数据乱序问题

明细层搭建

图片描述

建设标准与离线数仓目标一致,解决原始数据存在噪音,不完整,形式不统一等问题

图片描述

数据解析,业务整合,数据清洗,解决噪音,不完整,数据不一致问题;模型规范化(提前指定号规则,尽量跟离线保持一致),形成数据规范,规范尽可能跟离线保持一致,命名,比如,指标命名等;

与离线数仓不同之处在于,离线调度是有周期的,时报一小时,天报周期为一天,如果修改数据表字段,只要任务没开始,就可以修改,而实时是流式,7X24小时不间断运行的,想要修改流中的字段或者格式,对下游影响是不可预估的

实时数仓如果修改字段不像离线,在间隔期间通知下游把作业都改了就没事了,但是实时不一样,实时你改掉了字段,下游作业必须可以认识你修改的内容才行,kafka不是结构化存储,没有元数据的概念,不像Hive,如果表名不规范,找一个统一时间,把catolog改规范,然后把脚本一改就就解决了。

明细建设关键,我们会在每一条数据上增加一些额外字段到数仓里

图片描述

举例说明这些额外字段的意义

事件主键:对于上游数据重复问题,我们会根据一些数据内的字段来判断上有数据的唯一性,比如binlog,<集群id_><库id_><表id_>数据id_数据生成时间。

数据主键:唯一标识数据表的一行记录,可以使用数据库主键,主要用来解决分区一致性及分区有序。

数据元数据版本:上面介绍了,流式计算是7X24小时不间断的运算,当修改了数据结构,增加,删除了字段,对下游的影响是不可预估的,因此元数据变更需修改该字段,保持数据流中新老版本数据双跑,下游选择合适的时机进行数据切换。

数据批次:跟元数据用途相似,当明细层逻辑发现问题,需要重跑数据,为了对下游任务不产生影响,调整了明细层逻辑后,需要回倒位点重跑数据,同时需要跟老逻辑任务双跑,待下游业务都切换到新的逻辑后,老逻辑任务才可以停止。

还有一个思路,可以直接把明细层数据,也可以直接写到druid 直接用于分析。

维度层搭建

图片描述

维度数据处理:

图片描述

如上图,对于变化频率低,地理,节假日,代码转换,直接同步加载到缓存里,或者是新增数据,但是增加进来就不变了,通过数据接口,访问最新数据,然后通过本公司数据服务对外提供数据

图片描述

如上图,对于变化频率高,比如商品价格,也是需要监听变化消息,然后实时更新维度拉链表。对于比如像最近一个月没有消费用户这样的衍生维度,是需要根据变化消息,通过计算得到的衍生维度拉链。

因为维度数据也在发生变化,为了能够让源表数据匹配到维表,我们会给维表增加多版本minversion,然后通过TIMERANGE => [1303668804,
1303668904]筛选出源数据指定的维表版本数据。

这里有些同学可能觉得如果版本一致保存下去,会不会非常大,是的,我们响应的需要配置TTL保证维表数据量可控,上文我们介绍过,实时数仓解决是离线数仓两个间隔的问题,那么像这种变化频繁的数据我们TTL设置一周足够了。

关于源表与维表如果进行join,Flink原生sql以及Oceanus都是采用UDTF函数以及Lateral
Table 进行联合使用,其中UDTF我们可以实现查询数据服务获取维表数据的能力,Oceanus请参考相关材料。

汇总层搭建

图片描述

汇总层加工其实跟离线数仓是一致的,对共性指标进行加工,比如,pv,uv,订单优惠金额,交易额等,会在汇总层进行统一计算。

Flink提供了丰富的窗口计算,这使得我们可以做更细力度的聚合运算,例如,我们可以算最近5分钟,10分钟的数据聚合,根据时间窗口的间隔,也需要调整相应的TTL,保障内存高效实用。

Flink提供了丰富的聚合计算,数据都是要存在内存中的,因此需要注意设置state的TTL,例如,做Count(Distinct
x)。或者在进行PV,UV计算时候,都会使用大量的内存,这一块,当处理的基数比较大的时候,推荐使用一些非高精度去重算法,Bloom过滤器,Hyper LogLog等。

汇总层也需要在每一条数据上增加一些额外字段到数仓里,这块与明细层一致,就不在单独讲解了。

数据质量保证

图片描述

对于实时数仓数据质量的管理,我们通常由三步操作组成

第一步,数据与离线数据进行对比

首先,将汇总层数据Topic通过平台接入任务接入到离线仓库,然后通过数据质量任务,定时对实时数仓和离线数仓数据进行对比,并配置报警,数据差异,数据波动等。

第二步,配置报警,我们会在明细层以及汇总层,Topic配置生产监控,与以往数据波动,上游数据延迟或者积压,都需要进行报警。

第三部,构建实时血缘, Flink 在读取数据时候,会把信息读到flink catalog 这样就知道这个任务读取了哪个表,在解析客户DDL代码时,可以获得目标表信息,同步到我们的元数据服务。

参考文献:

美团实时数仓搭建:https://tech.meituan.com/2018/10/18/meishi-data-flink.html

菜鸟实时数仓:https://mp.weixin.qq.com/s/9ZRG76-vCM7AlRZNFCLrqA

hive 响应慢问题定位

情景描述:

大数据集群,目前有两套hiveserver2和metastore的集群,通过nginx指向进行流量互切,发现流量打到哪个metastore集群,哪个集群就特别卡顿。

那么先来回顾一下hive整个调用流程和框架

image-20201227123352163

Hive 提供的另外一个shell 客户端,也就是我们常用的hive 命令的客户端它的设计是直接启动了一个org.apache.hadoop.hive.cli.CliDriver的进程,这个进程其实主要包含了两块内容一个是提供给我们交互的cli ,另外一个就是我们的Driver 驱动引擎,这样的设计导致如果我们有多个客户端的情况下,我们就需要有多个Driver

image-20201226204741803

但是我们通过HiveServer2连接的时候我们就可以共享Driver,一方面可以简化客户端的设计降低资源损耗,另外一方面还能降低对MetaStore 的压力,减少连接的个数。

原因分析:

目前来看,变慢的原因应该是出现在hs2服务,metastore服务,具体业务,网络,服务器等原因。

1、先从简单的硬件分析入手,验证网络和服务器,这块省略验证过程

2、验证hs2服务,利用排除的方式,通过hive cli进行多次验证,发现也有缓慢的情况,正常应该1秒内返回,所以先不定位hs2的情况

3、验证metastore服务,还是先从直观简单的分析,看能否找出些现象,先看日志:

3.1 既然通过cli和hs2访问都会慢,先从简单的cli发起请求,可以通过加debug参数,发起查询

beeline –verbose=true –showNestedErrs=true –debug=true 看一下客户端是否有明显异常

3.2 cd /var/log/hive 查看一下是否有大量客户端访问

cat hadoop-cmf-hive-HIVEMETASTORE-data-hadoop-16-2.192.168.0.1.log.out | grep audit | grep -v “ugi=hue” | awk -F “ip=” ‘{print $2}’ | awk ‘{print $1}’ | sort | uniq -c | sort -nr | head

3.3 查看服务GC情况 jstat -gcutil pid interval(ms)

3.4 查看服务端日志

Error: Error while compiling statement: [Error 10308]: Attempt to acquire compile lock timed out. (state=,code=10308)
org.apache.hive.service.cli.HiveSQLException: Error while compiling statement: [Error 10308]: Attempt to acquire compile lock timed out.
at org.apache.hive.jdbc.Utils.verifySuccess(Utils.java:241)
at org.apache.hive.jdbc.Utils.verifySuccessWithInfo(Utils.java:227)
at org.apache.hive.jdbc.HiveStatement.execute(HiveStatement.java:255)
at org.apache.hive.beeline.Commands.executeInternal(Commands.java:989)
at org.apache.hive.beeline.Commands.execute(Commands.java:1180)
at org.apache.hive.beeline.Commands.sql(Commands.java:1094)
at org.apache.hive.beeline.BeeLine.dispatch(BeeLine.java:1180)
at org.apache.hive.beeline.BeeLine.execute(BeeLine.java:1013)
at org.apache.hive.beeline.BeeLine.begin(BeeLine.java:922)
at org.apache.hive.beeline.BeeLine.mainWithInputRedirection(BeeLine.java:518)
at org.apache.hive.beeline.BeeLine.main(BeeLine.java:501)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.hadoop.util.RunJar.run(RunJar.java:226)
at org.apache.hadoop.util.RunJar.main(RunJar.java:141)
Caused by: org.apache.hive.service.cli.HiveSQLException: Error while compiling statement: [Error 10308]: Attempt to acquire compile lock timed out.
at org.apache.hive.service.cli.operation.Operation.toSQLException(Operation.java:400)
at org.apache.hive.service.cli.operation.SQLOperation.prepare(SQLOperation.java:187)
at org.apache.hive.service.cli.operation.SQLOperation.runInternal(SQLOperation.java:271)
at org.apache.hive.service.cli.operation.Operation.run(Operation.java:337)
at org.apache.hive.service.cli.session.HiveSessionImpl.executeStatementInternal(HiveSessionImpl.java:439)
at org.apache.hive.service.cli.session.HiveSessionImpl.executeStatementAsync(HiveSessionImpl.java:416)
at org.apache.hive.service.cli.CLIService.executeStatementAsync(CLIService.java:282)
at org.apache.hive.service.cli.thrift.ThriftCLIService.ExecuteStatement(ThriftCLIService.java:503)
at org.apache.hive.service.cli.thrift.TCLIService$Processor$ExecuteStatement.getResult(TCLIService.java:1313)
at org.apache.hive.service.cli.thrift.TCLIService$Processor$ExecuteStatement.getResult(TCLIService.java:1298)
at org.apache.thrift.ProcessFunction.process(ProcessFunction.java:39)
at org.apache.thrift.TBaseProcessor.process(TBaseProcessor.java:39)
at org.apache.hive.service.auth.TSetIpAddressProcessor.process(TSetIpAddressProcessor.java:56)
at org.apache.thrift.server.TThreadPoolServer$WorkerProcess.run(TThreadPoolServer.java:286)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)

这里显示有获取metastore连接超时的异常,关键的日志是compiling statement: [Error 10308]: Attempt to acquire compile lock timed out. 编译时候获取编译索失败

顺着这个思路,查看一下代码:

关键字:Completed compiling command(queryId,参考https://github.com/apache/hive/blob/master/ql/src/java/org/apache/hadoop/hive/ql/lock/CompileLock.java

分析结论如下:

并且hivethrift每次访问都会初始化metastore 重新初始化元数据,HIVESEVER2提交SQL阻塞tryAcquireCompileLock原因:

a、HIVE1.1 只支持串行编译SQL,hiveserver2并发接受到SQL请求后,在complile阶段变为串行执行。当compilie编译慢时,引起阻塞SQL的提交。

b、compile的慢的原因:complile阶段,会通过hivemetastore访问mysql。目前是hiveserver2的请求打入到同一个metastore,流量上来后,hivemetastore访问mysql速度下降。

解决方案:hiveserver2的请求分摊所有hivemetastore上

Clickhouse 性能问题分析(一)zk报连接异常,性能低下

原因分析:

table read only 跟 zk 也有直接关系。现在能肯定 zk 集群的性能跟不上么 ?在这个压力下只要错误日志里有zk相关的超时,就是ZK性能更不上。

调优策略:

小批次,大批量写入,降低对ZK的QPS,减少的压力,然后横向扩容解决并发问题,不要怕条数多,她设计的原则就是 要你条数多,才能达到最佳性能,1000/s这种 已经触达了ZK的极限了

  1. 如果zk内存使用率较高,zkEnv.sh 修改zk jvm 为8192m,在zoo.cfg 中修改MaxSessionTimeout=120000
  2. 超时时间socket_timeout 80000

3.Spark insert 的-batch 调大到30000,就是每批次取3万行数据,参数可能是任务的参数


4. metrika.xml的zookeepr配置里调整下参数,调整的ch zk客户端超时,这个值是默认的30s

例如

<zookeeper>

    <node>

        <host>example1</host>

        <port>2181</port>

    </node>

    <node>

        <host>example2</host>

        <port>2181</port>

    </node>

    <session_timeout_ms>30000</session_timeout_ms>  –客户端会话的最大超时(以毫秒为单元)

    <operation_timeout_ms>10000</operation_timeout_ms>

</zookeeper>

  1. 重启生效

现在开始写入了。是先写入buffer engine 然后buffer engine 刷新 底层表

写入的数据库和表是:

xxx_dis

buffer的元数据不在zookeeper上,但是跟其他表的读写需要zk支持,就等于是 buffer 表刷底层表 是要走zookeeper. 的。

看看ZK机器的snapshot文件大小,如下图,也不大

Buffer表的建表语句方便发下吗? ENGINE = Buffer(‘blower_data_prod’, ‘xxx_info’, 18, 60, 100, 5000000, 6000000, 80000000, 1000000000) 主要就是后面这个吧。前面字段无所谓。

hive任务结束了,但是hive终端或者命令行没退出

问题描述:从一个100多亿条记录的hive表里查数据,查出数据后,写入一张新hive表里。mapreduce执行完load data环节后,一直不结束,哪位大神知道怎么更进一步地定位问题?

原因分析:文件比较多,最后一个movetask在移动文件和搜集文件统计信息。hive是迭代式计算,最后会有一个movetask把最终数据文件移动到hive的location下,这个过程有两个地方比较耗时,一个是rename,另外一个是list文件读文件元数据,更新元数据库的统计信息,如果数据在对象存储,rename是copy+delete,这个过程会比较慢,movetask是finaltask,不会起mr,任务没完成前,新表里的数据,不是全量的,只是部分。只有任务退出,才算完全完成

解决方案:

set hive.stats.autogather=false;开启收集线程,可以减少最后收集的时间

mapred.dfsclient.parallelism.max 20 增加并行rename的能力

【原创】几句话说清楚《HBASE Snapshot 恢复 restore 原理》

首先snapshot恢复 理论上是建立在数据文件(region,hfile,storefile)都已经导入到集群中了,只是把元数据更新成snapshot的时刻
其实特别简单
Hmaster处理meta元数据表,跟当前元数据的region比较,分为三种情况:
1、snapshot 有,当前也有:说明这部分数据后来有可能被更新过,宁可错杀100,绝不放过一个,所以,直接先从meta信息删除,然后再从snapshot恢复
2、snapshot 有,当前没有:说明这部分数据被归档了,当前已经不用了,重新将snapshot这部分写入meta信息
3、snapshot 没有,当前有:说明是snapshot后面加入的数据,直接从meta删除

Hmaster恢复Hfile数据文件,分为三种情况:
1、snapshot 有,当前也有:同名的,不做任何改动,因为Hfile一旦落盘,很少会发生改变
2、snapshot 有,当前没有:对于缺少的文件,直接对exportsnapshot拷贝到archive中的Hfile文件做引用
3、snapshot 没有,当前有:说明是snapshot后面加入的数据,转移到archive归档

Hmaster恢复Wal文件:
写入recovered.edits文件夹的相应region中,每个region一个线程,只写入跟snapshot相关的表region信息,RS会从这里进行恢复,几句话说清楚《hbase snapshot 原理》也提到了,恢复只会从snapshot的flushid进行恢复,之前的就不会重复操作了

对于regionserver

参考:https://cloud.tencent.com/developer/article/1047967

【原创】几句话说清楚《HBASE Snapshot 原理》

首先snapshot 理论上是用于做备份恢复的
1、恢复哪些内容?
答:snapshot 指定表,在线region和split之前的region

2、谁参与了snapshot
答:
hmastr,regionserver

3、如何snapshot
hmaster 主要负责管控snapshot,
创建snapshot相关信息,比如,表信息,关联region的信息,以及region对应的server信息
hmaster会通过zk管控任务,通过三种目录,acquired、reached、abort;abort下有Regionserver(RS)信息,说明RS有执行snapshot失败的,直接退出snapshot
acquired,hmaster 创建snapshot信息(表,region信息),然后RS进行确认,确认成功,在该目录下创建自己的名字
reached,hmaster 确认acquired下RS都确认了,通知RS可以进行干活,每个RS干完活,会在该目录下创建自己的名字
abort,RS如果执行遇到问题,则在该路径下留下名字,然后Hmaster 取消整个snapshot流程

regionserver 负责具体在线region的信息采集
对memstore和WAL进行flush到相应的Hfile和Hlog中,其中,WAL Hlog要记录本次的flushID,restore_snapshot后,即便发生Hbase集群恢复,也只从该flushID恢复,更早的不会再恢复了
把Hfile 创建相应的映射到snapshot相应目录中,只有映射(空文件),这也是snapshot快的原因
把Hlog 文件放入该snapshot相应文件下

hmaster还负责split之前的region(已经不用的,父region)相关信息的snapshot,其实也是把相关信息留到snapshot下的目录下

hmaster 负责校验snapshot结果,确认snapshot目录信息,表信息是否正确,最主要的要确认需要snapshot的region信息是否还在

总结一下:

1

snapshot流程主要涉及3个步骤:

1. 加一把全局锁,此时不允许任何的数据写入更新以及删除

2. 将Memstore中的缓存数据flush到文件中(可选)

3. 为所有HFile文件分别新建引用指针,这些指针元数据就是snapshot

扩展思考:LSM类系统确实比较容易理解,那其他非LSM系统原地更新的存储系统如何实现snapshot呢?

参考:https://yq.aliyun.com/articles/60455?spm=a2c4e.11163080.searchblog.18.151a2ec1uNNVP8