作者 邓超

1

<?php
\Swoole\Timer::tick(2000,function (){
echo '1';
});
\ No newline at end of file
... ...
... ... @@ -22,64 +22,61 @@ function start(){
// 初始时,进行一次统计
$table->set('etotal',['val'=> db()->count(\Model\email::count())]);
// 启动一个进程池进行管理
$topPool = new Process\Pool(WORKER_NUM);
// 每10分钟统计一次邮箱数量
$timer_id = \Swoole\Timer::tick(600,function () use (&$table){
$table->set('etotal',['val'=> db()->count(\Model\email::count())]);
});
// 进程管理器
$pm = new Process\Manager();
// 启动一个进程来管理定时
$pm->add(function (Process\Pool $pool, int $workerId)use (&$table){
_echo("进程({$workerId})启动成功");
// 每10分钟统计一次邮箱数量
\Swoole\Timer::tick(600000,function () use (&$table){
$table->set('etotal',['val'=> db()->count(\Model\email::count())]);
});
// 每2秒执行一次
$timer_check_id = \Swoole\Timer::tick(2,function () use (&$table,$topPool){
// 是否停止脚本
$table->set('stop',['val'=> redis()->get(RUNNING_REDIS_KEY) === 'stop' ? 1 : 0]);
// 检查是否结束了所有的协程同步代码
if ($table->get('stop','val')) {
$stop_num = 0;
foreach (range(0, WORKER_NUM) as $i) {
if ($table->exists('ps' . $i)) {
$stop_num++;
// 每2秒执行一次
\Swoole\Timer::tick(2000,function () use (&$table,&$pool){
// 是否停止脚本
$table->set('stop',['val'=> redis()->get(RUNNING_REDIS_KEY) === 'stop' ? 1 : 0]);
// _echo('定时器');
// 检查是否结束了所有的协程同步代码
if ($table->get('stop','val')) {
$stop_num = 0;
foreach (range(0, WORKER_NUM) as $i) {
if ($table->exists('ps' . $i)) {
$stop_num++;
}
}
}
if($stop_num >= WORKER_NUM){
if($stop_num >= WORKER_NUM){
// 退出进程
$topPool->shutdown();
$pool->shutdown();
}
}
}
// 邮件总数
$total = redis()->get('email_total',0);
if($total > $table->get('etotal','val')){
$table->set('etotal',['val'=> $total]);
}
// 邮件总数
$total = redis()->get('email_total',0);
if($total > $table->get('etotal','val')){
$table->set('etotal',['val'=> $total]);
}
});
});
// 进行阻塞,否则定时器无法运行
while (true){
co::sleep(9999);
}
},true);
// 这个是启用协程
$topPool->set(['enable_coroutine' => true]);
// 协程配置
// 协程配置
\co::set([
'max_coroutine'=>COROUTINE_MAX_NUM, // 最大携程数量
'hook_flags'=>SWOOLE_HOOK_TCP, // redis需要的配置
]);
// 开始工作
$topPool->on('WorkerStart',function (Process\Pool $pool,$worker_id) use (&$table){
// 启动业务进程
$pm->addBatch(WORKER_NUM,function (Process\Pool $pool, int $worker_id) use (&$table){
_echo("进程({$worker_id})启动成功");
// 是否停止,这里进行阻塞
if ($table->get('stop','val')){
// 某个进程退出了
$table->set('ps'.$worker_id,['val'=>1]);
co::sleep(1);
return true;
}
// 协程id集
$cid = [];
... ... @@ -132,19 +129,25 @@ function start(){
// 跳出无限循环了
if(!$cid){
_echo('正常关闭进程('.$worker_id.')');
// 关闭当前进程
// $pool->shutdown();
break;
}
}
});
// 停止工作后的回调
$topPool->on('WorkerStop', function (\Swoole\Process\Pool $pool, $workerId) {
_echo("[Worker #{$workerId}] WorkerStop\n");
});
// 是否停止,这里进行阻塞
if ($table->get('stop','val')){
// 某个进程退出了
$table->set('ps'.$worker_id,['val'=>1]);
// 阻塞直到 主进程 kill掉所有子进程
while (true){
co::sleep(5);
}
}
},true);
$topPool->start();
$pm->start();
}
... ... @@ -165,7 +168,7 @@ function create_coroutine(array &$cid,int &$isRunMaxCNum,$worker_id){
while (true){
// 是否退出协程
if(!$isRunMaxCNum){
_echo('协程('.co::getCid().'): stop '.$isRunMaxCNum);
// _echo('协程('.co::getCid().'): stop '.$isRunMaxCNum);
break;
}
... ... @@ -182,8 +185,8 @@ function create_coroutine(array &$cid,int &$isRunMaxCNum,$worker_id){
}
// 协程完成后执行的函数
co::defer(function () use (&$cid){
_echo('正常关闭协程('.co::getCid().')');
co::defer(function () use (&$cid,$worker_id){
_echo('正常关闭进程('.$worker_id.')下的协程('.co::getCid().')');
unset($cid[co::getCid()]);
});
... ...
<?php
echo co::getCid();
\ No newline at end of file
# 环境要求
* php >= 8.0
* swoole >= 5.0
* mysql >= 5.7
* 必须使用linux系统
## 脚本
1.启动 ,请以守护进程模式运行
~~~
php sync_email.php start
~~~
2.退出
~~~
php sync_email.php stop
~~~
3.注意:如果使用 进程守护器(supervisord) 执行退出后会被 守护器重新运行
... ...