gpkafka

资源

总览

  • gpss

    这个应该是个服务端,会开启etl服务。这个服务端,不必跟gp一个机器,也不必跟kdfka的server共享一个机器。故,分开部署。假想的场景是,我另布一个服务在一台新的机器上,单独跑etl服务,然后呢,往gp中导数据。

  • gpsscli

    这个是客户端,用来向gpss服务中,提交新的任务的。

  • gpkafka

    这个功能貌似更强大。是gpss跟gpsscli的合体。只需要一次操作,就能提交任务。有点像gpload对gpdist的封装。用更少的操作获得更强大的功能。

视频截图+文字视频内容

一、gpkafka是什么?

gpkafka是Greenplum数据库的一个组件,支持精准一次处理来自kafka的数据,基于Greenplum成熟的处理能力,可以支持–固定窗口、滑动窗口和会话窗口 –<实时数据处理里的重要概念>
的数据计算,适用于实时数据仓库及准实时数据处理,简单易用,容易入门和掌握。

https : //www.jianshu.com/p/82a2bfccc796--里面有谷歌关于流处理论文的原文和译文。

二、实时数据处理的一些典型应用场景:

  • 电商与市场营销:数据报表、广告投放、实时大屏、流程监控、实时推荐、实时数仓##物联网(IOT):传感器实时数据采集和显示,实时告警等
  • 金融业:实时监控、实时估值、实时风控、实时报表、实时大屏、实时数仓等

四、gpkafka实战

环境说明:

1)本次基于Greenplum 6.5 ,gpss 1.3.5,kafka_2.12-2.4.1.tgz,zookeeper 3.4.10(笔记本虚拟机)

2)创建测试的kafka topic

kafka-topics.sh --create --zookeeper hadoop201:2181 --replication-factor 1 --partitions 3 --topic gpss_test

3)开启生产者客户端,并生产相应的数据(##在生产之前,先测试topic生产者是否正常##)
生产者生产数据(order_info.csv是一个伪造的客户订单表,总数据量为972万)

kafka-console-producer.sh --broker-list hadoop201:9092 --topic gpss_test < order_info.csv

4)在gpdb集群上,消费来自kafka的数据

## start gpss服务
gpss gpsscfg_ex.json --log-dir ./gpsslogs &
# 查看当前的任务
gpsscli list --all --gpss-host 10.10.10.103 --gpss-port 5019
# 提交任务
gpsscli submit --name onders_merge --gpss-host 10.10.10.103 --gpss-port 5019 ./gpkafka_merge.yaml
gpsscli submit --name orders_insert --gpss-host 10.10.10.103 --gpss-port 5019 ./gpkafka_insert.yaml
gpsscli submit --name onders_update --gpss-host 10.10.10.103 --gpsS-port 5019 ./gpkafka_update.yaml



# 再次查看任务
gpsscli list --all --gpss-host 10.10.10.103 --gpss-port 5019
# 启动任务
gpsscli start orders_merge --gpss-host 10.10.10.103 --gpss-port 5019
gpsscli start orders_update --gpss-host 10.10.10.103 --gpss-port 5019
gpsscli start orders_insert --gpss-host 10.10.10.103 --gpss-port 5019
# 停止任务
gpsscli stop orders_merge --gpss-host 10.10.10.103 --gpss-port 5019
gpsscli stop orders_update --gpss-host 10.10.10.103 --gpss-port 5019
gpsscli stop orders_insert --gpss-host 10.10.10.103 --gpss-port 5019
# 任务状态
gpsscli status orders_merge --gpss-host 10.10.10.103 --gpss-port 5019

5)同时实时对数据进行查询,如统计客户数量,统计订单金额。
生产系统==>kafka ==> Greenplum ==>kafka ==>业务系统(实现数据决策的闭环)。

官方文档

psscli帮助文档

gpsscli provides subcommands to manage Greenplum Stream Server load jobs
and to view job status, progress, and history.

Usage:
  gpsscli [command]

Available Commands:
  convert     converts V1/V2 style config yaml file to new style yaml file
  help        Help about any command
  list        List gpss jobs and status
  load        Load data from kafka into greenplum
  progress    progress loading data from kafka into greenplum
  remove      Remove job from stream server
  shadow      shadow your password using the shadow key in config or a default key
  start       Start loading data from kafka into greenplum
  status      Show GPSS job status
  stop        Stop a gpss job
  submit      Submit a job which loads data from kafka into greenplum
  wait        Wait a gpss job until stop
Flags:
      --config string      gpsscli JSON configuration file
      --gpss-host string   gpss host address (default "127.0.0.1")
      --gpss-port string   gpss host port (default "5000")
  -h, --help               help for gpsscli
  -l, --log-dir string     log directory, default is $HOME/gpAdminLogs
      --no-check-ca        allow connections to gpss server without certs
      --verbose            enable debug log
      --version            version for gpsscli

Use "gpsscli [command] --help" for more information about a command.

gpkafka-v2.yaml

https://gpdb.docs.pivotal.io/streaming-server/1-3-3/kafka/gpkafka-yaml-v2.html

配置文件格式如下:

DATABASE: db_name
USER: user_name
PASSWORD: password
HOST: host
PORT: greenplum_port
VERSION: 2
KAFKA:
   INPUT:
      SOURCE:
        BROKERS: kafka_broker_host:broker_port [, ... ]
        TOPIC: kafka_topic
      [VALUE:
        COLUMNS:
           - NAME: { column_name | __IGNORED__ }
             TYPE: column_data_type
           [ ... ]
         FORMAT: value_data_format
         [[DELIMITED_OPTION:
            DELIMITER: delimiter_string] |
         [AVRO_OPTION:
            SCHEMA_REGISTRY_ADDR: http://schemareg_host:schemareg_port [, ... ]]] |
         [CUSTOM_OPTION:
            NAME: udf_name
            PARAMSTR: udf_parameter_string]]
      [KEY:
        COLUMNS:
           - NAME: { column_name | __IGNORED__ }
             TYPE: column_data_type
           [ ... ]
         FORMAT: key_data_format
         [[DELIMITED_OPTION:
            DELIMITER: delimiter_string] |
         [AVRO_OPTION:
            SCHEMA_REGISTRY_ADDR: http://schemareg_host:schemareg_port [, ... ]] |
         [CUSTOM_OPTION:
            NAME: udf_name
            PARAMSTR: udf_parameter_string]]
      [FILTER: filter_string]
      ERROR_LIMIT: { num_errors | percentage_errors }
   OUTPUT:
      [SCHEMA: output_schema_name]
      TABLE: table_name
      [MODE: mode]
      [MATCH_COLUMNS: 
         - match_column_name
         [ ... ]]
      [UPDATE_COLUMNS: 
         - update_column_name
         [ ... ]]
      [UPDATE_CONDITION: update_condition]
      [MAPPING: 
         - NAME: target_column_name
           EXPRESSION: { source_column_name | expression } 
         [ ... ]
           |
         target_column_name : { source_column_name | expression }
         [ ... ] ]
   [METADATA:
      [SCHEMA: metadata_schema_name]]
   COMMIT:
      MAX_ROW: num_rows
      MINIMAL_INTERVAL: wait_time
   [POLL:
      BATCHSIZE: num_records
      TIMEOUT: poll_time]
   [TASK:
      POST_BATCH_SQL: udf_or_sql_to_run
      BATCH_INTERVAL: num_batches]
   [PROPERTIES:
      kafka_property_name: kafka_property_value
      [ ... ]]

个人实战记录

启动gpss

gpss4ic.json

{
    "ListenAddress": {
        "Host": "",
        "Port": 50007
    },
    "Gpfdist": {
        "Host": "",
        "Port": 8319,
        "ReuseTables": false
    }
}
nohup gpss gpss4ic.json > gpss_server.log 2>&1 &

启动无任何报错

创建kafka

搭建服务

kafka服务端

0.0.0.0:9092:9092 是因为机器默认是tcp6,主要是解决这个问题。

version: '2'
services:
  zookeeper:
    image: 10.131.9.12:5000/wurstmeister/zookeeper
    ports:
      - "0.0.0.0:2181:2181"
  kafka:
    image: 10.131.9.12:5000/wurstmeister/kafka
    depends_on: [ zookeeper ]
    ports:
      - "0.0.0.0:9092:9092"
    environment:
      KAFKA_ADVERTISED_HOST_NAME: 10.172.41.206
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
    volumes:
      - /var/run/docker.sock:/var/run/docker.sock

注意:这里面有几个坑,可能设置了卷,所以,及时每次重新启动,之前旧的topic都还在。

# 错误的配置,导致启动任务时,gsscli 无法正常start任务
KAFKA_ADVERTISED_HOST_NAME: 127.0.0.1  

启动

hostname -I
docker-compose up -d
docker exec -it kafka_kafka_1 bash
  • 错误的命令方式。gpss_test

最好以.分隔不同的主题。

kafka-topics.sh --create --topic gpss_test --partitions 4 --zookeeper kafka_zookeeper_1:2181 --replication-factor 1 

提示

WARNING: Due to limitations in metric names, topics with a period (‘.’) or underscore (‘_’) could collide. To avoid issues it is best to use either, but not both.

  • 正确的方式
kafka-topics.sh --create --topic gpsstest2 --partitions 4 --zookeeper kafka_zookeeper_1:2181 --replication-factor 1 

这次没有警告信息。

text加载方式

配置yaml

  1. 加载以”|”分割的流数据的配置文件 kafka_testdata_delimited.yaml
DATABASE: gpdb
USER: gpadmin
PASSWORD: changeme
HOST: localhost
PORT: 5432
VERSION: 2
KAFKA:
   INPUT:
      SOURCE:
         BROKERS: 10.172.41.206:9092
         TOPIC: gpsstest2
      VALUE:
         COLUMNS:
           - NAME: tid
             TYPE: integer
           - NAME: tcode
             TYPE: varchar
           - NAME: tname
             TYPE: varchar
         FORMAT: delimited
         DELIMITED_OPTION:
           DELIMITER: "|"
      ERROR_LIMIT: 25
   OUTPUT:
      SCHEMA: yunchou
      TABLE: test_heap
   METADATA:
      SCHEMA: yunchou
   COMMIT:
      MINIMAL_INTERVAL: 2000
   POLL:
      BATCHSIZE: 100
      TIMEOUT: 3000

说明

# METADATA不知道什么意思   暂时跟上面一致
 METADATA:
   #SCHEMA: yunchou
   SCHEMA: gpkafka_internal

错误

配置gp数据库中不存在的SCHEMA,会导致有如下错误:

配置成已有的SCHEMA,问题可解决。

20210813:14:45:51 gpsscli:gpadmin:szc49030:113161-[ERROR]:-start job failed, err: rpc error: code = Unknown desc = initJob failed: InitJob: failed to create external table: "gpkafka_internal"."gpkafkaloadext_8f5ceb5aa8489a5a5e1bd22e1a932c2c", sql: CREATE EXTERNAL TABLE "gpkafka_internal"."gpkafkaloadext_8f5ceb5aa8489a5a5e1bd22e1a932c2c"(tid integer, tcode varchar, tname varchar)
        LOCATION('gpfdist://szc49030:8319/gpkafkaload/%22gpkafka_internal%22.%22gpkafkaloadext_8f5ceb5aa8489a5a5e1bd22e1a932c2c%22')
        FORMAT 'CUSTOM'(formatter='delimited_in', delimiter='|') LOG ERRORS SEGMENT REJECT LIMIT 25 ROWS: pq: schema "gpkafka_internal" does not exist

kafka配置不正确,导致从kafka获得到的broker总是127.0.0.1:9092

重新部署kafka服务,问题得以解决。即: KAFKA_ADVERTISED_HOST_NAME: 10.172.41.206

20210813:14:00:33 gpsscli:gpadmin:szc49030:104644-[ERROR]:-start job failed, err: rpc error: code = Unknown desc = initJob failed: InitJob: getPartitionsMetadata: connecting [127.0.0.1:9092] failed: dial tcp 127.0.0.1:9092: connect: connection refused

依然无法解决的问题。

20210813:16:03:17 gpsscli:gpadmin:szc49030:129862-[ERROR]:-start job failed, err: rpc error: code = Unknown desc = initJob failed: InitJob: getPartitionsMetadata: reader: kafkacsvdata fail to get kafka meta data: Local: Broker transport failure

任务管理

# --gpss-host 启动的 gpss监听的地址   --gpss-port 50007端口 这说明 gpss不必运行在跟gp同一台机器上

# 创建任务
gpsscli submit --name kafkacsvdata --gpss-port 50007 --gpss-host localhost ./kafka_testdata_delimited.yaml
# 输出如下
# 20210813:13:39:13 gpsscli:gpadmin:szc49030:100581-[INFO]:-JobID: adc3e38aea0a7a710a0ab5321d3f759e,JobName: kafkacsvdata

# 查看任务
# gpsscli list --all --gpss-port 50007 --gpss-host mdw
gpsscli list --all --gpss-port 50007 --gpss-host localhost

发送kafka数据

创建文件

cat > test_heap.csv <<EOL

内容

1|aa|2002
1|bb|2003
1|cc|2004
1|dd|2005
# 发布  10.172.41.206:9092
#kafka-console-producer.sh --topic=gpsstest --broker-list kafka_kafka_1:9092 < test_heap.csv
kafka-console-producer.sh --topic=gpsstest2 --broker-list kafka_kafka_1:9092 < test_heap.csv
# 发布成功后  每条输出一个 > 

# 消费者查看 
kafka-console-consumer.sh --bootstrap-server kafka_kafka_1:9092 --from-beginning --topic gpsstest
# 能查看到刚在的输入

创建表结构,没有表结构,貌似没有办法直接接收到消息

CREATE TABLE "yunchou"."test_heap" (
    tid integer,
    tcode varchar(255),
    tname varchar(255)
)  WITH (OIDS=FALSE);

gpsscli控制任务

# 启动任务
gpsscli start kafkacsvdata --gpss-host localhost --gpss-port 50007

# 停止任务
gpsscli stop kafkacsvdata --gpss-host localhost --gpss-port 50007

# 删除任务
gpsscli  remove kafkacsvdata --gpss-port 50007  

json加载

配置yaml

问题,如何加载key。key也要入库。BROKERS多个之间,用英文逗号分隔。

kafka_testdata_json.yaml

DATABASE: gpdb
USER: gpadmin
PASSWORD: changeme
HOST: localhost
PORT: 5432
VERSION: 2
KAFKA:
   INPUT:
      SOURCE:
         BROKERS: 10.172.32.31:1012,10.172.32.32:1012,10.172.32.33:1012
         TOPIC: ordercenter.reorganize.v2.order
      VALUE:
        COLUMNS:
           - NAME: jdata
             TYPE: json
        FORMAT: json
      ERROR_LIMIT: 25
   OUTPUT:
      SCHEMA: yunchou
      TABLE: test_order
      MAPPING:
        - NAME: businessOrderId
          EXPRESSION: (jdata->>'businessOrderId')::varchar
   METADATA:
      SCHEMA: yunchou
   COMMIT:
      MINIMAL_INTERVAL: 2000
   POLL:
      BATCHSIZE: 100
      TIMEOUT: 3000

创建存储的表

CREATE TABLE "yunchou"."test_order" (
    businessOrderId varchar(255)
)  WITH (OIDS=FALSE);

任务管理

由于直接采用人的kafka,作为消费者接入,故省略了kafka的生产者过程。

gpsscli submit --name kafkajsondata --gpss-port 50007 --gpss-host localhost ./kafka_testdata_json.yaml 


# 查看任务
# gpsscli list --all --gpss-port 50007 --gpss-host mdw
gpsscli list --all --gpss-port 50007 --gpss-host localhost

# 启动任务
gpsscli start kafkajsondata --gpss-host localhost --gpss-port 50007
gpsscli status kafkajsondata --gpss-host localhost --gpss-port 50007

使用别人搭建的,貌似一下就成功了。不知道为何。

csv入库

load-from-kafka-example.html

配置yaml

firstload_cfg.yaml

DATABASE: gpdb
USER: gpadmin
HOST: localhost
PORT: 5432
KAFKA:
   INPUT:
     SOURCE:
        BROKERS: 10.172.41.206:9092
        TOPIC: topic_for_gpkafka
     COLUMNS:
        - NAME: cust_id
          TYPE: int
        - NAME: __IGNORED__
          TYPE: int
        - NAME: expenses
          TYPE: decimal(9,2)
     FORMAT: csv
     ERROR_LIMIT: 125
   OUTPUT:
     TABLE: data_from_kafka
     MAPPING:
        - NAME: customer_id
          EXPRESSION: cust_id
        - NAME: expenses
          EXPRESSION: expenses
        - NAME: tax_due
          EXPRESSION: expenses * .0725
   COMMIT:
     MINIMAL_INTERVAL: 2000

在末尾增加了消费者的group.id,首先配置它认了,但是貌似没有起作用一样,服务端,它并没有查到该消费者,而且,gpss运行,插入的数据,并没有重复,难道它在客户端实现了维护数据的功能?

PROPERTIES: 
  group.id: yunchouhello

存储的表结构

CREATE TABLE public.data_from_kafka( customer_id int8, expenses decimal(9,2),
           tax_due decimal(7,2) );

如果表不存在,则报如下错误:

20210816:14:34:16 gpsscli:gpadmin:szc49030:105471-[ERROR]:-start job failed, err: rpc error: code = Unknown desc = initJob failed: InitJob: pq: relation "public.data_from_kafka" does not exist

任务管理

发送kafka数据

cat > sample_data.csv <<EOL
"1313131","12","1313.13"
"3535353","11","761.35"
"7979797","10","4489.00"
"7979797","11","18.72"
"3535353","10","6001.94"
"7979797","12","173.18"
"1313131","10","492.83"
"3535353","12","81.12"
"1313131","11","368.27"
docker exec -it kafka_kafka_1 bash

# 创建topic
# kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1  --topic topic_for_gpkafka
kafka-topics.sh --create --topic topic_for_gpkafka --partitions 1 --zookeeper kafka_zookeeper_1:2181 --replication-factor 1 
    
# 发布数据
kafka-console-producer.sh \
    --broker-list 10.172.41.206:9092 \
    --topic topic_for_gpkafka < sample_data.csv

# 验证数据
kafka-console-consumer.sh \
    --bootstrap-server 10.172.41.206:9092 --topic topic_for_gpkafka \
    --from-beginning

gpsscli控制任务

gpsscli submit --name topic_for_gpkafka --gpss-port 50007 --gpss-host localhost ./firstload_cfg.yaml
# 启动任务
gpsscli start topic_for_gpkafka --gpss-host localhost --gpss-port 50007

# 列出任务
gpsscli list --all --gpss-port 50007 --gpss-host localhost

# 停止任务
gpsscli stop topic_for_gpkafka --gpss-host localhost --gpss-port 50007

# 删除任务
gpsscli  remove topic_for_gpkafka --gpss-port 50007  

启动任务正常大概有如下:

20210816:14:36:27 gpsscli:gpadmin:szc49030:105944-[INFO]:-Job topic_for_gpkafka is started

查看,数据也能正常的入到kafka中。

gpkfka

gpkafka load  firstload_cfg.yaml

貌似不冲突,跟上面差不多,只不过会卡在一个界面上。

avro json

https://gpdb.docs.pivotal.io/streaming-server/1-3-3/kafka/load-key-value-example.html

配置yaml

avrokvload_cfg.yaml

DATABASE: gpdb
USER: gpadmin
HOST: localhost
PORT: 5432
VERSION: 2
KAFKA:
   INPUT:
     SOURCE:
        BROKERS: 10.172.41.206:9092
        TOPIC: topic_avrokv
     VALUE:
        COLUMNS:
          - NAME: c1
            TYPE: json
        FORMAT: avro
        #AVRO_OPTION:
          #SCHEMA_REGISTRY_ADDR: http://localhost:8081
     KEY:
        COLUMNS:
          - NAME: id
            TYPE: json
        FORMAT: avro
        #AVRO_OPTION:
          #SCHEMA_REGISTRY_ADDR: http://localhost:8081
     ERROR_LIMIT: 0
   OUTPUT:
     TABLE: avrokv_from_kafka
     MAPPING:
        - NAME: id
          EXPRESSION: id
        - NAME: customer_id
          EXPRESSION: (c1->>'cust_id')::int
        - NAME: year
          EXPRESSION: (c1->>'year')::int
        - NAME: expenses
          EXPRESSION: array(select json_array_elements(c1->'expenses')::text::float)
   COMMIT:
     MINIMAL_INTERVAL: 2000

存储的表结构

CREATE TABLE avrokv_from_kafka( id json, customer_id int, year int, expenses decimal(9,2)[] );

任务管理

发送kafka数据

注意,才此的数据格式,key跟value之间要包含一个tab键,下面的TAB要转换为真实的TAB。

1 TAB {"cust_id":1313131, "year":2012, "expenses":[1313.13, 2424.24]}

注意,下面的包含tab。

1	{"cust_id":1313131, "year":2012, "expenses":[1313.13, 2424.24]}
2	{"cust_id":3535353, "year":2011, "expenses":[761.35, 92.18, 14.41]}
3	{"cust_id":7979797, "year":2011, "expenses":[4489.00]}

发现压根儿没有kafka-avro-console-producer这一系列命令。

# 创建主题
kafka-topics.sh --create --topic topic_avrokv --partitions 1 --zookeeper kafka_zookeeper_1:2181 --replication-factor 1 

# 为该主题创建scheme
kafka-avro-console-producer \
    --broker-list localhost:9092 \
    --topic topic_avrokv \
    --property parse.key=true --property key.schema='{"type" : "int", "name" : "id"}' \
    --property value.schema='{ "type" : "record", "name" : "example_schema", "namespace" : "com.example", "fields" : [ { "name" : "cust_id", "type" : "int", "doc" : "Id of the customer account" }, { "name" : "year", "type" : "int", "doc" : "year of expense" }, { "name" : "expenses", "type" : {"type": "array", "items": "float"}, "doc" : "Expenses for the year" } ], "doc:" : "A basic schema for storing messages" }'
    
    
# 验证数据
kafka-avro-console-consumer \
    --bootstrap-server localhost:9092 --topic topic_avrokv \
    --from-beginning --property print.key=true

gpsscli控制任务

gpkafka

gpkafka load --quit-at-eof ./avrokvload_cfg.yaml

个人测试

貌似这种没有办法直接存,难道要让我写存储过程?

简单的方式,原因将数据存入到数据库中,然后再写脚本,定时将数据select into 到表中。暂缓。

[{"shipId":"5300010236284","type":"0201"}]

marked_ship_kafka.yaml

DATABASE: gpdb
USER: gpadmin
HOST: localhost
PORT: 5432
VERSION: 2
KAFKA:
   INPUT:
      SOURCE:
         BROKERS: 10.172.32.30:8090,10.172.32.15:8090,10.172.32.14:8090
         TOPIC: KAFKA-INTERNAL-MARKING-TOPIC
      VALUE:
        COLUMNS:
           - NAME: jdata
             TYPE: json
        FORMAT: json
      ERROR_LIMIT: 25
   OUTPUT:
      SCHEMA: yunchou
      TABLE: marked_ship_kafka
      MAPPING:
        - NAME: ship_id
          EXPRESSION: (jdata->>'shipId')::varchar
        - NAME: ship_type
          EXPRESSION: (jdata->>'type')::varchar
        - NAME: insert_time
          EXPRESSION: now()
   METADATA:
      SCHEMA: yunchou
   COMMIT:
      MINIMAL_INTERVAL: 2000
   POLL:
      BATCHSIZE: 100
      TIMEOUT: 3000
-- 运单打标,kafka源
CREATE TABLE yunchou.marked_ship_kafka (
    ship_id varchar(60),
    ship_type varchar(60),
    insert_time timestamp without time zone
)
with (appendonly=true, compresslevel=5, orientation=column, compresstype=zlib)
Distributed by (ship_id)
partition by range (insert_time) 
(
    PARTITION pn START ('2021-07-01 00:00:00'::timestamp without time zone) END ('2023-01-01 00:00:00'::timestamp without time zone) EVERY ('1 day'::interval), 
    DEFAULT PARTITION pdefault
);
COMMENT ON COLUMN yunchou.marked_ship_kafka.ship_id IS '运单号';
COMMENT ON COLUMN yunchou.marked_ship_kafka.ship_type IS '运单标记';
COMMENT ON COLUMN yunchou.marked_ship_kafka.insert_time IS '入库时间';
gpsscli submit --name marked_ship_kafka --gpss-port 50007 --gpss-host localhost ./marked_ship_kafka.yaml
gpsscli start marked_ship_kafka --gpss-port 50007 --gpss-host localhost
gpsscli list --all  --gpss-port 50007 --gpss-host localhost