作者 邓超

优化同步

@@ -2,108 +2,54 @@ @@ -2,108 +2,54 @@
2 2
3 //error_reporting(); 3 //error_reporting();
4 4
5 -use Swoole\Process;  
6 -  
7 -  
8 -  
9 -function start(){  
10 -  
11 -// 删除停止运行的值  
12 -// redis()->delete(SYNC_RUNNING_REDIS_KEY,'email_sync_stop_num');  
13 -  
14 - // 进程管理器  
15 - $pm = new Process\Manager();  
16 -  
17 - // 启动业务进程  
18 - $pm->addBatch(10,function (Process\Pool $pool, int $worker_id){  
19 -  
20 - swoole_set_process_name('php-email-sync-list-'.$worker_id);  
21 -  
22 - include_once __DIR__."/../vendor/autoload.php";  
23 - _echo("业务进程({$worker_id})启动成功");  
24 -  
25 - $start_time = time();  
26 -  
27 - $goNum = 0;  
28 - // 循环阻塞  
29 - while (true){  
30 - if($start_time+43200 < time()){  
31 - if($start_time+43400 < time()){  
32 - break;  
33 - }  
34 - co::sleep(1);  
35 - }  
36 - while ($goNum > 50){  
37 - co::sleep(0.5);  
38 - continue;  
39 - }  
40 - // 需要同步的id  
41 - $id = redis()->lPop('sync_email_lists');  
42 -  
43 - if($id && is_numeric($id)){  
44 -  
45 - // 占用当前的id,占用2小时  
46 - if(redis()->add('just_sync_'.$id,time(),600)){  
47 -// redis()->set('sync_my_pid:'.getmypid(),time(),86400);  
48 - // 启动一个协程  
49 - go(function () use ($id,&$goNum){  
50 - $goNum++;  
51 - try{  
52 - // 开始同步  
53 - $email = db()->cache(3600)->first(\Model\emailSql::first($id));  
54 - if($email){  
55 - $sync = new \Service\SyncMail($email);  
56 - // ai邮件只同步2天内的  
57 - $sync->search(  
58 - (new \Lib\Imap\ImapSearch())  
59 - ->dateGt(date('Y-m-d',strtotime("-1 day")))  
60 - );  
61 - $sync->isUidAfter(2)->sync();  
62 -  
63 - $sync = null;  
64 - unset($sync);  
65 - }  
66 -  
67 - }catch (Throwable $e){  
68 - logs('sync : '.$e->getMessage());  
69 - }  
70 -  
71 -  
72 - // 协程完成后执行的函数  
73 - co::defer(function () use ($id,&$goNum){  
74 - $goNum--;  
75 - // 30秒后 消除占用  
76 - redis()->expire('just_sync_'.$id,120);  
77 - // 写入日志  
78 - \Lib\Log::getInstance()->write();  
79 - });  
80 -  
81 - }); 5 +include_once __DIR__."/../vendor/autoload.php";
  6 +
  7 +$runNumber = 1000;
  8 +// 循环阻塞
  9 +while ($runNumber){
  10 + $runNumber--;
  11 + // 需要同步的id
  12 + $id = redis()->lPop('sync_email_lists');
  13 +
  14 + if($id && is_numeric($id)){
  15 +
  16 + // 占用当前的id,占用2小时
  17 + if(redis()->add('just_sync_'.$id,time(),600)){
  18 +
  19 + try{
  20 + // 开始同步
  21 + $email = db()->cache(3600)->first(\Model\emailSql::first($id));
  22 + if($email){
  23 + $sync = new \Service\SyncMail($email);
  24 + // ai邮件只同步2天内的
  25 + $sync->search(
  26 + (new \Lib\Imap\ImapSearch())
  27 + ->dateGt(date('Y-m-d',strtotime("-1 day")))
  28 + );
  29 + $sync->isUidAfter(2)->sync();
  30 +
  31 + $sync = null;
  32 + unset($sync);
82 } 33 }
  34 +
  35 + }catch (Throwable $e){
  36 + logs('sync : '.$e->getMessage());
83 } 37 }
84 38
85 - //每次都暂停1秒,防止同一时间启动太多的任务  
86 - co::sleep(1); 39 + // 30秒后 消除占用
  40 + redis()->expire('just_sync_'.$id,120);
87 41
  42 + // 写入日志
  43 + \Lib\Log::getInstance()->write();
88 } 44 }
89 -  
90 - return 0;  
91 -  
92 - },true);  
93 -  
94 -  
95 - // 启动管理器  
96 - $pm->start();  
97 - 45 + }else{
  46 + break;
  47 + }
98 } 48 }
99 49
100 50
101 51
102 52
103 -start();  
104 -  
105 -  
106 -  
107 53
108 54
109 55
@@ -48,6 +48,7 @@ class Es { @@ -48,6 +48,7 @@ class Es {
48 ->setHosts($this->host) 48 ->setHosts($this->host)
49 // ->setBasicAuthentication($user, $password) 49 // ->setBasicAuthentication($user, $password)
50 // ->setCABundle('path/to/http_ca.crt') 50 // ->setCABundle('path/to/http_ca.crt')
  51 + ->setSSLVerification(false) // 关闭 SSL 证书验证
51 ->build(); 52 ->build();
52 } 53 }
53 54