作者 邓超

x

正在显示 1 个修改的文件 包含 65 行增加26 行删除
@@ -4,43 +4,82 @@ @@ -4,43 +4,82 @@
4 4
5 use Swoole\Process; 5 use Swoole\Process;
6 6
7 -include_once __DIR__."/../vendor/autoload.php";  
8 -  
9 -//exec("ps -aux|grep -E \"mail-serve-sync-list\" -c",$exec);  
10 -//if(intval($exec[0]) > 200){  
11 -// exit;  
12 -//}  
13 -  
14 -swoole_set_process_name("mail-serve-sync-list");  
15 -  
16 -while (1){  
17 - $id = redis()->lPop('sync_email_lists');  
18 - if($id && is_numeric($id)){  
19 - // 占用当前的id,占用2小时  
20 - if(redis()->add('just_sync_'.$id,time(),600)){  
21 - // 启动一个协程  
22 -  
23 - try{  
24 - // 开始同步  
25 - (new \Service\SyncMail($id))->sync();  
26 - }catch (Throwable $e){  
27 - logs('sync : '.$e->getMessage()); 7 +
  8 +
  9 +function start(){
  10 +
  11 +// 删除停止运行的值
  12 +// redis()->delete(SYNC_RUNNING_REDIS_KEY,'email_sync_stop_num');
  13 +
  14 + // 进程管理器
  15 + $pm = new Process\Manager();
  16 +
  17 + // 启动业务进程
  18 + $pm->addBatch(10,function (Process\Pool $pool, int $worker_id){
  19 +
  20 + swoole_set_process_name('php-email-sync-list-'.$worker_id);
  21 +
  22 + include_once __DIR__."/../vendor/autoload.php";
  23 + _echo("业务进程({$worker_id})启动成功");
  24 +
  25 + $goNum = 0;
  26 + // 循环阻塞
  27 + while (true){
  28 +
  29 + // 需要同步的id
  30 + $id = redis()->lPop('sync_email_lists');
  31 +
  32 + if($id && is_numeric($id)){
  33 + // 占用当前的id,占用2小时
  34 + if(redis()->add('just_sync_'.$id,time(),600)){
  35 + // 启动一个协程
  36 + go(function () use ($id,&$goNum){
  37 + $goNum++;
  38 + try{
  39 + // 开始同步
  40 + (new \Service\SyncMail($id))->sync();
  41 + }catch (Throwable $e){
  42 + logs('sync : '.$e->getMessage());
  43 + }
  44 +
  45 +
  46 + // 协程完成后执行的函数
  47 + co::defer(function () use ($id,&$goNum){
  48 + $goNum--;
  49 + // 30秒后 消除占用
  50 + redis()->expire('just_sync_'.$id,300);
  51 + // 写入日志
  52 + \Lib\Log::getInstance()->write();
  53 + });
  54 +
  55 + });
  56 + }
28 } 57 }
29 58
  59 + //每次都暂停1秒,防止同一时间启动太多的任务
  60 + co::sleep(0.5);
30 61
31 - // 30秒后 消除占用  
32 - redis()->expire('just_sync_'.$id,300); 62 + while ($goNum > 50){
  63 + co::sleep(1);
  64 + break;
  65 + }
33 66
34 - break;  
35 } 67 }
36 - }  
37 68
38 - usleep(1000); 69 + },true);
  70 +
  71 +
  72 + // 启动管理器
  73 + $pm->start();
  74 +
39 } 75 }
40 76
41 77
42 78
43 79
  80 +start();
  81 +
  82 +
44 83
45 84
46 85