作者 邓超

x

... ... @@ -15,7 +15,7 @@ function start(){
$pm = new Process\Manager();
// 启动业务进程
$pm->addBatch(10,function (Process\Pool $pool, int $worker_id){
$pm->addBatch(15,function (Process\Pool $pool, int $worker_id){
swoole_set_process_name('php-email-sync-list-'.$worker_id);
... ... @@ -41,19 +41,12 @@ function start(){
go(function () use ($id){
// 开始同步
try {
sync($id);
}catch (\Throwable $e){
logs(
$e->getMessage().PHP_EOL.$e->getTraceAsString(),
LOG_PATH.'/sync/'.$id.'.log'
);
}
(new \Service\SyncMail($id))->sync();
// 协程完成后执行的函数
co::defer(function () use ($id){
// 30秒后 消除占用
redis()->expire('just_sync_'.$id,30);
redis()->expire('just_sync_'.$id,60);
// 写入日志
\Lib\Log::getInstance()->write();
});
... ... @@ -69,190 +62,12 @@ function start(){
},true);
// 启动一个同步内容的进程
// $pm->add(function (Process\Pool $pool, int $worker_id){
//
// swoole_set_process_name('php-email-sync-body-'.$worker_id);
//
// include_once __DIR__."/../vendor/autoload.php";
//
// _echo("业务进程({$worker_id})启动成功,body");
// $run_timer = time();
// // 循环阻塞
// while (true){
// // 运行超过1天的 停止
// if($run_timer < (time()-21600)){
// break;
// }
// // 需要同步的id
// $id = redis()->lPop('sync_email_body');
//
// if(!$id){
// co::sleep(1);
// }else{
// // 占用当前的id,占用2小时
// if(redis()->add('just_sync_body_'.$id['lists_id'],time(),600)){
// // 启动一个协程
// go(function () use ($id){
//
// // 开始同步
// try {
// sync_body($id);
// }catch (\Throwable $e){
//// _echo($e->getMessage());
// logs(
// $e->getMessage().PHP_EOL.$e->getTraceAsString(),
// LOG_PATH.'/'.$id['email_id'].'.log'
// );
// }
//
// // 协程完成后执行的函数
// co::defer(function () use ($id){
//// _echo('正常关闭进程('.$worker_id.')下的协程('.co::getCid().')');
// // 消除占用
// redis()->delete('just_sync_body_'.$id['lists_id']);
// // 写入日志
// \Lib\Log::getInstance()->write();
//
// });
//
// });
// }
// }
//
// }
//
// },true);
// 启动管理器
$pm->start();
}
/**
* 同步内容 body
* @param $id
* @param $worker_id
* @return int
* @author:dc
* @time 2023/3/23 10:18
*/
function sync_body($id){
// 是否有数据
if(db()->count(\Model\bodySql::has((int) $id['lists_id']))){
return 0;
}
$email = db()->first(\Model\emailSql::first($id['email_id']));
if(!$email){
return 0;
}
if($email['pwd_error']){
return 1;
}
$mailServer = new Lib\Mail\Mail($email['email'],base64_decode($email['password']),$email['imap']);
// 登录服务器
if($mailServer->login()!=1){
return 2;
}
// $mailServer->client->debug(true,LOG_PATH.'/'.$id['email_id'].'body/');
// 同步 body
$mailServer->syncBody($id['folder'],$id['uid'],$id['lists_id'],db());
$mailServer = null;
return 0;
}
/**
* 开始同步, 这里是主要的业务代码
* @param $email_id
* @param $worker_id
* @return int
* @author:dc
* @time 2023/3/10 10:19
*/
function sync($email_id){
$email = db()->first(\Model\emailSql::first($email_id));
if(!$email){
return 0;
}
if($email['pwd_error']){
return 1;
}
$mailServer = new Lib\Mail\Mail($email['email'],base64_decode($email['password']),$email['imap']);
// 登录服务器
if($mailServer->login()!==1){
return 2;
}
// $mailServer->client->debug(true,LOG_PATH.'/'.$email_id.'/');
// 同步文件夹
$mailServer->syncFolder($email_id);
_echo('文件夹同步成功-'.$email_id);
// 读取到邮箱中的文件夹
$folders = db()->all(\Model\folderSql::all($email['id']));
if(!$folders){
return 3;
}
$call = function ($email_id,$folder_id,$origin_folder) use ($mailServer){
// gmail 邮箱 这个是不可选的
if($origin_folder == '[Gmail]'){
return;
}
// 同步父文件夹
$result = $mailServer->syncMail($email_id,$folder_id,$origin_folder);
if(is_array($result) && $result){
_echo($email_id.' 同步文件夹('.$origin_folder.')邮件列表 '.count($result));
}
};
// $folders = list_to_tree($folders);
foreach ($folders as $folder){
try {
$is = true;
foreach ($folders as $f){
// 是否存在下级
if($f['pid'] == $folder['id']){
$is = false;
}
}
if($is) $call($email_id,$folder['id'],$folder['origin_folder']);
}catch (\Throwable $e){
logs(
$e->getMessage().$e->getTraceAsString(),
LOG_PATH.'/imap/'.$email['email'].'.error.log'
);
}
}
$email = null;
$mailServer = null;
}
if(!function_exists("imap_8bit")){
echo '请安装imap扩展';
exit(0);
}
... ...
... ... @@ -174,23 +174,53 @@ class SyncMail {
if($f->isSelect){ // 是否可以选择 只有可以选中的文件夹才有邮件
$folder = $this->imap->folder($f); // 选择文件夹后,有状态
// 这个暂时不要
// $this->db->update(folderSql::$table,[
// 'exsts' => $folder->getTotal(),
// 'unseen' => $folder->getUnseen(),
// 'last_sync_time' => time()
// ],dbWhere(['email_id'=>$this->emailId(),'origin_folder'=>$folder->getName()]),false);
// 是否有邮件 有邮件才继续
if ($folder->getTotal()){
$this->mail($folder);
}
// 更新数量
$this->db->update(folderSql::$table,[
'exsts' => $this->db->count(listsSql::listCount(
dbWhere(
[
'folder_id'=>$this->getFolderId($folder->getName()),
'deleted' => 0,
]
)
)),
'unseen' => $this->db->count(listsSql::listCount(
dbWhere(
[
'folder_id'=>$this->getFolderId($folder->getName()),
'seen' => 0,
'deleted' => 0,
]
)
)),
'last_sync_time' => time()
],dbWhere(['email_id'=>$this->emailId(),'uuid'=>md5($this->emailId().$folder->getName())]),false);
}
}
}
/**
* 当前 目录的id
* @param string $name
* @return mixed|null
* @author:dc
* @time 2024/10/12 17:44
*/
private function getFolderId(string $name){
return $this->db->cache(120)->value(folderSql::first([
'email_id'=>$this->emailId(),
'uuid' => md5($this->emailId().$name)
],'`id`'));
}
/**同步邮件
*
* @param string|\Lib\Imap\Request\Folder $folder
... ... @@ -204,10 +234,7 @@ class SyncMail {
$folder = $this->imap->folder($folder)->exec();
}
$folder_id = $this->db->value(folderSql::first([
'email_id'=>$this->emailId(),
'uuid' => md5($this->emailId().$folder->getName())
],'`id`'));
$folder_id = $this->getFolderId($folder->getName());
// 选择成功
if($folder->isOk()){
... ...