作者 邓超

同步es异常

1 -<?php  
2 -  
3 -//error_reporting();  
4 -  
5 -require_once "../vendor/autoload.php";  
6 -  
7 -  
8 -/**  
9 - * 把mysql的数据同步到es  
10 - * @author:dc  
11 - * @time 2025/3/4 10:19  
12 - * Class SyncToEsCmd  
13 - */  
14 -class SyncToEsCmd {  
15 -  
16 - public $isStop = false;  
17 -  
18 - /**  
19 - * 文件夹  
20 - * @var array  
21 - */  
22 - public $folders = [];  
23 -  
24 - /**  
25 - * @var \Lib\Db  
26 - */  
27 - public $fob_db;  
28 -  
29 - /**  
30 - * @var \Lib\Db  
31 - */  
32 - public $db;  
33 -  
34 -  
35 - public $startTime = 0;  
36 -  
37 - /**  
38 - * SyncToEsCmd constructor.  
39 - */  
40 - public function __construct()  
41 - {  
42 - $this->db = db();  
43 - $this->fob_db = fob_mysql();  
44 -  
45 - $handler = function ($signal){  
46 - _echo('收到进程信号 '. $signal);  
47 - // 可以处理其他程序  
48 - $this->isStop = true;  
49 - };  
50 - pcntl_signal(SIGTERM, $handler); // 这个是kill  
51 - pcntl_signal(SIGINT, $handler); // 这个是 ctrl+c  
52 -  
53 - $this->startTime = time();  
54 - }  
55 -  
56 - /**  
57 - * 是否是自动回复  
58 - * @author:dc  
59 - * @time 2025/6/6 11:07  
60 - */  
61 - public function isAuto($data){  
62 - // 没有发件人的直接标记  
63 - if(!$data['from']){  
64 - return 1;  
65 - }  
66 -  
67 - $is_auto = $this->db->count('select count(*) from `lists_auto` where `list_id` = '.$data['id']) ? 1 : 0;  
68 -  
69 - if($is_auto) return 1;  
70 -  
71 - return isAiAutoMail($data['from'],$data['subject'],$data['body']??'') === 1 ? 1 : 0;  
72 -  
73 - }  
74 -  
75 - /**  
76 - * @return bool  
77 - */  
78 - public function isStop(): bool  
79 - {  
80 - // 检查是否接收到信号  
81 - pcntl_signal_dispatch();  
82 -  
83 - // 是否超过来最大执行时间  
84 - if(time()-43200 > $this->startTime){  
85 - return true;  
86 - }  
87 -  
88 - return $this->isStop;  
89 - }  
90 -  
91 - /**  
92 - * es链接  
93 - * @var \Lib\Es\Es  
94 - */  
95 - public $es;  
96 -  
97 - /**  
98 - * @var \Lib\Es\BulkData  
99 - */  
100 - protected $bulkData;  
101 -  
102 -  
103 - public function handler($id){  
104 -  
105 - $this->es = es('email_lists_copy'); // 第二个库 新  
106 -  
107 - $this->bulkData = new \Lib\Es\BulkData();  
108 -  
109 - $this->bulkData($id);  
110 - $this->toDataEs(true);  
111 -  
112 - }  
113 -  
114 - /**  
115 - * 批量处理数据并存储到ES  
116 - *  
117 - * @param string $id 数据ID,如果包含点号则只取点号前的部分  
118 - * @return void  
119 - */  
120 - public function bulkData($id){  
121 - $is_check_body = false;  
122 - if(str_contains($id, '.')){  
123 - $id = explode('.',$id)[0];  
124 - $is_check_body = true;  
125 - }  
126 -  
127 - $data = $this->getDataByEs($id,$is_check_body);  
128 - if($data){  
129 - list($doc_id,$data) = $data;  
130 - // 主库  
131 - $this->bulkData->add('email_lists_copy',$doc_id,$data);  
132 - if($data['postid']){  
133 - // 分库  
134 - $this->bulkData->add('email_lists_branch_'.$data['postid'],$doc_id,$data);  
135 - }  
136 -  
137 - // 个人邮箱的情况  
138 - $postids = $this->getPostids($data['email_id']);  
139 - print_r($postids);  
140 - if($postids){  
141 - foreach ($postids as $postid){  
142 - $data['postid'] = $postid;  
143 - $data['source'] = 1;  
144 - // 分库 个人邮箱  
145 - $this->bulkData ->add('email_lists_branch_'.$postid,$doc_id,$data);  
146 - }  
147 - }  
148 - // 其他非fob邮件数据  
149 - if (!$data['postid']){  
150 - // 分库 其他 非fob数据源  
151 - $this->bulkData->add('email_lists_branch_'.$data['postid'],$doc_id,$data);  
152 - }  
153 -  
154 - }  
155 - }  
156 -  
157 -  
158 - /**  
159 - * 个人邮箱情况  
160 - * @param $email_id  
161 - * @author:dc  
162 - * @time 2025/8/5 14:53  
163 - */  
164 - private function getPostids($email_id){  
165 - $postids = $this->fob_db->throw()->cache(1800)->all("select `post_id` from `e_mail_binds` where `source` = 1 and `email_id` = {$email_id} and `deleted_at` is null");  
166 -  
167 - if($postids){  
168 - return array_column($postids,'post_id');  
169 - }  
170 -  
171 - return [];  
172 -  
173 - }  
174 -  
175 -  
176 - /**  
177 - * @param $id  
178 - * @param $is_check_body  
179 - * @return array|false  
180 - * @author:dc  
181 - * @time 2025/8/5 10:21  
182 - */  
183 - public function getDataByEs($id,$is_check_body) {  
184 - try {  
185 - $data = $this->db->throw()->first(\Model\listsSql::first('`id` = '.$id));  
186 - if(!$data){  
187 - $data = $this->db->throw()->first(\Model\listsSql::firstHot('`id` = '.$id));  
188 - }  
189 - }catch (Throwable $e){  
190 - $this->log([$id]);  
191 -// redis()->rPush('sync_to_es',$origin_id);  
192 - _echo('sync to es '.$id.":".$e->getMessage());  
193 - return false;  
194 - }  
195 -  
196 - if($data){  
197 - try {  
198 - // 文件夹  
199 - if(empty($this->folders[$data['folder_id']])){  
200 - $this->folders[$data['folder_id']] = $this->db->throw()->value(\Model\folderSql::first($data['folder_id'],'folder'));  
201 - }  
202 -  
203 - // 为文件夹打标 方便查询  
204 - $data['folder_as_int'] = folder2int($this->folders[$data['folder_id']]);  
205 - $data['is_auto'] = 0;  
206 - // 是否是自动回复  
207 - if($data['folder_as_int'] === 1){  
208 - // 是否检查body  
209 - if($is_check_body){  
210 - $body = getMailBody($data['id'],$this->db);  
211 - if($body){  
212 - $data['body'] = getBodyHtml($body);  
213 - }  
214 - }  
215 -  
216 - $data['is_auto'] = $this->isAuto($data);  
217 -  
218 - unset($data['body']);  
219 - }  
220 -  
221 - // postid ai邮箱要用 这个是查询黑格  
222 - list($data['postid'],$data['source']) = $this->getPostid($data['email_id'],$data['udate']);  
223 -  
224 - }catch (Throwable $e){  
225 - $this->log([$id]);  
226 -// redis()->rPush('sync_to_es',$origin_id);  
227 - _echo('sync to es '.$id.":".$e->getMessage());  
228 - return false;  
229 - }  
230 -  
231 - $data = $this->getEsData($data);  
232 -  
233 - $doc_id = $data['email_id'].'_'.$data['folder_id'].'_'.$data['uid'];  
234 -  
235 - return [$doc_id,$data];  
236 -  
237 - }  
238 - return false;  
239 - }  
240 -  
241 -  
242 - public $checkEsIndex = [];  
243 -  
244 -  
245 - public function setEsMap($index){  
246 - $this->es->setIndex($index);  
247 - if($this->es->getMapping()){  
248 - return 9;  
249 - }  
250 -  
251 - if(redis()->add('setmaplock:'.$index,1,20)){  
252 - return $this->es->putMapping([  
253 - 'properties' => [  
254 - 'subject' => ['type' => 'text'],  
255 - 'from' => [  
256 - 'type' => 'object', // 定义 from 字段为对象  
257 - 'properties' => [  
258 - 'name' => [  
259 - 'type' => 'keyword' // 或者 'keyword',根据需求选择  
260 - ],  
261 - 'email' => [  
262 - 'type' => 'text' // email 通常使用 keyword 类型  
263 - ]  
264 - ]  
265 - ],  
266 - 'to' => ['type' => 'text'],  
267 - 'cc' => ['type' => 'keyword'],  
268 - 'bcc' => ['type' => 'keyword'],  
269 - 'uid' => ['type' => 'integer'],  
270 - 'udate' => ['type' => 'integer'],  
271 - 'folder_id' => ['type' => 'integer'],  
272 - 'email_id' => ['type' => 'integer'],  
273 - 'size' => ['type' => 'integer'],  
274 - 'recent' => ['type' => 'integer'],  
275 - 'flagged' => ['type' => 'integer'],  
276 - 'deleted' => ['type' => 'integer'],  
277 - 'seen' => ['type' => 'integer'],  
278 - 'draft' => ['type' => 'integer'],  
279 - 'is_file' => ['type' => 'integer'],  
280 - 'is_hots' => ['type' => 'integer'],  
281 - 'is_auto' => ['type' => 'integer'],  
282 - 'folder_as_int' => ['type' => 'integer'],  
283 - 'postid' => ['type' => 'integer'],  
284 - 'source' => ['type' => 'integer'],  
285 - 'created_at' => ['type' => 'date'],  
286 - 'updated_at' => ['type' => 'date'],  
287 - 'description' => ['type' => 'keyword'],  
288 - 'references' => ['type' => 'keyword']  
289 - ]  
290 - ],$index == 'email_lists_copy' ? [  
291 - 'number_of_shards' => 21, // 设置分片数  
292 - 'number_of_replicas' => 1, // 设置副本数 暂用内存 主片+副片*  
293 - ]:[  
294 - 'number_of_shards' => 1, // 设置分片数  
295 - 'number_of_replicas' => 0, // 设置副本数 暂用内存 主片+副片*  
296 - ]  
297 - );  
298 - }  
299 - // 暂停1秒在试  
300 - sleep(1);  
301 - return $this->setEsMap($index);  
302 - }  
303 -  
304 - /**  
305 - * @var int  
306 - */  
307 - protected $pervSubmitTime = 0;  
308 -  
309 - /**  
310 - * 同步数据到es  
311 - * @param bool $nowSubmit 是否立即提交  
312 - * @return bool  
313 - * @throws \Elastic\Elasticsearch\Exception\ClientResponseException  
314 - * @throws \Elastic\Elasticsearch\Exception\ServerResponseException  
315 - * @author:dc  
316 - * @time 2025/8/7 10:29  
317 - */  
318 - public function toDataEs(bool $nowSubmit){  
319 - // 不立即提交  
320 - if (!$nowSubmit){  
321 - if($this->bulkData->total() < 20){  
322 - // 不足20条时 满足2秒也提交  
323 - if ($this->pervSubmitTime + 2 > time()){  
324 - return true;  
325 - }  
326 - }  
327 - }  
328 - // 为空不提交  
329 - if($this->bulkData->isEmpty()){  
330 - return true;  
331 - }  
332 -  
333 - // 上一次提交的时间  
334 - $this->pervSubmitTime = time();  
335 -  
336 - foreach ($this->bulkData->getIndexs() as $index){  
337 - $this->es->setIndex($index);  
338 - // 检查数据库是否存在  
339 - if(empty($this->checkEsIndex[$index]) && $index != 'email_lists_copy'){  
340 - if(!redis()->has('esmapcheck:'.$index)){  
341 - $m = $this->setEsMap($index);  
342 - if($m !== 9) _echo("{$index} 创建索引 ".$m);  
343 - redis()->set('esmapcheck:'.$index,1,86400);  
344 - }  
345 - }  
346 - // 下次不在检查  
347 - $this->checkEsIndex[$index] = 1;  
348 - }  
349 - // 批量提交数据的  
350 - $ret = $this->es->bulk($this->bulkData);  
351 -  
352 - if(!empty($ret['errors'])){  
353 - @file_put_contents(LOG_PATH.'/sync_es_fail.error.log',print_r($ret['errors'],1)."\n",FILE_APPEND|LOCK_EX);  
354 - }  
355 - // 清空  
356 - $this->bulkData->clear();  
357 - // 为空表示提交成功  
358 - return empty($ret['errors']);  
359 - }  
360 -  
361 - /**  
362 - * 记录日志  
363 - * @param array $ids  
364 - * @param string $index  
365 - * @author:dc  
366 - * @time 2025/8/5 10:17  
367 - */  
368 - public function log(array $ids){  
369 - file_put_contents(LOG_PATH.'/sync_es_fail.log',implode("\n",$ids)."\n",FILE_APPEND|LOCK_EX);  
370 - }  
371 -  
372 -  
373 -  
374 - /**  
375 - * 项目id  
376 - * @author:dc  
377 - * @time 2025/5/20 15:44  
378 - */  
379 - public function getPostid($email_id,$udate){  
380 -  
381 - //每60秒验证一次  
382 - if(redis()->add('fob_bind_mail_times_check:'.$email_id,1,60)){  
383 - $lastpostid = $this->fob_db->throw()->value("select `post_id` from `e_mail_binds_log` where `source` = 2 and `email_id` = '{$email_id}' order by `id` desc limit 1");  
384 - $thelast = $this->fob_db->throw()->first("select * from `e_mail_binds` where `source` = 2 and `email_id` = '{$email_id}' and `deleted_at` is null order by `id` desc limit 1");  
385 - if(!$thelast){  
386 - return [0,0];  
387 - }  
388 -  
389 - $thelastpostid = $thelast['post_id'];  
390 -  
391 - // 说明变了  
392 - if($lastpostid!=$thelastpostid){  
393 - $ret = $this->fob_db->throw()->insert('e_mail_binds_log',[  
394 - 'post_id' => $thelastpostid,  
395 - 'bind_time' => date('Y-m-d H:i:s',time()-2),  
396 - 'source' => $thelast['source'],  
397 - 'email' => $thelast['email'],  
398 - 'email_id' => $thelast['email_id'],  
399 - ],false);  
400 - _echo("邮箱异常分配 ".$email_id.' -- '.$lastpostid.' == '.$thelastpostid.' === '.$ret);  
401 - if(!$ret){  
402 - throw new Exception('新增失败');  
403 - }  
404 - }  
405 -  
406 - }  
407 -  
408 -  
409 - // 查询历史记录  
410 - $times = redis()->getSet('fob_bind_mail_times3:'.$email_id,300,function ($email_id){  
411 -  
412 - $times = $this->fob_db->throw()->all("select `post_id`,`bind_time`,`source` from `e_mail_binds_log` where `source` = 2 and `email_id` = {$email_id} order by `bind_time` desc ");  
413 - if(!$times){  
414 - return [];  
415 - }  
416 -  
417 - return $times;  
418 -  
419 - },$email_id);  
420 -  
421 - if(is_array($times) && $times){  
422 - foreach ($times as $time){  
423 - $t = strtotime($time['bind_time']);  
424 - // 邮件收到的时间是否大于绑定时间  
425 - if($udate > $t){  
426 - $data = $time;  
427 - break;  
428 - }  
429 - }  
430 - }  
431 -  
432 - // 没有找到历史,就找绑定表  
433 - if(empty($data)){  
434 - $data = redis()->getSet('fob_bind_mail3:'.$email_id,300,function ($email_id){  
435 - return $this->fob_db->throw()->first("select `post_id`,`source` from `e_mail_binds` where `source` = 2 and `email_id` = '{$email_id}' and `deleted_at` is null order by `id` desc limit 1");  
436 - },$email_id);  
437 -  
438 - }  
439 -  
440 - return [  
441 - $data['post_id']??0,  
442 - $data['source']??0,  
443 - ];  
444 - }  
445 -  
446 -  
447 - public function getEsData($data){  
448 - if(!empty($data['id'])){  
449 - $data['uuid'] = $data['id'];  
450 - unset($data['id']);  
451 - }  
452 -  
453 - $data['from'] = [  
454 - 'email' => $data['from'],  
455 - 'name' => $data['from_name']??''  
456 - ];  
457 - unset($data['from_name']);  
458 - unset($data['date']);  
459 -  
460 - $data['to_name'] = $this->trimEmail($data['to_name']??[]);  
461 - $data['cc'] = $this->trimEmail($data['cc']??[]);  
462 - $data['bcc'] = $this->trimEmail($data['bcc']??[]);  
463 -  
464 - $data['description'] = str_replace(["\n"],"",$data['description']);  
465 - // unset($data['to_name']);  
466 -  
467 - if(!empty($data['created_at'])){  
468 - $data['created_at'] = date('Y-m-d\TH:i:s',strtotime($data['created_at']));  
469 - }  
470 - if(!empty($data['updated_at'])){  
471 - $data['updated_at'] = date('Y-m-d\TH:i:s',strtotime($data['updated_at']));  
472 - }  
473 - $data['references'] = empty($data['references']) ? '' : $data['references'];  
474 - return $data;  
475 - }  
476 -  
477 -  
478 - private function trimEmail($emails){  
479 - if($emails){  
480 - $emails = is_array($emails) ? $emails : @json_decode($emails,true);  
481 - if(is_array($emails) && count($emails)>100){  
482 -  
483 - $emails = array_map(function ($v){  
484 - $v['name'] = '';  
485 - return $v;  
486 - },$emails);  
487 - $emails = array_slice($emails,0,100);  
488 - return json_encode($emails);  
489 - }  
490 - }  
491 - return '';  
492 - }  
493 -  
494 -}  
495 -  
496 -(new SyncToEsCmd())->handler($argv[1]);  
497 -  
498 -return 1;  
499 -  
500 -  
501 -  
502 -  
503 -  
504 -  
505 -  
506 -  
507 -