作者 邓超

同步到es 改成批量

@@ -89,58 +89,84 @@ class SyncToEsCmd { @@ -89,58 +89,84 @@ class SyncToEsCmd {
89 */ 89 */
90 public $es; 90 public $es;
91 91
  92 + /**
  93 + * @var \Lib\Es\BulkData
  94 + */
  95 + protected $bulkData;
  96 +
92 97
93 public function handler(){ 98 public function handler(){
94 99
95 $this->es = es('email_lists_copy'); // 第二个库 新 100 $this->es = es('email_lists_copy'); // 第二个库 新
96 101
97 - while (!$this->isStop()){ 102 + $this->bulkData = new \Lib\Es\BulkData();
  103 +
  104 + $ids = [];
98 105
  106 + while (!$this->isStop()){
  107 + $nowSubmit = false;
99 $id = redis()->lPop('sync_to_es'); 108 $id = redis()->lPop('sync_to_es');
100 if($id){ 109 if($id){
  110 + $ids[] = $id;
  111 + $this->bulkData($id);
  112 + }else{
  113 + $nowSubmit = true;
  114 + // 没有数据时暂停1秒
  115 + sleep(1);
  116 + }
  117 + if(!$this->toDataEs($nowSubmit)){
  118 + $this->log($ids);
  119 + $ids = [];
  120 + }
  121 + }
101 122
102 - $is_check_body = false;  
103 - if(str_contains($id, '.')){  
104 - $id = explode('.',$id)[0];  
105 - $is_check_body = true;  
106 - } 123 + if(!$this->toDataEs(true)){
  124 + $this->log($ids);
  125 + }
107 126
108 - $data = $this->getDataByEs($id,$is_check_body);  
109 - if($data){  
110 - list($doc_id,$data) = $data;  
111 - $bulkData = new \Lib\Es\BulkData();  
112 - // 主库  
113 - $bulkData->add('email_lists_copy',$doc_id,$data);  
114 - // 个人邮箱的情况  
115 - if(!$data['postid']){  
116 - $postids = $this->getPostids($data['email_id']);  
117 - if($postids){  
118 - foreach ($postids as $postid){  
119 - $data['postid'] = $postid;  
120 - $data['source'] = 1;  
121 - // 分库 个人邮箱  
122 - $bulkData ->add('email_lists_branch_'.$postid,$doc_id,$data);  
123 - }  
124 - }else{  
125 - // 分库 其他 非fob数据源  
126 - $bulkData->add('email_lists_branch_0',$doc_id,$data);  
127 - }  
128 - }else{  
129 - // 分库  
130 - $bulkData->add('email_lists_branch_'.$data['postid'],$doc_id,$data);  
131 - } 127 + }
132 128
133 - $this->toDataEs($id,$bulkData); 129 + /**
  130 + * 批量处理数据并存储到ES
  131 + *
  132 + * @param string $id 数据ID,如果包含点号则只取点号前的部分
  133 + * @return void
  134 + */
  135 + public function bulkData($id){
  136 + $is_check_body = false;
  137 + if(str_contains($id, '.')){
  138 + $id = explode('.',$id)[0];
  139 + $is_check_body = true;
  140 + }
134 141
  142 + $data = $this->getDataByEs($id,$is_check_body);
  143 + if($data){
  144 + list($doc_id,$data) = $data;
  145 + // 主库
  146 + $this->bulkData->add('email_lists_copy',$doc_id,$data);
  147 + // 个人邮箱的情况
  148 + if(!$data['postid']){
  149 + $postids = $this->getPostids($data['email_id']);
  150 + if($postids){
  151 + foreach ($postids as $postid){
  152 + $data['postid'] = $postid;
  153 + $data['source'] = 1;
  154 + // 分库 个人邮箱
  155 + $this->bulkData ->add('email_lists_branch_'.$postid,$doc_id,$data);
  156 + }
  157 + }else{
  158 + // 分库 其他 非fob数据源
  159 + $this->bulkData->add('email_lists_branch_0',$doc_id,$data);
135 } 160 }
136 }else{ 161 }else{
137 - // 没有数据时暂停1秒  
138 - sleep(1); 162 + // 分库
  163 + $this->bulkData->add('email_lists_branch_'.$data['postid'],$doc_id,$data);
139 } 164 }
140 - }  
141 165
  166 + }
142 } 167 }
143 168
  169 +
144 /** 170 /**
145 * 个人邮箱情况 171 * 个人邮箱情况
146 * @param $email_id 172 * @param $email_id
@@ -173,7 +199,7 @@ class SyncToEsCmd { @@ -173,7 +199,7 @@ class SyncToEsCmd {
173 $data = $this->db->throw()->first(\Model\listsSql::firstHot('`id` = '.$id)); 199 $data = $this->db->throw()->first(\Model\listsSql::firstHot('`id` = '.$id));
174 } 200 }
175 }catch (Throwable $e){ 201 }catch (Throwable $e){
176 - $this->log($id); 202 + $this->log([$id]);
177 // redis()->rPush('sync_to_es',$origin_id); 203 // redis()->rPush('sync_to_es',$origin_id);
178 _echo('sync to es '.$id.":".$e->getMessage()); 204 _echo('sync to es '.$id.":".$e->getMessage());
179 return false; 205 return false;
@@ -208,7 +234,7 @@ class SyncToEsCmd { @@ -208,7 +234,7 @@ class SyncToEsCmd {
208 list($data['postid'],$data['source']) = $this->getPostid($data['email_id'],$data['udate']); 234 list($data['postid'],$data['source']) = $this->getPostid($data['email_id'],$data['udate']);
209 235
210 }catch (Throwable $e){ 236 }catch (Throwable $e){
211 - $this->log($id); 237 + $this->log([$id]);
212 // redis()->rPush('sync_to_es',$origin_id); 238 // redis()->rPush('sync_to_es',$origin_id);
213 _echo('sync to es '.$id.":".$e->getMessage()); 239 _echo('sync to es '.$id.":".$e->getMessage());
214 return false; 240 return false;
@@ -287,19 +313,35 @@ class SyncToEsCmd { @@ -287,19 +313,35 @@ class SyncToEsCmd {
287 return $this->setEsMap($index); 313 return $this->setEsMap($index);
288 } 314 }
289 315
  316 + /**
  317 + * @var int
  318 + */
  319 + protected $pervSubmitTime = 0;
290 320
291 /** 321 /**
292 * 同步数据到es 322 * 同步数据到es
293 - * @param int $data_id  
294 - * @param \Lib\Es\BulkData $bulkData 323 + * @param bool $nowSubmit 是否立即提交
  324 + * @return bool
295 * @throws \Elastic\Elasticsearch\Exception\ClientResponseException 325 * @throws \Elastic\Elasticsearch\Exception\ClientResponseException
296 * @throws \Elastic\Elasticsearch\Exception\ServerResponseException 326 * @throws \Elastic\Elasticsearch\Exception\ServerResponseException
297 * @author:dc 327 * @author:dc
298 - * @time 2025/8/6 20:55 328 + * @time 2025/8/7 10:29
299 */ 329 */
300 - public function toDataEs(int $data_id, \Lib\Es\BulkData $bulkData){ 330 + public function toDataEs(bool $nowSubmit){
  331 + // 不立即提交
  332 + if (!$nowSubmit){
  333 + if($this->bulkData->total() < 10){
  334 + // 不足10条时 满足2秒也提交
  335 + if ($this->pervSubmitTime + 2 > time()){
  336 + return true;
  337 + }
  338 + }
  339 + }
301 340
302 - foreach ($bulkData->getIndexs() as $index){ 341 + // 上一次提交的时间
  342 + $this->pervSubmitTime = time();
  343 +
  344 + foreach ($this->bulkData->getIndexs() as $index){
303 $this->es->setIndex($index); 345 $this->es->setIndex($index);
304 // 检查数据库是否存在 346 // 检查数据库是否存在
305 if(empty($this->checkEsIndex[$index]) && $index != 'email_lists_copy'){ 347 if(empty($this->checkEsIndex[$index]) && $index != 'email_lists_copy'){
@@ -313,24 +355,26 @@ class SyncToEsCmd { @@ -313,24 +355,26 @@ class SyncToEsCmd {
313 $this->checkEsIndex[$index] = 1; 355 $this->checkEsIndex[$index] = 1;
314 } 356 }
315 // 批量提交数据的 357 // 批量提交数据的
316 - $ret = $this->es->bulk($bulkData); 358 + $ret = $this->es->bulk($this->bulkData);
  359 +
317 if(!empty($ret['errors'])){ 360 if(!empty($ret['errors'])){
318 - $this->log($data_id);  
319 - @file_put_contents(LOG_PATH.'/sync_es_fail.log',$data_id."\n",FILE_APPEND);  
320 - _echo($data_id.' ===> 0'); 361 + @file_put_contents(LOG_PATH.'/sync_es_fail.error.log',print_r($ret['errors'],1)."\n",FILE_APPEND|LOCK_EX);
321 } 362 }
322 - 363 + // 清空
  364 + $this->bulkData->clear();
  365 + // 为空表示提交成功
  366 + return empty($ret['errors']);
323 } 367 }
324 368
325 /** 369 /**
326 * 记录日志 370 * 记录日志
327 - * @param $id 371 + * @param array $ids
328 * @param string $index 372 * @param string $index
329 * @author:dc 373 * @author:dc
330 * @time 2025/8/5 10:17 374 * @time 2025/8/5 10:17
331 */ 375 */
332 - public function log($id,$index=''){  
333 - @file_put_contents(LOG_PATH.'/sync_es_fail.log',$index.":".$id."\n",FILE_APPEND); 376 + public function log(array $ids){
  377 + file_put_contents(LOG_PATH.'/sync_es_fail.log',implode("\n",$ids)."\n",FILE_APPEND|LOCK_EX);
334 } 378 }
335 379
336 380
@@ -67,7 +67,36 @@ class BulkData { @@ -67,7 +67,36 @@ class BulkData {
67 return $params; 67 return $params;
68 } 68 }
69 69
  70 + /**
  71 + * 清除 数据
  72 + * @author:dc
  73 + * @time 2025/8/7 9:55
  74 + */
  75 + public function clear(){
  76 + $this->data = [];
  77 + }
70 78
  79 + /**
  80 + * 是否为空
  81 + * @return bool
  82 + * @author:dc
  83 + * @time 2025/8/7 9:54
  84 + */
  85 + public function isEmpty():bool
  86 + {
  87 + return empty($this->data);
  88 + }
  89 +
  90 + /**
  91 + * 统计数量
  92 + * @return int
  93 + * @author:dc
  94 + * @time 2025/8/7 9:56
  95 + */
  96 + public function total():int
  97 + {
  98 + return count($this->data);
  99 + }
71 100
72 101
73 } 102 }