<?php //error_reporting(); require_once "../vendor/autoload.php"; /** * 把mysql的数据同步到es * @author:dc * @time 2025/3/4 10:19 * Class SyncToEsCmd */ class SyncToEsCmd { public $isStop = false; public function handler(){ $handler = function ($signal){ _echo('收到进程信号 '. $signal); // 可以处理其他程序 $this->isStop = true; }; pcntl_signal(SIGTERM, $handler); // 这个是kill pcntl_signal(SIGINT, $handler); // 这个是 ctrl+c // pcntl_signal(SIGHUP, $handler); $es = es(); $db = db(); while (1){ // 检查是否接收到信号 pcntl_signal_dispatch(); if($this->isStop) { _echo('已退出进程'); break; } $id = redis()->lPop('sync_to_es'); $code = 500; if($id){ $doc_id = ''; $data = $db->first(\Model\listsSql::first('`id` = '.$id)); if($data){ // 设置 进程 是否在运行 $data['is_auto']=$db->count('select count(*) from `lists_auto` where `list_id` = '.$data['id']) ? 1 : 0; $data = $this->getEsData($data); $doc_id = $data['email_id'].'_'.$data['folder_id'].'_'.$data['uid']; $code = $es->save($doc_id,$data); } if($code!==200){ @file_put_contents(LOG_PATH.'/sync_es_fail.log',$id."\n",FILE_APPEND); _echo('同步es: '.$doc_id.'===>'.$code); } }else{ sleep(1); } } } public function getEsData($data){ if(!empty($data['id'])){ $data['uuid'] = $data['id']; unset($data['id']); } $data['from'] = [ 'email' => $data['from'], 'name' => $data['from_name']??'' ]; unset($data['from_name']); unset($data['date']); if(!empty($data['to_name']) && strlen($data['to_name'])>10000){ $data['to_name'] = @json_decode($data['to_name'],true); if(is_array($data['to_name'])){ $data['to_name'] = array_map(function ($v){ $v['name'] = ''; return $v; },$data['to_name']); $data['to_name'] = json_encode($data['to_name']); }else{ $data['to_name'] = ''; } } if(!empty($data['created_at'])){ $data['created_at'] = date('Y-m-d\TH:i:s',strtotime($data['created_at'])); } if(!empty($data['updated_at'])){ $data['updated_at'] = date('Y-m-d\TH:i:s',strtotime($data['updated_at'])); } $data['references'] = empty($data['references']) ? '' : $data['references']; return $data; } } (new SyncToEsCmd())->handler(); return 1;