作者 邓超

auto

... ... @@ -17,98 +17,62 @@ class AutoMail {
}
/**
* shopk那边的预热邮箱
* @var array
*/
private $shopkHotEmail = [];
/**
* @var \Lib\Db|\Lib\DbPool
*/
private $db;
private $fids = [];
/**
* @author:dc
* @time 2024/7/18 14:04
*/
private function start(){
$this->fids = $this->db->all("select `id` from `folders` where `folder` = '收件箱'");
$this->fids = array_column($this->fids,'id');
$filter = @file_get_contents('https://fob.ai.cc/api/mail/ai_inbox_filter/'.md5('aicc.'.date('ymdh')));
$filter = @json_decode($filter,true);
if(!is_array($filter)){
return 0;
}
array_map(function ($v){
$this->filter[] = [
$v['type'],
$v['text'],
];
},$filter[0]);
if($this->filter){
redis()->set('ai_email_filter_lists',$this->filter,86400);
}
if(redis()->add('auto_mail_sync2',1,60)){
_echo('正在计算数据');
$maxId = $this->db->value("select `id` from `lists` order by `id` desc limit 1");
$startId = $maxId-200000;
foreach (minMaxToArray($startId,$maxId) as $ids){
redis()->rPush('auto_check_ids',implode(',',$ids));
}
_echo('计算完成');
}
while (1){
$ids = redis()->lPop('auto_check_ids');
if($ids){
$ids = explode(',',$ids);
$this->run($ids);
}else{
break;
}
}
}
$filter = isAiAutoMail(true,true);
private $filter = [];
$es = es('email_lists');
private function run($id){
$list = $this->db->all(\Model\listsSql::all(dbWhere(['id'=>$id]),'`id`,`from`,`subject`,`folder_id`'));
foreach ($list as $item){
// 必须是收件箱
if(in_array($item['folder_id'],$this->fids)){
foreach ($this->filter as $f){
$not_must = [];
foreach ($filter as $f){
list($t,$str) = $f;
$haystack = '';
if($t==2){
$haystack = $item['subject'];
$not_must[] = ["match_phrase"=>["subject"=>$str]];
}elseif ($t==1){
$haystack = $item['from'];
}
if(stripos($haystack,$str)!==false){
if(!$this->db->count("select count(*) from `lists_auto` where `list_id` = ".$item['id'])){
_echo("插入数据 ".$item['id'].'==>'.$this->db->create('lists_auto',['list_id'=>$item['id']],false).'==>'.$haystack);
}
break;
}
$not_must[] = ["match_phrase"=>["from.email"=>$str]];
}
}
// 查询自动回的邮件
$lists = $es->search([
"_source" => ["uuid"],
"query"=>[
"bool"=>[
"must"=>[
["term"=>["folder_as_int" => 1]], // 收件箱
["term"=>["is_auto" => 0]], // 非auto
["term"=>["is_hots" => 0]], // 非预热
["term"=>["deleted" => 0]], // 非删除
// 自动关键字
[
"bool" => [
"should"=> $not_must,
"minimum_should_match" => 1
]
]
]
]
]
],0,1000,[],1000);
foreach ($lists['hits']['hits']??[] as $list){
$id = intval($list['_source']['uuid']??0);
if($id){
_echo("id = ".$id);
redis()->rPush('sync_to_es',$id);
}
}
}
}
... ...
<?php
//error_reporting();
require_once "../vendor/autoload.php";
if(in_array('v2',$argv)){
define('S_V2','2');
}else{
define('S_V2','');
}
/**
* 把mysql的数据同步到es
* @author:dc
* @time 2025/3/4 10:19
* Class SyncToEsCmd
*/
class SyncToEsCmd {
public $isStop = false;
/**
* 文件夹
* @var array
*/
public $folders = [];
/**
* @var \Lib\Db
*/
public $fob_db;
public function handler(){
$handler = function ($signal){
_echo('收到进程信号 '. $signal);
// 可以处理其他程序
$this->isStop = true;
};
pcntl_signal(SIGTERM, $handler); // 这个是kill
pcntl_signal(SIGINT, $handler); // 这个是 ctrl+c
// pcntl_signal(SIGHUP, $handler);
if(S_V2){
$es = es('email_lists');
}else{
$es = es();
}
$db = db();
$this->fob_db = fob_mysql();
$startTime = time();
while (1){
if(time()-43200 > $startTime){
break;
}
// 检查是否接收到信号
pcntl_signal_dispatch();
if($this->isStop) {
_echo('已退出进程');
break;
}
$id = redis()->lPop('sync_to_es'.S_V2);
$code = 500;
if($id){
$doc_id = '';
try {
$data = $db->throw()->first(\Model\listsSql::first('`id` = '.$id));
}catch (Throwable $e){
redis()->rPush('sync_to_es'.S_V2,$id);
_echo('sync to es '.$e->getMessage());
break;
}
if($data){
// 设置 进程 是否在运行
$data['is_auto']=$db->count('select count(*) from `lists_auto` where `list_id` = '.$data['id']) ? 1 : 0;
try {
// 文件夹
if(empty($this->folders[$data['folder_id']])){
$this->folders[$data['folder_id']] = $db->throw()->value(\Model\folderSql::first($data['folder_id'],'folder'));
}
// 再次验证是否是自动回复
if($this->folders[$data['folder_id']] == '收件箱' && !$data['is_auto']){
$filter = redis()->get('ai_email_filter_lists',[]);
$filter = is_array($filter) ? $filter : [];
foreach ($filter as $f){
list($t,$str) = $f;
$haystack = '';
if($t==2){
$haystack = $data['subject'];
}elseif ($t==1){
$haystack = $data['from'];
}
if($haystack && $str && stripos($haystack,$str)!==false){
$data['is_auto'] = 1;
break;
}
}
}
// 为文件夹打标 方便查询
$data['folder_as_int'] = folder2int($this->folders[$data['folder_id']]);
// postid ai邮箱要用 这个是查询黑格
$data['postid'] = $this->getPostid($data['email_id']);
}catch (Throwable $e){
redis()->rPush('sync_to_es'.S_V2,$id);
_echo('sync to es '.$e->getMessage());
break;
}
$data = $this->getEsData($data);
$doc_id = $data['email_id'].'_'.$data['folder_id'].'_'.$data['uid'];
$code = $es->save($doc_id,$data);
}
if($code!==200){
@file_put_contents(LOG_PATH.'/sync_es_fail'.S_V2.'.log',$id."\n",FILE_APPEND);
_echo('同步es: '.$doc_id.'===>'.$code);
}
}else{
sleep(1);
}
}
}
protected $postids = [];
/**
* 项目id
* @author:dc
* @time 2025/5/20 15:44
*/
public function getPostid($email_id){
$h = date('dh');
if(!isset($this->postids[$h][$email_id])){
// 未删除状态
$id = (int) $this->fob_db->throw()->value("select `post_id` from `e_mail_binds` where `email_id` = '{$email_id}' and `deleted_at` is null order by `id` desc limit 1 ");
if(!$id){
// 已删状态
$id = (int) $this->fob_db->throw()->value("select `post_id` from `e_mail_binds` where `email_id` = '{$email_id}' order by `id` desc limit 1 ");
}
$this->postids[$h][$email_id] = $id;
}
return $this->postids[$h][$email_id];
return 0;
}
public function getEsData($data){
if(!empty($data['id'])){
$data['uuid'] = $data['id'];
unset($data['id']);
}
$data['from'] = [
'email' => $data['from'],
'name' => $data['from_name']??''
];
unset($data['from_name']);
unset($data['date']);
if(!empty($data['to_name']) && strlen($data['to_name'])>10000){
$data['to_name'] = @json_decode($data['to_name'],true);
if(is_array($data['to_name'])){
$data['to_name'] = array_map(function ($v){
$v['name'] = '';
return $v;
},$data['to_name']);
$data['to_name'] = array_slice($data['to_name'],0,100);
$data['to_name'] = json_encode($data['to_name']);
}else{
$data['to_name'] = '';
}
}
$data['description'] = str_replace(["\n"],"",$data['description']);
// unset($data['to_name']);
if(!empty($data['created_at'])){
$data['created_at'] = date('Y-m-d\TH:i:s',strtotime($data['created_at']));
}
if(!empty($data['updated_at'])){
$data['updated_at'] = date('Y-m-d\TH:i:s',strtotime($data['updated_at']));
}
$data['references'] = empty($data['references']) ? '' : $data['references'];
return $data;
}
}
(new SyncToEsCmd())->handler();
return 1;
... ... @@ -492,7 +492,7 @@ function isAiAutoMail($from,$subject,$body=''){
}
// 是否是自动回复
foreach ($temp($from===true&&$subject===true) as $f){
foreach ($temp() as $f){
list($t,$str) = $f;
$haystack = '';
if($t==2){
... ...