(转)Iceberg文件组织

使用以下 SQL 创建名为user_log_iceberg的 Iceberg 表并插入一条数据:

create table hadoop_catalog.iceberg_db.user_log_iceberg (
    imei string,
    uuid string,
    udt timestamp
)
using iceberg
partitioned by (days(udt));

insert into hadoop_catalog.iceberg_db.user_log_iceberg values ('xxxxxxxxxxxxx', 'yyyyyyyyyyyyy', cast(1640966400 as timestamp));

user_log_iceberg 在文件系统中的存储结构为:

user_log_iceberg/
├── data
│   └── udt_day=2021-12-31
│       └── 00000-0-67ab9286-794b-456d-a1d3-9c797a2b4b03-00001.parquet
└── metadata
    ├── f9d66153-6745-4103-ad24-334fc62f0d1e-m0.avro
    ├── snap-6744647507914918603-1-f9d66153-6745-4103-ad24-334fc62f0d1e.avro
    ├── v1.metadata.json
    ├── v2.metadata.json
    └── version-hint.text

可以看到该表目录下有两个子文件夹:data 和 metadata。其中 data 就是真正的数据文件目录,metadata 是该表的元数据目录,这里的 metadata 就是替代 Hive 中的 Metastore 服务的。

在了解 Iceberg 元数据管理之前先看几个概念:

  1. SnapshotSnapshot 就是表在某个时间点的状态,其中包括该时间点所有的数据文件。Iceberg 对表的每次更改都会新增一个 Snapshot。
  2. Metadata File每新增一个 Snapshot 就会新增一个 Metadata 文件,该文件记录了表的存储位置、Schema 演化信息、分区演化信息以及所有的 Snapshot 以及所有的 Manifest List 信息。
  3. Manifest ListManifest List 是一个元数据文件,其中记录了所有组成快照的 Manifest 文件信息。
  4. Manifest FileManifest File 是记录 Iceberg 表快照的众多元数据文件的其中一个。其中的每一行都记录了一个数据文件的分区,列级统计等信息。 一个 Manifest List 文件中可以包含多个 Manifest File 的信息。
  5. Partition Spec表示字段值和分区值之间的逻辑关系。
  6. Data File包含表中所有行的文件。
  7. Delete File对按位置或数据值删除的表行进行编码的文件。
iceberg metadata

从上图中可以看出 Iceberg 表通过三级关系管理表数据,下面以 Spark 中的 spark.sql.catalog.hadoop_prod.type=hadoop 为例说明。:

最上层中记录了 Iceberg 表当前元数据的版本,对应的是version-hint.text文件,version-hint.text文件中只记录了一个数字表示当前的元数据版本,初始为 1,后续表每变更一次就加 1。

中间层是元数据层。其中 Metadata File 记录了表的存储位置、Schema 演化信息、分区演化信息以及所有的 Snapshot 和 Manifest List 信息,对应的是v1.metadata.jsonv2.metadata.json文件,其中v后面的数字和version-hint.text文件中的数字对应,每当新增一个 Snapshot 的时候,version-hint.text中的数字加 1,同时也会新增一个vx.metadata.json文件,比如执行insert into hadoop_catalog.iceberg_db.user_log_iceberg values ('xxxxxxxxxxxxx', 'yyyyyyyyyyyyy', cast(1640986400 as timestamp))delete from hadoop_catalog.iceberg_db.user_log_iceberg where udt = cast(1640986400 as timestamp)之后,版本就会变成v4:

user_log_iceberg/
├── data
│   └── udt_day=2021-12-31
│       ├── 00000-0-67ab9286-794b-456d-a1d3-9c797a2b4b03-00001.parquet
│       └── 00000-0-88d582ef-605e-4e51-ba98-953ee3dd4c02-00001.parquet
└── metadata
    ├── b3b1643b-56a2-471e-a4ec-0f87f1efcd80-m0.avro
    ├── ecb9255a-bcc5-4954-a4e9-3a54f5b09500-m0.avro
    ├── f9d66153-6745-4103-ad24-334fc62f0d1e-m0.avro
    ├── snap-4140724156423386841-1-ecb9255a-bcc5-4954-a4e9-3a54f5b09500.avro
    ├── snap-6744647507914918603-1-f9d66153-6745-4103-ad24-334fc62f0d1e.avro
    ├── snap-8046643380197343006-1-b3b1643b-56a2-471e-a4ec-0f87f1efcd80.avro
    ├── v1.metadata.json
    ├── v2.metadata.json
    ├── v3.metadata.json
    ├── v4.metadata.json
    └── version-hint.text

查看v4.metadata.json中的内容如下:点击查看

{
  "format-version": 1,
  "table-uuid": "c69c9f46-b9d8-40cf-99da-85f55cb7bffc",
  "location": "/opt/module/spark-3.2.1/spark-warehouse/iceberg_db/user_log_iceberg",
  "last-updated-ms": 1647772606874,
  "last-column-id": 3,
  "schema": {
    "type": "struct",
    "schema-id": 0,
    "fields": [
      {
        "id": 1,
        "name": "imei",
        "required": false,
        "type": "string"
      },
      {
        "id": 2,
        "name": "uuid",
        "required": false,
        "type": "string"
      },
      {
        "id": 3,
        "name": "udt",
        "required": false,
        "type": "timestamptz"
      }
    ]
  },
  "current-schema-id": 0,
  "schemas": [
    {
      "type": "struct",
      "schema-id": 0,
      "fields": [
        {
          "id": 1,
          "name": "imei",
          "required": false,
          "type": "string"
        },
        {
          "id": 2,
          "name": "uuid",
          "required": false,
          "type": "string"
        },
        {
          "id": 3,
          "name": "udt",
          "required": false,
          "type": "timestamptz"
        }
      ]
    }
  ],
  "partition-spec": [
    {
      "name": "udt_day",
      "transform": "day",
      "source-id": 3,
      "field-id": 1000
    }
  ],
  "default-spec-id": 0,
  "partition-specs": [
    {
      "spec-id": 0,
      "fields": [
        {
          "name": "udt_day",
          "transform": "day",
          "source-id": 3,
          "field-id": 1000
        }
      ]
    }
  ],
  "last-partition-id": 1000,
  "default-sort-order-id": 0,
  "sort-orders": [
    {
      "order-id": 0,
      "fields": []
    }
  ],
  "properties": {
    "owner": "PowerYang"
  },
  "current-snapshot-id": 4140724156423386600,
  "snapshots": [
    {
      "snapshot-id": 6744647507914919000,
      "timestamp-ms": 1647758232673,
      "summary": {
        "operation": "append",
        "spark.app.id": "local-1647757937137",
        "added-data-files": "1",
        "added-records": "1",
        "added-files-size": "1032",
        "changed-partition-count": "1",
        "total-records": "1",
        "total-files-size": "1032",
        "total-data-files": "1",
        "total-delete-files": "0",
        "total-position-deletes": "0",
        "total-equality-deletes": "0"
      },
      "manifest-list": "/opt/module/spark-3.2.1/spark-warehouse/iceberg_db/user_log_iceberg/metadata/snap-6744647507914918603-1-f9d66153-6745-4103-ad24-334fc62f0d1e.avro",
      "schema-id": 0
    },
    {
      "snapshot-id": 8046643380197343000,
      "parent-snapshot-id": 6744647507914919000,
      "timestamp-ms": 1647772293740,
      "summary": {
        "operation": "append",
        "spark.app.id": "local-1647770527459",
        "added-data-files": "1",
        "added-records": "1",
        "added-files-size": "1032",
        "changed-partition-count": "1",
        "total-records": "2",
        "total-files-size": "2064",
        "total-data-files": "2",
        "total-delete-files": "0",
        "total-position-deletes": "0",
        "total-equality-deletes": "0"
      },
      "manifest-list": "/opt/module/spark-3.2.1/spark-warehouse/iceberg_db/user_log_iceberg/metadata/snap-8046643380197343006-1-b3b1643b-56a2-471e-a4ec-0f87f1efcd80.avro",
      "schema-id": 0
    },
    {
      "snapshot-id": 4140724156423386600,
      "parent-snapshot-id": 8046643380197343000,
      "timestamp-ms": 1647772606874,
      "summary": {
        "operation": "delete",
        "spark.app.id": "local-1647770527459",
        "deleted-data-files": "1",
        "deleted-records": "1",
        "removed-files-size": "1032",
        "changed-partition-count": "1",
        "total-records": "1",
        "total-files-size": "1032",
        "total-data-files": "1",
        "total-delete-files": "0",
        "total-position-deletes": "0",
        "total-equality-deletes": "0"
      },
      "manifest-list": "/opt/module/spark-3.2.1/spark-warehouse/iceberg_db/user_log_iceberg/metadata/snap-4140724156423386841-1-ecb9255a-bcc5-4954-a4e9-3a54f5b09500.avro",
      "schema-id": 0
    }
  ],
  "snapshot-log": [
    {
      "timestamp-ms": 1647758232673,
      "snapshot-id": 6744647507914919000
    },
    {
      "timestamp-ms": 1647772293740,
      "snapshot-id": 8046643380197343000
    },
    {
      "timestamp-ms": 1647772606874,
      "snapshot-id": 4140724156423386600
    }
  ],
  "metadata-log": [
    {
      "timestamp-ms": 1647757946953,
      "metadata-file": "/opt/module/spark-3.2.1/spark-warehouse/iceberg_db/user_log_iceberg/metadata/v1.metadata.json"
    },
    {
      "timestamp-ms": 1647758232673,
      "metadata-file": "/opt/module/spark-3.2.1/spark-warehouse/iceberg_db/user_log_iceberg/metadata/v2.metadata.json"
    },
    {
      "timestamp-ms": 1647772293740,
      "metadata-file": "/opt/module/spark-3.2.1/spark-warehouse/iceberg_db/user_log_iceberg/metadata/v3.metadata.json"
    }
  ]
}

可以看到snapshots属性中记录了多个 Snapshot 信息,每个 Snapshot 中包含了 snapshot-id、parent-snapshot-id、manifest-list 等信息。通过最外层的 current-snapshot-id 可以定位到当前 Snapshot 以及 manifest-list 文件为/opt/module/spark-3.2.1/spark-warehouse/iceberg_db/user_log_iceberg/metadata/snap-4140724156423386841-1-ecb9255a-bcc5-4954-a4e9-3a54f5b09500.avro。使用java -jar avro-tools-1.11.0.jar tojson snap-4140724156423386841-1-ecb9255a-bcc5-4954-a4e9-3a54f5b09500.avro > manifest_list.json将该文件转换成 json 格式,查看其内容:点击查看

({
  "manifest_path": "/opt/module/spark-3.2.1/spark-warehouse/iceberg_db/user_log_iceberg/metadata/ecb9255a-bcc5-4954-a4e9-3a54f5b09500-m0.avro",
  "manifest_length": 6127,
  "partition_spec_id": 0,
  "added_snapshot_id": {
    "long": 4140724156423386841
  },
  "added_data_files_count": {
    "int": 0
  },
  "existing_data_files_count": {
    "int": 0
  },
  "deleted_data_files_count": {
    "int": 1
  },
  "partitions": {
    "array": [
      {
        "contains_null": false,
        "contains_nan": {
          "boolean": false
        },
        "lower_bound": {
          "bytes": "0J\u0000\u0000"
        },
        "upper_bound": {
          "bytes": "0J\u0000\u0000"
        }
      }
    ]
  },
  "added_rows_count": {
    "long": 0
  },
  "existing_rows_count": {
    "long": 0
  },
  "deleted_rows_count": {
    "long": 1
  }
},
{
  "manifest_path": "/opt/module/spark-3.2.1/spark-warehouse/iceberg_db/user_log_iceberg/metadata/f9d66153-6745-4103-ad24-334fc62f0d1e-m0.avro",
  "manifest_length": 6128,
  "partition_spec_id": 0,
  "added_snapshot_id": {
    "long": 6744647507914918603
  },
  "added_data_files_count": {
    "int": 1
  },
  "existing_data_files_count": {
    "int": 0
  },
  "deleted_data_files_count": {
    "int": 0
  },
  "partitions": {
    "array": [
      {
        "contains_null": false,
        "contains_nan": {
          "boolean": false
        },
        "lower_bound": {
          "bytes": "0J\u0000\u0000"
        },
        "upper_bound": {
          "bytes": "0J\u0000\u0000"
        }
      }
    ]
  },
  "added_rows_count": {
    "long": 1
  },
  "existing_rows_count": {
    "long": 0
  },
  "deleted_rows_count": {
    "long": 0
  }
})

里面包含了两条 json 数据,分别对应了个 Manifest 文件信息,除了 Manifest 文件的路径之外还有一些统计信息。使用java -jar avro-tools-1.11.0.jar tojson f9d66153-6745-4103-ad24-334fc62f0d1e-m0.avro > manifest_1.jsonjava -jar avro-tools-1.11.0.jar tojson ecb9255a-bcc5-4954-a4e9-3a54f5b09500-m0.avro > manifest_2.json将两个 Manifest 文件转为 json 格式,观察其内容:点击查看

{
  "status": 1,
  "snapshot_id": {
    "long": 6744647507914918603
  },
  "data_file": {
    "file_path": "/opt/module/spark-3.2.1/spark-warehouse/iceberg_db/user_log_iceberg/data/udt_day=2021-12-31/00000-0-67ab9286-794b-456d-a1d3-9c797a2b4b03-00001.parquet",
    "file_format": "PARQUET",
    "partition": {
      "udt_day": {
        "int": 18992
      }
    },
    "record_count": 1,
    "file_size_in_bytes": 1032,
    "block_size_in_bytes": 67108864,
    "column_sizes": {
      "array": [
        {
          "key": 1,
          "value": 48
        },
        {
          "key": 2,
          "value": 48
        },
        {
          "key": 3,
          "value": 51
        }
      ]
    },
    "value_counts": {
      "array": [
        {
          "key": 1,
          "value": 1
        },
        {
          "key": 2,
          "value": 1
        },
        {
          "key": 3,
          "value": 1
        }
      ]
    },
    "null_value_counts": {
      "array": [
        {
          "key": 1,
          "value": 0
        },
        {
          "key": 2,
          "value": 0
        },
        {
          "key": 3,
          "value": 0
        }
      ]
    },
    "nan_value_counts": {
      "array": []
    },
    "lower_bounds": {
      "array": [
        {
          "key": 1,
          "value": "xxxxxxxxxxxxx"
        },
        {
          "key": 2,
          "value": "yyyyyyyyyyyyy"
        },
        {
          "key": 3,
          "value": "\u0000@\\CsÔ\u0005\u0000"
        }
      ]
    },
    "upper_bounds": {
      "array": [
        {
          "key": 1,
          "value": "xxxxxxxxxxxxx"
        },
        {
          "key": 2,
          "value": "yyyyyyyyyyyyy"
        },
        {
          "key": 3,
          "value": "\u0000@\\CsÔ\u0005\u0000"
        }
      ]
    },
    "key_metadata": null,
    "split_offsets": {
      "array": [4]
    },
    "sort_order_id": {
      "int": 0
    }
  }
}
{
  "status": 2,
  "snapshot_id": {
    "long": 4140724156423386841
  },
  "data_file": {
    "file_path": "/opt/module/spark-3.2.1/spark-warehouse/iceberg_db/user_log_iceberg/data/udt_day=2021-12-31/00000-0-88d582ef-605e-4e51-ba98-953ee3dd4c02-00001.parquet",
    "file_format": "PARQUET",
    "partition": {
      "udt_day": {
        "int": 18992
      }
    },
    "record_count": 1,
    "file_size_in_bytes": 1032,
    "block_size_in_bytes": 67108864,
    "column_sizes": {
      "array": [
        {
          "key": 1,
          "value": 48
        },
        {
          "key": 2,
          "value": 48
        },
        {
          "key": 3,
          "value": 51
        }
      ]
    },
    "value_counts": {
      "array": [
        {
          "key": 1,
          "value": 1
        },
        {
          "key": 2,
          "value": 1
        },
        {
          "key": 3,
          "value": 1
        }
      ]
    },
    "null_value_counts": {
      "array": [
        {
          "key": 1,
          "value": 0
        },
        {
          "key": 2,
          "value": 0
        },
        {
          "key": 3,
          "value": 0
        }
      ]
    },
    "nan_value_counts": {
      "array": []
    },
    "lower_bounds": {
      "array": [
        {
          "key": 1,
          "value": "xxxxxxxxxxxxx"
        },
        {
          "key": 2,
          "value": "yyyyyyyyyyyyy"
        },
        {
          "key": 3,
          "value": "\u0000\btëwÔ\u0005\u0000"
        }
      ]
    },
    "upper_bounds": {
      "array": [
        {
          "key": 1,
          "value": "xxxxxxxxxxxxx"
        },
        {
          "key": 2,
          "value": "yyyyyyyyyyyyy"
        },
        {
          "key": 3,
          "value": "\u0000\btëwÔ\u0005\u0000"
        }
      ]
    },
    "key_metadata": null,
    "split_offsets": {
      "array": [4]
    },
    "sort_order_id": {
      "int": 0
    }
  }
}

可以看到,每个 Manifest 文件中的每一行都对应一个 data 目录下的数据文件,除了记录数据文件的路径之外,还记录了该数据文件对应的文件格式、分区信息、以及尽可能详细地记录了各个字段的统计信息、排序信息等。Manifest 文件中的 status,表示此次操作的类型,1 表示 add,2 表示 delete。

可以发现 Iceberg 中对数据文件的管理是文件级别,分区管理、字段统计也是到文件级别,而不是目录级别,这也是为什么 Iceberg 扫描要比 Hive 快的原因。在扫描计划里,查询谓词会自动转换为分区数据的谓词,并首先应用于过滤数据文件。接下来,使用列级值计数、空计数、下限和上限来消除无法匹配查询谓词的文件,在某些情况下可以提升数十倍效率。

但是由于 Iceberg 用 json 文件存储 Metadata,表的每次更改都会新增一个 Metadata 文件,以保证操作的原子性。历史 Metadata 文件不会删除,所以像流式作业就需要定期清理 Metadata 文件,因为频繁的提交会导致堆积大量的 Metadata。可以通过配置write.metadata.delete-after-commit.enabledwrite.metadata.previous-versions-max属性实现自动清理元数据。

提示

前面是以 spark.sql.catalog.hadoop_prod.type=hadoop举例,如果 spark.sql.catalog.hadoop_prod.type=hive,文件组织方式会稍有不同,如:

  1. 没有version-hint.text文件,而是通过 Metastore 服务来记录当前版本指针;
  2. Metadata File 的命名不再是vx.metadata.json的方式,.metadata.json前面的vx部分将是一个很长的 UUID。

from:https://www.sqlboy.tech/pages/19baa8/

(转)「大数据」Hive 分区和分桶的区别及示例讲解

一、概述

在大数据处理过程中,Hive是一种非常常用的数据仓库工具。Hive分区和分桶是优化Hive性能的两种方式,它们的区别如下:

1)分区概述

Hive分区是把数据按照某个属性分成不同的数据子集。

  • 在Hive中,数据被存储在HDFS中,每个分区实际上对应HDFS下的一个文件夹,这个文件夹中保存了这个分区的数据。
  • 因此,在Hive中使用分区,实际上是将数据按照某个属性值进行划分,然后将相同属性值的数据存储在同一个文件夹中。Hive分区的效率提升主要是因为,当进行查询操作时,只需读取与查询相关的数据分区,避免了全表扫描,节约了查询时间

Hive分区的主要作用是:

  • 提高查询效率: 使用分区对数据进行访问时,系统只需要读取和此次查询相关的分区,避免了全表扫描,从而显著提高查询效率。
  • 降低存储成本: 分区可以更加方便的删除过期数据,减少不必要的存储。

2)分桶概述

Hive分桶是将数据划分为若干个存储文件,并规定存储文件的数量。

  • Hive分桶的实现原理是将数据按照某个字段值分成若干桶,并将相同字段值的数据放到同一个桶中。在存储数据时,桶内的数据会被写入到对应数量的文件中,最终形成多个文件。
  • Hive分桶主要是为了提高分布式查询的效率。它能够通过将数据划分为若干数据块来将大量数据分发到多个节点,使得数据均衡分布到多个机器上处理。这样分发到不同节点的数据可以在本地进行处理,避免了数据的传输和网络带宽的浪费,同时提高了查询效率。

分桶的主要作用是:

  • 数据聚合: 分桶可以使得数据被分成较小的存储单元,提高了数据统计和聚合的效率。
  • 均衡负载: 数据经过分桶后更容易实现均衡负载,数据可以分发到多个节点中,提高了查询效率。

综上所述,分区和分桶的区别在于其提供的性能优化方向不同。分区适用于对于数据常常进行的聚合查询数据分析,而分桶适用于对于数据的均衡负载、高效聚合等方面的性能优化。当数据量较大、查询效率比较低时,使用分区和分桶可以有效优化性能。分区主要关注数据的分区和存储,而分桶则重点考虑数据的分布以及查询效率。

二、环境准备

如果已经有了环境了,可以忽略,如果想快速部署环境可以参考我这篇文章:通过 docker-compose 快速部署 Hive 详细教程

三、外部表和管理表

在Hive中,可以创建两种类型的表:外部表和管理表。它们之间的主要区别如下:

1)外部表

1、外部表介绍

外部表是指在Hive中创建的表,实际上其数据是存储在外部文件系统(HDFS或本地文件系统)中的。

  • 外部分区表是一种特殊类型的表,它们的数据存储在Hive之外的文件系统上,例如HDFS、S3等。
  • 对于外部分区表,Hive只会管理它们的元数据信息,而不会管理数据文件本身,这意味着,如果你使用Hive命令删除一个外部分区表,只会删除该表的元数据,而不会删除数据文件
  • 外部分区表通常用于存储和管理原始数据,这些数据通常需要在多个系统和工具之间共享。

2、示例讲解

【示例一】下面是创建Hive外部表的一个示例(数据存储在HDFS):

假设我们有一个存储在 HDFS 上的数据文件,其路径为’/user/hive/external_table/data’,我们可以通过以下语句,在Hive中创建一个外部表:

# 登录容器docker exec -it hive-hiveserver2 
# 登录hive客户端beeline -u jdbc:hive2://hive-hiveserver2:10000  -n hadoop
# 建表
CREATE EXTERNAL TABLE external_table1 (
    column1 STRING,
    column2 INT,
    column3 DOUBLE)
ROW FORMAT DELIMITEDFIELDS TERMINATED BY ','LINES TERMINATED BY '\n'STORED AS TEXTFILELOCATION '/user/hive/external_table/data';

在该表中,我们指定了表的各列的数据类型和分隔符等信息,并且使用了LOCATION 关键字来指定数据文件的存储位置。这样,在Hive中对该外部表进行查询操作时,Hive会自动去对应的位置读取数据文件,并据此返回查询结果。

load 数据

# 模拟一些数据cat >data<<EOFc1,12,56.33c2,14,58.99c3,15,66.34c4,16,76.78EOF
# 登录hive客户端beeline -u jdbc:hive2://hive-hiveserver2:10000  -n hadoop# 加载数据,local 是加载本机文件数据load data local inpath './data' into table external_table1;

需要注意的是,在使用外部表时,我们必须保证Hive对数据文件的访问权限与HDFS的文件权限相同,否则会导致外部表的查询失败。此外,在使用外部表时,务必不要删除外部表的数据文件,否则将会导致查询结果的不准确。

【示例一】下面是创建外部表访问本地数据文件的示例(数据存储在本地,很少使用):

在Hive中,我们同样可以创建外部表来访问本地文件系统上的数据文件。在这种情况下,我们需要注意的是,在Hive的配置中,必须开启hive.stats.autogather 功能。否则,在查询外部表时可能会出现错误。

假设我们有一个存储在本地文件系统上的数据文件,路径为’/path/to/local/file’,我们可以通过以下语句,在Hive中创建一个外部表:

CREATE EXTERNAL TABLE external_table2 (
    column1 STRING,
    column2 INT,
    column3 DOUBLE)ROW FORMAT DELIMITEDFIELDS TERMINATED BY ','LINES TERMINATED BY '\n'STORED AS TEXTFILELOCATION 'file:///path/to/local/file';### hive文件存储格式包括以下几类(STORED AS TEXTFILE):#1、TEXTFILE#2、SEQUENCEFILE#3、RCFILE#4、ORCFILE(0.11以后出现)#其中TEXTFILE为默认格式,建表时不指定默认为这个格式,导入数据时会直接把数据文件拷贝到hdfs上不进行处理;

需要注意的是,我们在使用LOCATION关键字时,要指定为’file:///path/to/local/file’,而不是 ‘/path/to/local/file’ ,这是因为我们需要使用文件系统的URL来访问本地文件系统上的数据文件。

2)管理表(内部表)

1、管理表(内部表)介绍

管理表是利用Hive自身的存储能力来对数据进行存储和管理的表。在Hive中创建管理表时,必须指定数据的存储路径。

  • 管理表也称为内部表(Internal Table),管理表是Hive默认创建的表类型,它的数据存储在Hive默认的文件系统上(通常是HDFS)。
  • Hive会自动管理这些表的数据和元数据,包括表的位置、数据格式等。如果你使用Hive命令删除了一个管理表,那么该表的数据也会被删除
  • 通常情况下,管理表用于存储和管理中间结果、汇总数据和基础数据。当数据规模较小时,管理表是一个不错的选择,因为它可以提供更好的查询性能,同时也更容易管理。

2、示例讲解

在Hive中,除了外部表,我们还可以创建内部表来存储数据。与外部表不同的是,内部表存储的数据位于Hive自身管理的HDFS上,因此,在创建内部表时,我们需要确保数据可以被正确地上传到HDFS上。下面是创建内部表并存储在本机的示例:

假设我们有以下数据文件,名为data.csv,存储在本地文件系统的/path/to/local目录下:

cat >data.csv<<EOFvalue1,1,2.3value2,2,3.4value3,3,4.5EOF

我们可以使用以下语句,在Hive中创建一个内部表:

CREATE TABLE internal_table (    column1 STRING,    column2 INT,    column3 DOUBLE)ROW FORMAT DELIMITEDFIELDS TERMINATED BY ','LINES TERMINATED BY '\n'STORED AS TEXTFILE;# 加载本地数据,LOCAL LOAD DATA LOCAL INPATH './data.csv' INTO TABLE internal_table;# 加载HDFS数据# 先将文件推送到HDFS上hdfs dfs -put ./data.csv /tmp/# 登录hive客户端beeline -u jdbc:hive2://hive-hiveserver2:10000  -n hadoop# 加载HDFS上的数据LOAD DATA INPATH '/tmp/data.csv' INTO TABLE internal_table;# 查询select * from internal_table;


总之,外部表和管理表都可以在Hive中实现数据的存储和管理,但它们之间的不同主要体现在数据的存储和处理方式上。

四、分区表之静态分区和动态分区

Hive中的分区表可以进一步细分为静态分区和动态分区。

静态分区是指通过手动指定分区列的值来创建分区。例如,在创建一个基于年份的分区表时,我们可以手动指定每个分区名对应的年份:

CREATE TABLE sales (
  id int,
  date string,
  amount double)PARTITIONED BY (year string);
ALTER TABLE sales ADD PARTITION (year='2019') location '/data/sales/2022';
ALTER TABLE sales ADD PARTITION (year='2020') location '/data/sales/2023';

在上述示例中,我们通过 ALTER TABLE 语句手动添加了2019和2020两个年份的分区。

动态分区是指在加载数据时通过SQL语句自动创建分区。例如,在从一个包含销售记录的数据文件中加载数据时,可以自动根据数据中的年份信息创建相应的分区:

INSERT INTO TABLE sales PARTITION (year)SELECT id, date, amount, YEAR(date)FROM raw_sales;

在上述示例中,我们使用 PARTITION 子句指定在 CREATE TABLE 语句中定义的分区列year,并使用 YEAR(date) 表达式从数据中提取出年份信息。

动态分区的优点在于它可以大大简化创建和管理分区表的过程并提高效率;但是需要注意的是,它可能会在某些情况下产生不可预期的行为,例如可能创建太多分区。

总之,静态分区和动态分区都是用于在Hive中管理大型数据集的有效工具,具体使用需要根据具体情况选择最适合的方法,并理解它们的优点和缺点。

五、hive分区表严格模式和非严格模式

Hive分区表的严格模式和非严格模式可以通过以下两个参数进行设置:

  1. hive.exec.dynamic.partition.mode:该参数用于设置分区模式,其默认值为strict,即严格模式。可以将其设置为nonstrict,即非严格模式
# 登录hive客户端beeline -u jdbc:hive2://hive-hiveserver2:10000  -n hadoop# 设置SET hive.exec.dynamic.partition.mode=nonstrict;
  1. hive.exec.max.dynamic.partitions:该参数用于限制动态分区的最大数量。在非严格模式下,当动态分区的数量超过该参数指定的值时,Hive将抛出异常。可以通过以下语句来修改该参数:
SET hive.exec.max.dynamic.partitions=<value>;

其中,<value> 为一个整数值,表示限制的动态分区数量。如果需要取消该限制,可以将该参数设置为一个非正数,例如:

SET hive.exec.max.dynamic.partitions=-1;

需要注意的是,这些参数的设置仅对当前会话有效,也可以将其添加到Hive的配置文件中以在每个会话中自动应用。

总之,hive.exec.dynamic.partition.mode 和 hive.exec.max.dynamic.partitions 是控制Hive分区表严格模式和非严格模式的两个重要参数,开发人员可以根据自己的需求进行设置。

1)严格模式

严格模式要求在加载数据时必须指定所有分区列的值,否则将会导致抛出异常。例如,在下面的分区表中:

CREATE TABLE sales (
  id int,
  date string,
  amount double)PARTITIONED BY (year string, month string, day string)CLUSTERED BY (id) INTO 10 BUCKETS;

在严格模式下,我们必须为year、month和day三个分区列的所有可能取值指定一个分区:

INSERT INTO TABLE sales PARTITION (year='2019', month='01', day='03')SELECT id, date, amountFROM raw_salesWHERE YEAR(date) = 2019 AND MONTH(date) = 1 AND DAY(date) = 3;

在上述示例中,我们使用 PARTITION 子句手动为分区列year、month、day指定取值。

2)非严格模式

非严格模式则允许忽略某些分区列的值,这样使用 INSERT INTO 语句时只需指定提供的分区值即可。例如:

# SET hive.exec.dynamic.partition.mode=nonstrict;INSERT INTO TABLE sales PARTITION (year, month, day)SELECT id, YEAR(date), MONTH(date), DAY(date), amountFROM raw_salesWHERE YEAR(date) = 2019;

在上述示例中,我们使用 SET 语句设置分区模式为非严格模式,然后只提供了year分区列的值,而month和day分区列的值是从数据中动态计算得出的。

使用非严格模式可以简化分区表的创建和管理,但需要注意,它可能会产生一些意料之外的结果(例如可能创建太多分区),所以需要谨慎使用。

总之,分区表的严格模式和非严格模式都具有一些优点和缺点,具体使用需要根据具体情况选择最适合的方式。

六、分区表和分桶表示例讲解

1)分区表示例讲解

在Hive中,我们可以使用分区表来更有效地组织和管理数据。分区表将数据分为子集,每个子集对应一个或多个分区。这样,我们就可以更快地访问和查询数据,而不必扫描整个数据集。

创建分区表的语法类似于创建普通表,只不过要使用 PARTITIONED BY 子句指定一个或多个分区列,例如:

# 内部表CREATE TABLE partitioned_internal_table (
  id INT,
  mesg STRING)PARTITIONED BY (
  year INT,
  month INT)
ROW FORMAT DELIMITEDFIELDS TERMINATED BY ','LINES TERMINATED BY '\n'STORED AS TEXTFILE;
# 外部表
CREATE EXTERNAL TABLE partitioned_external_table (
  id INT,
  mesg STRING)PARTITIONED BY (
  year INT,
  month INT)
ROW FORMAT DELIMITEDFIELDS TERMINATED BY ','LINES TERMINATED BY '\n'STORED AS TEXTFILELOCATION '/user/hive/partitioned_table/data';

上述语句创建了一个分区表,在列column1和column2的基础上,按照year和month两列进行了分区。

【注意】分区的实现依赖于Hive的底层存储Hadoop分布式文件系统(HDFS)。为了确定如何分配数据,Hive要求每个分区对应一个目录,该目录包含该分区数据的所有文件。因此,在将数据加载到分区表中时,必须提供与分区对应的目录

例如,如果我们要将一个CSV文件加载到分区表中,我们可以使用以下语句:

LOAD DATA LOCAL INPATH './file.csv' INTO TABLE partitioned_external_table PARTITION (year=2019, month=1);# 查看分区show partitions partitioned_external_table;

在上述语句中,我们使用 LOAD DATA 子句将 /data/file.csv 文件加载到partitioned_table 表中,并指定了分区year为2019,分区month为1。

假设我们的CSV文件具有以下内容:

1,test1,2019,11,test2,2019,12,test3,2022,13,test4,2023,1

使用以下语句查询分区表:

SELECT * FROM partitioned_external_table WHERE year=2019 AND month=1;

分区表的优点在于可以更高效地组织数据,同时也允许我们根据需要删除或添加分区。例如,我们可以使用以下语句删除分区:

ALTER TABLE partitioned_table DROP PARTITION (year=2019, month=1);

可以使用以下语句添加分区:

ALTER TABLE partitioned_external_table ADD PARTITION (year=2020, month=2);
# 查看分区show partitions partitioned_external_table;

总之,分区表是管理和查询大型数据集的有效方式,可以帮助我们更轻松地处理大量数据。

2)分桶表示例讲解

除了分区表之外,Hive还提供了另一种将数据分割成可管理单元的方式,即分桶。

分区和分桶的概念有一些相似之处,但也存在一些重要的区别。

  • 分区是指基于表的某些列将数据分割成不同的存储单元;
  • 而分桶是指将数据根据哈希函数分成一组固定的桶。

类比于分区,在创建一个分桶表时,我们需要指定分桶的数量和分桶的列。例如,以下是一个创建分桶表的示例:

CREATE TABLE bucketed_table (
  column1 data_type,
  column2 data_type,  ...) 
CLUSTERED BY (column1) -- 分桶列INTO 10 BUCKETS; -- 桶数量

在上述示例中,我们将column1作为分桶列,并将数据分成10个桶。

加载数据时,Hive根据指定的桶列计算哈希值,并将数据存储在对应的桶中。

INSERT INTO TABLE bucketed_table VALUES ('value1', 1, 2.3)

查询时,可以使用以下格式指定桶列:

SELECT * FROM bucketed_table TABLESAMPLE(BUCKET x OUT OF y ON column1);

在上述示例中,我们使用用于抽样数据的 TABLESAMPLE 子句,指定从桶x中抽取数据,并在分桶列column1上进行抽样。

分桶表的优点在于,我们可以更容易地执行等值和范围查询,并更好地利用MapReduce 的数据本地性,从而提高查询性能。但分桶表也有一些缺点,例如添加和删除数据涉及重新计算哈希函数和移动数据的成本。

总之,分区表和分桶表都是Hive管理和处理大型数据集的重要工具,可以帮助我们更轻松地组织、查询和分析大量数据。在具体使用时,需要考虑表的存储和查询需求,选择最适合的表类型。在实际场景中分区用的居多。

关于Hive 分区和分桶的区别和示例讲解就先到这里了,有任何疑问欢迎给我留言,后续会持续更新相关文章,请小伙伴耐心等待

from:https://baijiahao.baidu.com/s?id=1764915870006249443&wfr=spider&for=pc

如何优化 Spark 小文件,Kyuubi 一步搞定!(转)


Hive 表中太多的小文件会影响数据的查询性能和效率,同时加大了 HDFS NameNode 的压力。Hive (on MapReduce) 一般可以简单的通过一些参数来控制小文件,而 Spark 中并没有提供小文件合并的功能。下面我们来简单了解一下 Spark 小文件问题,以及如何处理小文件。

01Spark 小文件问题

 1.1 环境 

Kyuubi 版本:1.6.0-SNAPSHOT

Spark 版本:3.1.3、3.2.0 

 1.2 TPCDS 数据集 

Kyuubi 中提供了一个 TPCDS Spark Connector,可以通过配置 Catalog 的方式,在读取时自动生成 TPCDS 数据。 

只需要将 kyuubi-spark-connector-tpcds_2.12-1.6.0-SNAPSHOT.jar 包放入 Spark jars 目录中,并配置:

spark.sql.catalog.tpcds=org.apache.kyuubi.spark.connector.tpcds.TPCDSCatalog;

这样我们就可以直接读取 TPCDS 数据集:

use tpcds; 

show databases; 

use sf3000; 

show tables; 

select * from sf300.catalog_returns limit 10;

 1.3 小文件产生

首先我们在 Hive 中创建一个 sample.catalog_returns 表,用于写入生成的 TPCDScatalog_returns 数据,并添加一个 hash 字段作为分区。

我们先关闭 Kyuubi 的优化,读取 catalog_returns 数据并写入 Hive:

Spark SQL 最终产生的文件数最多可能是最后一个写入的 Stage 的 Task 数乘以动态分区的数量我们 可以看到由于读取输入表的 Task 数是 44 个,所以最终产生了 44 个文件,每个文件大小约 69 M。 

 1.4 改变分区数(Repartition) 

由于写入的文件数跟最终写入 Stage 的 Task 数据有关,那么我们可以通过添加一个 Repartition 操作, 来减少最终写入的 task 数,从而控制小文件:

添加 REPARTITION(10) 后,会在读取后做一个 Repartition 操作,将 partition 数变成 10,所以最终 写入的文件数变成 10 个。 

 1.5 Spark AQE 自动合并小分区 

Spark 3.0 以后引入了自适应查询优化(Adaptive Query Execution, AQE),可以自动合并较小的分区。

开启 AQE,并通过添加 distribute by cast(rand() * 100 as int) 触发 Shuffle 操作:

默认 Shuffle 分区数 spark.sql.shuffle.partitions=200 ,如果不开启 AQE 会产生 200 个小文件, 开启 AQE 后,会自动合并小分区,根据spark.sql.adaptive.advisoryPartitionSizeInBytes=512M 配置合并较小的分区,最终产生 12 个文件。

02Kyuubi 小文件优化分析

Apache Kyuubi (Incubating) 作为增强版的 Spark Thrift Server 服务,可通过 Spark SQL 进行大规模的 数据处理分析。Kyuubi 通过 Spark SQL Extensions 实现了很多的 Spark 优化,其中包括了RepartitionBeforeWrite 的优化,再结合 Spark AQE 可以自动优化小文件问题,下面我们具体分析 一下 Kyuubi 如何实现小文件优化。

  2.1 Kyuubi 如何优化小文件

 Kyuubi 提供了在写入前加上 Repartition 操作的优化,我们只需要将 kyuubi-extension-spark-3- 1_2.12-1.6.0-SNAPSHOT.jar 放入 Spark jars 目录中,并配置spark.sql.extensions=org.apache.kyuubi.sql.KyuubiSparkSQLExtension 。相关配置:

通过 spark.sql.optimizer.insertRepartitionNum 参数可以配置最终插入 Repartition 的分区数, 当不开启 AQE,默认为 spark.sql.shuffle.partitions 的值。需要注意,当我们设置此配置会导致 AQE 失效,所以开启 AQE 不建议设置此值。 

对于动态分区写入,会根据动态分区字段进行 Repartition,并添加一个随机数来避免产生数据倾斜,spark.sql.optimizer.dynamicPartitionInsertionRepartitionNum 用来配置随机数的范围,不 过添加随机数后,由于加大了动态分区的基数,还是可能会导致小文件。这个操作类似在 SQL 中添加distribute by DYNAMIC_PARTITION_COLUMN, cast(rand() * 100 as int)。

  2.2 静态分区写入 

开启 Kyuubi 优化和 AQE,测试静态分区写入:

可以看到 AQE 生效了,很好地控制了小文件,产生了 11 个文件,文件大小 314.5 M 左右。

 2.3 动态分区写入

我们测试一下动态分区写入的情况,先关闭 Kyuubi 优化,并生成 10 个 hash 分区:

产生了 44 × 10 = 440 个文件,文件大小 8 M 左右。

开启 Kyuubi 优化和 AQE:

产生了 12 × 10 = 120 个文件,文件大小 30 M 左右,可以看到小文件有所改善,不过仍然不够理想。

此案例中 hash 分区由 rand 函数产生,分布比较均匀,所以我们将spark.sql.optimizer.dynamicPartitionInsertionRepartitionNum 设置成 0 ,重新运行,同时将动态分区数设置为5 :

由于动态分区数只有 5 个,所以实际上只有 5 个 Task 有数据写入,每个 Task 对应一个分区,导致最终每个分区只有一个较大的大文件。

通过上面的分析可以看到,对于动态分区写入,Repartition 的优化可以缓解小文件,配置spark.sql.optimizer.dynamicPartitionInsertionRepartitionNum=100 解决了数据倾斜问题, 不过同时还是可能会有小文件。 

  2.4 Rebalance 优化 

Spark 3.2+ 引入了 Rebalance 操作,借助于 Spark AQE 来平衡分区,进行小分区合并和倾斜分区拆 分,避免分区数据过大或过小,能够很好地处理小文件问题。 

Kyuubi 对于 Spark 3.2+ 的优化,是在写入前插入 Rebalance 操作,对于动态分区,则指定动态分区列 进行 Rebalance 操作。不再需要 spark.sql.optimizer.insertRepartitionNum 和spark.sql.optimizer.dynamicPartitionInsertionRepartitionNum 配置。

测试静态分区写入,使用 Spark 3.2.0 开启 Kyuubi 优化和 AQE:

Repartition 操作自动合并了小分区,产生了 11 个文件,文件大小 334.6 M 左右,解决了小文件的问题。

测试动态分区写入,使用 Spark 3.2.0 开启 Kyuubi 优化和 AQE,生成 5 个动态分区:

Repartition 操作自动拆分较大分区,产生了 2 × 5 = 10 个文件,文件大小 311 M 左右,很好地解决倾斜问题。

03总 结

从上面的分析可以看到,对于 Spark 3.2+,Kyuubi 结合 Rebalance 能够很好地解决小文件问题,对于 Spark 3.1,Kyuubi 也能自动优化小文件,不过动态分区写入的情况还是可能存在问题。

相关的配置总结:

更多 AQE 配置可以参考:How To Use Spark Adaptive Query Execution (AQE) in Kyuubi

原文连接:https://www.modb.pro/db/423478