sync_to_es.php 3.0 KB
<?php

//error_reporting();

require_once "../vendor/autoload.php";

/**
 * 把mysql的数据同步到es
 * @author:dc
 * @time 2025/3/4 10:19
 * Class SyncToEsCmd
 */
class SyncToEsCmd {

    public $isStop = false;

    static $num = 0;

    public function handler(){

        $handler = function ($signal){
            _echo('收到进程信号 '. $signal);
            // 可以处理其他程序
            $this->isStop = true;
        };
        pcntl_signal(SIGTERM, $handler); // 这个是kill
        pcntl_signal(SIGINT, $handler); // 这个是 ctrl+c
//        pcntl_signal(SIGHUP, $handler);


        while (1){

            // 检查是否接收到信号
            pcntl_signal_dispatch();

            if($this->isStop) {
                _echo('已退出进程');
                break;
            }

            $id = redis()->lPop('sync_to_es');
            if($id){
                while(self::$num>=50){
                    co::sleep(0.5);
                }
                go(function () use ($id){
                    static::$num++;
                    $db = db();
                    $code = 500;

                    $data = $db->first(\Model\listsSql::first('`id` = '.$id));
                    if($data){
                        // 设置 进程 是否在运行
                        $data['is_auto']=$db->count('select count(*) from `lists_auto` where `list_id` = '.$data['id']) ? 1 : 0;

                        $data = $this->getEsData($data);
                        $doc_id = $data['email_id'].'_'.$data['folder_id'].'_'.$data['uid'];

                        $code = es()->save($doc_id,$data);
                    }

                    if($code!==200){
                        @file_put_contents(LOG_PATH.'/sync_es_fail.log',$id."\n",FILE_APPEND);
                        _echo('同步es: '.$doc_id.'===>'.$code);
                    }

                    $db = null;

                    co::defer(function (){
                        static::$num--;

                        db()->close();

                        \Lib\Log::getInstance()->write();
                    });
                });
            }else{
                co::sleep(1);
            }

        }

    }


    public function getEsData($data){
        if(!empty($data['id'])){
            $data['uuid'] = $data['id'];
            unset($data['id']);
        }

        $data['from'] = [
            'email' =>  $data['from'],
            'name'  =>  $data['from_name']??''
        ];
        unset($data['from_name']);
        unset($data['date']);
        if(!empty($data['created_at'])){
            $data['created_at'] = date('Y-m-d\TH:i:s',strtotime($data['created_at']));
        }
        if(!empty($data['updated_at'])){
            $data['updated_at'] = date('Y-m-d\TH:i:s',strtotime($data['updated_at']));
        }
        $data['references'] = empty($data['references']) ? '' : $data['references'];
        return $data;
    }


}

\Lib\DbPool::$clientNumber = 60;

\Co\run(function (){
    (new SyncToEsCmd())->handler();
});


return 1;