sync_to_es_v2.php 7.4 KB
<?php

//error_reporting();

require_once "../vendor/autoload.php";


/**
 * 把mysql的数据同步到es
 * @author:dc
 * @time 2025/3/4 10:19
 * Class SyncToEsCmd
 */
class SyncToEsCmd {

    public $isStop = false;

    /**
     * 文件夹
     * @var array
     */
    public $folders  = [];

    /**
     * @var \Lib\Db
     */
    public $fob_db;

    /**
     * @var \Lib\Db
     */
    public $db;


    public $startTime = 0;

    /**
     * SyncToEsCmd constructor.
     */
    public function __construct()
    {
        $this->db = db();
        $this->fob_db = fob_mysql();

        $handler = function ($signal){
            _echo('收到进程信号 '. $signal);
            // 可以处理其他程序
            $this->isStop = true;
        };
        pcntl_signal(SIGTERM, $handler); // 这个是kill
        pcntl_signal(SIGINT, $handler); // 这个是 ctrl+c

        $this->startTime = time();
    }

    /**
     * 是否是自动回复
     * @author:dc
     * @time 2025/6/6 11:07
     */
    public function isAuto($data){
        $is_auto = $this->db->count('select count(*) from `lists_auto` where `list_id` = '.$data['id']) ? 1 : 0;

        if($is_auto) return 1;

        return isAiAutoMail($data['from'],$data['subject'],$data['body']??'') === 1 ? 1 : 0;

    }

    /**
     * @return bool
     */
    public function isStop(): bool
    {
        // 检查是否接收到信号
        pcntl_signal_dispatch();

        // 是否超过来最大执行时间
        if(time()-43200 > $this->startTime){
            return true;
        }

        return $this->isStop;
    }


    public function handler(){

        // $es = es(); // 第一个库,即将丢弃
        $es2 = es('email_lists_copy'); // 第二个库 新

        while (!$this->isStop()){

            $id = redis()->lPop('sync_to_es');
            $code = 500;
            if($id){

                $is_check_body = false;
                if(str_contains($id, '.')){
                    $id = explode('.',$id)[0];
                    $is_check_body = true;
                }

                
                $doc_id = '';
                try {
                    $data = $this->db->throw()->first(\Model\listsSql::first('`id` = '.$id));
                    if(!$data){
                        $data = $this->db->throw()->first(\Model\listsSql::firstHot('`id` = '.$id));
                    }
                }catch (Throwable $e){
                    redis()->rPush('sync_to_es',$id);
                    _echo('sync to es '.$e->getMessage());
                    break;
                }

                if($data){
                    try {
                        // 文件夹
                        if(empty($this->folders[$data['folder_id']])){
                            $this->folders[$data['folder_id']] = $this->db->throw()->value(\Model\folderSql::first($data['folder_id'],'folder'));
                        }

                        // 为文件夹打标 方便查询
                        $data['folder_as_int'] = folder2int($this->folders[$data['folder_id']]);

                        // 是否是自动回复
                        if($data['folder_as_int'] === 1){
                            // 是否检查body
                            if($is_check_body){
                                $body = getMailBody($data['id'],$this->db);
                                if($body){
                                    $data['body'] = getBodyHtml($body);
                                }
                            }

                            $data['is_auto'] = $this->isAuto($data);

                            unset($data['body']);
                        }

                        // postid ai邮箱要用 这个是查询黑格
                        list($data['postid'],$data['source']) = $this->getPostid($data['email_id'],$data['udate']);

                    }catch (Throwable $e){
                        redis()->rPush('sync_to_es',$id);
                        _echo('sync to es '.$e->getMessage());
                        break;
                    }

                    $data = $this->getEsData($data);
                    $doc_id = $data['email_id'].'_'.$data['folder_id'].'_'.$data['uid'];

                    // 新
                    $code = $es2->save($doc_id,$data);

                    //  这个验证数据没问题后会丢弃
                    // $code = $es->save($doc_id,$data);

                }

                if($code!==200){
                    @file_put_contents(LOG_PATH.'/sync_es_fail.log',$id."\n",FILE_APPEND);
                    _echo('同步es: '.$doc_id.'===>'.$code);
                }

            }else{
                sleep(1);
            }
        }

    }


    /**
     * 项目id
     * @author:dc
     * @time 2025/5/20 15:44
     */
    public function getPostid($email_id,$udate){

        // 查询历史记录
        $times = redis()->getSet('fob_bind_mail_times2:'.$email_id,300,function ($email_id){

            $times = $this->fob_db->throw()->all("select `post_id`,`bind_time`,`source` from `e_mail_binds_log` where `email_id` = {$email_id} order by `bind_time` desc ");
            if(!$times){
                return [];
            }

            return $times;

        },$email_id);

        if(is_array($times) && $times){
            foreach ($times as $time){
                $t = strtotime($time['bind_time']);
                // 邮件收到的时间是否大于绑定时间
                if($udate > $t){
                    $data = $time;
                    break;
                }
            }
        }

        // 没有找到历史,就找绑定表
        if(empty($data)){
            $data = redis()->getSet('fob_bind_mail:'.$email_id,300,function ($email_id){
                return $this->fob_db->throw()->first("select `post_id`,`source` from `e_mail_binds` where `email_id` = '{$email_id}' and `deleted_at` is null order by `id` desc limit 1");
            },$email_id);

        }

        return [
            $data['post_id']??0,
            $data['source']??0,
        ];
    }


    public function getEsData($data){
        if(!empty($data['id'])){
            $data['uuid'] = $data['id'];
            unset($data['id']);
        }

        $data['from'] = [
            'email' =>  $data['from'],
            'name'  =>  $data['from_name']??''
        ];
        unset($data['from_name']);
        unset($data['date']);

        if(!empty($data['to_name']) && strlen($data['to_name'])>10000){
            $data['to_name'] = @json_decode($data['to_name'],true);
            if(is_array($data['to_name'])){
                $data['to_name'] = array_map(function ($v){
                    $v['name'] = '';
                    return $v;
                },$data['to_name']);
                $data['to_name'] = array_slice($data['to_name'],0,100);
                $data['to_name'] = json_encode($data['to_name']);
            }else{
                $data['to_name'] = '';
            }
        }
        $data['description'] = str_replace(["\n"],"",$data['description']);
        // unset($data['to_name']);

        if(!empty($data['created_at'])){
            $data['created_at'] = date('Y-m-d\TH:i:s',strtotime($data['created_at']));
        }
        if(!empty($data['updated_at'])){
            $data['updated_at'] = date('Y-m-d\TH:i:s',strtotime($data['updated_at']));
        }
        $data['references'] = empty($data['references']) ? '' : $data['references'];
        return $data;
    }


}

(new SyncToEsCmd())->handler();

return 1;