sync_email.php 9.0 KB
<?php
//
///**
// * TODO:: 这个文件是定时去拉取所有邮箱中的邮件
// * 暂时不用这个
// */
////error_reporting();
//
//use Swoole\Process;
//
//
//
//include_once __DIR__."/../vendor/autoload.php";
//
//
//function start(){
//
//// 删除停止运行的值
//    redis()->delete(SYNC_RUNNING_REDIS_KEY);
//
//    /** 创建一个表 **/
//    $table = new Swoole\Table(128);// 128 行
//    $table->column('val', Swoole\Table::TYPE_INT);
//    $table->create();
//
//    // 初始时,进行一次统计
//    $table->set('etotal',['val'=> db()->count(\Model\emailSql::count())]);
//
//    // 进程管理器
//    $pm = new Process\Manager();
//
//    // 启动一个进程来管理定时
//    $pm->add(function (Process\Pool $pool, int $workerId)use (&$table){
//        _echo("定时进程({$workerId})启动成功");
//        // 每10分钟统计一次邮箱数量
//        \Swoole\Timer::tick(600000,function () use (&$table){
//            $table->set('etotal',['val'=> db()->count(\Model\emailSql::count())]);
//        });
//
//        // 每2秒执行一次
//        \Swoole\Timer::tick(2000,function () use (&$table,&$pool){
//            // 是否停止脚本
//            $table->set('stop',['val'=> redis()->get(SYNC_RUNNING_REDIS_KEY) === 'stop' ? 1 : 0]);
////            _echo('定时器');
//            // 检查是否结束了所有的协程同步代码
//            if ($table->get('stop','val')) {
//                $stop_num = 0;
//                foreach (range(0, WORKER_NUM) as $i) {
//                    if ($table->exists('ps' . $i)) {
//                        $stop_num++;
//                    }
//                }
//                if($stop_num >= WORKER_NUM){
////                退出进程
//                    $pool->shutdown();
//                }
//            }
//
//            // 邮件总数
//            $total = redis()->get('email_total',0);
//            if($total > $table->get('etotal','val')){
//                $table->set('etotal',['val'=> $total]);
//            }
//
//        });
//
//
//        //todo:: 需要更新同步的邮件,每10分钟同步一次,这里是的时间是微妙
//        \Swoole\Timer::tick(600000,function (){
//
//            start_now_mail();
//
//        });
//
//
//
//        // 进行阻塞,否则定时器无法运行
//        while (true){
//            co::sleep(9999);
//        }
//    },true);
//
//
//// 协程配置
//    \co::set([
//        'max_coroutine'=>COROUTINE_MAX_NUM, // 最大携程数量
//        'hook_flags'=>SWOOLE_HOOK_TCP,  //  redis需要的配置
//    ]);
//
//    // 启动业务进程
//    $pm->addBatch(WORKER_NUM,function (Process\Pool $pool, int $worker_id) use (&$table){
//        _echo("业务进程({$worker_id})启动成功");
//
//        // 协程id集
//        $cid = [];
//
//        $i = 0;
//        $email_total = $table->get('etotal','val');//邮件总数量
//        $isRunMaxCNum = 1; // 允许最大协程数量,如果为0则停止所有协程工作,相当于停止脚本
//        // 是否退出进程
//        while (true){
//            $i++;
//            // 每10秒 验证一次邮箱数量,好控制协程的数量
//            if($i>=10){
//                $email_total = $table->get('etotal','val');
//                $i = 0;
//            }
//
//            if(!$email_total){
//                break;
//            }
//
//            // 每个协程 分配 10个邮箱任务
//            $cnum = ceil($email_total/(WORKER_NUM*10));
//            // 当前协程的数量
//            $nowCnum = count($cid);
//            // 说明 需要新的协程了
//            if($cnum > $nowCnum){
//                // 开启所需要的协程数量
//                foreach (range(0,$cnum-$nowCnum) as $v){
//                    // 启动一个协程
//                    create_coroutine($cid,$isRunMaxCNum,$worker_id);
//                }
//            }
//            // 暂时没有实现 减少协程数量操作
////            else if ($cnum < $nowCnum){
////                // 说明 协程数量过多,小于了1个协程处理10个邮箱的,资源闲置情况
////                // 销毁多余协程
////                $isRunMaxCNum = $nowCnum - $cnum;
////            }
//
//            // 每3秒检查一次是否要停止 协程
//            if($i%3 === 0){
////                _echo('是否收到退出信号:'.$table->get('stop','val'));
//                if ($table->get('stop','val')){
//                    // 停止
//                    $isRunMaxCNum = 0;
//                }
//            }
//
//            // 这个是检查 cid的如果协程全部退出,则退出进程
//            co::sleep(1);
//            // 跳出无限循环了
//            if(!$cid){
//                _echo('正常关闭进程('.$worker_id.')');
//                break;
//            }
//        }
//
//        // 是否停止,这里进行阻塞
//        if ($table->get('stop','val')){
//            // 某个进程退出了
//            $table->set('ps'.$worker_id,['val'=>1]);
//            // 阻塞直到 主进程 kill掉所有子进程
//            while (true){
//                co::sleep(5);
//            }
//        }
//
//
//    },true);
//
//
//    // 启动管理器
//    $pm->start();
//
//}
//
///**
// * 创建协程
// * @param array $cid
// * @param int $isRunMaxCNum
// * @param $worker_id
// * @author:dc
// * @time 2023/2/14 17:04
// */
//function create_coroutine(array &$cid,int &$isRunMaxCNum,$worker_id){
//    go(function () use (&$cid,&$isRunMaxCNum,$worker_id){
//        // 协程id
//        $cid[co::getCid()] = co::getCid();
//
//        // 同步操作
//        while (true){
//            // 是否退出协程
//            if(!$isRunMaxCNum){
////                _echo('协程('.co::getCid().'): stop '.$isRunMaxCNum);
//                break;
//            }
//
//            // 开始同步
//            try {
//                sync($worker_id);
//            }catch (\Throwable $e){
//                _echo($e->getMessage());
//                logs(
//                    $e->getMessage().PHP_EOL.$e->getTraceAsString(),
//                    LOG_PATH.'/'.$worker_id.'_'.co::getCid().'.log'
//                );
//            }
//
//            // 阻塞1秒
//            co::sleep(1);
//        }
//
//        // 协程完成后执行的函数
//        co::defer(function () use (&$cid,$worker_id){
//            _echo('正常关闭进程('.$worker_id.')下的协程('.co::getCid().')');
//            unset($cid[co::getCid()]);
//        });
//
//    });
//}
//
//
///**
// * 开始同步, 这里是主要的业务代码
// * @param int $worker_id 进程号
// * @return int
// * @author:dc
// * @time 2023/2/18 11:27
// */
//function sync($worker_id=0){
//    // 需要同步的id
//    $id = redis()->lPop('sync_email_lists');
//
//    if(!$id){
//        co::sleep(1);
//        return -1;
//    }
//
//    _echo($worker_id.': 协程('.co::getCid().'):抢到 '.$id);
//
//    $email = db()->first(\Model\emailSql::first($id));
//    if(!$email){
//        return 0;
//    }
//
//    if($email['pwd_error']){
//        return 1;
//    }
//
//    $mailServer = new Lib\Mail\Mail($email['email'],base64_decode($email['password']),$email['imap']);
//
//    // 登录服务器
//    if(!$mailServer->login()){
//        return 2;
//    }
//
//    // 文件夹间隔1天同步一次
//    if(empty($email['last_sync_time']) || time() > $email['last_sync_time']+86400){
//        // 同步文件夹
//        $mailServer->syncFolder($id,db());
//    }
//
//    // 读取到邮箱中的文件夹
//    $folders = db()->all(\Model\folderSql::all($email['id']));
//    if(!$folders){
//        return 3;
//    }
//    $folders = list_to_tree($folders);
//    foreach ($folders as $folder){
//        try {
//            if(empty($folder['_child'])){
//                // 同步父文件夹
//                $mailServer->syncMail($id,$folder['id'],$folder['origin_folder']);
//            }else{
//                foreach ($folder as $item){
//                    // 同步子文件夹
//                    $mailServer->syncMail($id,$item['id'],$item['origin_folder']);
//                }
//            }
//
//        }catch (Throwable $e){
//            logs(
//                $e->getMessage(),
//                LOG_PATH.'/imap/'.$email['email'].'.error.log'
//            );
//        }
//    }
//
//
//    $email = null;
//    $mailServer = null;
//
//}
//
//
//
//
//switch ($argv[1]){
//    case 'start':{
//        start();
//        break;
//    }
//    case 'stop':{
//        \Co\run(function (){
//            echo "正在退出程序...\n非必要请不要强制kill掉进程\n";
//            redis()->set(SYNC_RUNNING_REDIS_KEY,'stop');
//            while (true){
//                $num = exec("ps -ef | grep \"sync_email.php start\" | grep -v grep | wc -l");
//                if(!$num){
//                    break;
//                }
//                co::sleep(0.5);
//            }
//            echo "已退出程序\n";
//        });
//        break;
//    }
//    default:{
//        break;
//    }
//}
//
//
//
//
//
//
//
//
//
//