sync_email.php 3.3 KB
<?php

//error_reporting();

use Swoole\Process;

include_once __DIR__."/config.php";
include_once __DIR__."/function.php";



function start(){
    // 启动一个进程池进行管理
    $topPool = new Process\Pool(WORKER_NUM);
    // 这个是启用协程
    $topPool->set(['enable_coroutine' => true]);
    // 协程配置
    \co::set([
        'max_coroutine'=>1000, // 最大携程数量
        'hook_flags'=>SWOOLE_HOOK_TCP,  //  redis需要的配置
    ]);
    // 开始工作
    $topPool->on('WorkerStart',function (Process\Pool $pool,$worker_id){
        _echo("进程({$worker_id})启动成功");
        // 协程id集
        $cid = [];
        // 删除停止运行的值
        swoole_redis()->delete(RUNNING_REDIS_KEY);

//        \Co\run(function (){
            // 开启多个协程
            foreach (range(1,COROUTINE_NUM) as $i){
                go(function () use ($cid,$worker_id){
                    // 协程id
                    $cid[co::getCid()] = co::getCid();

                    $redis = swoole_redis();

                    // 同步操作
                    while (true){

                        // 开始同步
                        try {
                            sync();
                        }catch (Throwable $e){
                            logs($e->getMessage(),LOG_PATH.'/'.$worker_id.'_'.co::getCid().'.log');
                        }

                        // 是否停止
                        if($redis->get(RUNNING_REDIS_KEY) == 'stop'){
                            break;
                        }
                        // 阻塞1秒
                        co::sleep(1);
                    }

                    // 协程完成后执行的函数
                    co::defer(function () use ($cid){
                        _echo('正常关闭协程('.co::getCid().')');
                        unset($cid[co::getCid()]);
                    });

                });
            }

//        });
        // 是否退出进程
        while (true){
            if($cid){
                co::sleep(1);
            }else{
                _echo('正常关闭进程('.$worker_id.')');
                // 关闭当前进程
                $pool->shutdown();
                break;
            }
        }

    });
    // 停止工作后的回调
    $topPool->on('WorkerStop', function (\Swoole\Process\Pool $pool, $workerId) {
        _echo("[Worker #{$workerId}] WorkerStop\n");
    });

    $topPool->start();

}

/**
 * 开始同步
 * @author:dc
 * @time 2023/2/13 9:42
 */
function sync(){
    db()->first();
}




switch ($argv[1]){
    case 'start':{
        start();
        break;
    }
    case 'stop':{
        \Co\run(function (){
            echo "正在退出程序...\n";
            swoole_redis()->set(RUNNING_REDIS_KEY,'stop');
            while (true){
                $num = exec("ps -ef | grep \"sync_email.php start\" | grep -v grep | wc -l");
                if(!$num){
                    break;
                }
                co::sleep(0.5);
            }
            echo "已退出程序\n";
        });
        break;
    }
    default:{
        echo "参数 :\n";
        echo "\tstart n  n是协程的数量\n";
        echo "\tstop\n";
        echo "\trestart n\n";
        echo PHP_EOL;
        break;
    }
}

exit();