作者 邓超

x

... ... @@ -17,112 +17,46 @@ function start(){
// 进程管理器
$pm = new Process\Manager();
// 启动一个进程来管理定时
$pm->add(function (Process\Pool $pool, int $workerId){
_echo("定时进程({$workerId})启动成功");
// 每秒执行
\Swoole\Timer::tick(1000,function() use(&$pool){
if(redis()->getOriginData('email_sync_stop_num') >= WORKER_NUM+1 ){
$pool->shutdown();
}
//
\Lib\Log::getInstance()->write();
});
// 进行阻塞,否则定时器无法运行
while (true){
co::sleep(10);
}
},true);
// 协程配置
\co::set([
'max_coroutine'=>COROUTINE_MAX_NUM, // 最大携程数量
'hook_flags'=>SWOOLE_HOOK_TCP, // redis需要的配置
]);
// 启动业务进程
$pm->addBatch(WORKER_NUM,function (Process\Pool $pool, int $worker_id){
_echo("业务进程({$worker_id})启动成功");
$start_num = 0;// 启动的协程数量
// 循环阻塞
while (true){
swoole_set_process_name('php-email-sync-list-'.$worker_id);
// 需要同步的id
$id = redis()->lPop('sync_email_lists');
if(redis()->get(SYNC_RUNNING_REDIS_KEY)=='stop'){
break;
}
// 是否到了协程配置的数量上限
if($start_num < COROUTINE_MAX_NUM){
// 需要同步的id
$id = redis()->lPop('sync_email_lists');
if(!$id || !is_numeric($id)){
co::sleep(1);
}else{
// 占用当前的id,占用2小时
if(redis()->add('just_sync_'.$id,time(),600)){
// 启动一个协程
go(function () use (&$start_num,$worker_id,$id){
$start_num++;
// 开始同步
try {
sync($id,$worker_id);
}catch (\Throwable $e){
// 重新发布同步任务,如果失败了是否重新发布
// redis()->rPush('sync_email_lists',$id);
// _echo($e->getMessage());
logs(
$e->getMessage().PHP_EOL.$e->getTraceAsString(),
LOG_PATH.'/'.$worker_id.'.log'
);
}
// 协程完成后执行的函数
co::defer(function () use (&$start_num,$worker_id,$id){
// _echo('正常关闭进程('.$worker_id.')下的协程('.co::getCid().')');
$start_num--;
// 消除占用
redis()->delete('just_sync_'.$id);
// 写入日志
\Lib\Log::getInstance()->write();
});
});
}
}
}else{
// 协程到了最大的数量,阻塞1秒
if(!$id || !is_numeric($id)){
co::sleep(1);
}
else{
// 占用当前的id,占用2小时
if(redis()->add('just_sync_'.$id,time(),600)){
// 启动一个协程
go(function () use ($id){
// 开始同步
try {
sync($id);
}catch (\Throwable $e){
logs(
$e->getMessage().PHP_EOL.$e->getTraceAsString(),
LOG_PATH.'/sync/'.$id.'.log'
);
}
// 协程完成后执行的函数
co::defer(function () use ($id){
// 消除占用
redis()->delete('just_sync_'.$id);
// 写入日志
\Lib\Log::getInstance()->write();
});
}
// 验证是否全部进程退出了
while (true){
if(!$start_num){
redis()->incr('email_sync_stop_num');
break;
});
}
}
co::sleep(0.5);
}
while (true){
co::sleep(99);
}
},true);
... ... @@ -130,73 +64,45 @@ function start(){
// 启动一个同步内容的进程
$pm->add(function (Process\Pool $pool, int $worker_id){
_echo("业务进程({$worker_id})启动成功,body");
$start_num = 0;// 启动的协程数量
swoole_set_process_name('php-email-sync-body-'.$worker_id);
// 循环阻塞
while (true){
if(redis()->get(SYNC_RUNNING_REDIS_KEY)=='stop'){
break;
}
// 是否到了协程配置的数量上限
if($start_num < 500){
// 需要同步的id
$id = redis()->lPop('sync_email_body');
if(!$id){
co::sleep(1);
}else{
// 占用当前的id,占用2小时
if(redis()->add('just_sync_body_'.$id['lists_id'],time(),600)){
// 启动一个协程
go(function () use (&$start_num,$worker_id,$id){
$start_num++;
// 开始同步
try {
sync_body($id,$worker_id);
}catch (\Throwable $e){
// 需要同步的id
$id = redis()->lPop('sync_email_body');
if(!$id){
co::sleep(1);
}else{
// 占用当前的id,占用2小时
if(redis()->add('just_sync_body_'.$id['lists_id'],time(),600)){
// 启动一个协程
go(function () use ($id){
// 开始同步
try {
sync_body($id);
}catch (\Throwable $e){
// _echo($e->getMessage());
logs(
$e->getMessage().PHP_EOL.$e->getTraceAsString(),
LOG_PATH.'/'.$worker_id.'.log'
);
}
// 协程完成后执行的函数
co::defer(function () use (&$start_num,$worker_id,$id){
logs(
$e->getMessage().PHP_EOL.$e->getTraceAsString(),
LOG_PATH.'/'.$id['lists_id'].'.log'
);
}
// 协程完成后执行的函数
co::defer(function () use ($id){
// _echo('正常关闭进程('.$worker_id.')下的协程('.co::getCid().')');
$start_num--;
// 消除占用
redis()->delete('just_sync_body_'.$id['lists_id']);
// 写入日志
\Lib\Log::getInstance()->write();
});
// 消除占用
redis()->delete('just_sync_body_'.$id['lists_id']);
// 写入日志
\Lib\Log::getInstance()->write();
});
}
});
}
}else{
// 协程到了最大的数量,阻塞1秒
co::sleep(1);
}
}
// 验证是否全部进程退出了
while (true){
if(!$start_num){
redis()->incr('email_sync_stop_num');
break;
}
co::sleep(0.5);
}
while (true){
co::sleep(99);
}
},true);
... ... @@ -214,7 +120,7 @@ function start(){
* @author:dc
* @time 2023/3/23 10:18
*/
function sync_body($id,$worker_id){
function sync_body($id){
// 是否有数据
if(db()->count(\Model\bodySql::has((int) $id['lists_id']))){
... ... @@ -256,7 +162,7 @@ function sync_body($id,$worker_id){
* @author:dc
* @time 2023/3/10 10:19
*/
function sync($email_id,$worker_id){
function sync($email_id){
$email = db()->first(\Model\emailSql::first($email_id));
if(!$email){
... ... @@ -269,7 +175,7 @@ function sync($email_id,$worker_id){
$mailServer = new Lib\Mail\Mail($email['email'],base64_decode($email['password']),$email['imap']);
// 登录服务器
// 登录服务器
if($mailServer->login()!==1){
return 2;
}
... ... @@ -277,8 +183,8 @@ function sync($email_id,$worker_id){
// $mailServer->client->debug(true,LOG_PATH.'/'.$email_id.'/');
// 同步文件夹
$mailServer->syncFolder($email_id,db());
$mailServer->syncFolder($email_id);
_echo('文件夹同步成功-'.$email_id);
// 读取到邮箱中的文件夹
$folders = db()->all(\Model\folderSql::all($email['id']));
... ... @@ -289,10 +195,12 @@ function sync($email_id,$worker_id){
foreach ($folders as $folder){
try {
if(empty($folder['_child'])){
_echo('同步文件夹('.$folder['origin_folder'].')邮件列表');
// 同步父文件夹
$mailServer->syncMail($email_id,$folder['id'],$folder['origin_folder']);
}else{
foreach ($folder['_child'] as $item){
_echo('同步文件夹('.$item['origin_folder'].')邮件列表');
// 同步子文件夹
$mailServer->syncMail($email_id,$item['id'],$item['origin_folder']);
}
... ... @@ -319,41 +227,7 @@ if(!function_exists("imap_8bit")){
$ps = "ps -ef | grep \"sync.php start\" | grep -v grep | wc -l";
switch ($argv[1]??0){
case 'start':{
// $num = exec($ps);
// if($num){
// echo '正则运行,请勿重复运行';
// }else{
start();
// }
break;
}
case 'stop':{
\Co\run(function ($ps){
echo "正在退出程序...\n非必要请不要强制kill掉进程\n";
redis()->set(SYNC_RUNNING_REDIS_KEY,'stop');
while (true){
$num = exec($ps);
if(!$num){
break;
}
co::sleep(0.2);
}
echo "已退出程序\n";
},$ps);
break;
}
default:{
break;
}
}
start();
... ...
<?php
//
///**
// * TODO:: 这个文件是定时去拉取所有邮箱中的邮件
// * 暂时不用这个
// */
////error_reporting();
//
//use Swoole\Process;
//
//
//
//include_once __DIR__."/../vendor/autoload.php";
//
//
//function start(){
//
//// 删除停止运行的值
// redis()->delete(SYNC_RUNNING_REDIS_KEY);
//
// /** 创建一个表 **/
// $table = new Swoole\Table(128);// 128 行
// $table->column('val', Swoole\Table::TYPE_INT);
// $table->create();
//
// // 初始时,进行一次统计
// $table->set('etotal',['val'=> db()->count(\Model\emailSql::count())]);
//
// // 进程管理器
// $pm = new Process\Manager();
//
// // 启动一个进程来管理定时
// $pm->add(function (Process\Pool $pool, int $workerId)use (&$table){
// _echo("定时进程({$workerId})启动成功");
// // 每10分钟统计一次邮箱数量
// \Swoole\Timer::tick(600000,function () use (&$table){
// $table->set('etotal',['val'=> db()->count(\Model\emailSql::count())]);
// });
//
// // 每2秒执行一次
// \Swoole\Timer::tick(2000,function () use (&$table,&$pool){
// // 是否停止脚本
// $table->set('stop',['val'=> redis()->get(SYNC_RUNNING_REDIS_KEY) === 'stop' ? 1 : 0]);
//// _echo('定时器');
// // 检查是否结束了所有的协程同步代码
// if ($table->get('stop','val')) {
// $stop_num = 0;
// foreach (range(0, WORKER_NUM) as $i) {
// if ($table->exists('ps' . $i)) {
// $stop_num++;
// }
// }
// if($stop_num >= WORKER_NUM){
//// 退出进程
// $pool->shutdown();
// }
// }
//
// // 邮件总数
// $total = redis()->get('email_total',0);
// if($total > $table->get('etotal','val')){
// $table->set('etotal',['val'=> $total]);
// }
//
// });
//
//
// //todo:: 需要更新同步的邮件,每10分钟同步一次,这里是的时间是微妙
// \Swoole\Timer::tick(600000,function (){
//
// start_now_mail();
//
// });
//
//
//
// // 进行阻塞,否则定时器无法运行
// while (true){
// co::sleep(9999);
// }
// },true);
//
//
//// 协程配置
// \co::set([
// 'max_coroutine'=>COROUTINE_MAX_NUM, // 最大携程数量
// 'hook_flags'=>SWOOLE_HOOK_TCP, // redis需要的配置
// ]);
//
// // 启动业务进程
// $pm->addBatch(WORKER_NUM,function (Process\Pool $pool, int $worker_id) use (&$table){
// _echo("业务进程({$worker_id})启动成功");
//
// // 协程id集
// $cid = [];
//
// $i = 0;
// $email_total = $table->get('etotal','val');//邮件总数量
// $isRunMaxCNum = 1; // 允许最大协程数量,如果为0则停止所有协程工作,相当于停止脚本
// // 是否退出进程
// while (true){
// $i++;
// // 每10秒 验证一次邮箱数量,好控制协程的数量
// if($i>=10){
// $email_total = $table->get('etotal','val');
// $i = 0;
// }
//
// if(!$email_total){
// break;
// }
//
// // 每个协程 分配 10个邮箱任务
// $cnum = ceil($email_total/(WORKER_NUM*10));
// // 当前协程的数量
// $nowCnum = count($cid);
// // 说明 需要新的协程了
// if($cnum > $nowCnum){
// // 开启所需要的协程数量
// foreach (range(0,$cnum-$nowCnum) as $v){
// // 启动一个协程
// create_coroutine($cid,$isRunMaxCNum,$worker_id);
// }
// }
// // 暂时没有实现 减少协程数量操作
//// else if ($cnum < $nowCnum){
//// // 说明 协程数量过多,小于了1个协程处理10个邮箱的,资源闲置情况
//// // 销毁多余协程
//// $isRunMaxCNum = $nowCnum - $cnum;
//// }
//
// // 每3秒检查一次是否要停止 协程
// if($i%3 === 0){
//// _echo('是否收到退出信号:'.$table->get('stop','val'));
// if ($table->get('stop','val')){
// // 停止
// $isRunMaxCNum = 0;
// }
// }
//
// // 这个是检查 cid的如果协程全部退出,则退出进程
// co::sleep(1);
// // 跳出无限循环了
// if(!$cid){
// _echo('正常关闭进程('.$worker_id.')');
// break;
// }
// }
//
// // 是否停止,这里进行阻塞
// if ($table->get('stop','val')){
// // 某个进程退出了
// $table->set('ps'.$worker_id,['val'=>1]);
// // 阻塞直到 主进程 kill掉所有子进程
// while (true){
// co::sleep(5);
// }
// }
//
//
// },true);
//
//
// // 启动管理器
// $pm->start();
//
//}
//
///**
// * 创建协程
// * @param array $cid
// * @param int $isRunMaxCNum
// * @param $worker_id
// * @author:dc
// * @time 2023/2/14 17:04
// */
//function create_coroutine(array &$cid,int &$isRunMaxCNum,$worker_id){
// go(function () use (&$cid,&$isRunMaxCNum,$worker_id){
// // 协程id
// $cid[co::getCid()] = co::getCid();
//
// // 同步操作
// while (true){
// // 是否退出协程
// if(!$isRunMaxCNum){
//// _echo('协程('.co::getCid().'): stop '.$isRunMaxCNum);
// break;
// }
//
// // 开始同步
// try {
// sync($worker_id);
// }catch (\Throwable $e){
// _echo($e->getMessage());
// logs(
// $e->getMessage().PHP_EOL.$e->getTraceAsString(),
// LOG_PATH.'/'.$worker_id.'_'.co::getCid().'.log'
// );
// }
//
// // 阻塞1秒
// co::sleep(1);
// }
//
// // 协程完成后执行的函数
// co::defer(function () use (&$cid,$worker_id){
// _echo('正常关闭进程('.$worker_id.')下的协程('.co::getCid().')');
// unset($cid[co::getCid()]);
// });
//
// });
//}
//
//
///**
// * 开始同步, 这里是主要的业务代码
// * @param int $worker_id 进程号
// * @return int
// * @author:dc
// * @time 2023/2/18 11:27
// */
//function sync($worker_id=0){
// // 需要同步的id
// $id = redis()->lPop('sync_email_lists');
//
// if(!$id){
// co::sleep(1);
// return -1;
// }
//
// _echo($worker_id.': 协程('.co::getCid().'):抢到 '.$id);
//
// $email = db()->first(\Model\emailSql::first($id));
// if(!$email){
// return 0;
// }
//
// if($email['pwd_error']){
// return 1;
// }
//
// $mailServer = new Lib\Mail\Mail($email['email'],base64_decode($email['password']),$email['imap']);
//
// // 登录服务器
// if(!$mailServer->login()){
// return 2;
// }
//
// // 文件夹间隔1天同步一次
// if(empty($email['last_sync_time']) || time() > $email['last_sync_time']+86400){
// // 同步文件夹
// $mailServer->syncFolder($id,db());
// }
//
// // 读取到邮箱中的文件夹
// $folders = db()->all(\Model\folderSql::all($email['id']));
// if(!$folders){
// return 3;
// }
// $folders = list_to_tree($folders);
// foreach ($folders as $folder){
// try {
// if(empty($folder['_child'])){
// // 同步父文件夹
// $mailServer->syncMail($id,$folder['id'],$folder['origin_folder']);
// }else{
// foreach ($folder as $item){
// // 同步子文件夹
// $mailServer->syncMail($id,$item['id'],$item['origin_folder']);
// }
// }
//
// }catch (Throwable $e){
// logs(
// $e->getMessage(),
// LOG_PATH.'/imap/'.$email['email'].'.error.log'
// );
// }
// }
//
//
// $email = null;
// $mailServer = null;
//
//}
//
//
//
//
//switch ($argv[1]){
// case 'start':{
// start();
// break;
// }
// case 'stop':{
// \Co\run(function (){
// echo "正在退出程序...\n非必要请不要强制kill掉进程\n";
// redis()->set(SYNC_RUNNING_REDIS_KEY,'stop');
// while (true){
// $num = exec("ps -ef | grep \"sync_email.php start\" | grep -v grep | wc -l");
// if(!$num){
// break;
// }
// co::sleep(0.5);
// }
// echo "已退出程序\n";
// });
// break;
// }
// default:{
// break;
// }
//}
//
//
//
//
//
//
//
//
//
//