作者 张关杰

server submit

@@ -89,6 +89,7 @@ class SyncToEsCmd { @@ -89,6 +89,7 @@ class SyncToEsCmd {
89 } 89 }
90 // 再次验证是否是自动回复 90 // 再次验证是否是自动回复
91 if($this->folders[$data['folder_id']] == '收件箱' && !$data['is_auto']){ 91 if($this->folders[$data['folder_id']] == '收件箱' && !$data['is_auto']){
  92 + echo "xx";
92 $filter = redis()->get('ai_email_filter_lists',[]); 93 $filter = redis()->get('ai_email_filter_lists',[]);
93 $filter = is_array($filter) ? $filter : []; 94 $filter = is_array($filter) ? $filter : [];
94 foreach ($filter as $f){ 95 foreach ($filter as $f){
@@ -99,7 +100,8 @@ class SyncToEsCmd { @@ -99,7 +100,8 @@ class SyncToEsCmd {
99 }elseif ($t==1){ 100 }elseif ($t==1){
100 $haystack = $data['from']; 101 $haystack = $data['from'];
101 } 102 }
102 - if(stripos($haystack,$str)!==false){ 103 + if($haystack && $str && stripos($haystack,$str)!==false){
  104 + echo 'auto';
103 $data['is_auto'] = 1; 105 $data['is_auto'] = 1;
104 break; 106 break;
105 } 107 }
  1 +<?php
  2 +
  3 +//error_reporting();
  4 +
  5 +require_once "../vendor/autoload.php";
  6 +
  7 +
  8 +if(in_array('v2',$argv)){
  9 + define('S_V2','2');
  10 +}else{
  11 + define('S_V2','');
  12 +}
  13 +
  14 +/**
  15 + * 把mysql的数据同步到es
  16 + * @author:dc
  17 + * @time 2025/3/4 10:19
  18 + * Class SyncToEsCmd
  19 + */
  20 +class SyncToEsCmd {
  21 +
  22 + public $isStop = false;
  23 +
  24 + /**
  25 + * 文件夹
  26 + * @var array
  27 + */
  28 + public $folders = [];
  29 +
  30 + /**
  31 + * @var \Lib\Db
  32 + */
  33 + public $fob_db;
  34 +
  35 + public function handler(){
  36 +
  37 + $handler = function ($signal){
  38 + _echo('收到进程信号 '. $signal);
  39 + // 可以处理其他程序
  40 + $this->isStop = true;
  41 + };
  42 + pcntl_signal(SIGTERM, $handler); // 这个是kill
  43 + pcntl_signal(SIGINT, $handler); // 这个是 ctrl+c
  44 +// pcntl_signal(SIGHUP, $handler);
  45 +
  46 +
  47 + if(S_V2){
  48 + $es = es('email_lists');
  49 + }else{
  50 + $es = es();
  51 + }
  52 +
  53 + $db = db();
  54 + $this->fob_db = fob_mysql();
  55 + $startTime = time();
  56 + // 查询gmail的所有邮件文件夹 ai邮件不同步这个文件夹
  57 + $fids = $db->throw()->all("select `id` from `folders` where `origin_folder` in ('[Gmail]/&UWiQ6JD1TvY-','[Gmail]/All Mail','[Gmail]/&YkBnCZCuTvY-')");
  58 + $fids = array_column($fids,'id','id');
  59 +
  60 + while (1){
  61 +
  62 + if(time()-43200 > $startTime){
  63 + break;
  64 + }
  65 +
  66 + // 检查是否接收到信号
  67 + pcntl_signal_dispatch();
  68 +
  69 + if($this->isStop) {
  70 + _echo('已退出进程');
  71 + break;
  72 + }
  73 +
  74 + $id = redis()->lPop('sync_to_es'.S_V2);
  75 + $code = 500;
  76 + if($id){
  77 + $doc_id = '';
  78 + try {
  79 + $data = $db->throw()->first(\Model\listsSql::first('`id` = '.$id));
  80 + }catch (Throwable $e){
  81 + redis()->rPush('sync_to_es'.S_V2,$id);
  82 + _echo('sync to es '.$e->getMessage());
  83 + break;
  84 + }
  85 +
  86 + if($data){
  87 + // 是否是所有邮件文件夹 是就跳过
  88 + if($fids[$data['folder_id']]??0){
  89 + continue;
  90 + }
  91 + // 设置 进程 是否在运行
  92 + $data['is_auto']=$db->count('select count(*) from `lists_auto` where `list_id` = '.$data['id']) ? 1 : 0;
  93 + try {
  94 + // 文件夹
  95 + if(empty($this->folders[$data['folder_id']])){
  96 + $this->folders[$data['folder_id']] = $db->throw()->value(\Model\folderSql::first($data['folder_id'],'folder'));
  97 + }
  98 + // 为文件夹打标 方便查询
  99 + $data['folder_as_int'] = folder2int($this->folders[$data['folder_id']]);
  100 + // postid ai邮箱要用 这个是查询黑格
  101 + list($data['postid'],$data['source']) = $this->getPostid($data['email_id']);
  102 + }catch (Throwable $e){
  103 + redis()->rPush('sync_to_es'.S_V2,$id);
  104 + _echo('sync to es '.$e->getMessage());
  105 + break;
  106 + }
  107 +
  108 + $data = $this->getEsData($data);
  109 + $doc_id = $data['email_id'].'_'.$data['folder_id'].'_'.$data['uid'];
  110 +
  111 + $code = $es->save($doc_id,$data);
  112 + }
  113 +
  114 + if($code!==200){
  115 + @file_put_contents(LOG_PATH.'/sync_es_fail'.S_V2.'.log',$id."\n",FILE_APPEND);
  116 + _echo('同步es: '.$doc_id.'===>'.$code);
  117 + }
  118 +
  119 + }else{
  120 + sleep(1);
  121 + }
  122 + }
  123 +
  124 + }
  125 +
  126 +
  127 + /**
  128 + * 查询项目id 和 邮件来源
  129 + * @param int $email_id 邮箱id
  130 + * @return array
  131 + * @author:dc
  132 + * @time 2025/5/29 11:47
  133 + */
  134 + public function getPostid($email_id){
  135 + $data = redis()->getSet('fob_bind_mail:'.$email_id,300,function ($email_id){
  136 + return $this->fob_db->throw()->first("select `post_id`,`source` from `e_mail_binds` where `email_id` = '{$email_id}' and `deleted_at` is null order by `id` desc limit 1");
  137 + },$email_id);
  138 +
  139 + return [
  140 + $data['post_id']??0,
  141 + $data['source']??0,
  142 + ];
  143 + }
  144 +
  145 +
  146 + public function getEsData($data){
  147 + if(!empty($data['id'])){
  148 + $data['uuid'] = $data['id'];
  149 + unset($data['id']);
  150 + }
  151 +
  152 + $data['from'] = [
  153 + 'email' => $data['from'],
  154 + 'name' => $data['from_name']??''
  155 + ];
  156 + unset($data['from_name']);
  157 + unset($data['date']);
  158 +
  159 + if(!empty($data['to_name']) && strlen($data['to_name'])>10000){
  160 + $data['to_name'] = @json_decode($data['to_name'],true);
  161 + if(is_array($data['to_name'])){
  162 + $data['to_name'] = array_map(function ($v){
  163 + $v['name'] = '';
  164 + return $v;
  165 + },$data['to_name']);
  166 + $data['to_name'] = array_slice($data['to_name'],0,100);
  167 + $data['to_name'] = json_encode($data['to_name']);
  168 + }else{
  169 + $data['to_name'] = '';
  170 + }
  171 + }
  172 + $data['description'] = str_replace(["\n"],"",$data['description']);
  173 + // unset($data['to_name']);
  174 +
  175 + if(!empty($data['created_at'])){
  176 + $data['created_at'] = date('Y-m-d\TH:i:s',strtotime($data['created_at']));
  177 + }
  178 + if(!empty($data['updated_at'])){
  179 + $data['updated_at'] = date('Y-m-d\TH:i:s',strtotime($data['updated_at']));
  180 + }
  181 + $data['references'] = empty($data['references']) ? '' : $data['references'];
  182 + return $data;
  183 + }
  184 +
  185 +
  186 +}
  187 +
  188 +(new SyncToEsCmd())->handler();
  189 +
  190 +return 1;
  191 +
  192 +
  193 +
  194 +
  195 +
  196 +
  197 +
  198 +
  199 +