作者 邓超

es

@@ -83,17 +83,20 @@ class SyncToEsCmd { @@ -83,17 +83,20 @@ class SyncToEsCmd {
83 return $this->isStop; 83 return $this->isStop;
84 } 84 }
85 85
  86 + /**
  87 + * es链接
  88 + * @var \Lib\Es\Es
  89 + */
  90 + public $es;
  91 +
86 92
87 public function handler(){ 93 public function handler(){
88 94
89 - // $es = es(); // 第一个库,即将丢弃  
90 - $es2 = es('email_lists_copy'); // 第二个库 新 95 + $this->es = es('email_lists_copy'); // 第二个库 新
91 96
92 while (!$this->isStop()){ 97 while (!$this->isStop()){
93 98
94 $id = redis()->lPop('sync_to_es'); 99 $id = redis()->lPop('sync_to_es');
95 - $origin_id = $id;  
96 - $code = 500;  
97 if($id){ 100 if($id){
98 101
99 $is_check_body = false; 102 $is_check_body = false;
@@ -102,17 +105,40 @@ class SyncToEsCmd { @@ -102,17 +105,40 @@ class SyncToEsCmd {
102 $is_check_body = true; 105 $is_check_body = true;
103 } 106 }
104 107
  108 + $data = $this->getDataByEs($id,$is_check_body);
  109 + if($data){
  110 + list($doc_id,$data) = $data;
  111 + // 主库
  112 + $this->toDataEs('email_lists_copy',$id,$doc_id,$data);
  113 + // 分库
  114 + $this->toDataEs('email_lists_branch_'.$data['postid'],$id,$doc_id,$data);
  115 + }
  116 + }else{
  117 + // 没有数据时暂停1秒
  118 + sleep(1);
  119 + }
  120 + }
  121 +
  122 + }
105 123
106 - $doc_id = ''; 124 + /**
  125 + * @param $id
  126 + * @param $is_check_body
  127 + * @return array|false
  128 + * @author:dc
  129 + * @time 2025/8/5 10:21
  130 + */
  131 + public function getDataByEs($id,$is_check_body) {
107 try { 132 try {
108 $data = $this->db->throw()->first(\Model\listsSql::first('`id` = '.$id)); 133 $data = $this->db->throw()->first(\Model\listsSql::first('`id` = '.$id));
109 if(!$data){ 134 if(!$data){
110 $data = $this->db->throw()->first(\Model\listsSql::firstHot('`id` = '.$id)); 135 $data = $this->db->throw()->first(\Model\listsSql::firstHot('`id` = '.$id));
111 } 136 }
112 }catch (Throwable $e){ 137 }catch (Throwable $e){
113 - redis()->rPush('sync_to_es',$origin_id);  
114 - _echo('sync to es '.$e->getMessage());  
115 - break; 138 + $this->log($id);
  139 +// redis()->rPush('sync_to_es',$origin_id);
  140 + _echo('sync to es '.$id.":".$e->getMessage());
  141 + return false;
116 } 142 }
117 143
118 if($data){ 144 if($data){
@@ -144,35 +170,117 @@ class SyncToEsCmd { @@ -144,35 +170,117 @@ class SyncToEsCmd {
144 list($data['postid'],$data['source']) = $this->getPostid($data['email_id'],$data['udate']); 170 list($data['postid'],$data['source']) = $this->getPostid($data['email_id'],$data['udate']);
145 171
146 }catch (Throwable $e){ 172 }catch (Throwable $e){
147 - redis()->rPush('sync_to_es',$origin_id);  
148 - _echo('sync to es '.$e->getMessage());  
149 - break; 173 + $this->log($id);
  174 +// redis()->rPush('sync_to_es',$origin_id);
  175 + _echo('sync to es '.$id.":".$e->getMessage());
  176 + return false;
150 } 177 }
151 178
152 $data = $this->getEsData($data); 179 $data = $this->getEsData($data);
153 - $doc_id = $data['email_id'].'_'.$data['folder_id'].'_'.$data['uid'];  
154 180
155 - // 新  
156 - $code = $es2->save($doc_id,$data); 181 + $doc_id = $data['email_id'].'_'.$data['folder_id'].'_'.$data['uid'];
157 182
158 - // 这个验证数据没问题后会丢弃  
159 - // $code = $es->save($doc_id,$data); 183 + return [$doc_id,$data];
160 184
161 } 185 }
162 -  
163 - if($code!==200){  
164 - @file_put_contents(LOG_PATH.'/sync_es_fail.log',$id."\n",FILE_APPEND);  
165 - _echo('同步es: '.$doc_id.'===>'.$code); 186 + return false;
166 } 187 }
167 188
168 - }else{  
169 - sleep(1); 189 +
  190 + public $checkEsIndex = [];
  191 +
  192 +
  193 +
  194 + /**
  195 + * 同步数据到es
  196 + * @param string $index 索引名称
  197 + * @param int $data_id list表的id
  198 + * @param string $doc_id es文档id
  199 + * @param array $data es数据
  200 + * @author:dc
  201 + * @time 2025/8/5 10:09
  202 + */
  203 + public function toDataEs($index,$data_id,$doc_id,$data){
  204 +// echo $index;
  205 +// echo ' ==== ';
  206 + $this->es->setIndex($index);
  207 + // 检查数据库是否存在
  208 + if(empty($this->checkEsIndex[$index])){
  209 + if(!$this->es->getMapping()){
  210 + $map = $this->es->putMapping([
  211 + 'properties' => [
  212 + 'subject' => ['type' => 'text'],
  213 + 'from' => [
  214 + 'type' => 'object', // 定义 from 字段为对象
  215 + 'properties' => [
  216 + 'name' => [
  217 + 'type' => 'keyword' // 或者 'keyword',根据需求选择
  218 + ],
  219 + 'email' => [
  220 + 'type' => 'text' // email 通常使用 keyword 类型
  221 + ]
  222 + ]
  223 + ],
  224 + 'to' => ['type' => 'text'],
  225 + 'cc' => ['type' => 'keyword'],
  226 + 'bcc' => ['type' => 'keyword'],
  227 + 'uid' => ['type' => 'integer'],
  228 + 'udate' => ['type' => 'integer'],
  229 + 'folder_id' => ['type' => 'integer'],
  230 + 'email_id' => ['type' => 'integer'],
  231 + 'size' => ['type' => 'integer'],
  232 + 'recent' => ['type' => 'integer'],
  233 + 'flagged' => ['type' => 'integer'],
  234 + 'deleted' => ['type' => 'integer'],
  235 + 'seen' => ['type' => 'integer'],
  236 + 'draft' => ['type' => 'integer'],
  237 + 'is_file' => ['type' => 'integer'],
  238 + 'is_hots' => ['type' => 'integer'],
  239 + 'is_auto' => ['type' => 'integer'],
  240 + 'folder_as_int' => ['type' => 'integer'],
  241 + 'postid' => ['type' => 'integer'],
  242 + 'source' => ['type' => 'integer'],
  243 + 'created_at' => ['type' => 'date'],
  244 + 'updated_at' => ['type' => 'date'],
  245 + 'description' => ['type' => 'keyword'],
  246 + 'references' => ['type' => 'keyword']
  247 + ]
  248 + ],$index == 'email_lists_copy' ? [
  249 + 'number_of_shards' => 21, // 设置分片数
  250 + 'number_of_replicas' => 1, // 设置副本数 暂用内存 主片+副片*
  251 + ]:[
  252 + 'number_of_shards' => 1, // 设置分片数
  253 + 'number_of_replicas' => 0, // 设置副本数 暂用内存 主片+副片*
  254 + ]
  255 + );
  256 + _echo("{$index} 创建索引 ".$map);
  257 + }
  258 + }
  259 + // 下次不在检查
  260 + $this->checkEsIndex[$index] = 1;
  261 +
  262 + $code = $this->es->save($doc_id,$data);
  263 +
  264 + if($code!==200){
  265 + $this->log($data_id,$index);
  266 + @file_put_contents(LOG_PATH.'/sync_es_fail.log',$index.":".$data_id."\n",FILE_APPEND);
  267 + _echo($index.': '.$data_id.'===>'.$code);
170 } 268 }
171 } 269 }
172 270
  271 + /**
  272 + * 记录日志
  273 + * @param $id
  274 + * @param string $index
  275 + * @author:dc
  276 + * @time 2025/8/5 10:17
  277 + */
  278 + public function log($id,$index=''){
  279 + @file_put_contents(LOG_PATH.'/sync_es_fail.log',$index.":".$id."\n",FILE_APPEND);
173 } 280 }
174 281
175 282
  283 +
176 /** 284 /**
177 * 项目id 285 * 项目id
178 * @author:dc 286 * @author:dc