sync_to_es.php
2.6 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
<?php
//error_reporting();
require_once "../vendor/autoload.php";
/**
* 把mysql的数据同步到es
* @author:dc
* @time 2025/3/4 10:19
* Class SyncToEsCmd
*/
class SyncToEsCmd {
public $isStop = false;
public function handler(){
$handler = function ($signal){
_echo('收到进程信号 '. $signal);
// 可以处理其他程序
$this->isStop = true;
};
pcntl_signal(SIGTERM, $handler); // 这个是kill
pcntl_signal(SIGINT, $handler); // 这个是 ctrl+c
// pcntl_signal(SIGHUP, $handler);
$es = (new Lib\Es\Es('hg_ai_emails'));
$db = db();
$maxNum = 10000; // 最大执行数量
while ($maxNum > 0){
// 检查是否接收到信号
pcntl_signal_dispatch();
if($this->isStop) {
_echo('已退出进程');
break;
}
$maxNum--;
$id = redis()->lPop('sync_to_es');
if($id){
$data = $db->first(\Model\listsSql::first('`id` = '.$id));
if($data){
// 设置 进程 是否在运行
$data['is_auto']=$db->count('select count(*) from `lists_auto` where `list_id` = '.$data['id']) ? 1 : 0;
$data = $this->getEsData($data);
$doc_id = $data['email_id'].'_'.$data['folder_id'].'_'.$data['uid'];
$code = $es->save($doc_id,$data);
if($code!==200){
sleep(1);
$code = $es->save($doc_id,$data); // 重试一次
}
_echo('同步es: '.$doc_id.'===>'.$code);
continue;
}
}
echo '没有找到数据'.PHP_EOL;
}
}
public function getEsData($data){
if(!empty($data['id'])){
$data['uuid'] = $data['id'];
unset($data['id']);
}
$data['from'] = [
'email' => $data['from'],
'name' => $data['from_name']??''
];
unset($data['from_name']);
unset($data['date']);
if(!empty($data['created_at'])){
$data['created_at'] = date('Y-m-d\TH:i:s',strtotime($data['created_at']));
}
if(!empty($data['updated_at'])){
$data['updated_at'] = date('Y-m-d\TH:i:s',strtotime($data['updated_at']));
}
$data['references'] = empty($data['references']) ? '' : $data['references'];
return $data;
}
}
(new SyncToEsCmd())->handler();
return 1;