作者 邓超

同步

正在显示 1 个修改的文件 包含 28 行增加68 行删除
@@ -4,85 +4,45 @@ @@ -4,85 +4,45 @@
4 4
5 use Swoole\Process; 5 use Swoole\Process;
6 6
7 - 7 +include_once __DIR__."/../vendor/autoload.php";
  8 +swoole_set_process_name('php-email-sync-list');
8 9
9 function start(){ 10 function start(){
10 -  
11 -// 删除停止运行的值  
12 -// redis()->delete(SYNC_RUNNING_REDIS_KEY,'email_sync_stop_num');  
13 -  
14 - // 进程管理器  
15 - $pm = new Process\Manager();  
16 -  
17 - // 启动业务进程  
18 - $pm->addBatch(10,function (Process\Pool $pool, int $worker_id){  
19 -  
20 - swoole_set_process_name('php-email-sync-list-'.$worker_id);  
21 -  
22 - include_once __DIR__."/../vendor/autoload.php";  
23 - _echo("业务进程({$worker_id})启动成功");  
24 -  
25 - $goNum = 0;  
26 - $timer = time();  
27 - // 循环阻塞  
28 - while (true){  
29 -  
30 - if(time()-$timer > 3600){  
31 - break; 11 + // 需要同步的id
  12 + $id = redis()->lPop('sync_email_lists');
  13 +
  14 + if($id && is_numeric($id)){
  15 + // 占用当前的id,占用2小时
  16 + if(redis()->add('just_sync_'.$id,time(),600)){
  17 + try{
  18 + // 开始同步
  19 + (new \Service\SyncMail($id))->sync();
  20 + }catch (Throwable $e){
  21 + logs('sync : '.$e->getMessage());
32 } 22 }
  23 + // 30秒后 消除占用
  24 + redis()->expire('just_sync_'.$id,60);
  25 + // 写入日志
  26 + \Lib\Log::getInstance()->write();
33 27
34 -  
35 - // 需要同步的id  
36 - $id = redis()->lPop('sync_email_lists');  
37 -  
38 - if($id && is_numeric($id)){  
39 - // 占用当前的id,占用2小时  
40 - if(redis()->add('just_sync_'.$id,time(),600)){  
41 - // 启动一个协程  
42 - go(function () use ($id,&$goNum){  
43 - $goNum++;  
44 - try{  
45 - // 开始同步  
46 - (new \Service\SyncMail($id))->sync();  
47 - }catch (Throwable $e){  
48 - logs('sync : '.$e->getMessage());  
49 - }  
50 -  
51 -  
52 - // 协程完成后执行的函数  
53 - co::defer(function () use ($id,&$goNum){  
54 - $goNum--;  
55 - // 30秒后 消除占用  
56 - redis()->expire('just_sync_'.$id,60);  
57 - // 写入日志  
58 - \Lib\Log::getInstance()->write();  
59 - });  
60 -  
61 - });  
62 - while ($goNum == 0){  
63 - co::sleep(0.5);  
64 - break;  
65 - }  
66 - }  
67 - }else{  
68 - co::sleep(1);  
69 - }  
70 - //每次都暂停1秒,防止同一时间启动太多的任务  
71 - co::sleep(0.5);  
72 } 28 }
73 -  
74 - },true);  
75 -  
76 -  
77 - // 启动管理器  
78 - $pm->start(); 29 + }else{
  30 + co::sleep(1);
  31 + }
  32 + //每次都暂停1秒,防止同一时间启动太多的任务
  33 + co::sleep(0.5);
79 34
80 } 35 }
81 36
82 37
83 38
  39 +\Co\run(function (){
  40 + // 循环阻塞
  41 + while (true) {
  42 + start();
  43 + }
  44 +});
84 45
85 -start();  
86 46
87 47
88 48