sync_email.php 6.3 KB
<?php

//error_reporting();

use Swoole\Process;



include_once __DIR__."/../vendor/autoload.php";


function start(){

// 删除停止运行的值
    redis()->delete(RUNNING_REDIS_KEY);

    /** 创建一个表 **/
    $table = new Swoole\Table(128);// 128 行
    $table->column('val', Swoole\Table::TYPE_INT);
    $table->create();

    // 初始时,进行一次统计
    $table->set('etotal',['val'=> db()->count(\Model\email::count())]);

// 启动一个进程池进行管理
    $topPool = new Process\Pool(WORKER_NUM);

    // 每10分钟统计一次邮箱数量
    $timer_id = \Swoole\Timer::tick(600,function () use (&$table){
        $table->set('etotal',['val'=> db()->count(\Model\email::count())]);
    });

    // 每2秒执行一次
    $timer_check_id = \Swoole\Timer::tick(2,function () use (&$table,$topPool){
        // 是否停止脚本
        $table->set('stop',['val'=> redis()->get(RUNNING_REDIS_KEY) === 'stop' ? 1 : 0]);

        // 检查是否结束了所有的协程同步代码
        if ($table->get('stop','val')) {
            $stop_num = 0;
            foreach (range(0, WORKER_NUM) as $i) {
                if ($table->exists('ps' . $i)) {
                    $stop_num++;
                }
            }
            if($stop_num >= WORKER_NUM){
//                退出进程
                $topPool->shutdown();
            }
        }

        // 邮件总数
        $total = redis()->get('email_total',0);
        if($total > $table->get('etotal','val')){
            $table->set('etotal',['val'=> $total]);
        }

    });



    // 这个是启用协程
    $topPool->set(['enable_coroutine' => true]);
    // 协程配置
    \co::set([
        'max_coroutine'=>COROUTINE_MAX_NUM, // 最大携程数量
        'hook_flags'=>SWOOLE_HOOK_TCP,  //  redis需要的配置
    ]);

    // 开始工作
    $topPool->on('WorkerStart',function (Process\Pool $pool,$worker_id) use (&$table){
        _echo("进程({$worker_id})启动成功");

        // 是否停止,这里进行阻塞
        if ($table->get('stop','val')){
            // 某个进程退出了
            $table->set('ps'.$worker_id,['val'=>1]);

            co::sleep(1);
            return true;
        }

        // 协程id集
        $cid = [];

        $i = 0;
        $email_total = $table->get('etotal','val');//邮件总数量
        $isRunMaxCNum = 1; // 允许最大协程数量,如果为0则停止所有协程工作,相当于停止脚本
        // 是否退出进程
        while (true){
            $i++;
            // 每10秒 验证一次邮箱数量,好控制协程的数量
            if($i>=10){
                $email_total = $table->get('etotal','val');
                $i = 0;
            }

            if(!$email_total){
                break;
            }

            // 每个协程 分配 10个邮箱任务
            $cnum = ceil($email_total/40);
            // 当前协程的数量
            $nowCnum = count($cid);
            // 说明 需要新的协程了
            if($cnum > $nowCnum){
                // 开启所需要的协程数量
                foreach (range(0,$cnum-$nowCnum) as $v){
                    // 启动一个协程
                    create_coroutine($cid,$isRunMaxCNum,$worker_id);
                }
            }
            // 暂时没有实现 减少协程数量操作
//            else if ($cnum < $nowCnum){
//                // 说明 协程数量过多,小于了1个协程处理10个邮箱的,资源闲置情况
//                // 销毁多余协程
//                $isRunMaxCNum = $nowCnum - $cnum;
//            }

            // 每3秒检查一次是否要停止 协程
            if($i%3 === 0){
//                _echo('是否收到退出信号:'.$table->get('stop','val'));
                if ($table->get('stop','val')){
                    // 停止
                    $isRunMaxCNum = 0;
                }
            }

            // 这个是检查 cid的如果协程全部退出,则退出进程
            co::sleep(1);
            // 跳出无限循环了
            if(!$cid){
                _echo('正常关闭进程('.$worker_id.')');
                // 关闭当前进程
//                $pool->shutdown();
                break;
            }
        }

    });
    // 停止工作后的回调
    $topPool->on('WorkerStop', function (\Swoole\Process\Pool $pool, $workerId) {
        _echo("[Worker #{$workerId}] WorkerStop\n");
    });

    $topPool->start();

}

/**
 * 创建协程
 * @param array $cid
 * @param int $isRunMaxCNum
 * @param $worker_id
 * @author:dc
 * @time 2023/2/14 17:04
 */
function create_coroutine(array &$cid,int &$isRunMaxCNum,$worker_id){
    go(function () use (&$cid,&$isRunMaxCNum,$worker_id){
        // 协程id
        $cid[co::getCid()] = co::getCid();

        // 同步操作
        while (true){
            // 是否退出协程
            if(!$isRunMaxCNum){
                _echo('协程('.co::getCid().'): stop '.$isRunMaxCNum);
                break;
            }

            // 开始同步
            try {
                sync();
            }catch (\Throwable $e){
                _echo($e->getMessage());
                logs($e->getMessage().PHP_EOL.$e->getTraceAsString(),LOG_PATH.'/'.$worker_id.'_'.co::getCid().'.log');
            }

            // 阻塞1秒
            co::sleep(1);
        }

        // 协程完成后执行的函数
        co::defer(function () use (&$cid){
            _echo('正常关闭协程('.co::getCid().')');
            unset($cid[co::getCid()]);
        });

    });
}


/**
 * 开始同步
 * @author:dc
 * @time 2023/2/13 9:42
 */
function sync(){
    co::sleep(1);
}




switch ($argv[1]){
    case 'start':{
        start();
        break;
    }
    case 'stop':{
        \Co\run(function (){
            echo "正在退出程序...\n";
            redis()->set(RUNNING_REDIS_KEY,'stop');
            while (true){
                $num = exec("ps -ef | grep \"sync_email.php start\" | grep -v grep | wc -l");
                if(!$num){
                    break;
                }
                co::sleep(0.5);
            }
            echo "已退出程序\n";
        });
        break;
    }
    default:{
        echo "参数 :\n";
        echo "\tstart n  n是协程的数量\n";
        echo "\tstop\n";
        echo "\trestart n\n";
        echo PHP_EOL;
        break;
    }
}