作者 邓超

1

@@ -18,16 +18,24 @@ function start(){ @@ -18,16 +18,24 @@ function start(){
18 $pm = new Process\Manager(); 18 $pm = new Process\Manager();
19 19
20 // 启动一个进程来管理定时 20 // 启动一个进程来管理定时
21 -// $pm->add(function (Process\Pool $pool, int $workerId){  
22 -// _echo("定时进程({$workerId})启动成功");  
23 -//  
24 -//  
25 -// // 进行阻塞,否则定时器无法运行  
26 -// while (true){  
27 -// co::sleep(9999);  
28 -// }  
29 -//  
30 -// },true); 21 + $pm->add(function (Process\Pool $pool, int $workerId){
  22 + _echo("定时进程({$workerId})启动成功");
  23 +
  24 + \Swoole\Timer::tick(1000,function() use(&$pool){
  25 +
  26 + if(redis()->getOriginData('email_sync_stop_num') >= WORKER_NUM+2 ){
  27 + $pool->shutdown();
  28 + }
  29 +
  30 + });
  31 +
  32 +
  33 + // 进行阻塞,否则定时器无法运行
  34 + while (true){
  35 + co::sleep(9999);
  36 + }
  37 +
  38 + },true);
31 39
32 40
33 // 协程配置 41 // 协程配置
@@ -44,6 +52,11 @@ function start(){ @@ -44,6 +52,11 @@ function start(){
44 52
45 // 循环阻塞 53 // 循环阻塞
46 while (true){ 54 while (true){
  55 +
  56 + if(redis()->get(SYNC_RUNNING_REDIS_KEY)=='stop'){
  57 + break;
  58 + }
  59 +
47 // 是否到了协程配置的数量上限 60 // 是否到了协程配置的数量上限
48 if($start_num < COROUTINE_MAX_NUM){ 61 if($start_num < COROUTINE_MAX_NUM){
49 // 需要同步的id 62 // 需要同步的id
@@ -99,8 +112,20 @@ function start(){ @@ -99,8 +112,20 @@ function start(){
99 112
100 } 113 }
101 114
  115 + // 验证是否全部进程退出了
  116 + while (true){
  117 + if(!$start_num){
  118 + redis()->incr('email_sync_stop_num');
  119 + break;
  120 + }
  121 + co::sleep(0.5);
  122 + }
  123 + while (true){
  124 + co::sleep(99);
  125 + }
102 126
103 },true); 127 },true);
  128 +
104 // 启动一个同步内容的进程 129 // 启动一个同步内容的进程
105 $pm->add(function (Process\Pool $pool, int $worker_id){ 130 $pm->add(function (Process\Pool $pool, int $worker_id){
106 _echo("业务进程({$worker_id})启动成功,body"); 131 _echo("业务进程({$worker_id})启动成功,body");
@@ -109,8 +134,11 @@ function start(){ @@ -109,8 +134,11 @@ function start(){
109 134
110 // 循环阻塞 135 // 循环阻塞
111 while (true){ 136 while (true){
  137 + if(redis()->get(SYNC_RUNNING_REDIS_KEY)=='stop'){
  138 + break;
  139 + }
112 // 是否到了协程配置的数量上限 140 // 是否到了协程配置的数量上限
113 - if($start_num < 50){ 141 + if($start_num < 500){
114 // 需要同步的id 142 // 需要同步的id
115 $id = redis()->lPop('sync_email_body'); 143 $id = redis()->lPop('sync_email_body');
116 144
@@ -162,6 +190,17 @@ function start(){ @@ -162,6 +190,17 @@ function start(){
162 190
163 } 191 }
164 192
  193 + // 验证是否全部进程退出了
  194 + while (true){
  195 + if(!$start_num){
  196 + redis()->incr('email_sync_stop_num');
  197 + break;
  198 + }
  199 + co::sleep(0.5);
  200 + }
  201 + while (true){
  202 + co::sleep(99);
  203 + }
165 204
166 },true); 205 },true);
167 206
@@ -72,6 +72,17 @@ class RedisPool { @@ -72,6 +72,17 @@ class RedisPool {
72 } 72 }
73 73
74 /** 74 /**
  75 + * 获取原数据
  76 + * @param $key
  77 + * @return false|mixed|string
  78 + * @author:dc
  79 + * @time 2023/4/12 16:51
  80 + */
  81 + public function getOriginData($key){
  82 + return $this->getClient()->get($key);
  83 + }
  84 +
  85 + /**
75 * @param $key 86 * @param $key
76 * @param $val 87 * @param $val
77 * @param null $ttl 88 * @param null $ttl