plpython

postgresql中,自定义函数(udf),利用python的工具库来计算。

资源

  • postgresql的官方手册。[Chapter 46. PL/Python - Python 过程 语言 ]

  • Greenplumn从大数据战略到实现

说明

PL/Python使用的Python解释器是Greenplum自带的,位于目录$GPHOME/ext/python下面

安装python包

gpscp、gpssh两个工具辅助安装python工具包。

各节点安装pip

$ cat all_hosts
sdw1
sdw2
sdw3
$gpssh -f all_hosts
=source /usr/local/greenplum-db/greenplum_path.sh
=>wget --no-check-certificate https://bootstrap.pypa.io/get-pip.py
=>python get-pip.py
=>exit

用pip,安装python包

$ gpssh -f all_hosts
=> source /usr/local/greenplum-db/greenplum_path.sh
=> pip install numpy
=> exit

gp数据计算的Python包集合

相当于一次安装一批的软件集合。会在各个节点上安装好。

source /usr/local/greenplum-db/greenplum_path.sh
gppkg -i DataSciencePython-<version>-rhel<N>-x86_64.gppkg
source /usr/local/greenplum-db/greenplum_path.sh
gpstop -r 

gppkg将会在所有节点上安装这些包。注意,在安装这些包后需要用source命令来重新装载环境变量并重启Greenplum集群,这是因为这些Python包放在一个$GPHOME/ext/DataSciencePython/目录下面,所以必须修改PYTHONPATH、LD_LIBRARY_PATH环境变量,以便PL/Python能调用到这些包,而这些环境变量的变动保存在greenplum_path.sh文件中。

安装

语言安装

在shell环境下执行:

createlang plpythonu -d develop

-d 参数,指定数据库

删除语言:

droplang plpythonu -d postgres

出于安全考虑,Greenplum只允许数据库的超级用户启用PL/Python并编写PL/Python函数,普通用户则只能运行PL/Python函数。PL/Python使用的Python解释器是Greenplum自带的,位于目录$GPHOME/ext/python下面,版本为2.7.12

psql

create or replace function pyinc(a integer) returns int as $$
return a+1
$$ language plpythonu;

调用测试:

select pyinc(10);

删除函数

DROP FUNCTION pyinc(integer)

-- create or replace function 更改了返回类型,则需要先删除函数。

数据共享

PL/Python支持函数级别和会话级别的数据共享,即允许在函数调用时候把一些数据保存在内存,供这个函数调用或者这个会话中的其他函数使用。函数级别的数据共享是通过SD词典实现的,而会话级别的数据共享是通过GD词典实现的。使用这两个词典有时候会大幅提升性能。我们以下面的函数为例解释SD和GD的使用。

CREATE OR REPLACE FUNCTION pylog(a integer, b integer)RETURNS
double precision AS $$
import math
return math.log(a,b)
$$ LANGUAGE plpythonu;

select pylog(a, b) from tbl;

这个函数的功能是对于tbl表中的每一行计算一个log函数,其中用到了math这个Python包。Greenplum执行器在处理tbl表的每一行时会运行这个函数,而每次运行都要导入math这个Python包,这样会影响性能。但是如果把函数修改为下面这样:

CREATE OR REPLACE FUNCTION pylog(a integer,b integer) RETURNS double precision AS $$
If 'math' not in GD:
    import math
	GD['math'] = math
return GD['math'].log(a, b)
$$LANGUAGE plpythonu;

由于有了全局词典GD, math包在这个会话中只需一次import操作,后续其他函数也可以继续使用这个词典项。当然,如果不考虑其他函数的使用,这里也可以使用SD词典。SD词典是函数级别的,只能在这个函数的多次调用中被共享使用。下面是一个常用的SD例子(注意,GD是会话级别的,会话中别的函数也可以使用,因此使用的时候要注意安全性)。

CREATE FUNCTION valid_type(a text) RETURNS text AS $$
if not SD.has_key("plan"):
    SD["plan"] = plpy.prepare("SELECT fname FROM users WHERE lname = $1"[ "text"])
    rv = plpy.execute(SD["plan"],[ a ])
    if len(rv):
        return rv[0]["fname"]
    return None
$$ LANGUAGE plpythonu;

类型

参数、及返回的类型,来源于postgre本身的类型。如:

  • integer
  • text

写了一个函数,用来处理字符串的

结果呢,对于中文,总是出错。

create or replace function mysplitword(str text) returns text[] as $$
n = len(str)
res = []
for i in range(0,n-1):
    res.append(str[i:i+2])
return res
$$ language plpythonu;

教程

安装并访问redis

安装过程如下:

# 由于使用su - gpadmin 已执行了环境变量,故直接安装
python get-pip.py --trusted-host repo.xxx.com  -i http://repo.xxx.com/pypi/simple/
pip install redis --trusted-host repo.xxx.com  -i http://repo.xxx.com/pypi/simple/

简单使用示例如下:

CREATE OR REPLACE FUNCTION redis_test(mykey text) RETURNS text AS $$
if 'redis_client' not in GD:
    import redis
    client = redis.Redis(host='10.172.1.1', port=1016,password='', decode_responses=True)
    GD['redis_client'] = client
rds = GD['redis_client']
return rds.get('sms:ph2nu:%s'%mykey)
$$LANGUAGE plpythonu;

函数使用

-- 使用
select redis_test('132434');
-- 压测
select redis_test('15056967586') from generate_series(1,10000000);

性能方便,100万次请求,时间200秒,跟直接写python的脚本调用性能差不多。性能方面并不是很好。

中文正则分词测试

备注:如果函数的类型发生变化,需要先删除。

CREATE OR REPLACE FUNCTION parse_addr(addr text) RETURNS bool AS $$
#coding=utf-8
if 'pattn' not in GD:
    import re
    pattn =[]
    pattn.append( re.compile('(.{1,5}省)(.{1,5}市)(.{1,5}区)'))
    pattn.append( re.compile('(.{1,5}省)(.{1,5}市)(.{1,5}县)'))
    pattn.append( re.compile('(.{1,5}省)(.{1,5}市)(.{1,5}市)'))
    GD['pattn'] = pattn
pat = GD['pattn']
sch = pat[0].search(addr)  # search
if sch and sch.group(0):
    return True
sch = pat[1].search(addr)  # search
if sch and sch.group(0):
    return True
sch = pat[2].search(addr)  # search
if sch and sch.group(0):
    return True
print(addr)
return False
$$LANGUAGE plpythonu;

调用sql

select recv_addr, parse_addr(recv_addr) from tu_doc_info limit 10;

问题:

结果是正则表达式的原因。最开始以为是中文编码的原因。故如下排查

  • 中文测试
CREATE OR REPLACE FUNCTION test_ch(addr text) RETURNS text AS $$
return '河南'
$$LANGUAGE plpythonu;
select test_ch('1') ;
  • 中文正则匹配也能通过
CREATE OR REPLACE FUNCTION test_ch(addr text) RETURNS text AS $$
import re
pat = re.compile('.*南')
sch = pat.search(addr)  # search
if sch and sch.group(0):
    return sch.group(0)
return '没有找到'
$$LANGUAGE plpythonu;
select test_ch('河南') ;
select test_ch('湖南省') ;

而错误的提示如下:

'.{1,5}省'
# 错误 

ERROR:  invalid byte sequence for encoding "UTF8": 0xb9
CONTEXT:  while creating return value
  • 最终的示例
CREATE OR REPLACE FUNCTION parse_addr(addr text) RETURNS text AS $$
#coding=utf-8
if 'pattn' not in GD:
    import re
    pattn =[]
    pattn.append( re.compile('(.*?省)(.*?市)(.*?区)'))
    pattn.append( re.compile('(.*?省)(.*?市)(.*?县)'))
    pattn.append( re.compile('(.*?省)(.*?市)(.*?市)'))
    GD['pattn'] = pattn
pat = GD['pattn']
sch = pat[0].search(addr)  # search
if sch and sch.group(0):
    return sch.group(0)
sch = pat[1].search(addr)  # search
if sch and sch.group(0):
    return sch.group(0)
sch = pat[2].search(addr)  # search
if sch and sch.group(0):
    return sch.group(0)
return ''
$$LANGUAGE plpythonu;

sql中的以下,经测试,有、没有的情况下,执行结果都是一样。

#coding=utf-8