云上的对象存储,具有低成本、高可靠、易运维等诸多优点。将其对接到 PostgreSQL 后,可用于自动归档历史数据,还可用于数据分析。
介绍
pg_parquet
是一个 PostgreSQL 扩展,允许您通过COPY TO/FROM
命令,从 PostgreSQL 读取和写入位于S3
或文件系统中的 Parquet 文件。它依赖于 Apache Arrow 项目来读写 Parquet 文件,依赖于 pgrx 项目来扩展 PostgreSQL 的COPY
命令。
-- Copy a query result into Parquet in S3
COPY (SELECT * FROM table) TO 's3://mybucket/data.parquet' WITH (format 'parquet');
-- Load data from Parquet in S3
COPY table FROM 's3://mybucket/data.parquet' WITH (format 'parquet');
从源代码安装
在安装 PostgreSQL(14 或更高版本)后,您需要设置rustup
,cargo-pgrx
以构建扩展。
# install rustup
> curl --proto '=https' --tlsv1.2 -sSf https://sh.rustup.rs | sh
# install cargo-pgrx
> cargo install cargo-pgrx
# configure pgrx
> cargo pgrx init --pg17 $(which pg_config)
# append the extension to shared_preload_libraries in ~/.pgrx/data-17/postgresql.conf
> echo "shared_preload_libraries = 'pg_parquet'" >> ~/.pgrx/data-17/postgresql.conf
# run cargo-pgrx to build and install the extension
> cargo pgrx run
# create the extension in the database
psql> "CREATE EXTENSION pg_parquet;"
用法
您可以使用pg_parquet
主要做 3 件事:
1. 您可以将 PostgreSQL 表/查询结果导出到 Parquet 文件,
2. 您可以将数据从 Parquet 文件导入到 PostgreSQL 表。
3. 您可以检查 Parquet 文件的模式和元数据。
在表和 Parquet 文件之间进行导入/导出
您可以使用 PostgreSQL 的COPY
命令,来读取和写入 Parquet 文件。以下示例说明了,如何将具有复杂类型的 PostgreSQL 表写入 Parquet 文件,然后将 Parquet 文件内容读回同一表中。
-- create composite types
CREATE TYPE product_item AS (id INT, name TEXT, price float4);
CREATE TYPE product AS (id INT, name TEXT, items product_item[]);
-- create a table with complex types
CREATE TABLE product_example (
id int,
product product,
products product[],
created_at TIMESTAMP,
updated_at TIMESTAMPTZ
);
-- insert some rows into the table
insert into product_example values (
1,
ROW(1, 'product 1', ARRAY[ROW(1, 'item 1', 1.0), ROW(2, 'item 2', 2.0), NULL]::product_item[])::product,
ARRAY[ROW(1, NULL, NULL)::product, NULL],
now(),
'2022-05-01 12:00:00-04'
);
-- copy the table to a parquet file
COPY product_example TO '/tmp/product_example.parquet' (format 'parquet', compression 'gzip');
-- show table
SELECT * FROM product_example;
-- copy the parquet file to the table
COPY product_example FROM '/tmp/product_example.parquet';
-- show table
SELECT * FROM product_example;
检查 Parquet 模式
您可以调用SELECT * FROM parquet.schema(<uri>)
,以发现给定 uri 处的 Parquet 文件的模式。
SELECT * FROM parquet.schema('/tmp/product_example.parquet') LIMIT 10;
uri | name | type_name | type_length | repetition_type | num_children | converted_type | scale | precision | field_id | logical_type
------------------------------+--------------+------------+-------------+-----------------+--------------+----------------+-------+-----------+----------+--------------
/tmp/product_example.parquet | arrow_schema | | | | 5 | | | | |
/tmp/product_example.parquet | id | INT32 | | OPTIONAL | | | | | 0 |
/tmp/product_example.parquet | product | | | OPTIONAL | 3 | | | | 1 |
/tmp/product_example.parquet | id | INT32 | | OPTIONAL | | | | | 2 |
/tmp/product_example.parquet | name | BYTE_ARRAY | | OPTIONAL | | UTF8 | | | 3 | STRING
/tmp/product_example.parquet | items | | | OPTIONAL | 1 | LIST | | | 4 | LIST
/tmp/product_example.parquet | list | | | REPEATED | 1 | | | | |
/tmp/product_example.parquet | items | | | OPTIONAL | 3 | | | | 5 |
/tmp/product_example.parquet | id | INT32 | | OPTIONAL | | | | | 6 |
/tmp/product_example.parquet | name | BYTE_ARRAY | | OPTIONAL | | UTF8 | | | 7 | STRING
(10 rows)
检查 Parquet 元数据
您可以调用SELECT * FROM parquet.metadata(<uri>)
,以在给定的 uri 中发现 Parquet 文件的详细元数据,例如列统计信息。
SELECT uri, row_group_id, row_group_num_rows, row_group_num_columns, row_group_bytes, column_id, file_offset, num_values, path_in_schema, type_name FROM parquet.metadata('/tmp/product_example.parquet') LIMIT 1;
uri | row_group_id | row_group_num_rows | row_group_num_columns | row_group_bytes | column_id | file_offset | num_values | path_in_schema | type_name
------------------------------+--------------+--------------------+-----------------------+-----------------+-----------+-------------+------------+----------------+-----------
/tmp/product_example.parquet | 0 | 1 | 13 | 842 | 0 | 0 | 1 | id | INT32
(1 row)
SELECT stats_null_count, stats_distinct_count, stats_min, stats_max, compression, encodings, index_page_offset, dictionary_page_offset, data_page_offset, total_compressed_size, total_uncompressed_size FROM parquet.metadata('/tmp/product_example.parquet') LIMIT 1;
stats_null_count | stats_distinct_count | stats_min | stats_max | compression | encodings | index_page_offset | dictionary_page_offset | data_page_offset | total_compressed_size | total_uncompressed_size
------------------+----------------------+-----------+-----------+--------------------+--------------------------+-------------------+------------------------+------------------+-----------------------+-------------------------
0 | | 1 | 1 | GZIP(GzipLevel(6)) | PLAIN,RLE,RLE_DICTIONARY | | 4 | 42 | 101 | 61
(1 row)
您可以调用SELECT * FROM parquet.file_metadata(<uri>)
,以在给定 uri 处发现 Parquet 文件的文件级元数据,例如格式版本。
SELECT * FROM parquet.file_metadata('/tmp/product_example.parquet')
uri | created_by | num_rows | num_row_groups | format_version
------------------------------+------------+----------+----------------+----------------
/tmp/product_example.parquet | pg_parquet | 1 | 1 | 1
(1 row)
您可以调用SELECT * FROM parquet.kv_metadata(<uri>)
,以查询给定 uri 处的 Parquet 文件的自定义键值元数据。
SELECT uri, encode(key, 'escape') as key, encode(value, 'escape') as value FROM parquet.kv_metadata('/tmp/product_example.parquet');
uri | key | value
------------------------------+--------------+---------------------
/tmp/product_example.parquet | ARROW:schema | /////5gIAAAQAAAA ...
(1 row)
对象存储支持
pg_parquet
支持从S3
对象存储读取和写入 Parquet 文件。仅支持带s3://
的 URI 格式。
配置对象存储的最简单方法是,创建标准的~/.aws/credentials
和~/.aws/config
文件:
$ cat ~/.aws/credentials
[default]
aws_access_key_id = AKIAIOSFODNN7EXAMPLE
aws_secret_access_key = wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY
$ cat ~/.aws/config
[default]
region = eu-central-1
或者,您可以在启动 PostgreSQL 时,使用以下环境变量来配置 S3 客户端:
• AWS_ACCESS_KEY_ID
:AWS 账户的访问密钥 ID
• AWS_SECRET_ACCESS_KEY
:AWS 账户的保密访问密钥
• AWS_REGION
:AWS 账户的默认区域
• AWS_SHARED_CREDENTIALS_FILE
:凭证文件的替代位置
• AWS_CONFIG_FILE
:配置文件的替代位置
• AWS_PROFILE
:凭证和配置文件中的概要文件名称(默认概要文件名称为default
)
注意:为了能够写入对象存储位置,您需要向当前 postgres 用户授予parquet_object_store_write
角色。同样,要从对象存储位置读取数据,您需要向当前 postgres 用户授予parquet_object_store_read
角色。
COPY 选项
pg_parquet
在COPY TO
命令中支持以下选项:
• format parquet
:您需要指定此选项,来读取或写入不以.parquet[.<compression>]
扩展名结尾的 Parquet 文件。(这是COPY FROM
命令支持的唯一选项。)
• row_group_size <int>
:写入 Parquet 文件时每个行组中的行数。默认行组的行数为122880
,
• row_group_size_bytes <int>
:写入 Parquet 文件时每个行组中行的总字节数。默认行组的字节数为row_group_size * 1024
,
• compression <string>
:写入 Parquet 文件时要使用的压缩格式。支持的压缩格式包括uncompressed
、snappy
、gzip
、brotli
、lz4
、lz4raw
和zstd
。默认压缩格式为snappy
。如果未指定,则压缩格式由文件扩展名决定。
• compression_level <int>
:写入 Parquet 文件时要使用的压缩级别。支持的压缩级别,仅支持用于gzip
、zstd
和brotli
压缩格式。默认压缩级别,对于gzip (0-10)
为6
、对于zstd (1-22)
为1
,对于brotli (0-11)
为1
。
配置
目前只有一个 GUC 参数,可用于启用/禁用pg_parquet
:
支持的类型
pg_parquet
具有丰富的类型支持,包括 PostgreSQL 的基础类型、数组和复合类型。下表列出了 PostgreSQL 中支持的类型,及其相应的 Parquet 类型。
PostgreSQL 类型 | Parquet 物理类型 | 逻辑类型 |
bool | BOOLEAN |
|
smallint | INT16 |
|
integer | INT32 |
|
bigint | INT64 |
|
real | FLOAT |
|
oid | INT32 |
|
double | DOUBLE |
|
numeric [1] | FIXED_LEN_BYTE_ARRAY(16) | DECIMAL(128) |
text | BYTE_ARRAY | STRING |
json | BYTE_ARRAY | STRING |
bytea | BYTE_ARRAY |
|
date [2] | INT32 | DATE |
timestamp | INT64 | TIMESTAMP_MICROS |
timestamptz [3] | INT64 | TIMESTAMP_MICROS |
time | INT64 | TIME_MICROS |
timetz [3] | INT64 | TIME_MICROS |
geometry [4] | BYTE_ARRAY |
|
composite | GROUP | STRUCT |
array | 元素的物理类型 | LIST |
警告:
(1) 精度 <= 38
的numeric
类型表示为FIXED_LEN_BYTE_ARRAY(16)
,逻辑类型为DECIMAL(128)
。精度 > 38
的numeric
类型表示为BYTE_ARRAY
,逻辑类型为STRING
。
(2) date
类型在写入 Parquet 文件时,根据Unix epoch
来表示。从 Parquet 文件读取时,根据PostgreSQL epoch
转换回来。
(3) 在写入 Parquet 文件时,timestamptz
和timetz
类型会调整成UTC
形式。从 Parquet 文件读取时,它们会按UTC
时区转换回来。
(4) 在设置了postgis
扩展的情况下,geometry
类型表示为BYTE_ARRAY
,编码为WKB
。否则,它将表示为BYTE_ARRAY
,逻辑类型为STRING
。
(5) 作为一种回退机制,任何没有相应 Parquet 类型的类型,都将表示为BYTE_ARRAY
,逻辑类型为STRING
。例如enum
。
该文章在 2024/11/12 16:49:17 编辑过