作者 邓超

优化同步

@@ -14,6 +14,7 @@ class SyncToEsCmd { @@ -14,6 +14,7 @@ class SyncToEsCmd {
14 14
15 public $isStop = false; 15 public $isStop = false;
16 16
  17 + static $num = 0;
17 18
18 public function handler(){ 19 public function handler(){
19 20
@@ -27,9 +28,6 @@ class SyncToEsCmd { @@ -27,9 +28,6 @@ class SyncToEsCmd {
27 // pcntl_signal(SIGHUP, $handler); 28 // pcntl_signal(SIGHUP, $handler);
28 29
29 30
30 - $es = es();  
31 - $db = db();  
32 -  
33 while (1){ 31 while (1){
34 32
35 // 检查是否接收到信号 33 // 检查是否接收到信号
@@ -41,26 +39,45 @@ class SyncToEsCmd { @@ -41,26 +39,45 @@ class SyncToEsCmd {
41 } 39 }
42 40
43 $id = redis()->lPop('sync_to_es'); 41 $id = redis()->lPop('sync_to_es');
44 - $code = 500;  
45 if($id){ 42 if($id){
46 - $data = $db->first(\Model\listsSql::first('`id` = '.$id));  
47 - if($data){  
48 - // 设置 进程 是否在运行  
49 - $data['is_auto']=$db->count('select count(*) from `lists_auto` where `list_id` = '.$data['id']) ? 1 : 0; 43 + while(self::$num>=50){
  44 + co::sleep(0.5);
  45 + }
  46 + go(function () use ($id){
  47 + static::$num++;
  48 + $db = db();
  49 + $code = 500;
50 50
51 - $data = $this->getEsData($data);  
52 - $doc_id = $data['email_id'].'_'.$data['folder_id'].'_'.$data['uid']; 51 + $data = $db->first(\Model\listsSql::first('`id` = '.$id));
  52 + if($data){
  53 + // 设置 进程 是否在运行
  54 + $data['is_auto']=$db->count('select count(*) from `lists_auto` where `list_id` = '.$data['id']) ? 1 : 0;
53 55
54 - $code = $es->save($doc_id,$data);  
55 - }  
56 - } 56 + $data = $this->getEsData($data);
  57 + $doc_id = $data['email_id'].'_'.$data['folder_id'].'_'.$data['uid'];
  58 +
  59 + $code = es()->save($doc_id,$data);
  60 + }
  61 +
  62 + if($code!==200){
  63 + @file_put_contents(LOG_PATH.'/sync_es_fail.log',$id."\n",FILE_APPEND);
  64 + _echo('同步es: '.$doc_id.'===>'.$code);
  65 + }
57 66
58 - if($code!==200){  
59 - @file_put_contents(LOG_PATH.'/sync_es_fail.log',$id."\n",FILE_APPEND);  
60 - _echo('同步es: '.$doc_id.'===>'.$code); 67 + $db = null;
  68 +
  69 + co::defer(function (){
  70 + static::$num--;
  71 +
  72 + db()->close();
  73 +
  74 + \Lib\Log::getInstance()->write();
  75 + });
  76 + });
  77 + }else{
  78 + co::sleep(1);
61 } 79 }
62 80
63 - sleep(1);  
64 } 81 }
65 82
66 } 83 }
@@ -91,7 +108,12 @@ class SyncToEsCmd { @@ -91,7 +108,12 @@ class SyncToEsCmd {
91 108
92 } 109 }
93 110
94 -(new SyncToEsCmd())->handler(); 111 +\Lib\DbPool::$clientNumber = 60;
  112 +
  113 +\Co\run(function (){
  114 + (new SyncToEsCmd())->handler();
  115 +});
  116 +
95 117
96 return 1; 118 return 1;
97 119