gpfdist

使用外部表的形式来高速加载数据,避免直接使用copy命令时,只有master节点,在解析、工作。从而提高数据的加载速度。重点掌握该工具,来达到快速的加载数据。

gpfdist基于libevent的高速并行文件加载工具;充分利用多节点优势,并行加载;加载性能非常好;可水平扩展。

实践过程中,利用域名解析,如将运行gpfdist的机器的ip加入到本地域名中etl。好处是:外部表的定义不用更改,可以方便的指向真实ip。

参考资源

官方手册

以下内容也有gpload方面内容,本文并没有参考。

gpfdist加载数据

  • 数据源

    cat > demo.txt <<EOL

a|b|c
d|e|f
  • 先创建正式表
create table demo(c1 text,c2 text,c3 text);
  • 再创建外部表,导入数据
create external table demo_ext (like demo) LOCATION ('gpfdist://etl:8080/demo.txt' ) FORMAT 'TEXT' (DELIMITER '|');
  • 启动服务
gpfdist > gpfdist.log

# 测试连通性,应该能看到demo的文本    特殊的请求头,才会有正确的返回
curl -H "X-GP-PROTO: 0" etl:8080/demo.txt

执行外部表操作

  • 查询外部表
select * from demo_ext;
  • 利用外部表加载数据

    跟copy命令一样,重复执行,就会重复加载。

insert into demo select * from demo_ext;

gpload加载数据

gpload对gpfdist封装增加高级特性:1、UPSERT。2、Transform。3、多gpfdist实例。
基于python实现,跨平台,用于与第三方ETL工具集成。

不需要自己手动启动gpfdist服务,会自动的创建服务、外部表等。

步骤:

  • 数据同上。
  • 创建表
create table testgpload(c1 text,c2 text,c3 text);
  • 配置文件

cat > demo.yaml <<EOL

VERSION: 1.0.0.1
DATABASE: gpdb
USER: gpadmin
HOST: etl
PORT: 5432
GPLOAD:
  INPUT:
  - SOURCE:
      FILE:
         - demo.txt
  - FORMAT: text
  - DELIMITER: '|'
  OUTPUT:
  - TABLE: testgpload
  - MODE: insert
  • 加载命令
gpload -f demo.yaml

日志

# FILE跟SOURCE的级别问题
2021-09-13 17:40:51|ERROR|unexpected key: "file"
# 文件不存在
2021-09-13 17:42:08|INFO|setting schema 'yunchou' for table 'testgpload'
2021-09-13 17:42:08|INFO|started gpfdist -p 8000 -P 9000 -f "demo.txt" -t 30
2021-09-13 17:42:09|ERROR|ERROR:  http response code 404 from gpfdist (gpfdist://szc49030:8000/demo.txt): HTTP/1.0 404 file not found  (seg0 slice1 10.172.49.30:55000 pid=87876)
# 正常结果
2021-09-13 17:45:51|INFO|gpload succeeded

管道加载,实现不落盘

使用管道加载,实现不落盘的加载数据。

核心即管道文件。

mkfifo  mypipe

创建外部表,文件选择管道文件。

create external table demo_pipe (like demo) LOCATION ('gpfdist://etl:8080/mypipe' ) FORMAT 'TEXT' (DELIMITER '|');

下面的命令会阻塞,直到有数据进来。(会一直卡在那,直到有数据进来,中间不会异常退出)

select * from demo_pipe;

将数据写入到管道中。

cat demo.txt > mypipe

在真实的场景中,可以用其他语言或方式,往管道里面写文件,然后从管道里面读内容,写入到表中。

注意事项

  • 严格遵循操作管道文件的时序

  • 确保管道不会被其它进程意外使用

  • 对同一个管道不支持并发访问

gpfdist多实例

gpfdist

gpfdist采用的是基于libevent的单进程单线程的工作模式,尽管可以最大限度发挥单个CPU核的最大性能,但是当单个CPU核的加载速度无法满足实际需求时,如何利用多CPU,甚至多机进行并行加载?使用gpfdist来实现,水平扩展。

核心:通过外表的location中,指定多个gpfdist的url地址。

下面的3个服务是相互独立的进程,可以分布在不同的机器上,也可以布在同一个机器上。

nohup gpfdist -p 8081 -d /tmp/data1 2>&1 >gpfdist_8081.log &
nohup gpfdist -p 8082 -d /tmp/data2 2>&1 >gpfdist_8082.log &
nohup gpfdist -p 8083 -d /tmp/data3 2>&1 >gpfdist_8083.log &

备注:

-p,指定端口。

-d,指定数据目录。

  • 创建目标表
create table demo (c1 text, c2 text, c3 text);
  • 创建外部表,增加多个url
create external table demo_ext (like demo) LOCATION (
    'gpfdist://etl:8081/a.txt',
    'gpfdist://etl:8082/b.txt',
    'gpfdist://etl:8083/c.txt'
) FORMAT 'TEXT' (DELIMITER '|');
  • 查询外部表:
select * from demo_ext;
  • 加载数据:
insert into demo select * from demo_ext;

gpload

指定多个source配置。只能在单个物理机上。

cat > multi.yaml <<EOL

VERSION: 1.0.0.1
DATABASE: gpdb
USER: gpadmin
HOST: etl
PORT: 5432
GPLOAD:
  INPUT:
  - SOURCE:
      FILE:
         - demo.txt
  - SOURCE:
      FILE:
         - demo.txt
  - SOURCE:
      FILE:
         - demo.txt
  - FORMAT: text
  - DELIMITER: '|'
  OUTPUT:
  - TABLE: testgpload
  - MODE: insert

加载数据

gpload -f multi.yaml

多实例总结:

  • gpfdist可支持水平扩展
  • gpload不支持在多机上启动gpfdist
  • 当加载瓶颈是网络带宽时,多实例不会改善性能
  • 当加载瓶颈为磁盘IO时,多gpfdist实例反而会导致性能下降

加载json文件

  • 创建外部表
create external table ext_json (data json) location ('gpfdist://etl:8080/sample.json') format 'text' (ESCAPE E'\x1E' DELIMITERE '\x1F');

由于json数据,只有1列,故DELIMITERE可以关闭,这里用的是不存在的字符\x1F,另外,json里面有转义,但是我们不需要对其进行解析,也关闭,用到不存在的字符\x1E

  • 查看外部表
select *from ext_json;

转换数据,使用pg自带的函数,能完成数据的解析。

mapping加载

增加了列属性,以及映射。

VERSION: 1.0.0.1
DATABASE: gpdb
USER: gpadmin
HOST: etl
PORT: 5432
GPLOAD:
  INPUT:
  - SOURCE:
      FILE:
         - demo.txt
  - FORMAT: csv
  - COLUMNS: 
    - col1: int
    - col2: decimal(5,2)
  OUTPUT:
  - TABLE: testgpload
  - MODE: insert
  - MAPPING:
      id: col1
      value: (col2 - 32) * 5 / 9
      updated: now()

总结:

  • gpload通过mapping实现数据加工
  • mapping可以为任何合法的表达式(expression)
  • insert和merge模式都支持

gpload加载json

跟mapping很像。

VERSION:1.0.0.1
DATABASE: test
USER: gpadmin
HOST: mdw
PORT:5432
GPLOAD:
  INPUT:
    - SOURCE:
        FILE:
        - prize.jsonl
    - FORMAT: text
    - DELIMITER: E'\x1F'
    - ESCAPE: E'\x1E'
    - COLUMNS:
      - data: json
  OUTPUT:
   - TABLE: prize
   - MODE: insert
   - MAPPING:
       year: (data->>'year')::int
       category: data->>'category'
       first_name: json_array_elements(data->'laureates')::json->>'firstname'
       surname: json_array_elements(data->'laureates')::json->>'surname'

正文

gpfdist

gpfdist [-d directory] [-p http_port] [-P last_http_port] [-l log_file]
   [-t timeout] [-S] [-w time] [-v | -V] [-s] [-m max_length]
   [--ssl certificate_path [--sslclean wait_time] ]
   [-c config.yml]

gpfdist -? | --help 

gpfdist --version

示例


gpfdist -d /var/load_files -p 8081 &

gpfdist -d /var/load_files -p 8081 -l /home/gpadmin/log &

#使用kill 杀掉后台进程

gpload

gpload -f control_file [-l log_file] [-h hostname] [-p port] 
   [-U username] [-d database] [-W] [--gpfdist_timeout seconds] 
   [--no_auto_trans] [--max_retries retry_times] [[-v | -V] [-q]] [-D]

gpload -? 

gpload --version

yaml配置文件的语法

---
VERSION: 1.0.0.1
DATABASE: db_name
USER: db_username
HOST: master_hostname
PORT: master_port
GPLOAD:
   INPUT:
    - SOURCE:
         LOCAL_HOSTNAME:
           - hostname_or_ip
         PORT: http_port
       | PORT_RANGE: [start_port_range, end_port_range]
         FILE: 
           - /path/to/input_file
         SSL: true | false
         CERTIFICATES_PATH: /path/to/certificates
    - FULLY_QUALIFIED_DOMAIN_NAME: true | false
    - COLUMNS:
           - field_name: data_type
    - TRANSFORM: 'transformation'
    - TRANSFORM_CONFIG: 'configuration-file-path' 
    - MAX_LINE_LENGTH: integer 
    - FORMAT: text | csv
    - DELIMITER: 'delimiter_character'
    - ESCAPE: 'escape_character' | 'OFF'
    - NULL_AS: 'null_string'
    - FILL_MISSING_FIELDS: true | false
    - FORCE_NOT_NULL: true | false
    - QUOTE: 'csv_quote_character'
    - HEADER: true | false
    - ENCODING: database_encoding
    - ERROR_LIMIT: integer
    - LOG_ERRORS: true | false
   EXTERNAL:
      - SCHEMA: schema | '%'
   OUTPUT:
    - TABLE: schema.table_name
    - MODE: insert | update | merge
    - MATCH_COLUMNS:
           - target_column_name
    - UPDATE_COLUMNS:
           - target_column_name
    - UPDATE_CONDITION: 'boolean_condition'
    - MAPPING:
              target_column_name: source_column_name | 'expression'
   PRELOAD:
    - TRUNCATE: true | false
    - REUSE_TABLES: true | false
    - STAGING_TABLE: external_table_name
    - FAST_MATCH: true | false
   SQL:
    - BEFORE: "sql_command"
    - AFTER: "sql_command"

关于出错的一些选项

  • FILL_MISSING_FIELDS

    可选的,默认是false。针对尾行不完整的场景。设置为true的时候,当读到不完整的行时,不存在的字段会被设置为NULL

    感觉用处不大。错误的数据,肯定不希望用这种方式来加载进去。另外,如果确实存缺失的列,也可以自己重建表。

  • ERROR_LIMIT

    可选。当开启该功能时,加载数据时,错误会隔离到单行的错误。

    说直白点,设置了这个数字,即错误没有达到最大错误时,所有的正常数据都会被加载,错误的数据,可能会被记录下来。这个功能,还有有用的,针对局部错误的行。

  • LOG_ERRORS

    依赖前面ERROR_LIMIT设置。使用下面的方式,读到错误的日志。

    gp_read_error_log('table_name');