pcntl

pcntl中有关进程相关的知识。1、进程复制。2、进程运行的用户组、用户。3、设置进程运行时的用户组、用户。4、等待进程结束pcntl_wait。5、信号注册与触发。6、信号处理pcntl_signal_dispatch。7、设置session id。使进程脱离终端。posix_setsid。8、守护进程,复制一次、设置会话组长、再复制一次。9、信号可以用,但是想做及时性的东西,好像信号,又做不到。

上面所有的,基本上都在posix跟pcntl两个扩展中。另外,重点讲一些与wokerman有关的东西。workerman的主要的功能:1、转化为守护进程。2、主进程对子进程的监控以及对信号的监听。3、关闭STDOUT等,改为写文件。4、子进程使用Libevent等,实现对socket的读写监听。5、同时,子进程也会监听主进程发号施令,并退出进程或重启进程。6、除此之外,posix、pcntl中一些常用的函数使用,设置进程title、设置运行的group、userid等等。7、统计功能,stauts跟connections,(主进程发信号,子进程将收集的信息写文件,主进程汇报),这一部分功能也很复杂。8、命令解析功能,对旧进程的控制,提供start、reload等命令(如nginx命令)。

demo1

<?php
$id = 'null';
for($i=1;$i<100;$i++){
    if($i==10){
        $id = pcntl_fork();
    }
    echo "$id: $i\n";
    sleep(1);
}

当$i=10的时候,开始进程复制,然后,此时开始分叉。这个时候,会同时输出$i两次。id = 0 为子进程,id>0,则为父进程。id=-1,则说明进程复制失败。我们可以简单理解,对于父进程,需要拿到子进程的id。

<?php
$ids = [];
$pid ='';
for($i=1;$i<10;$i++){
    if($i==5){
        $pid = pcntl_fork();
        //mt_srand();   复制后,防止产生一样的随机数
    }
    $ids[] = mt_rand(1,100);
    echo "$pid  :".join(",",$ids)."\n";
    sleep(1);
}

进程复制后,发现产生的随机数,居然是一模一样的?惊不惊喜?意不意外?添加mt_rand后发现,两个进程产生的序列号终于不一样了。(难过在wokerman中,要执行mt_randrand函数。)

上述例子说明两个问题:1、父子进程,在复制后,子进程的全局变量与父进程已经是分离的,相当于复制了一份。(引用资源的除外)2、进程复制后,最好要重新播种随机化种子,否则,会产生一样的随机数。

demo2

pcntl_signal_dispatch();
pcntl_signal_dispatch();
pcntl_signal_dispatch();

单独执行,好像并没有什么效果。程序也不会卡在调用处,即不会阻塞。其效果是,如果此时有信号,则转向处理信号。

demo3

<?php
//declare('ticks=1');  也会让堆积的信号,执行。
function sig_handler($signo){
    switch ($signo) {
        case SIGUSR1: echo "SIGUSR1\n"; break;
        case SIGUSR2: echo "SIGUSR2\n"; break;
        default:      echo "unknow";    break;
    }
}
//安装信号触发器器
pcntl_signal(SIGUSR1, "sig_handler");
pcntl_signal(SIGUSR2, "sig_handler");
//向当前进程发送SIGUSR1信号
posix_kill(posix_getpid(), SIGUSR1);
echo "111\n";
posix_kill(posix_getpid(), SIGUSR2);
echo "222\n";
sleep(3);
echo "333\n";
pcntl_signal_dispatch();
pcntl_signal_dispatch();
pcntl_signal_dispatch();

pcntl_signal用来注册信号处理的回调。

posix_kill是用来发送信号。暂时这样假设,该函数只是投递消息到队列中,但是并不会直接执行该消息。而是等到进程在pcntl_signal_dispatch处,进程则开始处理堆积的信号,如果有,则处理,如果没有,则不会阻塞,执行到后面。

所以,可以观测上面代码的输出顺序。上面如果不执行pcntl_signal_dispatch();,则堆积的信号,进程退出也不会处理。

另外,在低版本的<php5.4中,在代码开头处,添加如下,堆积的信号也会执行。

declare('ticks=1');

demo4

<?php
function a(){
    sleep(10);
    echo "OK\n";
}
function b(){
    echo "Stop\n";
}
function c(){
    usleep(100000);
}
//信号处理代码
function sig(){
    throw new Exception;
}
try{
    pcntl_alarm(2); //设定超时后触发的信号
    pcntl_signal(SIGALRM, "sig");
    pcntl_signal_dispatch();
    a();
    pcntl_alarm(0);
}catch(Exception $e){
    echo "timeout\n";
}
b();
a(); //等待十秒后完成
b();

实在不太懂上面的代码的意义。

demo5

<?php
function sig(){
    echo "sig \n";
}
pcntl_alarm(2);
pcntl_signal(SIGALRM, "sig");
sleep(100);
pcntl_signal_dispatch();

sleep(100);上面的代码,被阻塞到sleep处,但是,当2秒后有pcntl_alarm信号,然后转向处理pcntl_signal_dispatch,整个代码实际上执行2秒左右,就退出了。

demo6

<?php
function sig(){
    echo "sig \n";
}
pcntl_alarm(2);
pcntl_signal(SIGALRM, "sig");
sleep(100);
echo "wo shi da ying xiong ";   //注意,后面的能输出。

作为对比,我们这次没有写pcntl_signal_dispatch,发现2秒种退出后,什么也不执行。即,有信号时,则直接打断了sleep代码。

但是最后一句echo 能正常输出。

demo7

<?php
pcntl_alarm(2);
sleep(100);
echo "wo shi da ying xiong ";

这次,我们直接不注册任何信号处理,我们发现,2秒种后,程序直接退出了,后面的echo输出语句也没有输出。屏幕只输出Alarm clock

对比发现,可能是因为,没有对信号进行处理,这是否就是类似于ctrl+c的退出信号一样?

demo8

我们发现sleeph后,进程被阻塞在sleep语句,当收到信号后,手续的代码,都会陆续的执行并触发。另外,我们发现,及时删除pcntl_signal_dispatch()语句,没有啥区别,只是对信号少响应。

<?php
$i=0;

$pid = posix_getpid();
echo $pid .PHP_EOL;

pcntl_signal(SIGUSR1, "sig_handler");

function sig_handler($signo){
    switch ($signo) {
        case SIGUSR1: echo "SIGUSR1\n"; break;
        default:      echo "unknow";    break;
    }
}

while(true){
    sleep(3600);
    echo ++$i.PHP_EOL;
    pcntl_signal_dispatch();
    echo 'after signal dispatch'.PHP_EOL;
}

脚本2

php 脚本2.php pid ,其中pid是脚本1的pid。

<?php
foreach(range(1,1000) as $v){
    posix_kill($argv[1],SIGUSR1);
}

我们发现,如果频繁的发送信号(未加sleep),脚本2,瞬间执行完成,而脚本1,可能只触发了一两次。也就是说没,并不是100%,都会执行的,可能多的信号,直接忽略了。

另外,如果脚本2发送SIGUSR2信号,由于未注册该信号,脚本1收到后,直接会退出进程。

demo总结

通过前面的demo例子,我们可以得到以下结论:进程可以注册要处理的信号,并选择在合适的时候,来处理堆积的信号,也可以选择不处理。收到未注册处理的信号,很可能会退出进程。收到信号,会打断阻塞的地方,执行后续的代码,如打断sleep、pcntl_wait($status, \WUNTRACED)等,使进程恢复执行。

但是,不能用信号来处理高频的事件,如1秒发送1000此信号,而进程可能就收到1、2次而已。

应用

获取进程状态

下面的方式,注册一个信号,并将程序的状态写到文件中。

<?php
// echo '\SIGINT'.SIGINT .PHP_EOL;

if( PHP_OS==='Linux'){
    function signal_status_data(){
        file_put_contents('tmp.txt','signal_status_data',\FILE_APPEND);
    }
    function signal_exit(){
        exit ;
    }
    // 停止
    \pcntl_signal(\SIGINT, 'signal_exit', false);  //2
    \pcntl_signal(\SIGTERM, 'signal_exit', false);  //15
    // 获取状态数据
    \pcntl_signal(\SIGUSR1, 'signal_status_data', false); //10  
}

while (true){
    sleep(1);
    pcntl_signal_dispatch();
}

workerman

为了简单起见,后面所有的分析都是基与Linux环境的,暂不考虑window系统。基于VERSION = ‘3.5.23’;。研究源码的顺序:

第1遍:先挑捡自己感兴趣的过程函数,重点分析。围绕特定的过程,比如:1、主进程是如何做进程监控的。2、信号问题。主、子进程如何安装信号。以及主进程是如何给子进程发信号。各自对信号的响应。3、全局状态维护,status是如何转变的。4、查找某个变量、函数的调用了。5、static变量、函数代表的是全局部分,属于框架部分,而非static修饰的属性、方法,则属于对象本身的,对象的任务,跟框架的任务显然不一样。

第2遍:按代码顺序,从上往下开始读。对于调用其他函数的代码,如果被调用的代码比较短或者调用几层,容易读,则全部读下去。看完后,接着从该位置,顺序往下读。

第3遍:运行代码,一边改,一边读???

第3遍:自己动手写,然后再回头看代码。

workerman进程监控

protected static function monitorWorkersForLinux()
{
    static::$_status = static::STATUS_RUNNING;
    while (1) {
        // Calls signal handlers for pending signals.
        \pcntl_signal_dispatch();
        // Suspends execution of the current process until a child has exited, or until a signal is delivered
        $status = 0;
        $pid    = \pcntl_wait($status, \WUNTRACED);
        // Calls signal handlers for pending signals again.
        \pcntl_signal_dispatch();
        // If a child has already exited.
        if ($pid > 0) {
            // Find out witch worker process exited.
            // Is still running state then fork a new worker process.
        }
        // If shutdown state and all child processes exited then master process exit.
        if (static::$_status === static::STATUS_SHUTDOWN && !static::getAllWorkerPids()) {
            static::exitAndClearAll();
        }
    }
}

上面是workerman中,进程监控的核心代码。master进程监控子进程的运行状态。进入无限循环,while(1),利用pcntl_wait阻塞主进程,只没有子进程退出时,该函数会处于挂起状态,或者,当有新的信号来临,会打破挂起状态,转向执行pcntl_signal_dispatch信号处理。两者都会打破挂起的状态。

函数pcntl_wait的参数 WUNTRACED,会阻塞,WNOHANG不会阻塞,会立即结束。

上面,既然要接收的信号,看下注册的信号都是哪些?

// 而installSignal只在runAll中调用,而且是在子进程复制之前,
// 也就是说,子进程复制之前,已经具备了响应下面的信号的基础。
protected static function installSignal()
    {
        if (static::$_OS !== \OS_TYPE_LINUX) {
            return;
        }
        $signalHandler = '\Workerman\Worker::signalHandler';
        // stop
        \pcntl_signal(\SIGINT, $signalHandler, false);
        // graceful stop
        \pcntl_signal(\SIGTERM, $signalHandler, false);
        // reload
        \pcntl_signal(\SIGUSR1, $signalHandler, false);
        // graceful reload
        \pcntl_signal(\SIGQUIT, $signalHandler, false);
        // status
        \pcntl_signal(\SIGUSR2, $signalHandler, false);
        // connection status
        \pcntl_signal(\SIGIO, $signalHandler, false);
        // ignore
        \pcntl_signal(\SIGPIPE, \SIG_IGN, false);
    }

很显然,上面是注册了一些,命令行可用的参数。然后,观察posix_kill的调用情况。

我们发现:1、在parseCommand函数中,都是向master进程发信号。这也很容易理解,进程运行过程中,只有解析用户命令阶段,才会发送信号,给已经存在的老的master进程。

2、对于子进程发信号,主要在这两个函数中:reload,对每个子进程发送的是SIGQUITSIGUSR1信号,即重启信号;stopAll,对每个子进程发送的是SIGTERMSIGINT信号,即停止信号。

当然,子进程注册响应信号的动作跟父进程不一样,reinstallSignal实际上,是为子进程设置信号的。而该函数是在run中调用,而run在forkWorkersForWindowsforkOneWorkerForLinux中调用。意图很明显,当复制一个子进程处理时,应该重新安装子进程的信号。

重点看子进程的任务:run

重新注册信号、添加onMessage、onWokerStart的处理。然后进入事件循环loop,这个会让所有的子进程卡到这一步,不会再往下执行。

public function run(){

    //省略一些其他的。。。。
    // Reinstall signal.
    static::reinstallSignal();
    // Init Timer.
    Timer::init(static::$globalEvent);

    // Set an empty onMessage callback.
    if (empty($this->onMessage)) {
        $this->onMessage = function () {};
    }
    \restore_error_handler();
    
    // Try to emit onWorkerStart callback.
    // Main loop.
    static::$globalEvent->loop();
}

那么,看一下调用处forkOneWorkerForLinux,也正好说明,子进程会一直处于run阶段,如果真的结束了,那么也会直接抛出异常并exit。

$worker->run();
$err = new Exception('event-loop exited');
static::log($err);
exit(250);

那么,installSignalreinstallSignal的顺序,有没有什么讲究呢?按道理讲,在复制子进程后,主进程再安装主进程的信号,而子进程安装子进程的信号,不是更好吗?这样,就不用再取消安装信号这个过程。但是,我们知道,随着程序运行过程,子进程的数量可能随时不足,需要主进程复制一个出来,那么,此时还是要取消安装主进程的信号,安装子进程的信号,所以,为了消除不一致的处理,等待主进程稳定后,再复制,这样保证,复制出来的子进程的一致处理。

可是,我们看到runAll中,还有resetStd,按刚刚的理论,为啥不等resetStd之后再forkWorkers,

static::forkWorkers();
static::resetStd();
static::monitorWorkers();

何况forkOneWorkerForLinux中,也确实需要resetStd。(如果挪了位置,那么连判断也不需要了,转换为一致处理了???我怎么感觉都是我想的是对的?)

elseif (0 === $pid) {
    \srand();
    \mt_srand();
    if ($worker->reusePort) {
        $worker->listen();
    }
    if (static::$_status === static::STATUS_STARTING) {
        static::resetStd();
    }
    //.......
}

状态流转:

程序一直处于STATUS_STARTING状态,只有在monitorWorkersForLinux函数中,再转换为STATUS_RUNNING状态。

那么回顾一下,reinstallSignal是如何定义的?

protected static function reinstallSignal()
{
    if (static::$_OS !== \OS_TYPE_LINUX) {
        return;
    }
    $signalHandler = '\Workerman\Worker::signalHandler';
    // uninstall stop signal handler
    \pcntl_signal(\SIGINT, \SIG_IGN, false);
    // uninstall graceful stop signal handler
    \pcntl_signal(\SIGTERM, \SIG_IGN, false);
    // uninstall reload signal handler
    \pcntl_signal(\SIGUSR1, \SIG_IGN, false);
    // uninstall graceful reload signal handler
    \pcntl_signal(\SIGQUIT, \SIG_IGN, false);
    // uninstall status signal handler
    \pcntl_signal(\SIGUSR2, \SIG_IGN, false);
    // uninstall connections status signal handler
    \pcntl_signal(\SIGIO, \SIG_IGN, false);
    // reinstall stop signal handler
    static::$globalEvent->add(\SIGINT, EventInterface::EV_SIGNAL, $signalHandler);
    // reinstall graceful stop signal handler
    static::$globalEvent->add(\SIGTERM, EventInterface::EV_SIGNAL, $signalHandler);
    // reinstall reload signal handler
    static::$globalEvent->add(\SIGUSR1, EventInterface::EV_SIGNAL, $signalHandler);
    // reinstall graceful reload signal handler
    static::$globalEvent->add(\SIGQUIT, EventInterface::EV_SIGNAL, $signalHandler);
    // reinstall status signal handler
    static::$globalEvent->add(\SIGUSR2, EventInterface::EV_SIGNAL, $signalHandler);
    // reinstall connection status signal handler
    static::$globalEvent->add(\SIGIO, EventInterface::EV_SIGNAL, $signalHandler);
}

取消父进程定义的信号,然后使用$globalEvent来处理信号,而该变量是在run中定义的,

// Create a global event loop.
if (!static::$globalEvent) {
    $event_loop_class = static::getEventLoopName();
    static::$globalEvent = new $event_loop_class;
    $this->resumeAccept();
}

getEventLoopName

<?php
protected static function getEventLoopName()
{
    if (static::$eventLoopClass) {
        return static::$eventLoopClass;
    }

    if (!\class_exists('\Swoole\Event', false)) {
        unset(static::$_availableEventLoops['swoole']);
    }
    
    $loop_name = '';
    foreach (static::$_availableEventLoops as $name=>$class) {
        if (\extension_loaded($name)) {
            $loop_name = $name;
            break;
        }
    }

    if ($loop_name) {
        if (\interface_exists('\React\EventLoop\LoopInterface')) {
            switch ($loop_name) {
                case 'libevent':
                    static::$eventLoopClass = '\Workerman\Events\React\ExtLibEventLoop';
                    break;
                case 'event':
                    static::$eventLoopClass = '\Workerman\Events\React\ExtEventLoop';
                    break;
                default :
                    static::$eventLoopClass = '\Workerman\Events\React\StreamSelectLoop';
                    break;
            }
        } else {
            static::$eventLoopClass = static::$_availableEventLoops[$loop_name];
        }
    } else {
        static::$eventLoopClass = \interface_exists('\React\EventLoop\LoopInterface') ? '\Workerman\Events\React\StreamSelectLoop' : '\Workerman\Events\Select';
    }
    return static::$eventLoopClass;
}

而事件循环:

protected static $_availableEventLoops = array(
    'libevent' => '\Workerman\Events\Libevent',
    'event'    => '\Workerman\Events\Event'
    // Temporarily removed swoole because it is not stable enough  
    //'swoole'   => '\Workerman\Events\Swoole'
);

大概就,上面两、三种。getEventLoopName会首先拦截,如果已经计算出来过$eventLoopClass值后,后续直接就返回了。$globalEvent大概是\Workerman\Events\Libevent\Workerman\Events\Event等种的一个对象吧。

但看,注册信号:

static::$globalEvent->add(\SIGINT, EventInterface::EV_SIGNAL, $signalHandler);

第1个参数,不是fd吗?SIGINT如何能作为fd来使用呢?另外,

除了响应系统的信号,那么socket也是$globalEvent的主要任务。

public function pauseAccept(){
    if (static::$globalEvent && false === $this->_pauseAccept && $this->_mainSocket) {
        static::$globalEvent->del($this->_mainSocket, EventInterface::EV_READ);
        $this->_pauseAccept = true;
    }
}

public function resumeAccept(){
    // Register a listener to be notified when server socket is ready to read.
    if (static::$globalEvent && true === $this->_pauseAccept && $this->_mainSocket) {
        if ($this->transport !== 'udp') {
            static::$globalEvent->add($this->_mainSocket, EventInterface::EV_READ, array($this, 'acceptConnection'));
        } else {
            static::$globalEvent->add($this->_mainSocket, EventInterface::EV_READ, array($this, 'acceptUdpConnection'));
        }
        $this->_pauseAccept = false;  
    }
}

$this->_pauseAccept简单理解是,防重入的锁。

那么,后面可能就需要对Libevent这个库有些了解了。resumeAccept中,调用add方法,我们简单的理解,添加对socket但监控。如果_mainSocket有连接进入,则会触发读。

_mainSocket(有3处赋值)定义、赋值如下:

protected $_mainSocket = null;
// listen 方法中赋值
$this->_mainSocket = \stream_socket_server($local_socket, $errno, $errmsg, $flags, $this->_context);

从定义上看,$_mainSocket应该是属于woker对象的属性,而不是static全局共享的。该对象,是服务端的socket。

看Listen方法,核心就是创建_mainSocket,然后将_mainSocket设置成非阻塞模式,继而resumeAccept。单独挑出来这个过程,就容易理解了,非阻塞模式,利用Libevent来监视是否_mainSocket可读写。

public function listen(){
    if (!$this->_socketName) {
        return;
    }
    // Autoload.
    Autoloader::setRootPath($this->_autoloadRootPath);
    if (!$this->_mainSocket) {
        .......
        // Create an Internet or Unix domain server socket.
        $this->_mainSocket = \stream_socket_server($local_socket, $errno, $errmsg, $flags, $this->_context);
        ......
        // Non blocking.
        \stream_set_blocking($this->_mainSocket, false);
    }
    $this->resumeAccept();
}

那么看下listen调用处(省略了其他语句):

function initWorkers(){
    if (!$worker->reusePort) {
        $worker->listen();  //端口不能复用,则早早的调用listen
    }
}
function forkOneWorkerForLinux(){
    if ($worker->reusePort) {
        $worker->listen(); //端口能复用,则等到复制子进程的时候,开始。
    }
}

上面过程,对于端口可复用,无碍乎,就是调用下面这句。

static::$globalEvent->add($this->_mainSocket, EventInterface::EV_READ, array($this, 'acceptConnection'));

上面是分析,反过来往上看,则是另外一种思路。

当有新的客户端连接时,则调用acceptConnection,多个进程可能同时收到该事间,所以有惊群效应。

public function acceptConnection($socket)
{
    // Accept a connection on server socket.
    \set_error_handler(function(){});
    $new_socket = \stream_socket_accept($socket, 0, $remote_address);
    \restore_error_handler();

    // Thundering herd.
    if (!$new_socket) {
        return;
    }

    // TcpConnection.
    $connection                         = new TcpConnection($new_socket, $remote_address);
    $this->connections[$connection->id] = $connection;
    $connection->worker                 = $this;
    $connection->protocol               = $this->protocol;
    $connection->transport              = $this->transport;
    $connection->onMessage              = $this->onMessage;
    $connection->onClose                = $this->onClose;
    $connection->onError                = $this->onError;
    $connection->onBufferDrain          = $this->onBufferDrain;
    $connection->onBufferFull           = $this->onBufferFull;

    // Try to emit onConnect callback.
    if ($this->onConnect) {
        try {
            \call_user_func($this->onConnect, $connection);
        } catch (\Exception $e) {
            static::log($e);
            exit(250);
        } catch (\Error $e) {
            static::log($e);
            exit(250);
        }
    }
}

当有连接时,则为该连接创建一个TcpConnection,

$new_socket = \stream_socket_accept($socket, 0, $remote_address);
$connection = new TcpConnection($new_socket,$remote_address);

TcpConnection

我们细看一下TcpConnection,继承自ConnectionInterface

public function __construct($socket, $remote_address = ''){
    ++self::$statistics['connection_count'];
    $this->id = $this->_id = self::$_idRecorder++;
    if(self::$_idRecorder === \PHP_INT_MAX){
        self::$_idRecorder = 0;
    }
    $this->_socket = $socket;
    \stream_set_blocking($this->_socket, 0);
    // Compatible with hhvm
    if (\function_exists('stream_set_read_buffer')) {
        \stream_set_read_buffer($this->_socket, 0);
    }
    Worker::$globalEvent->add($this->_socket, EventInterface::EV_READ, array($this, 'baseRead'));
    $this->maxSendBufferSize        = self::$defaultMaxSendBufferSize;
    $this->maxPackageSize           = self::$defaultMaxPackageSize;
    $this->_remoteAddress           = $remote_address;
    static::$connections[$this->id] = $this;
}

核心依然:

设置非阻塞,然后$globalEvent对象来负责当可读时,事件的通知。

\stream_set_blocking($this->_socket, 0);
Worker::$globalEvent->add($this->_socket, EventInterface::EV_READ, array($this, 'baseRead'));

然后看该类,搜索Worker::$globalEvent大概有9处,而监听的消息是sockete的读写事件:EventInterface::EV_READEventInterface::EV_WRITE

__construct
Worker::$globalEvent->add($this->_socket, EventInterface::EV_READ, array($this, 'baseRead'));

send
Worker::$globalEvent->add($this->_socket, EventInterface::EV_WRITE, array($this, 'baseWrite'));  //2处

pauseRecv    //当上传的时候,就暂停触发事间
Worker::$globalEvent->del($this->_socket, EventInterface::EV_READ);

resumeRecv
Worker::$globalEvent->add($this->_socket, EventInterface::EV_READ, array($this, 'baseRead'));


baseRead
Worker::$globalEvent->add($socket, EventInterface::EV_WRITE, array($this, 'baseWrite'));

baseWrite
Worker::$globalEvent->del($this->_socket, EventInterface::EV_WRITE);

destroy
Worker::$globalEvent->del($this->_socket, EventInterface::EV_READ);
Worker::$globalEvent->del($this->_socket, EventInterface::EV_WRITE);

其中,有两处调用pipe、close:

public function pipe(self $dest){
    $source              = $this;
    $this->onMessage     = function ($source, $data) use ($dest) {
        $dest->send($data);
    };
    $this->onClose       = function ($source) use ($dest) {
        $dest->destroy();
    };
    $dest->onBufferFull  = function ($dest) use ($source) {
        $source->pauseRecv();
    };
    $dest->onBufferDrain = function ($dest) use ($source) {
        $source->resumeRecv();
    };
}

流程

整个启动流程,实际上即runAll执行的流程

public static function runAll()
{
    static::checkSapiEnv();
    static::init();
    static::lock();
    static::parseCommand();
    static::daemonize();
    static::initWorkers();
    static::installSignal();
    static::saveMasterPid();
    static::unlock();
    static::displayUI();
    static::forkWorkers();
    static::resetStd();
    static::monitorWorkers();
}

workerman的代码有点难度,可能是因为woker中,即是一个对象,又是全局函数的容器,而且,还有命令行解析的代码。如果,将上述3者分开,获取代码可能更容易理解。

worker对象能干的事情况:

sed -n “/ function /p” Worker.php |grep -v “static”

public function getMainSocket(){
public function setUserAndGroup()
public function __construct($socket_name = '', array $context_option = array())
public function listen()
public function unlisten() {
protected function parseSocketAddress() {
public function pauseAccept()
public function resumeAccept()
public function getSocketName()
public function run()
public function stop()
public function acceptConnection($socket)
public function acceptUdpConnection($socket)

worker本身作为容器,能干的事情:

sed -n “/static function/p” Worker.php

public static function runAll()
protected static function checkSapiEnv()
protected static function init()
protected static function lock()
protected static function unlock()
protected static function initWorkers()
public static function reloadAllWorkers()
public static function getAllWorkers()
public static function getEventLoop()
protected static function initId()
protected static function getCurrentUser()
protected static function displayUI()
public static function getUiColumns()
public static function getSingleLineTotalLength()
protected static function parseCommand()
protected static function formatStatusData()
protected static function installSignal()
protected static function reinstallSignal()
public static function signalHandler($signal)
protected static function daemonize()
public static function resetStd()
protected static function saveMasterPid()
protected static function getEventLoopName()
protected static function getAllWorkerPids()
protected static function forkWorkers()
protected static function forkWorkersForLinux()
protected static function forkWorkersForWindows()
public static function getStartFilesForWindows() {
public static function forkOneWorkerForWindows($start_file)
public static function checkWorkerStatusForWindows()
protected static function forkOneWorkerForLinux(self $worker)
protected static function getId($worker_id, $pid)
protected static function setProcessTitle($title)
protected static function monitorWorkers()
protected static function monitorWorkersForLinux()
protected static function monitorWorkersForWindows()
protected static function exitAndClearAll()
protected static function reload()
public static function stopAll()
public static function checkIfChildRunning()
public static function getStatus()
public static function getGracefulStop()
protected static function writeStatisticsToStatusFile()
protected static function writeConnectionsStatisticsToStatusFile()
public static function checkErrors()
protected static function getErrorType($type)
public static function log($msg)
public static function safeEcho($msg, $decorated = false)
private static function outputStream($stream = null)

进程间通信

作为一般的tcp的server,每个woker节点,并不太需要通信。而woker节点,主动跟master节点通信呢?汇包自身的状态呢?发现,它是用文件_statisticsFile来进行汇总各个节点的信息。信号+文件,汇报状态信息。

首先,查看parseCommand方法,当为status命令时,则执行:

master进程

static::safeEcho(static::formatStatusData());
\posix_kill($master_pid, SIGUSR2);
\sleep(1);
static::safeEcho(static::formatStatusData());
//而formatStatusData 方法,就是格式化`_statisticsFile`中的内容并输出。

子进程

static::$globalEvent->add(\SIGUSR2, EventInterface::EV_SIGNAL, $signalHandler);

执行的动作:

case \SIGUSR2:
    static::writeStatisticsToStatusFile();
    break;

而writeStatisticsToStatusFile,则执行:

\file_put_contents(static::$_statisticsFile, $worker_status_str, \FILE_APPEND);

写入到文件。所以,不用加锁了?

信号

对于进程,安装的信号有两种:主进程installSignal,子进程reinstallSignal(由于其是从主进程复制而来,所以需要先卸载主进程的消耗,然后再安装新的信号,只不过新的信号换成了$globalEvent,为何要用两种不同的方式,来接收信号呢?我猜测,子进程,主要用globalEvent->loop进行事件循环,如果不在loop中添加pcntl_signal_dispatch来处理,估计,信号得不到处理。)。

而对响应信号的处理函数,都统一由$signalHandler = '\Workerman\Worker::signalHandler';来处理。所以,该函数内,需要区分,当前是何种进程。

具体的操作主要有以下四种,均要区分是主、子进程。

stopAll
reload
writeStatisticsToStatusFile
writeConnectionsStatisticsToStatusFile

另外_gracefulStop类静态变量来区分,是否是优雅的停止或重启。那么,优雅的含义是:



//reload函数中,如果_gracefulStop 为false,会额外执行下面一句
Timer::add(static::KILL_WORKER_TIMER_TIME, '\posix_kill', array($one_worker_pid, \SIGKILL), false);

//stopAll函数中,如果_gracefulStop 为false,会额外执行下面一句
Timer::add(static::KILL_WORKER_TIMER_TIME, '\posix_kill', array($worker_pid, \SIGKILL), false);

//stop函数中强行断开连接
// Close all connections for the worker.
if (!static::$_gracefulStop) {
    foreach ($this->connections as $connection) {
        $connection->close();
    }
}

pidMap与idMap

这两个概念,很容易混淆,而且数据结构也差不多,为啥要出现两个不同的id呢?记得以前一个大神说,少定义变量,多一个变量多增加一份新智负担。

这两个变量,都应该是在主进程中使用,而在子进程的复制的时候,提供给主进程,便于主进程来管理、调度,比如,woker是否缺了。而子进程应该不会用到这些变量。

但是,我在读源码过程中,通过追踪这些变量的调用、使用,发现,其实二者没有啥差别,比如:判断是否需要fork一个子进程,$_pidMap先用在forkWorkersForLinux方法中,利用isset是否定义,判断是否需要增加子进程。而 idMap 是pid为0,则应该新增进程,即数组中包含无效的pid。

所以,如果是我来优化,我肯定要删除idMap变量。原因:1、出现次数少,变量出现7次,出现在getId(被调用2次)、initId(被调用两次)等函数中。reload过程调用了initId?2、出现idMap的地方,基本上也出现了pid,而一些关键过程,还是利用pid。比如进程退出、等等。即关键的判断,还是用pidMap。

主进程中,保存子进程的信息,好像也不太多,比如进程的fork时间??

首先,先看定义:

/**
 * All worker processes pid.
 * The format is like this [worker_id=>[pid=>pid, pid=>pid, ..], ..]
 * @var array
 */
protected static $_pidMap = array();

/**
 * Mapping from PID to worker process ID.
 * The format is like this [worker_id=>[0=>$pid, 1=>$pid, ..], ..].
 * @var array
 */
protected static $_idMap = array();

首先,从数据结构来看,而者差别并不是太大。区别在于 key,一个是进程id值,另外一个是进程id。

初始化:

protected static function initId()
{
    foreach (static::$_workers as $worker_id => $worker) {
        $new_id_map = array();
        $worker->count = $worker->count < 1 ? 1 : $worker->count;
        for($key = 0; $key < $worker->count; $key++) {
            $new_id_map[$key] = isset(static::$_idMap[$worker_id][$key]) ? static::$_idMap[$worker_id][$key] : 0;
        }
        static::$_idMap[$worker_id] = $new_id_map;
    }
}

// pid对象初始化的时候使用
public function __construct($socket_name = '', array $context_option = array()){
 	static::$_pidMap[$this->workerId]  = array();   
}

进程复制时,赋值

$pid = \pcntl_fork();
// For master process.
if ($pid > 0) {
    static::$_pidMap[$worker->workerId][$pid] = $pid;
    static::$_idMap[$worker->workerId][$id]   = $pid;
}

标记进程可用,(销毁旧进程):

// Clear process data.
unset(static::$_pidMap[$worker_id][$pid]);
// Mark id is available.
$id  = static::getId($worker_id, $pid);  
static::$_idMap[$worker_id][$id] = 0;   

//getId = \array_search($pid, static::$_idMap[$worker_id]);  

利用该变量判断,是否要复制子进程:

// forkOneWorkerForLinux
// Get available worker id.
$id = static::getId($worker->workerId, 0);
if ($id === false) {
    return;
}

//forkWorkersForLinux
while (\count(static::$_pidMap[$worker->workerId]) < $worker->count) {
    static::forkOneWorkerForLinux($worker);
}

检测子进程存活状态:

//  flatten $_pidMap  拿到value值,相当于  array_walk...
//   有getAllWorkerPids,但又不用???
public static function checkIfChildRunning()
{
    foreach (static::$_pidMap as $worker_id => $worker_pid_array) {
        foreach ($worker_pid_array as $pid => $worker_pid) {
            if (!\posix_kill($pid, 0)) {
                unset(static::$_pidMap[$worker_id][$pid]);
            }
        }
    }
}

但是,注意,子进程会清空$_pidMap

//forkOneWorkerForLinux
elseif (0 === $pid) { //子进程
    static::$_pidMap  = array();
}

_pidsToRestart

跟踪这个变量,大概发现,需要重启的时候,先一次将pids放到该变量中,并与reloadable进行求交集,然后,逐一重启该进程。

workerman总结

感觉代码好像优雅,但是又感觉代码比较难度。见流程中的描述:workerman的代码有点难度…

pidMap跟idMap的共同使用,为啥要多一个变量?多一个变量,就增加一些心智负担……,除非他们有明显的不同作用,如果不是,那么为啥要用两个?

这些都是造成它看起来优雅,但是巨无霸,混合各种东西,难以理解?

反正,看完后的感觉,好像是懂了,但又好像没有懂???

other


感觉listen有点乱
if ($worker->reusePort) {
    $worker->listen();
}