作者 邓超

to es

<?php
//error_reporting();
require_once "../vendor/autoload.php";
/**
* 把mysql的数据同步到es
* @author:dc
* @time 2025/3/4 10:19
* Class SyncToEsCmd
*/
class SyncToEsCmd {
public $isStop = false;
/**
* 文件夹
* @var array
*/
public $folders = [];
/**
* @var \Lib\Db
*/
public $fob_db;
/**
* 是否是自动回复
* @author:dc
* @time 2025/6/6 11:07
*/
public function isAuto($db,$data){
$is_auto = $db ->count('select count(*) from `lists_auto` where `list_id` = '.$data['id']) ? 1 : 0;
if($is_auto) return 1;
$filter = redis()->get('ai_email_filter_lists',[]);
$filter = is_array($filter) ? $filter : [];
foreach ($filter as $f){
list($t,$str) = $f;
$haystack = '';
if($t==2){
$haystack = $data['subject'];
}elseif ($t==1){
$haystack = $data['from'];
}
if($haystack && $str && stripos($haystack,$str)!==false){
return 1;
}
}
return 0;
}
public function handler(){
$handler = function ($signal){
_echo('收到进程信号 '. $signal);
// 可以处理其他程序
$this->isStop = true;
};
pcntl_signal(SIGTERM, $handler); // 这个是kill
pcntl_signal(SIGINT, $handler); // 这个是 ctrl+c
// pcntl_signal(SIGHUP, $handler);
$es = es(); // 第一个库,即将丢弃
$es2 = es('email_lists'); // 第二个库 新
$db = db();
$this->fob_db = fob_mysql();
$startTime = time();
while (1){
if(time()-43200 > $startTime){
break;
}
// 检查是否接收到信号
pcntl_signal_dispatch();
if($this->isStop) {
_echo('已退出进程');
break;
}
$id = redis()->lPop('sync_to_es');
$code = 500;
if($id){
$doc_id = '';
try {
$data = $db->throw()->first(\Model\listsSql::first('`id` = '.$id));
}catch (Throwable $e){
redis()->rPush('sync_to_es',$id);
_echo('sync to es '.$e->getMessage());
break;
}
if($data){
try {
// 文件夹
if(empty($this->folders[$data['folder_id']])){
$this->folders[$data['folder_id']] = $db->throw()->value(\Model\folderSql::first($data['folder_id'],'folder'));
}
// 为文件夹打标 方便查询
$data['folder_as_int'] = folder2int($this->folders[$data['folder_id']]);
// 是否是自动回复
if($data['folder_as_int'] === 1){
$data['is_auto'] = $this->isAuto($db,$data);
}
// postid ai邮箱要用 这个是查询黑格
list($data['postid'],$data['source']) = $this->getPostid($data['email_id'],$data['udate']);
}catch (Throwable $e){
redis()->rPush('sync_to_es',$id);
_echo('sync to es '.$e->getMessage());
break;
}
$data = $this->getEsData($data);
$doc_id = $data['email_id'].'_'.$data['folder_id'].'_'.$data['uid'];
// 新
$code = $es2->save($doc_id,$data);
// 这个验证数据没问题后会丢弃
$code = $es->save($doc_id,$data);
}
if($code!==200){
@file_put_contents(LOG_PATH.'/sync_es_fail.log',$id."\n",FILE_APPEND);
_echo('同步es: '.$doc_id.'===>'.$code);
}
}else{
sleep(1);
}
}
}
protected $postids = [];
/**
* 项目id
* @author:dc
* @time 2025/5/20 15:44
*/
public function getPostid($email_id,$udate){
// 查询历史记录
$times = redis()->getSet('fob_bind_mail_times:'.$email_id,300,function ($email_id){
$times = $this->fob_db->throw()->first("select `post_id`,`bind_time`,`source` from `e_mail_binds_log` where `email_id` = {$email_id} order by `bind_time` desc ");
if(!$times){
return [];
}
return $times;
},$email_id);
foreach ($times as $time){
$t = strtotime($time['bind_time']);
// 邮件收到的时间是否大于绑定时间
if($udate > $t){
$data = $time;
break;
}
}
// 没有找到历史,就找绑定表
if(empty($data)){
$data = redis()->getSet('fob_bind_mail:'.$email_id,300,function ($email_id){
return $this->fob_db->throw()->first("select `post_id`,`source` from `e_mail_binds` where `email_id` = '{$email_id}' and `deleted_at` is null order by `id` desc limit 1");
},$email_id);
}
return [
$data['post_id']??0,
$data['source']??0,
];
}
public function getEsData($data){
if(!empty($data['id'])){
$data['uuid'] = $data['id'];
unset($data['id']);
}
$data['from'] = [
'email' => $data['from'],
'name' => $data['from_name']??''
];
unset($data['from_name']);
unset($data['date']);
if(!empty($data['to_name']) && strlen($data['to_name'])>10000){
$data['to_name'] = @json_decode($data['to_name'],true);
if(is_array($data['to_name'])){
$data['to_name'] = array_map(function ($v){
$v['name'] = '';
return $v;
},$data['to_name']);
$data['to_name'] = array_slice($data['to_name'],0,100);
$data['to_name'] = json_encode($data['to_name']);
}else{
$data['to_name'] = '';
}
}
$data['description'] = str_replace(["\n"],"",$data['description']);
// unset($data['to_name']);
if(!empty($data['created_at'])){
$data['created_at'] = date('Y-m-d\TH:i:s',strtotime($data['created_at']));
}
if(!empty($data['updated_at'])){
$data['updated_at'] = date('Y-m-d\TH:i:s',strtotime($data['updated_at']));
}
$data['references'] = empty($data['references']) ? '' : $data['references'];
return $data;
}
}
(new SyncToEsCmd())->handler();
return 1;
... ...