gpkafka
资源
- 用gpss从kafka消费数据加载到greenplum
- gp官方使用gpss的例子
- 同上
- GPkafka-Kafka数据导入GreenPlum
- infoq:GPkafka-Kafka 数据导入 GreenPlum 实践
总览
gpss
这个应该是个服务端,会开启etl服务。这个服务端,不必跟gp一个机器,也不必跟kdfka的server共享一个机器。故,分开部署。假想的场景是,我另布一个服务在一台新的机器上,单独跑etl服务,然后呢,往gp中导数据。
gpsscli
这个是客户端,用来向gpss服务中,提交新的任务的。
gpkafka
这个功能貌似更强大。是gpss跟gpsscli的合体。只需要一次操作,就能提交任务。有点像gpload对gpdist的封装。用更少的操作获得更强大的功能。
视频截图+文字视频内容
一、gpkafka是什么?
gpkafka是Greenplum数据库的一个组件,支持精准一次处理来自kafka的数据,基于Greenplum成熟的处理能力,可以支持–固定窗口、滑动窗口和会话窗口 –<实时数据处理里的重要概念>
的数据计算,适用于实时数据仓库及准实时数据处理,简单易用,容易入门和掌握。
https : //www.jianshu.com/p/82a2bfccc796--里面有谷歌关于流处理论文的原文和译文。
二、实时数据处理的一些典型应用场景:
- 电商与市场营销:数据报表、广告投放、实时大屏、流程监控、实时推荐、实时数仓##物联网(IOT):传感器实时数据采集和显示,实时告警等
- 金融业:实时监控、实时估值、实时风控、实时报表、实时大屏、实时数仓等
四、gpkafka实战
环境说明:
1)本次基于Greenplum 6.5 ,gpss 1.3.5,kafka_2.12-2.4.1.tgz,zookeeper 3.4.10(笔记本虚拟机)
2)创建测试的kafka topic
kafka-topics.sh --create --zookeeper hadoop201:2181 --replication-factor 1 --partitions 3 --topic gpss_test
3)开启生产者客户端,并生产相应的数据(##在生产之前,先测试topic生产者是否正常##)
生产者生产数据(order_info.csv是一个伪造的客户订单表,总数据量为972万)
kafka-console-producer.sh --broker-list hadoop201:9092 --topic gpss_test < order_info.csv
4)在gpdb集群上,消费来自kafka的数据
## start gpss服务
gpss gpsscfg_ex.json --log-dir ./gpsslogs &
# 查看当前的任务
gpsscli list --all --gpss-host 10.10.10.103 --gpss-port 5019
# 提交任务
gpsscli submit --name onders_merge --gpss-host 10.10.10.103 --gpss-port 5019 ./gpkafka_merge.yaml
gpsscli submit --name orders_insert --gpss-host 10.10.10.103 --gpss-port 5019 ./gpkafka_insert.yaml
gpsscli submit --name onders_update --gpss-host 10.10.10.103 --gpsS-port 5019 ./gpkafka_update.yaml
# 再次查看任务
gpsscli list --all --gpss-host 10.10.10.103 --gpss-port 5019
# 启动任务
gpsscli start orders_merge --gpss-host 10.10.10.103 --gpss-port 5019
gpsscli start orders_update --gpss-host 10.10.10.103 --gpss-port 5019
gpsscli start orders_insert --gpss-host 10.10.10.103 --gpss-port 5019
# 停止任务
gpsscli stop orders_merge --gpss-host 10.10.10.103 --gpss-port 5019
gpsscli stop orders_update --gpss-host 10.10.10.103 --gpss-port 5019
gpsscli stop orders_insert --gpss-host 10.10.10.103 --gpss-port 5019
# 任务状态
gpsscli status orders_merge --gpss-host 10.10.10.103 --gpss-port 5019
5)同时实时对数据进行查询,如统计客户数量,统计订单金额。
生产系统==>kafka ==> Greenplum ==>kafka ==>业务系统(实现数据决策的闭环)。
官方文档
psscli帮助文档
gpsscli provides subcommands to manage Greenplum Stream Server load jobs
and to view job status, progress, and history.
Usage:
gpsscli [command]
Available Commands:
convert converts V1/V2 style config yaml file to new style yaml file
help Help about any command
list List gpss jobs and status
load Load data from kafka into greenplum
progress progress loading data from kafka into greenplum
remove Remove job from stream server
shadow shadow your password using the shadow key in config or a default key
start Start loading data from kafka into greenplum
status Show GPSS job status
stop Stop a gpss job
submit Submit a job which loads data from kafka into greenplum
wait Wait a gpss job until stop
Flags:
--config string gpsscli JSON configuration file
--gpss-host string gpss host address (default "127.0.0.1")
--gpss-port string gpss host port (default "5000")
-h, --help help for gpsscli
-l, --log-dir string log directory, default is $HOME/gpAdminLogs
--no-check-ca allow connections to gpss server without certs
--verbose enable debug log
--version version for gpsscli
Use "gpsscli [command] --help" for more information about a command.
gpkafka-v2.yaml
https://gpdb.docs.pivotal.io/streaming-server/1-3-3/kafka/gpkafka-yaml-v2.html
配置文件格式如下:
DATABASE: db_name
USER: user_name
PASSWORD: password
HOST: host
PORT: greenplum_port
VERSION: 2
KAFKA:
INPUT:
SOURCE:
BROKERS: kafka_broker_host:broker_port [, ... ]
TOPIC: kafka_topic
[VALUE:
COLUMNS:
- NAME: { column_name | __IGNORED__ }
TYPE: column_data_type
[ ... ]
FORMAT: value_data_format
[[DELIMITED_OPTION:
DELIMITER: delimiter_string] |
[AVRO_OPTION:
SCHEMA_REGISTRY_ADDR: http://schemareg_host:schemareg_port [, ... ]]] |
[CUSTOM_OPTION:
NAME: udf_name
PARAMSTR: udf_parameter_string]]
[KEY:
COLUMNS:
- NAME: { column_name | __IGNORED__ }
TYPE: column_data_type
[ ... ]
FORMAT: key_data_format
[[DELIMITED_OPTION:
DELIMITER: delimiter_string] |
[AVRO_OPTION:
SCHEMA_REGISTRY_ADDR: http://schemareg_host:schemareg_port [, ... ]] |
[CUSTOM_OPTION:
NAME: udf_name
PARAMSTR: udf_parameter_string]]
[FILTER: filter_string]
ERROR_LIMIT: { num_errors | percentage_errors }
OUTPUT:
[SCHEMA: output_schema_name]
TABLE: table_name
[MODE: mode]
[MATCH_COLUMNS:
- match_column_name
[ ... ]]
[UPDATE_COLUMNS:
- update_column_name
[ ... ]]
[UPDATE_CONDITION: update_condition]
[MAPPING:
- NAME: target_column_name
EXPRESSION: { source_column_name | expression }
[ ... ]
|
target_column_name : { source_column_name | expression }
[ ... ] ]
[METADATA:
[SCHEMA: metadata_schema_name]]
COMMIT:
MAX_ROW: num_rows
MINIMAL_INTERVAL: wait_time
[POLL:
BATCHSIZE: num_records
TIMEOUT: poll_time]
[TASK:
POST_BATCH_SQL: udf_or_sql_to_run
BATCH_INTERVAL: num_batches]
[PROPERTIES:
kafka_property_name: kafka_property_value
[ ... ]]
个人实战记录
启动gpss
gpss4ic.json
{
"ListenAddress": {
"Host": "",
"Port": 50007
},
"Gpfdist": {
"Host": "",
"Port": 8319,
"ReuseTables": false
}
}
nohup gpss gpss4ic.json > gpss_server.log 2>&1 &
启动无任何报错
创建kafka
搭建服务
kafka服务端
0.0.0.0:9092:9092 是因为机器默认是tcp6,主要是解决这个问题。
version: '2'
services:
zookeeper:
image: 10.131.9.12:5000/wurstmeister/zookeeper
ports:
- "0.0.0.0:2181:2181"
kafka:
image: 10.131.9.12:5000/wurstmeister/kafka
depends_on: [ zookeeper ]
ports:
- "0.0.0.0:9092:9092"
environment:
KAFKA_ADVERTISED_HOST_NAME: 10.172.41.206
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
volumes:
- /var/run/docker.sock:/var/run/docker.sock
注意:这里面有几个坑,可能设置了卷,所以,及时每次重新启动,之前旧的topic都还在。
# 错误的配置,导致启动任务时,gsscli 无法正常start任务
KAFKA_ADVERTISED_HOST_NAME: 127.0.0.1
启动
hostname -I
docker-compose up -d
docker exec -it kafka_kafka_1 bash
- 错误的命令方式。gpss_test
最好以
.分隔不同的主题。
kafka-topics.sh --create --topic gpss_test --partitions 4 --zookeeper kafka_zookeeper_1:2181 --replication-factor 1
提示
WARNING: Due to limitations in metric names, topics with a period (‘.’) or underscore (‘_’) could collide. To avoid issues it is best to use either, but not both.
- 正确的方式
kafka-topics.sh --create --topic gpsstest2 --partitions 4 --zookeeper kafka_zookeeper_1:2181 --replication-factor 1
这次没有警告信息。
text加载方式
配置yaml
- 加载以”|”分割的流数据的配置文件 kafka_testdata_delimited.yaml
DATABASE: gpdb
USER: gpadmin
PASSWORD: changeme
HOST: localhost
PORT: 5432
VERSION: 2
KAFKA:
INPUT:
SOURCE:
BROKERS: 10.172.41.206:9092
TOPIC: gpsstest2
VALUE:
COLUMNS:
- NAME: tid
TYPE: integer
- NAME: tcode
TYPE: varchar
- NAME: tname
TYPE: varchar
FORMAT: delimited
DELIMITED_OPTION:
DELIMITER: "|"
ERROR_LIMIT: 25
OUTPUT:
SCHEMA: yunchou
TABLE: test_heap
METADATA:
SCHEMA: yunchou
COMMIT:
MINIMAL_INTERVAL: 2000
POLL:
BATCHSIZE: 100
TIMEOUT: 3000
说明
# METADATA不知道什么意思 暂时跟上面一致
METADATA:
#SCHEMA: yunchou
SCHEMA: gpkafka_internal
错误
配置gp数据库中不存在的SCHEMA,会导致有如下错误:
配置成已有的SCHEMA,问题可解决。
20210813:14:45:51 gpsscli:gpadmin:szc49030:113161-[ERROR]:-start job failed, err: rpc error: code = Unknown desc = initJob failed: InitJob: failed to create external table: "gpkafka_internal"."gpkafkaloadext_8f5ceb5aa8489a5a5e1bd22e1a932c2c", sql: CREATE EXTERNAL TABLE "gpkafka_internal"."gpkafkaloadext_8f5ceb5aa8489a5a5e1bd22e1a932c2c"(tid integer, tcode varchar, tname varchar)
LOCATION('gpfdist://szc49030:8319/gpkafkaload/%22gpkafka_internal%22.%22gpkafkaloadext_8f5ceb5aa8489a5a5e1bd22e1a932c2c%22')
FORMAT 'CUSTOM'(formatter='delimited_in', delimiter='|') LOG ERRORS SEGMENT REJECT LIMIT 25 ROWS: pq: schema "gpkafka_internal" does not exist
kafka配置不正确,导致从kafka获得到的broker总是127.0.0.1:9092
重新部署kafka服务,问题得以解决。即: KAFKA_ADVERTISED_HOST_NAME: 10.172.41.206
20210813:14:00:33 gpsscli:gpadmin:szc49030:104644-[ERROR]:-start job failed, err: rpc error: code = Unknown desc = initJob failed: InitJob: getPartitionsMetadata: connecting [127.0.0.1:9092] failed: dial tcp 127.0.0.1:9092: connect: connection refused
依然无法解决的问题。
20210813:16:03:17 gpsscli:gpadmin:szc49030:129862-[ERROR]:-start job failed, err: rpc error: code = Unknown desc = initJob failed: InitJob: getPartitionsMetadata: reader: kafkacsvdata fail to get kafka meta data: Local: Broker transport failure
任务管理
# --gpss-host 启动的 gpss监听的地址 --gpss-port 50007端口 这说明 gpss不必运行在跟gp同一台机器上
# 创建任务
gpsscli submit --name kafkacsvdata --gpss-port 50007 --gpss-host localhost ./kafka_testdata_delimited.yaml
# 输出如下
# 20210813:13:39:13 gpsscli:gpadmin:szc49030:100581-[INFO]:-JobID: adc3e38aea0a7a710a0ab5321d3f759e,JobName: kafkacsvdata
# 查看任务
# gpsscli list --all --gpss-port 50007 --gpss-host mdw
gpsscli list --all --gpss-port 50007 --gpss-host localhost
发送kafka数据
创建文件
cat > test_heap.csv <<EOL
内容
1|aa|2002
1|bb|2003
1|cc|2004
1|dd|2005
# 发布 10.172.41.206:9092
#kafka-console-producer.sh --topic=gpsstest --broker-list kafka_kafka_1:9092 < test_heap.csv
kafka-console-producer.sh --topic=gpsstest2 --broker-list kafka_kafka_1:9092 < test_heap.csv
# 发布成功后 每条输出一个 >
# 消费者查看
kafka-console-consumer.sh --bootstrap-server kafka_kafka_1:9092 --from-beginning --topic gpsstest
# 能查看到刚在的输入
创建表结构,没有表结构,貌似没有办法直接接收到消息
CREATE TABLE "yunchou"."test_heap" (
tid integer,
tcode varchar(255),
tname varchar(255)
) WITH (OIDS=FALSE);
gpsscli控制任务
# 启动任务
gpsscli start kafkacsvdata --gpss-host localhost --gpss-port 50007
# 停止任务
gpsscli stop kafkacsvdata --gpss-host localhost --gpss-port 50007
# 删除任务
gpsscli remove kafkacsvdata --gpss-port 50007
json加载
配置yaml
问题,如何加载key。key也要入库。BROKERS多个之间,用英文逗号分隔。
kafka_testdata_json.yaml
DATABASE: gpdb
USER: gpadmin
PASSWORD: changeme
HOST: localhost
PORT: 5432
VERSION: 2
KAFKA:
INPUT:
SOURCE:
BROKERS: 10.172.32.31:1012,10.172.32.32:1012,10.172.32.33:1012
TOPIC: ordercenter.reorganize.v2.order
VALUE:
COLUMNS:
- NAME: jdata
TYPE: json
FORMAT: json
ERROR_LIMIT: 25
OUTPUT:
SCHEMA: yunchou
TABLE: test_order
MAPPING:
- NAME: businessOrderId
EXPRESSION: (jdata->>'businessOrderId')::varchar
METADATA:
SCHEMA: yunchou
COMMIT:
MINIMAL_INTERVAL: 2000
POLL:
BATCHSIZE: 100
TIMEOUT: 3000
创建存储的表
CREATE TABLE "yunchou"."test_order" (
businessOrderId varchar(255)
) WITH (OIDS=FALSE);
任务管理
由于直接采用人的kafka,作为消费者接入,故省略了kafka的生产者过程。
gpsscli submit --name kafkajsondata --gpss-port 50007 --gpss-host localhost ./kafka_testdata_json.yaml
# 查看任务
# gpsscli list --all --gpss-port 50007 --gpss-host mdw
gpsscli list --all --gpss-port 50007 --gpss-host localhost
# 启动任务
gpsscli start kafkajsondata --gpss-host localhost --gpss-port 50007
gpsscli status kafkajsondata --gpss-host localhost --gpss-port 50007
使用别人搭建的,貌似一下就成功了。不知道为何。
csv入库
配置yaml
firstload_cfg.yaml
DATABASE: gpdb
USER: gpadmin
HOST: localhost
PORT: 5432
KAFKA:
INPUT:
SOURCE:
BROKERS: 10.172.41.206:9092
TOPIC: topic_for_gpkafka
COLUMNS:
- NAME: cust_id
TYPE: int
- NAME: __IGNORED__
TYPE: int
- NAME: expenses
TYPE: decimal(9,2)
FORMAT: csv
ERROR_LIMIT: 125
OUTPUT:
TABLE: data_from_kafka
MAPPING:
- NAME: customer_id
EXPRESSION: cust_id
- NAME: expenses
EXPRESSION: expenses
- NAME: tax_due
EXPRESSION: expenses * .0725
COMMIT:
MINIMAL_INTERVAL: 2000
在末尾增加了消费者的group.id,首先配置它认了,但是貌似没有起作用一样,服务端,它并没有查到该消费者,而且,gpss运行,插入的数据,并没有重复,难道它在客户端实现了维护数据的功能?
PROPERTIES:
group.id: yunchouhello
存储的表结构
CREATE TABLE public.data_from_kafka( customer_id int8, expenses decimal(9,2),
tax_due decimal(7,2) );
如果表不存在,则报如下错误:
20210816:14:34:16 gpsscli:gpadmin:szc49030:105471-[ERROR]:-start job failed, err: rpc error: code = Unknown desc = initJob failed: InitJob: pq: relation "public.data_from_kafka" does not exist
任务管理
发送kafka数据
cat > sample_data.csv <<EOL
"1313131","12","1313.13"
"3535353","11","761.35"
"7979797","10","4489.00"
"7979797","11","18.72"
"3535353","10","6001.94"
"7979797","12","173.18"
"1313131","10","492.83"
"3535353","12","81.12"
"1313131","11","368.27"
docker exec -it kafka_kafka_1 bash
# 创建topic
# kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic topic_for_gpkafka
kafka-topics.sh --create --topic topic_for_gpkafka --partitions 1 --zookeeper kafka_zookeeper_1:2181 --replication-factor 1
# 发布数据
kafka-console-producer.sh \
--broker-list 10.172.41.206:9092 \
--topic topic_for_gpkafka < sample_data.csv
# 验证数据
kafka-console-consumer.sh \
--bootstrap-server 10.172.41.206:9092 --topic topic_for_gpkafka \
--from-beginning
gpsscli控制任务
gpsscli submit --name topic_for_gpkafka --gpss-port 50007 --gpss-host localhost ./firstload_cfg.yaml
# 启动任务
gpsscli start topic_for_gpkafka --gpss-host localhost --gpss-port 50007
# 列出任务
gpsscli list --all --gpss-port 50007 --gpss-host localhost
# 停止任务
gpsscli stop topic_for_gpkafka --gpss-host localhost --gpss-port 50007
# 删除任务
gpsscli remove topic_for_gpkafka --gpss-port 50007
启动任务正常大概有如下:
20210816:14:36:27 gpsscli:gpadmin:szc49030:105944-[INFO]:-Job topic_for_gpkafka is started
查看,数据也能正常的入到kafka中。
gpkfka
gpkafka load firstload_cfg.yaml
貌似不冲突,跟上面差不多,只不过会卡在一个界面上。
avro json
https://gpdb.docs.pivotal.io/streaming-server/1-3-3/kafka/load-key-value-example.html
配置yaml
avrokvload_cfg.yaml
DATABASE: gpdb
USER: gpadmin
HOST: localhost
PORT: 5432
VERSION: 2
KAFKA:
INPUT:
SOURCE:
BROKERS: 10.172.41.206:9092
TOPIC: topic_avrokv
VALUE:
COLUMNS:
- NAME: c1
TYPE: json
FORMAT: avro
#AVRO_OPTION:
#SCHEMA_REGISTRY_ADDR: http://localhost:8081
KEY:
COLUMNS:
- NAME: id
TYPE: json
FORMAT: avro
#AVRO_OPTION:
#SCHEMA_REGISTRY_ADDR: http://localhost:8081
ERROR_LIMIT: 0
OUTPUT:
TABLE: avrokv_from_kafka
MAPPING:
- NAME: id
EXPRESSION: id
- NAME: customer_id
EXPRESSION: (c1->>'cust_id')::int
- NAME: year
EXPRESSION: (c1->>'year')::int
- NAME: expenses
EXPRESSION: array(select json_array_elements(c1->'expenses')::text::float)
COMMIT:
MINIMAL_INTERVAL: 2000
存储的表结构
CREATE TABLE avrokv_from_kafka( id json, customer_id int, year int, expenses decimal(9,2)[] );
任务管理
发送kafka数据
注意,才此的数据格式,key跟value之间要包含一个tab键,下面的TAB要转换为真实的TAB。
1 TAB {"cust_id":1313131, "year":2012, "expenses":[1313.13, 2424.24]}
注意,下面的包含tab。
1 {"cust_id":1313131, "year":2012, "expenses":[1313.13, 2424.24]}
2 {"cust_id":3535353, "year":2011, "expenses":[761.35, 92.18, 14.41]}
3 {"cust_id":7979797, "year":2011, "expenses":[4489.00]}
发现压根儿没有kafka-avro-console-producer这一系列命令。
# 创建主题
kafka-topics.sh --create --topic topic_avrokv --partitions 1 --zookeeper kafka_zookeeper_1:2181 --replication-factor 1
# 为该主题创建scheme
kafka-avro-console-producer \
--broker-list localhost:9092 \
--topic topic_avrokv \
--property parse.key=true --property key.schema='{"type" : "int", "name" : "id"}' \
--property value.schema='{ "type" : "record", "name" : "example_schema", "namespace" : "com.example", "fields" : [ { "name" : "cust_id", "type" : "int", "doc" : "Id of the customer account" }, { "name" : "year", "type" : "int", "doc" : "year of expense" }, { "name" : "expenses", "type" : {"type": "array", "items": "float"}, "doc" : "Expenses for the year" } ], "doc:" : "A basic schema for storing messages" }'
# 验证数据
kafka-avro-console-consumer \
--bootstrap-server localhost:9092 --topic topic_avrokv \
--from-beginning --property print.key=true
gpsscli控制任务
gpkafka
gpkafka load --quit-at-eof ./avrokvload_cfg.yaml
个人测试
貌似这种没有办法直接存,难道要让我写存储过程?
简单的方式,原因将数据存入到数据库中,然后再写脚本,定时将数据select into 到表中。暂缓。
[{"shipId":"5300010236284","type":"0201"}]
marked_ship_kafka.yaml
DATABASE: gpdb
USER: gpadmin
HOST: localhost
PORT: 5432
VERSION: 2
KAFKA:
INPUT:
SOURCE:
BROKERS: 10.172.32.30:8090,10.172.32.15:8090,10.172.32.14:8090
TOPIC: KAFKA-INTERNAL-MARKING-TOPIC
VALUE:
COLUMNS:
- NAME: jdata
TYPE: json
FORMAT: json
ERROR_LIMIT: 25
OUTPUT:
SCHEMA: yunchou
TABLE: marked_ship_kafka
MAPPING:
- NAME: ship_id
EXPRESSION: (jdata->>'shipId')::varchar
- NAME: ship_type
EXPRESSION: (jdata->>'type')::varchar
- NAME: insert_time
EXPRESSION: now()
METADATA:
SCHEMA: yunchou
COMMIT:
MINIMAL_INTERVAL: 2000
POLL:
BATCHSIZE: 100
TIMEOUT: 3000
-- 运单打标,kafka源
CREATE TABLE yunchou.marked_ship_kafka (
ship_id varchar(60),
ship_type varchar(60),
insert_time timestamp without time zone
)
with (appendonly=true, compresslevel=5, orientation=column, compresstype=zlib)
Distributed by (ship_id)
partition by range (insert_time)
(
PARTITION pn START ('2021-07-01 00:00:00'::timestamp without time zone) END ('2023-01-01 00:00:00'::timestamp without time zone) EVERY ('1 day'::interval),
DEFAULT PARTITION pdefault
);
COMMENT ON COLUMN yunchou.marked_ship_kafka.ship_id IS '运单号';
COMMENT ON COLUMN yunchou.marked_ship_kafka.ship_type IS '运单标记';
COMMENT ON COLUMN yunchou.marked_ship_kafka.insert_time IS '入库时间';
gpsscli submit --name marked_ship_kafka --gpss-port 50007 --gpss-host localhost ./marked_ship_kafka.yaml
gpsscli start marked_ship_kafka --gpss-port 50007 --gpss-host localhost
gpsscli list --all --gpss-port 50007 --gpss-host localhost