sync.php 10.1 KB
<?php

//error_reporting();

use Swoole\Process;



include_once __DIR__."/../vendor/autoload.php";


function start(){

// 删除停止运行的值
    redis()->delete(SYNC_RUNNING_REDIS_KEY,'email_sync_stop_num');

    // 进程管理器
    $pm = new Process\Manager();

    // 启动一个进程来管理定时
    $pm->add(function (Process\Pool $pool, int $workerId){
        _echo("定时进程({$workerId})启动成功");

        // 每秒执行
        \Swoole\Timer::tick(1000,function() use(&$pool){

            if(redis()->getOriginData('email_sync_stop_num') >= WORKER_NUM+1 ){
                $pool->shutdown();
            }

            // 每秒检查一次是否空闲redis
            foreach (\Lib\RedisPool::$instance as $redis){
                if(time()-2 > $redis->lastTimer ){
                    $redis->close();
                }
            }
            // 空闲超过10秒的db链接,关闭
            foreach (\Lib\DbPool::$instance as $db){
                if(time()-10 > $db->lastTimer ){
                    $db->close();
                }
            }

            //
            \Lib\Log::getInstance()->write();

        });


        // 进行阻塞,否则定时器无法运行
        while (true){
            co::sleep(10);
        }

    },true);


// 协程配置
    \co::set([
        'max_coroutine'=>COROUTINE_MAX_NUM, // 最大携程数量
        'hook_flags'=>SWOOLE_HOOK_TCP,  //  redis需要的配置
    ]);

    // 启动业务进程
    $pm->addBatch(WORKER_NUM,function (Process\Pool $pool, int $worker_id){
        _echo("业务进程({$worker_id})启动成功");

        $start_num = 0;// 启动的协程数量

        // 循环阻塞
        while (true){

            if(redis()->get(SYNC_RUNNING_REDIS_KEY)=='stop'){
                break;
            }

            // 是否到了协程配置的数量上限
            if($start_num < COROUTINE_MAX_NUM){
                // 需要同步的id
                $id = redis()->lPop('sync_email_lists');

                if(!$id || !is_numeric($id)){
                    co::sleep(1);
                }else{
                    // 占用当前的id,占用2小时
                    if(redis()->add('just_sync_'.$id,time(),7200)){
                        // 启动一个协程
                        go(function () use (&$start_num,$worker_id,$id){
                            $start_num++;

                            // 开始同步
                            try {
                                sync($id,$worker_id);
                            }catch (\Throwable $e){
                                // 重新发布同步任务,如果失败了是否重新发布
//                            redis()->rPush('sync_email_lists',$id);

//                            _echo($e->getMessage());
                                logs(
                                    $e->getMessage().PHP_EOL.$e->getTraceAsString(),
                                    LOG_PATH.'/'.$worker_id.'.log'
                                );
                            }

                            // 协程完成后执行的函数
                            co::defer(function () use (&$start_num,$worker_id,$id){
//                        _echo('正常关闭进程('.$worker_id.')下的协程('.co::getCid().')');
                                $start_num--;
                                // 消除占用
                                redis()->delete('just_sync_'.$id);
                                // 写入日志
                                \Lib\Log::getInstance()->write();

                                // 关闭数据库链接
                                db()->close();
                                // 关闭redis链接
                                redis()->close();

                            });

                        });
                    }
                }
            }else{
                // 协程到了最大的数量,阻塞1秒
                co::sleep(1);
            }


        }

        // 验证是否全部进程退出了
        while (true){
            if(!$start_num){
                redis()->incr('email_sync_stop_num');
                break;
            }
            co::sleep(0.5);
        }
        while (true){
            co::sleep(99);
        }

    },true);

    // 启动一个同步内容的进程
    $pm->add(function (Process\Pool $pool, int $worker_id){
        _echo("业务进程({$worker_id})启动成功,body");

        $start_num = 0;// 启动的协程数量

        // 循环阻塞
        while (true){
            if(redis()->get(SYNC_RUNNING_REDIS_KEY)=='stop'){
                break;
            }
            // 是否到了协程配置的数量上限
            if($start_num < 500){
                // 需要同步的id
                $id = redis()->lPop('sync_email_body');

                if(!$id){
                    co::sleep(1);
                }else{
                    // 占用当前的id,占用2小时
                    if(redis()->add('just_sync_body_'.$id['lists_id'],time(),600)){
                        // 启动一个协程
                        go(function () use (&$start_num,$worker_id,$id){

                            $start_num++;

                            // 开始同步
                            try {
                                sync_body($id,$worker_id);
                            }catch (\Throwable $e){
//                            _echo($e->getMessage());
                                logs(
                                    $e->getMessage().PHP_EOL.$e->getTraceAsString(),
                                    LOG_PATH.'/'.$worker_id.'.log'
                                );
                            }

                            // 协程完成后执行的函数
                            co::defer(function () use (&$start_num,$worker_id,$id){
//                        _echo('正常关闭进程('.$worker_id.')下的协程('.co::getCid().')');
                                $start_num--;
                                // 消除占用
                                redis()->delete('just_sync_body_'.$id['lists_id']);
                                // 写入日志
                                \Lib\Log::getInstance()->write();

                                // 关闭数据库链接
                                db()->close();
                                // 关闭redis链接
                                redis()->close();

                            });

                        });
                    }
                }
            }else{
                // 协程到了最大的数量,阻塞1秒
                co::sleep(1);
            }


        }

        // 验证是否全部进程退出了
        while (true){
            if(!$start_num){
                redis()->incr('email_sync_stop_num');
                break;
            }
            co::sleep(0.5);
        }
        while (true){
            co::sleep(99);
        }

    },true);

    // 启动管理器
    $pm->start();

}

/**
 * 同步内容 body
 * @param $id
 * @param $worker_id
 * @return int
 * @author:dc
 * @time 2023/3/23 10:18
 */
function sync_body($id,$worker_id){

    // 是否有数据
    if(db()->count(\Model\bodySql::has((int) $id['lists_id']))){
        return 0;
    }

    $email = db()->first(\Model\emailSql::first($id['email_id']));
    if(!$email){
        return 0;
    }

    if($email['pwd_error']){
        return 1;
    }

    $mailServer = new Lib\Mail\Mail($email['email'],base64_decode($email['password']),$email['imap']);

    // 登录服务器
    if(!$mailServer->login()){
        return 2;
    }
//    $mailServer->client->debug(true,LOG_PATH.'/'.$id['email_id'].'body/');

    // 同步 body
    $mailServer->syncBody($id['folder'],$id['uid'],$id['lists_id'],db());

    $mailServer = null;

    return 0;

}


/**
 * 开始同步, 这里是主要的业务代码
 * @param $email_id
 * @param $worker_id
 * @return int
 * @author:dc
 * @time 2023/3/10 10:19
 */
function sync($email_id,$worker_id){

    $email = db()->first(\Model\emailSql::first($email_id));
    if(!$email){
        return 0;
    }

    if($email['pwd_error']){
        return 1;
    }

    $mailServer = new Lib\Mail\Mail($email['email'],base64_decode($email['password']),$email['imap']);

        // 登录服务器
    if(!$mailServer->login()){
        return 2;
    }

//    $mailServer->client->debug(true,LOG_PATH.'/'.$email_id.'/');

    // 同步文件夹
    $mailServer->syncFolder($email_id,db());


    // 读取到邮箱中的文件夹
    $folders = db()->all(\Model\folderSql::all($email['id']));
    if(!$folders){
        return 3;
    }
    $folders = list_to_tree($folders);
    foreach ($folders as $folder){
        try {
            if(empty($folder['_child'])){
                // 同步父文件夹
                $mailServer->syncMail($email_id,$folder['id'],$folder['origin_folder']);
            }else{
                foreach ($folder['_child'] as $item){
                    // 同步子文件夹
                    $mailServer->syncMail($email_id,$item['id'],$item['origin_folder']);
                }
            }

        }catch (\Throwable $e){
            logs(
                $e->getMessage().$e->getTraceAsString(),
                LOG_PATH.'/imap/'.$email['email'].'.error.log'
            );
        }
    }


    $email = null;
    $mailServer = null;
}


if(!function_exists("imap_8bit")){
    echo '请安装imap扩展';
    exit(0);
}



$ps = "ps -ef | grep \"sync.php start\" | grep -v grep | wc -l";

switch ($argv[1]??0){
    case 'start':{
//        $num = exec($ps);
//        if($num){
//            echo '正则运行,请勿重复运行';
//        }else{
            start();
//        }
        break;
    }
    case 'stop':{
        \Co\run(function ($ps){
            echo "正在退出程序...\n非必要请不要强制kill掉进程\n";

            redis()->set(SYNC_RUNNING_REDIS_KEY,'stop');

            while (true){

                $num = exec($ps);
                if(!$num){
                    break;
                }
                co::sleep(0.2);
            }
            echo "已退出程序\n";
        },$ps);

        break;
    }
    default:{
        break;
    }
}