gpfdist
使用外部表的形式来高速加载数据,避免直接使用copy命令时,只有master节点,在解析、工作。从而提高数据的加载速度。重点掌握该工具,来达到快速的加载数据。
gpfdist基于libevent的高速并行文件加载工具;充分利用多节点优势,并行加载;加载性能非常好;可水平扩展。
实践过程中,利用域名解析,如将运行gpfdist的机器的ip加入到本地域名中etl。好处是:外部表的定义不用更改,可以方便的指向真实ip。
参考资源
官方手册
以下内容也有gpload方面内容,本文并没有参考。
gpfdist加载数据
数据源
cat > demo.txt <<EOL
a|b|c
d|e|f
- 先创建正式表
create table demo(c1 text,c2 text,c3 text);
- 再创建外部表,导入数据
create external table demo_ext (like demo) LOCATION ('gpfdist://etl:8080/demo.txt' ) FORMAT 'TEXT' (DELIMITER '|');
- 启动服务
gpfdist > gpfdist.log
# 测试连通性,应该能看到demo的文本 特殊的请求头,才会有正确的返回
curl -H "X-GP-PROTO: 0" etl:8080/demo.txt
执行外部表操作
- 查询外部表
select * from demo_ext;
利用外部表加载数据
跟copy命令一样,重复执行,就会重复加载。
insert into demo select * from demo_ext;
gpload加载数据
gpload对gpfdist封装增加高级特性:1、UPSERT。2、Transform。3、多gpfdist实例。
基于python实现,跨平台,用于与第三方ETL工具集成。
不需要自己手动启动gpfdist服务,会自动的创建服务、外部表等。
步骤:
- 数据同上。
- 创建表
create table testgpload(c1 text,c2 text,c3 text);
- 配置文件
cat > demo.yaml <<EOL
VERSION: 1.0.0.1
DATABASE: gpdb
USER: gpadmin
HOST: etl
PORT: 5432
GPLOAD:
INPUT:
- SOURCE:
FILE:
- demo.txt
- FORMAT: text
- DELIMITER: '|'
OUTPUT:
- TABLE: testgpload
- MODE: insert
- 加载命令
gpload -f demo.yaml
日志
# FILE跟SOURCE的级别问题
2021-09-13 17:40:51|ERROR|unexpected key: "file"
# 文件不存在
2021-09-13 17:42:08|INFO|setting schema 'yunchou' for table 'testgpload'
2021-09-13 17:42:08|INFO|started gpfdist -p 8000 -P 9000 -f "demo.txt" -t 30
2021-09-13 17:42:09|ERROR|ERROR: http response code 404 from gpfdist (gpfdist://szc49030:8000/demo.txt): HTTP/1.0 404 file not found (seg0 slice1 10.172.49.30:55000 pid=87876)
# 正常结果
2021-09-13 17:45:51|INFO|gpload succeeded
管道加载,实现不落盘
使用管道加载,实现不落盘的加载数据。
核心即管道文件。
mkfifo mypipe
创建外部表,文件选择管道文件。
create external table demo_pipe (like demo) LOCATION ('gpfdist://etl:8080/mypipe' ) FORMAT 'TEXT' (DELIMITER '|');
下面的命令会阻塞,直到有数据进来。(会一直卡在那,直到有数据进来,中间不会异常退出)
select * from demo_pipe;
将数据写入到管道中。
cat demo.txt > mypipe
在真实的场景中,可以用其他语言或方式,往管道里面写文件,然后从管道里面读内容,写入到表中。
注意事项
严格遵循操作管道文件的时序
确保管道不会被其它进程意外使用
对同一个管道不支持并发访问
gpfdist多实例
gpfdist
gpfdist采用的是基于libevent的单进程单线程的工作模式,尽管可以最大限度发挥单个CPU核的最大性能,但是当单个CPU核的加载速度无法满足实际需求时,如何利用多CPU,甚至多机进行并行加载?使用gpfdist来实现,水平扩展。
核心:通过外表的location中,指定多个gpfdist的url地址。
下面的3个服务是相互独立的进程,可以分布在不同的机器上,也可以布在同一个机器上。
nohup gpfdist -p 8081 -d /tmp/data1 2>&1 >gpfdist_8081.log &
nohup gpfdist -p 8082 -d /tmp/data2 2>&1 >gpfdist_8082.log &
nohup gpfdist -p 8083 -d /tmp/data3 2>&1 >gpfdist_8083.log &
备注:
-p,指定端口。
-d,指定数据目录。
- 创建目标表
create table demo (c1 text, c2 text, c3 text);
- 创建外部表,增加多个url
create external table demo_ext (like demo) LOCATION (
'gpfdist://etl:8081/a.txt',
'gpfdist://etl:8082/b.txt',
'gpfdist://etl:8083/c.txt'
) FORMAT 'TEXT' (DELIMITER '|');
- 查询外部表:
select * from demo_ext;
- 加载数据:
insert into demo select * from demo_ext;
gpload
指定多个source配置。只能在单个物理机上。
cat > multi.yaml <<EOL
VERSION: 1.0.0.1
DATABASE: gpdb
USER: gpadmin
HOST: etl
PORT: 5432
GPLOAD:
INPUT:
- SOURCE:
FILE:
- demo.txt
- SOURCE:
FILE:
- demo.txt
- SOURCE:
FILE:
- demo.txt
- FORMAT: text
- DELIMITER: '|'
OUTPUT:
- TABLE: testgpload
- MODE: insert
加载数据
gpload -f multi.yaml
多实例总结:
- gpfdist可支持水平扩展
- gpload不支持在多机上启动gpfdist
- 当加载瓶颈是网络带宽时,多实例不会改善性能
- 当加载瓶颈为磁盘IO时,多gpfdist实例反而会导致性能下降
加载json文件
- 创建外部表
create external table ext_json (data json) location ('gpfdist://etl:8080/sample.json') format 'text' (ESCAPE E'\x1E' DELIMITERE '\x1F');
由于json数据,只有1列,故DELIMITERE可以关闭,这里用的是不存在的字符\x1F,另外,json里面有转义,但是我们不需要对其进行解析,也关闭,用到不存在的字符\x1E。
- 查看外部表
select *from ext_json;
转换数据,使用pg自带的函数,能完成数据的解析。
mapping加载
增加了列属性,以及映射。
VERSION: 1.0.0.1
DATABASE: gpdb
USER: gpadmin
HOST: etl
PORT: 5432
GPLOAD:
INPUT:
- SOURCE:
FILE:
- demo.txt
- FORMAT: csv
- COLUMNS:
- col1: int
- col2: decimal(5,2)
OUTPUT:
- TABLE: testgpload
- MODE: insert
- MAPPING:
id: col1
value: (col2 - 32) * 5 / 9
updated: now()
总结:
- gpload通过mapping实现数据加工
- mapping可以为任何合法的表达式(expression)
- insert和merge模式都支持
gpload加载json
跟mapping很像。
VERSION:1.0.0.1
DATABASE: test
USER: gpadmin
HOST: mdw
PORT:5432
GPLOAD:
INPUT:
- SOURCE:
FILE:
- prize.jsonl
- FORMAT: text
- DELIMITER: E'\x1F'
- ESCAPE: E'\x1E'
- COLUMNS:
- data: json
OUTPUT:
- TABLE: prize
- MODE: insert
- MAPPING:
year: (data->>'year')::int
category: data->>'category'
first_name: json_array_elements(data->'laureates')::json->>'firstname'
surname: json_array_elements(data->'laureates')::json->>'surname'
正文
gpfdist
gpfdist [-d directory] [-p http_port] [-P last_http_port] [-l log_file]
[-t timeout] [-S] [-w time] [-v | -V] [-s] [-m max_length]
[--ssl certificate_path [--sslclean wait_time] ]
[-c config.yml]
gpfdist -? | --help
gpfdist --version
示例
gpfdist -d /var/load_files -p 8081 &
gpfdist -d /var/load_files -p 8081 -l /home/gpadmin/log &
#使用kill 杀掉后台进程
gpload
gpload -f control_file [-l log_file] [-h hostname] [-p port]
[-U username] [-d database] [-W] [--gpfdist_timeout seconds]
[--no_auto_trans] [--max_retries retry_times] [[-v | -V] [-q]] [-D]
gpload -?
gpload --version
yaml配置文件的语法
---
VERSION: 1.0.0.1
DATABASE: db_name
USER: db_username
HOST: master_hostname
PORT: master_port
GPLOAD:
INPUT:
- SOURCE:
LOCAL_HOSTNAME:
- hostname_or_ip
PORT: http_port
| PORT_RANGE: [start_port_range, end_port_range]
FILE:
- /path/to/input_file
SSL: true | false
CERTIFICATES_PATH: /path/to/certificates
- FULLY_QUALIFIED_DOMAIN_NAME: true | false
- COLUMNS:
- field_name: data_type
- TRANSFORM: 'transformation'
- TRANSFORM_CONFIG: 'configuration-file-path'
- MAX_LINE_LENGTH: integer
- FORMAT: text | csv
- DELIMITER: 'delimiter_character'
- ESCAPE: 'escape_character' | 'OFF'
- NULL_AS: 'null_string'
- FILL_MISSING_FIELDS: true | false
- FORCE_NOT_NULL: true | false
- QUOTE: 'csv_quote_character'
- HEADER: true | false
- ENCODING: database_encoding
- ERROR_LIMIT: integer
- LOG_ERRORS: true | false
EXTERNAL:
- SCHEMA: schema | '%'
OUTPUT:
- TABLE: schema.table_name
- MODE: insert | update | merge
- MATCH_COLUMNS:
- target_column_name
- UPDATE_COLUMNS:
- target_column_name
- UPDATE_CONDITION: 'boolean_condition'
- MAPPING:
target_column_name: source_column_name | 'expression'
PRELOAD:
- TRUNCATE: true | false
- REUSE_TABLES: true | false
- STAGING_TABLE: external_table_name
- FAST_MATCH: true | false
SQL:
- BEFORE: "sql_command"
- AFTER: "sql_command"
关于出错的一些选项
FILL_MISSING_FIELDS
可选的,默认是false。针对尾行不完整的场景。设置为true的时候,当读到不完整的行时,不存在的字段会被设置为
NULL。感觉用处不大。错误的数据,肯定不希望用这种方式来加载进去。另外,如果确实存缺失的列,也可以自己重建表。
ERROR_LIMIT
可选。当开启该功能时,加载数据时,错误会隔离到单行的错误。
说直白点,设置了这个数字,即错误没有达到最大错误时,所有的正常数据都会被加载,错误的数据,可能会被记录下来。这个功能,还有有用的,针对局部错误的行。
LOG_ERRORS
依赖前面
ERROR_LIMIT设置。使用下面的方式,读到错误的日志。gp_read_error_log('table_name');