作者 邓超

x

<?php
//error_reporting();
require_once "../vendor/autoload.php";
/**
* 把mysql的数据同步到es
* @author:dc
* @time 2025/3/4 10:19
* Class SyncToEsCmd
*/
class SyncToEsCmd {
public $isStop = false;
/**
* 文件夹
* @var array
*/
public $folders = [];
/**
* @var \Lib\Db
*/
public $fob_db;
/**
* @var \Lib\Db
*/
public $db;
public $startTime = 0;
/**
* SyncToEsCmd constructor.
*/
public function __construct()
{
$this->db = db();
$this->fob_db = fob_mysql();
$handler = function ($signal){
_echo('收到进程信号 '. $signal);
// 可以处理其他程序
$this->isStop = true;
};
pcntl_signal(SIGTERM, $handler); // 这个是kill
pcntl_signal(SIGINT, $handler); // 这个是 ctrl+c
$this->startTime = time();
}
/**
* 是否是自动回复
* @author:dc
* @time 2025/6/6 11:07
*/
public function isAuto($data){
// 没有发件人的直接标记
if(!$data['from']){
return 1;
}
$is_auto = $this->db->count('select count(*) from `lists_auto` where `list_id` = '.$data['id']) ? 1 : 0;
if($is_auto) return 1;
return isAiAutoMail($data['from'],$data['subject'],$data['body']??'') === 1 ? 1 : 0;
}
/**
* @return bool
*/
public function isStop(): bool
{
// 检查是否接收到信号
pcntl_signal_dispatch();
// 是否超过来最大执行时间
if(time()-43200 > $this->startTime){
return true;
}
return $this->isStop;
}
/**
* es链接
* @var \Lib\Es\Es
*/
public $es;
/**
* @var \Lib\Es\BulkData
*/
protected $bulkData;
public function handler($id){
$this->es = es('email_lists_copy'); // 第二个库 新
$this->bulkData = new \Lib\Es\BulkData();
$this->bulkData($id);
$this->toDataEs(true);
}
/**
* 批量处理数据并存储到ES
*
* @param string $id 数据ID,如果包含点号则只取点号前的部分
* @return void
*/
public function bulkData($id){
$is_check_body = false;
if(str_contains($id, '.')){
$id = explode('.',$id)[0];
$is_check_body = true;
}
$data = $this->getDataByEs($id,$is_check_body);
if($data){
list($doc_id,$data) = $data;
// 主库
$this->bulkData->add('email_lists_copy',$doc_id,$data);
if($data['postid']){
// 分库
$this->bulkData->add('email_lists_branch_'.$data['postid'],$doc_id,$data);
}
// 个人邮箱的情况
$postids = $this->getPostids($data['email_id']);
print_r($postids);
if($postids){
foreach ($postids as $postid){
$data['postid'] = $postid;
$data['source'] = 1;
// 分库 个人邮箱
$this->bulkData ->add('email_lists_branch_'.$postid,$doc_id,$data);
}
}
// 其他非fob邮件数据
if (!$data['postid']){
// 分库 其他 非fob数据源
$this->bulkData->add('email_lists_branch_'.$data['postid'],$doc_id,$data);
}
}
}
/**
* 个人邮箱情况
* @param $email_id
* @author:dc
* @time 2025/8/5 14:53
*/
private function getPostids($email_id){
$postids = $this->fob_db->throw()->cache(1800)->all("select `post_id` from `e_mail_binds` where `source` = 1 and `email_id` = {$email_id} and `deleted_at` is null");
if($postids){
return array_column($postids,'post_id');
}
return [];
}
/**
* @param $id
* @param $is_check_body
* @return array|false
* @author:dc
* @time 2025/8/5 10:21
*/
public function getDataByEs($id,$is_check_body) {
try {
$data = $this->db->throw()->first(\Model\listsSql::first('`id` = '.$id));
if(!$data){
$data = $this->db->throw()->first(\Model\listsSql::firstHot('`id` = '.$id));
}
}catch (Throwable $e){
$this->log([$id]);
// redis()->rPush('sync_to_es',$origin_id);
_echo('sync to es '.$id.":".$e->getMessage());
return false;
}
if($data){
try {
// 文件夹
if(empty($this->folders[$data['folder_id']])){
$this->folders[$data['folder_id']] = $this->db->throw()->value(\Model\folderSql::first($data['folder_id'],'folder'));
}
// 为文件夹打标 方便查询
$data['folder_as_int'] = folder2int($this->folders[$data['folder_id']]);
$data['is_auto'] = 0;
// 是否是自动回复
if($data['folder_as_int'] === 1){
// 是否检查body
if($is_check_body){
$body = getMailBody($data['id'],$this->db);
if($body){
$data['body'] = getBodyHtml($body);
}
}
$data['is_auto'] = $this->isAuto($data);
unset($data['body']);
}
// postid ai邮箱要用 这个是查询黑格
list($data['postid'],$data['source']) = $this->getPostid($data['email_id'],$data['udate']);
}catch (Throwable $e){
$this->log([$id]);
// redis()->rPush('sync_to_es',$origin_id);
_echo('sync to es '.$id.":".$e->getMessage());
return false;
}
$data = $this->getEsData($data);
$doc_id = $data['email_id'].'_'.$data['folder_id'].'_'.$data['uid'];
return [$doc_id,$data];
}
return false;
}
public $checkEsIndex = [];
public function setEsMap($index){
$this->es->setIndex($index);
if($this->es->getMapping()){
return 9;
}
if(redis()->add('setmaplock:'.$index,1,20)){
return $this->es->putMapping([
'properties' => [
'subject' => ['type' => 'text'],
'from' => [
'type' => 'object', // 定义 from 字段为对象
'properties' => [
'name' => [
'type' => 'keyword' // 或者 'keyword',根据需求选择
],
'email' => [
'type' => 'text' // email 通常使用 keyword 类型
]
]
],
'to' => ['type' => 'text'],
'cc' => ['type' => 'keyword'],
'bcc' => ['type' => 'keyword'],
'uid' => ['type' => 'integer'],
'udate' => ['type' => 'integer'],
'folder_id' => ['type' => 'integer'],
'email_id' => ['type' => 'integer'],
'size' => ['type' => 'integer'],
'recent' => ['type' => 'integer'],
'flagged' => ['type' => 'integer'],
'deleted' => ['type' => 'integer'],
'seen' => ['type' => 'integer'],
'draft' => ['type' => 'integer'],
'is_file' => ['type' => 'integer'],
'is_hots' => ['type' => 'integer'],
'is_auto' => ['type' => 'integer'],
'folder_as_int' => ['type' => 'integer'],
'postid' => ['type' => 'integer'],
'source' => ['type' => 'integer'],
'created_at' => ['type' => 'date'],
'updated_at' => ['type' => 'date'],
'description' => ['type' => 'keyword'],
'references' => ['type' => 'keyword']
]
],$index == 'email_lists_copy' ? [
'number_of_shards' => 21, // 设置分片数
'number_of_replicas' => 1, // 设置副本数 暂用内存 主片+副片*
]:[
'number_of_shards' => 1, // 设置分片数
'number_of_replicas' => 0, // 设置副本数 暂用内存 主片+副片*
]
);
}
// 暂停1秒在试
sleep(1);
return $this->setEsMap($index);
}
/**
* @var int
*/
protected $pervSubmitTime = 0;
/**
* 同步数据到es
* @param bool $nowSubmit 是否立即提交
* @return bool
* @throws \Elastic\Elasticsearch\Exception\ClientResponseException
* @throws \Elastic\Elasticsearch\Exception\ServerResponseException
* @author:dc
* @time 2025/8/7 10:29
*/
public function toDataEs(bool $nowSubmit){
// 不立即提交
if (!$nowSubmit){
if($this->bulkData->total() < 20){
// 不足20条时 满足2秒也提交
if ($this->pervSubmitTime + 2 > time()){
return true;
}
}
}
// 为空不提交
if($this->bulkData->isEmpty()){
return true;
}
// 上一次提交的时间
$this->pervSubmitTime = time();
foreach ($this->bulkData->getIndexs() as $index){
$this->es->setIndex($index);
// 检查数据库是否存在
if(empty($this->checkEsIndex[$index]) && $index != 'email_lists_copy'){
if(!redis()->has('esmapcheck:'.$index)){
$m = $this->setEsMap($index);
if($m !== 9) _echo("{$index} 创建索引 ".$m);
redis()->set('esmapcheck:'.$index,1,86400);
}
}
// 下次不在检查
$this->checkEsIndex[$index] = 1;
}
// 批量提交数据的
$ret = $this->es->bulk($this->bulkData);
if(!empty($ret['errors'])){
@file_put_contents(LOG_PATH.'/sync_es_fail.error.log',print_r($ret['errors'],1)."\n",FILE_APPEND|LOCK_EX);
}
// 清空
$this->bulkData->clear();
// 为空表示提交成功
return empty($ret['errors']);
}
/**
* 记录日志
* @param array $ids
* @param string $index
* @author:dc
* @time 2025/8/5 10:17
*/
public function log(array $ids){
file_put_contents(LOG_PATH.'/sync_es_fail.log',implode("\n",$ids)."\n",FILE_APPEND|LOCK_EX);
}
/**
* 项目id
* @author:dc
* @time 2025/5/20 15:44
*/
public function getPostid($email_id,$udate){
//每60秒验证一次
if(redis()->add('fob_bind_mail_times_check:'.$email_id,1,60)){
$lastpostid = $this->fob_db->throw()->value("select `post_id` from `e_mail_binds_log` where `source` = 2 and `email_id` = '{$email_id}' order by `id` desc limit 1");
$thelast = $this->fob_db->throw()->first("select * from `e_mail_binds` where `source` = 2 and `email_id` = '{$email_id}' and `deleted_at` is null order by `id` desc limit 1");
if(!$thelast){
return [0,0];
}
$thelastpostid = $thelast['post_id'];
// 说明变了
if($lastpostid!=$thelastpostid){
$ret = $this->fob_db->throw()->insert('e_mail_binds_log',[
'post_id' => $thelastpostid,
'bind_time' => date('Y-m-d H:i:s',time()-2),
'source' => $thelast['source'],
'email' => $thelast['email'],
'email_id' => $thelast['email_id'],
],false);
_echo("邮箱异常分配 ".$email_id.' -- '.$lastpostid.' == '.$thelastpostid.' === '.$ret);
if(!$ret){
throw new Exception('新增失败');
}
}
}
// 查询历史记录
$times = redis()->getSet('fob_bind_mail_times3:'.$email_id,300,function ($email_id){
$times = $this->fob_db->throw()->all("select `post_id`,`bind_time`,`source` from `e_mail_binds_log` where `source` = 2 and `email_id` = {$email_id} order by `bind_time` desc ");
if(!$times){
return [];
}
return $times;
},$email_id);
if(is_array($times) && $times){
foreach ($times as $time){
$t = strtotime($time['bind_time']);
// 邮件收到的时间是否大于绑定时间
if($udate > $t){
$data = $time;
break;
}
}
}
// 没有找到历史,就找绑定表
if(empty($data)){
$data = redis()->getSet('fob_bind_mail3:'.$email_id,300,function ($email_id){
return $this->fob_db->throw()->first("select `post_id`,`source` from `e_mail_binds` where `source` = 2 and `email_id` = '{$email_id}' and `deleted_at` is null order by `id` desc limit 1");
},$email_id);
}
return [
$data['post_id']??0,
$data['source']??0,
];
}
public function getEsData($data){
if(!empty($data['id'])){
$data['uuid'] = $data['id'];
unset($data['id']);
}
$data['from'] = [
'email' => $data['from'],
'name' => $data['from_name']??''
];
unset($data['from_name']);
unset($data['date']);
$data['to_name'] = $this->trimEmail($data['to_name']??[]);
$data['cc'] = $this->trimEmail($data['cc']??[]);
$data['bcc'] = $this->trimEmail($data['bcc']??[]);
$data['description'] = str_replace(["\n"],"",$data['description']);
// unset($data['to_name']);
if(!empty($data['created_at'])){
$data['created_at'] = date('Y-m-d\TH:i:s',strtotime($data['created_at']));
}
if(!empty($data['updated_at'])){
$data['updated_at'] = date('Y-m-d\TH:i:s',strtotime($data['updated_at']));
}
$data['references'] = empty($data['references']) ? '' : $data['references'];
return $data;
}
private function trimEmail($emails){
if($emails){
$emails = is_array($emails) ? $emails : @json_decode($emails,true);
if(is_array($emails) && count($emails)>100){
$emails = array_map(function ($v){
$v['name'] = '';
return $v;
},$emails);
$emails = array_slice($emails,0,100);
return json_encode($emails);
}
}
return '';
}
}
(new SyncToEsCmd())->handler($argv[1]);
return 1;
... ...