PHP批量插入

PHP批量插入

关于数据批量插入,实际上是一个常见的需求。有场景:从某处同步过来一批数据,最简单的方式,收一条,写一条。如果丢失数据,问题不大,就丢一条,但是,这样的效率太低了。所以,最好能先缓存一批数据,然后批量插入。(暂不考虑,更新数据这个需求。)

这便是今天想要探讨的问题。如何让插入数据更快。(面试也经常问)。有了一批数据,1、每条记录插入一次,但是总的,用一个事务控制。速度有提升。2、将500条记录,拼接成一条sql,然后一次插入。效率提供比较明显。3、将一大批数据,直接写入到csv文件中,然后利用数据库工具导入到数据库中。(但是这个需要更大的权限)

关于多条记录拼接成1条sql,其实吧,也是有坑的,对与php这样的动态语言。我曾经踩过坑。对接的是java从mysql抓取到记录,然后用fastjson转换成kafka消息。坑在于,1、null的字段,它直接过滤掉了,导致我收到的字段有缺少。2、另外,还有一个问题,字段的顺序问题,比如插入的顺序name、age,你按age、name那么来,顺序反了肯定也不行。

所以,别想太复杂,直接循环$rows,然后对每一条row,再循环fields,然后将没有的字段赋值为空。这是最简单的方式、也是最简单的方式。

这样,对批量过来的数据,我们已经经过一次了标准化,入库非常简单了。

批量插入

拼接sql

多条记录传过来,注意,传入过来的字段,1、可能缺少字段。2、传入过来的字段顺序,可能跟自己插入的顺序不一致,导致错误。

csv

csv文件,可能因为字段乱码,导致插入失败。

更新or插入

还有另外一种场景,同一条记录,可能会传过来多次,要求后来记录更新前面的记录。那么,处理方式也有好几种。

先查存在

对于传过来的数据,缓存到一批数据后,然后通过sql查过来 select id in (1,2,3),是否重复。对于已经在表内存在的数据,我们应该逐条Update进去,对于剩下来的数据,然后拼接成一条sql,插入进去。

思索:

在并发的情况下,有多个进程,每个进程都缓存一批数据,在查存在的时候,只是查表,并未查到进程中缓存的数据,结果导致插入的时候,还是冲突报错。

那么,我们是要进行负载均衡算法吗?hash一致性,来选择数据落到哪个进程来处理,这样,来避免重复吗?

先入库,再整理

先将一批数据入库,然后如库后,再整理,删重复。

先入临时库

先将数据入临时库,然后再整理?

问题

问题代码

下面的代码,实际上只注意到缺字段,但是忘记字段还有顺序问题。

/**
 * 批量插入函数。注意,要求插入的每条数据,都具有相同的形式。
 * 2021-01-04 10:48
 */
function batchInsert($table, $rows){
    $cacheKey = $table . '_c_' .count($rows);
    $header = $rows[0];
    $params = [];
    $items = [];
    foreach ($rows as $index => $row) {
        $values = '(';
        foreach($row as $k=>$v){
            $field = ':'.$k.'_'.$index;
            $params[$field] = $v;
            $values .= ( $field . ',');
        }
        $values = substr($values,0,-1) . ')';
        $items[] = $values;
    }
    //走缓存
    if(!empty($this->batchCache[$cacheKey])){
        return $this->batchCache[$cacheKey]->execute($params);
    }
    $sql = 'INSERT INTO ' . $table . ' ('.implode(',', array_keys($header)).') VALUES '.implode(',',$items).'';
    $stat = $this->pdo->prepare($sql);
    $this->batchCache[$cacheKey] = $stat; 
    return $stat->execute($params);
}

分析:

按上面的示例代码,要想成功的批量插入成功,对$rows的形式要求很严格。第一,要求,$rows的每个row都具有相同的字段,第二,要求,每个row的字段顺序也要相同。所以,上面的代码很low,导致自己频频出错。

具体问题思考

正式库 (包含自增id)

临时库 (包含自增id)

方案1:
重复数据先进临时库,然后,临时库筛选、删除,然后进正式库。

怎么将数据取出,插入新库?

增加临时表,首先,占硬盘资源。好处是:处理的时候,只处理临时表,不会导致数据丢失。

方案2:
数据直接入正式库。定时的删除重复的数据。可以通过指定主键来限定范围,比如一次处理一批。记录下,当前的处理进度。下次继续从该位置来处理。

怎么写sql??判断重复的,只保留1条数据。

select * from td_mail where id>id1 and id < id2 and ship_id

直接在原表上删除,有意外删错数据、导致数据丢失的风险。处理起来相对简单,省资源。

方案3:
入库的时候,php代码进行处理,收到的数据内部去重,然后查库,如果数据存在呢,则变为update、否则则直接插入。

缺点:进程中处理掉了重复问题,但是不同进程间,并不能相互去重,并发情况下,会导致,多个进程中由于持有相同的 unique key,而导致报错。现实中确实发生过。所以,不适合高并发。

方案1、2如果单纯的用sql来解决,sql写得非常复杂。如果结合着php来处理呢。

同名变量

由于自己的失误,误将标记1、标记2的顺序颠倒,导致$h的同名变量,被覆盖。结果入库的时候,总是入最后一个数据库。

虽然是个简单的错误,但是排查了好久。同名变量的锅。我还以为是之前前面的函数定义的引用变量的错。

$consumer->start(function($topic, $part, $message) {
    global $handler;
    static $count = 0;
    $h = $handler[$topic];
	//标记1
    try{
        $h->handle($message);
    }catch(\Exception $e){
        echo $e->getMessage();
        exit;
    }
	//标记2
    if( $count%1 === 0 ){
        $now = date('Y-m-d H:i:s');
        echo "$now $count: $topic\n";
        foreach($handler as $h){
            $h->batch();
        }
        //退出。
        isShutdown();
    }
    if($count > 100 ){
        exit;
    }
    $count++;
});

PDO字段大小写问题

pg数据库,对字段的大小写毕竟敏感。shipId,我无法用pdo的pgsql驱动插入进去,因为它自动转换成了小写了。

PDOException: SQLSTATE[42703]: Undefined column: 7 ERROR:  column "shipid" of relation "td_mail" does not exist
LINE 1: INSERT INTO td_mail (shipId,sendId,recvId,kafka_tim

postgresql

由于是以脚本cli形式运行,长期持有一个pdo对象,并用该对象进行操作,很长一段时间后,会报如下的错误。我猜想,这个可能是,postgresql的问题,它负责维护该连接的进程或线程,长期运行,可能发生了内存泄漏。所以,我每次持有pdo到10000次,便考虑释放掉该连接,让该连接重新生成。

2021-01-07 03:09:31 105597000: all_ships_phone_userId_topic
PHP Fatal error: Uncaught PDOException: SQLSTATE[XX000]: Internal error: 7 ERROR: Canceling query because of high VMEM usage. Used: 7327MB, available 819MB, red zone: 7372MB (runaway_cleaner.c:202) in /yd/td/app/util/Database.php:97

长期运行pdo脚本,可能数据低峰期,很长一段时间都没有数据进来,可能会导致连接失效,所以呢,要考虑到重新释放,并获取新的脚本。

/**
 * @return Database
 */
public static function getDb()
{
    static $accessTime = 0;
    $now = time();
    static::$count++ ; 
    //超过20秒没有访问,应该重新建一个实例,防止超时问题
    //使用1万次,释放改连接。重建一个。
    if($now - $accessTime > 30 || static::$count > 10000){
        static::$count = 0 ;
        //释放资源
        if( static::$link ){
            static::$link = null;
        }
        static::$link = new Database();   
    }
    $accessTime = $now ;

    return static::$link;
}

其他,参见删除重复数据。