作者 邓超

sync es

@@ -4,57 +4,101 @@ @@ -4,57 +4,101 @@
4 4
5 require_once "../vendor/autoload.php"; 5 require_once "../vendor/autoload.php";
6 6
7 -function getEsData($data){  
8 - if(!empty($data['id'])){  
9 - $data['uuid'] = $data['id'];  
10 - unset($data['id']);  
11 - }  
12 - if(!empty($data['from'])){  
13 - $data['from'] = [  
14 - 'email' => $data['from'],  
15 - 'name' => $data['from_name']??''  
16 - ];  
17 - unset($data['from_name']);  
18 - }  
19 - unset($data['date']);  
20 - if(!empty($data['created_at'])){  
21 - $data['created_at'] = date('Y-m-d\TH:i:s',strtotime($data['created_at']));  
22 - }  
23 - if(!empty($data['updated_at'])){  
24 - $data['updated_at'] = date('Y-m-d\TH:i:s',strtotime($data['updated_at']));  
25 - }  
26 - $data['references'] = empty($data['references']) ? '' : $data['references'];  
27 - return $data;  
28 -} 7 +/**
  8 + * 把mysql的数据同步到es
  9 + * @author:dc
  10 + * @time 2025/3/4 10:19
  11 + * Class SyncToEsCmd
  12 + */
  13 +class SyncToEsCmd {
  14 +
  15 + public $isStop = false;
  16 +
  17 +
  18 + public function handler(){
  19 +
  20 + $handler = function ($signal){
  21 + _echo('收到进程信号 '. $signal);
  22 + // 可以处理其他程序
  23 + $this->isStop = true;
  24 + };
  25 + pcntl_signal(SIGTERM, $handler); // 这个是kill
  26 + pcntl_signal(SIGINT, $handler); // 这个是 ctrl+c
  27 +// pcntl_signal(SIGHUP, $handler);
  28 +
  29 +
  30 + $es = (new Lib\Es\Es('hg_ai_emails'));
  31 + $db = db();
  32 +
  33 + while (1){
  34 + // 检查是否接收到信号
  35 + pcntl_signal_dispatch();
  36 +
  37 + if($this->isStop) {
  38 + _echo('已退出进程');
  39 + break;
  40 + }
  41 +
  42 + $id = redis()->lPop('sync_to_es');
  43 + if($id){
  44 + $data = $db->first(\Model\listsSql::first('`id` = '.$id));
  45 + if($data){
  46 + // 设置 进程 是否在运行
  47 + redis()->set('sync_my_pid:'.getmypid(),time(),86400);
29 48
30 -$db = db();  
31 -$es = (new Lib\Es\Es(''));  
32 -while (1){  
33 - $id = redis()->lPop('sync_to_es');  
34 - if($id){  
35 - $data = $db->first(\Model\listsSql::first('`id` = '.$id));  
36 - if($data){ 49 + $data['is_auto']=$db->count('select count(*) from `lists_auto` where `list_id` = '.$data['id']) ? 1 : 0;
37 50
38 - $data['is_auto']=$db->count('select count(*) from `lists_auto` where `list_id` = '.$data['id']) ? 1 : 0; 51 + $data = $this->getEsData($data);
  52 + $doc_id = $data['email_id'].'_'.$data['folder_id'].'_'.$data['uid'];
39 53
40 - $data = getEsData($data);  
41 - $doc_id = $data['email_id'].'_'.$data['folder_id'].'_'.$data['uid']; 54 + $code = $es->save($doc_id,$data);
  55 + if($code!==200){
  56 + sleep(1);
  57 + $code = $es->save($doc_id,$data); // 重试一次
  58 + }
42 59
43 - $code = $es->save($doc_id,$data);  
44 - if($code!==200){  
45 - sleep(1);  
46 - $code = $es->save($doc_id,$data); // 重试一次 60 + _echo('同步es: '.$doc_id.'===>'.$code);
  61 + continue;
  62 + }
47 } 63 }
48 64
49 - _echo('同步es: '.$doc_id.'===>'.$code);  
50 - continue; 65 + echo '没有找到数据'.PHP_EOL;
  66 + sleep(1);
51 } 67 }
  68 +
52 } 69 }
53 70
54 - echo '没有找到数据'.PHP_EOL;  
55 - sleep(1); 71 +
  72 + public function getEsData($data){
  73 + if(!empty($data['id'])){
  74 + $data['uuid'] = $data['id'];
  75 + unset($data['id']);
  76 + }
  77 + if(!empty($data['from'])){
  78 + $data['from'] = [
  79 + 'email' => $data['from'],
  80 + 'name' => $data['from_name']??''
  81 + ];
  82 + unset($data['from_name']);
  83 + }
  84 + unset($data['date']);
  85 + if(!empty($data['created_at'])){
  86 + $data['created_at'] = date('Y-m-d\TH:i:s',strtotime($data['created_at']));
  87 + }
  88 + if(!empty($data['updated_at'])){
  89 + $data['updated_at'] = date('Y-m-d\TH:i:s',strtotime($data['updated_at']));
  90 + }
  91 + $data['references'] = empty($data['references']) ? '' : $data['references'];
  92 + return $data;
  93 + }
  94 +
  95 +
56 } 96 }
57 97
  98 +(new SyncToEsCmd())->handler();
  99 +
  100 +return 1;
  101 +
58 102
59 103
60 104