作者 邓超

es 批量添加

@@ -108,8 +108,9 @@ class SyncToEsCmd { @@ -108,8 +108,9 @@ class SyncToEsCmd {
108 $data = $this->getDataByEs($id,$is_check_body); 108 $data = $this->getDataByEs($id,$is_check_body);
109 if($data){ 109 if($data){
110 list($doc_id,$data) = $data; 110 list($doc_id,$data) = $data;
  111 + $bulkData = new \Lib\Es\BulkData();
111 // 主库 112 // 主库
112 - $this->toDataEs('email_lists_copy',$id,$doc_id,$data); 113 + $bulkData->add('email_lists_copy',$doc_id,$data);
113 // 个人邮箱的情况 114 // 个人邮箱的情况
114 if(!$data['postid']){ 115 if(!$data['postid']){
115 $postids = $this->getPostids($data['email_id']); 116 $postids = $this->getPostids($data['email_id']);
@@ -118,17 +119,19 @@ class SyncToEsCmd { @@ -118,17 +119,19 @@ class SyncToEsCmd {
118 $data['postid'] = $postid; 119 $data['postid'] = $postid;
119 $data['source'] = 1; 120 $data['source'] = 1;
120 // 分库 个人邮箱 121 // 分库 个人邮箱
121 - $this->toDataEs('email_lists_branch_'.$data['postid'],$id,$doc_id,$data); 122 + $bulkData ->add('email_lists_branch_'.$postid,$doc_id,$data);
122 } 123 }
123 }else{ 124 }else{
124 // 分库 其他 非fob数据源 125 // 分库 其他 非fob数据源
125 - $this->toDataEs('email_lists_branch_0',$id,$doc_id,$data); 126 + $bulkData->add('email_lists_branch_0',$doc_id,$data);
126 } 127 }
127 }else{ 128 }else{
128 // 分库 129 // 分库
129 - $this->toDataEs('email_lists_branch_'.$data['postid'],$id,$doc_id,$data); 130 + $bulkData->add('email_lists_branch_'.$data['postid'],$doc_id,$data);
130 } 131 }
131 132
  133 + $this->toDataEs($id,$bulkData);
  134 +
132 } 135 }
133 }else{ 136 }else{
134 // 没有数据时暂停1秒 137 // 没有数据时暂停1秒
@@ -287,39 +290,36 @@ class SyncToEsCmd { @@ -287,39 +290,36 @@ class SyncToEsCmd {
287 290
288 /** 291 /**
289 * 同步数据到es 292 * 同步数据到es
290 - * @param string $index 索引名称  
291 - * @param int $data_id list表的id  
292 - * @param string $doc_id es文档id  
293 - * @param array $data es数据 293 + * @param int $data_id
  294 + * @param \Lib\Es\BulkData $bulkData
  295 + * @throws \Elastic\Elasticsearch\Exception\ClientResponseException
  296 + * @throws \Elastic\Elasticsearch\Exception\ServerResponseException
294 * @author:dc 297 * @author:dc
295 - * @time 2025/8/5 10:09 298 + * @time 2025/8/6 20:55
296 */ 299 */
297 - public function toDataEs($index,$data_id,$doc_id,$data){  
298 -// echo $index;  
299 -// echo ' ==== ';  
300 - $this->es->setIndex($index);  
301 - // 检查数据库是否存在  
302 - if(empty($this->checkEsIndex[$index]) && $index != 'email_lists_copy'){  
303 - if(!redis()->has('esmapcheck:'.$index)){  
304 - $m = $this->setEsMap($index);  
305 - if($m !== 9) _echo("{$index} 创建索引 ".$m);  
306 - if(!$m){  
307 - $this->log($data_id,$index);  
308 - return; 300 + public function toDataEs(int $data_id, \Lib\Es\BulkData $bulkData){
  301 +
  302 + foreach ($bulkData->getIndexs() as $index){
  303 + $this->es->setIndex($index);
  304 + // 检查数据库是否存在
  305 + if(empty($this->checkEsIndex[$index]) && $index != 'email_lists_copy'){
  306 + if(!redis()->has('esmapcheck:'.$index)){
  307 + $m = $this->setEsMap($index);
  308 + if($m !== 9) _echo("{$index} 创建索引 ".$m);
  309 + redis()->set('esmapcheck:'.$index,1,86400);
309 } 310 }
310 - redis()->set('esmapcheck:'.$index,1,86400);  
311 } 311 }
  312 + // 下次不在检查
  313 + $this->checkEsIndex[$index] = 1;
312 } 314 }
313 - // 下次不在检查  
314 - $this->checkEsIndex[$index] = 1;  
315 -  
316 - $code = $this->es->save($doc_id,$data);  
317 -  
318 - if($code!==200){  
319 - $this->log($data_id,$index);  
320 - @file_put_contents(LOG_PATH.'/sync_es_fail.log',$index.":".$data_id."\n",FILE_APPEND);  
321 - _echo($index.': '.$data_id.'===>'.$code); 315 + // 批量提交数据的
  316 + $ret = $this->es->bulk($bulkData);
  317 + if(!empty($ret['errors'])){
  318 + $this->log($data_id);
  319 + @file_put_contents(LOG_PATH.'/sync_es_fail.log',$data_id."\n",FILE_APPEND);
  320 + _echo($data_id.' ===> 0');
322 } 321 }
  322 +
323 } 323 }
324 324
325 /** 325 /**
@@ -38,6 +38,16 @@ class BulkData { @@ -38,6 +38,16 @@ class BulkData {
38 38
39 39
40 /** 40 /**
  41 + * 获取所有索引名称
  42 + * @return array
  43 + * @author:dc
  44 + * @time 2025/8/6 16:45
  45 + */
  46 + public function getIndexs(): array {
  47 + return array_keys($this->data);
  48 + }
  49 +
  50 + /**
41 * 转换成es可识别的数据 51 * 转换成es可识别的数据
42 * @return array 52 * @return array
43 * @author:dc 53 * @author:dc