作者 邓超

1

... ... @@ -4,7 +4,21 @@
//use Model\listsSql;
//include_once __DIR__."/../vendor/autoload.php";
include_once __DIR__."/../vendor/autoload.php";
//$pm = new Swoole\Process\Manager();
//
//$pm->add(function (){
// go(function (){
// var_dump(co::getCid());
// co::defer(function (){
// var_dump(co::getCid());
// });
// });
// co::sleep(9999);
//},true);
//
//$pm->start();
//
//print_r(redis()->incr('asdfasdfasdfasdf',600));
... ...
... ... @@ -18,16 +18,16 @@ function start(){
$pm = new Process\Manager();
// 启动一个进程来管理定时
$pm->add(function (Process\Pool $pool, int $workerId){
_echo("定时进程({$workerId})启动成功");
// 进行阻塞,否则定时器无法运行
while (true){
co::sleep(9999);
}
},true);
// $pm->add(function (Process\Pool $pool, int $workerId){
// _echo("定时进程({$workerId})启动成功");
//
//
// // 进行阻塞,否则定时器无法运行
// while (true){
// co::sleep(9999);
// }
//
// },true);
// 协程配置
... ... @@ -63,7 +63,7 @@ function start(){
sync($id,$worker_id);
}catch (\Throwable $e){
// 重新发布同步任务,如果失败了是否重新发布
redis()->rPush('sync_email_lists',$id);
// redis()->rPush('sync_email_lists',$id);
// _echo($e->getMessage());
logs(
... ... @@ -101,13 +101,115 @@ function start(){
},true);
// 启动一个同步内容的进程
$pm->add(function (Process\Pool $pool, int $worker_id){
_echo("业务进程({$worker_id})启动成功,body");
$start_num = 0;// 启动的协程数量
// 循环阻塞
while (true){
// 是否到了协程配置的数量上限
if($start_num < 50){
// 需要同步的id
$id = redis()->lPop('sync_email_body');
if(!$id){
co::sleep(1);
}else{
// 占用当前的id,占用2小时
redis()->add('just_sync_body_'.$id['lists_id'],time(),600);
// 启动一个协程
go(function () use (&$start_num,$worker_id,$id){
$start_num++;
// 开始同步
try {
sync_body($id,$worker_id);
}catch (\Throwable $e){
// _echo($e->getMessage());
logs(
$e->getMessage().PHP_EOL.$e->getTraceAsString(),
LOG_PATH.'/'.$worker_id.'_'.co::getCid().'.log'
);
}
// 协程完成后执行的函数
co::defer(function () use (&$start_num,$worker_id,$id){
// _echo('正常关闭进程('.$worker_id.')下的协程('.co::getCid().')');
$start_num--;
// 消除占用
redis()->delete('just_sync_body_'.$id['lists_id']);
// 写入日志
\Lib\Log::getInstance()->write();
// 关闭数据库链接
db()->close();
// 关闭redis链接
redis()->close();
});
});
}
}else{
// 协程到了最大的数量,阻塞1秒
co::sleep(1);
}
}
},true);
// 启动管理器
$pm->start();
}
/**
* 同步内容 body
* @param $id
* @param $worker_id
* @return int
* @author:dc
* @time 2023/3/23 10:18
*/
function sync_body($id,$worker_id){
// 是否有数据
if(db()->count(\Model\bodySql::has((int) $id['lists_id']))){
return 0;
}
$email = db()->first(\Model\emailSql::first($id['email_id']));
if(!$email){
return 0;
}
if($email['pwd_error']){
return 1;
}
$mailServer = new Lib\Mail\Mail($email['email'],base64_decode($email['password']),$email['imap']);
// 登录服务器
if(!$mailServer->login()){
return 2;
}
// 同步 body
$mailServer->syncBody($id['folder'],$id['uid'],$id['lists_id'],db());
$mailServer = null;
return 0;
}
/**
* 开始同步, 这里是主要的业务代码
... ...
... ... @@ -18,6 +18,8 @@ define('APP_LANG','zh');
// 根目录
define('ROOT_PATH',__DIR__);
// 对外的web目录
define('PUBLIC_PATH',ROOT_PATH.'/public');
// redis
... ...
... ... @@ -3,6 +3,7 @@
namespace Lib\Mail;
use Lib\DbPool;
use Model\bodySql;
use Model\folderSql;
use Model\listsSql;
... ... @@ -254,7 +255,15 @@ class Mail {
foreach ($results as $insert){
if(empty($uuids[$insert['uuid']])){
// 新增
$db->insert(listsSql::$table,$insert);
$id = $db->insert(listsSql::$table,$insert);
// 同步body内容
redis()->rPush('sync_email_body', [
'lists_id' => $id,
'email_id' => $email_id,
'folder_id' => $folder_id,
'folder' => $folder,
'uid' => $insert['uid'],
]);
}else{
// 修改
$db->update(
... ... @@ -296,15 +305,22 @@ class Mail {
* @author:dc
* @time 2023/2/9 10:29
*/
public static function syncBody($id,$msgno, $email_id,$folder_name,$email):bool {
public function syncBody($folder_name, $uid ,$id,$db=null):bool {
$db = $db ? $db : db();
// 选择文件夹
static::$client[$email]->selectFolder($folder_name);
$this->client->selectFolder($folder_name);
$body = $this->client->fetchBody([$uid],PUBLIC_PATH.'/attachment/',true);
$body = static::$client[$email]->fetchBody([$msgno],storage_path('email/'.$email_id));
$body = array_values($body);
$body = $body[0]['RFC822.TEXT']??'';
if(!empty($body[$msgno]['RFC822.TEXT'])){
\App\Models\Body::_insert($id,$body[$msgno]['RFC822.TEXT']);
if(!empty($body)){
$db->insert(bodySql::$table,[
'lists_id' => $id,
'body' => $body
]);
}
return true;
... ...
<?php
namespace Model;
/**
* body
* @author:dc
* @time 2023/3/23 10:13
* Class bodySql
* @package Model
*/
class bodySql {
public static $table = 'bodies';
/**
*
* @param int $id
* @return string
* @author:dc
* @time 2023/3/23 10:15
*/
public static function first(int $id):string {
return "select * from `".static::$table."` where `id` = {$id} limit 1";
}
/**
* has
* @param int $id
* @return string
* @author:dc
* @time 2023/3/23 10:15
*/
public static function has(int $id){
return "select count(*) from `".static::$table."` where `id` = {$id} limit 1";
}
}
... ...