作者 邓超

优化同步

<?php
use Model\listsSql;
/**
* 把预热的邮件进行归档处理 减少数量量
* @author:dc
* @time 2025/1/17 17:10
* Class HotMail
*/
class HotMailArchive {
public function __construct(){
$this->db = db();
$this->start();
}
/**
* shopk那边的预热邮箱
* @var array
*/
private $hotEmail = [];
/**
* @var \Lib\Db|\Lib\DbPool
*/
private $db;
/**
* @author:dc
* @time 2024/7/18 14:04
*/
private function start(){
_echo('启动预热邮件归档处理 '.getmypid());
// $fid = [235623,235628,235633,235638,235643,235648,235653,235658,235663,235668,235673,235678,235683,235688,235693,235698,235703,235708,235713,235718,235723,235728,235733,235738,235743,235748,235753,235758,235763,235768,235773,235778,235783,235788,235793,235798,235803,235808,235813,235818,235823,235828,235833,235838,235843,235848,235853,235858,235863,235868,235873,235878,235883,235888,235893,235898,235903,235908,235913,235918,235923,235928,235933,235938,235943,235948,235953,235958,235963,235968,235973,235978,235983,235988,235993,235998,236003,236008,236013,236018];
// foreach ($fid as $i){
// $this->move($i);
// }
// return 0;
$id = 0;
while (1){
$id = $this->run($id);
if($id === 0){
break;
}
}
}
private function run($id):int {
$floder = $this->db->first(\Model\folderSql::first(['id.>'=>$id]));
if($floder){
$this->move($floder['id']);
return $floder['id'];
}else{
return 0;
}
}
public function move($fid){
_echo('正在移动 '.$fid);
$list = $this->db->all(\Model\listsSql::first(dbWhere(['folder_id'=>$fid,'is_hots'=>1])).'0000');
if($list){
$this->db->transaction(function () use ($list){
foreach ($list as $item){
try {
$ret = $this->db->throw()->insert('lists_hot',$item,false);
if($ret){
$this->db->delete(listsSql::$table,['id'=>$item['id']]);
}
}catch (Throwable $e){
$this->db->update('lists_hot',$item,dbWhere(['id'=>$item['id']]));
}
}
return true;
});
$list = null;
$this->move($fid);
}
}
}
include_once "../vendor/autoload.php";
new HotMailArchive();
_echo('执行完成 等待下次执行');
//
//swoole_set_process_name('hot-email-run-man');
//
//$pm = new Swoole\Process\Manager();
//
//$pm->addBatch(3,function (){
//
// swoole_set_process_name('hot-email-run');
//
// include_once "../vendor/autoload.php";
//
// new HotMail();
//
// exit();
//},true);
//
//$pm->start();
<?php
/**
* 循环本地,验证远程是否存在 不存在则删除本地
*/
//error_reporting();
use Swoole\Process;
include_once __DIR__."/../vendor/autoload.php";
function start(){
swoole_set_process_name('php-email-sync-list-check');
$id = 0;
// $goNum = 0;
// 循环阻塞
while (true){
$id = db()->value('select `id` from `'.\Model\emailSql::$table.'` where `id` > '.$id.' order by `id` asc limit 1');
if($id){
// 启动一个协程
// go(function () use ($id,&$goNum){
// $goNum++;
// 开始同步
try {
sync($id);
}catch (\Throwable $e){
echo $e->getMessage();
}
\Lib\Log::getInstance()->write();
// co::defer(function () use (&$goNum){
// $goNum--;
// });
// });
}else{
break;
}
}
// while ($goNum>0){
// co::sleep(1);
// }
_echo('结束了');
}
/**
* 开始同步, 这里是主要的业务代码
* @param $email_id
* @param $worker_id
* @return int
* @author:dc
* @time 2023/3/10 10:19
*/
function sync($email_id){
$email = db()->first(\Model\emailSql::first($email_id));
if(!$email || $email['pwd_error']){
// 密码错误,或者超过一个月没有更新的邮箱 清空数据
if($email['pwd_error'] && $email['updated_at'] && strtotime($email['updated_at']) < (time()-86400*7) ){
db()->delete(\Model\listsSql::$table,['email_id'=>$email['id']]);
}
return 0;
}
// 读取到邮箱中的文件夹
$folders = db()->all(\Model\folderSql::all($email['id']));
if(!$folders){
return 3;
}
$mailServer = new Lib\Mail\Mail($email['email'],base64_decode($email['password']),$email['imap']);
// 登录服务器
if($mailServer->login()!==1){
return 2;
}
$call = function ($email_id,$folder_id,$origin_folder) use ($mailServer){
_echo('run e '.$email_id.' fn '.$origin_folder);
// gmail 邮箱 这个是不可选的
if($origin_folder == '[Gmail]'){
return;
}
// 同步父文件夹
$mailServer->client->selectFolder($origin_folder);
$page = 0;
$db = db();
while (1){
$ids = $db->all("select `id`,`uid` from ".\Model\listsSql::$table." where `email_id` = {$email_id} and `folder_id` = {$folder_id} and `udate` < ".strtotime("-1 day")." limit 100 offset ".($page*100));
$page++;
if($ids){
try {
$result = $mailServer->client->fetch(array_column($ids,'uid'),'UID',true);
$result = array_column($result,'UID','UID');
}catch (Throwable $e){
_echo($e->getMessage());
return;
}
foreach ($ids as $id){
$uid = $id['uid']; $id = $id['id'];
if(!$result || !isset($result[$uid])){
// 删除 如果远程没有,就删除本地
_echo('删除 e '.$email_id.' f '.$folder_id.' u '.$uid.' id '.$id.' d '.$db->delete(\Model\listsSql::$table,['id'=>$id]).' body '.$db->delete(\Model\bodySql::$table,['lists_id'=>$id]));
}
}
}
// 结束了
if(!$ids || count($ids) < 100){
break;
}
}
};
// $folders = list_to_tree($folders);
foreach ($folders as $folder){
try {
$is = true;
foreach ($folders as $f){
// 是否存在下级
if($f['pid'] == $folder['id']){
$is = false;
}
}
if($is) $call($email_id,$folder['id'],$folder['origin_folder']);
}catch (\Throwable $e){
echo $e->getMessage();
}
}
$email = null;
$mailServer = null;
}
if(!function_exists("imap_8bit")){
echo '请安装imap扩展';
exit(0);
}
\Co\run(function (){
start();
});
... ... @@ -4,7 +4,7 @@ include_once "../vendor/autoload.php";
// 这里试试不用多进程模式,用多协程模式
\Lib\DbPool::$clientNumber = 50;
class SendJob {
... ... @@ -117,6 +117,12 @@ class SendJob {
* @time 2024/4/10 9:25
*/
public function go_($list){
// 控制50个协程内
while ($this->cnum>=50){
co::sleep(0.5);
}
// 占用 id
if(redis()->add('send_job_run_id_'.$list['id'],$list['id'],600)){
go(function ($data) {
... ... @@ -253,6 +259,7 @@ class SendJob {
_echo('执行任务完成'.$data['id']);
db()->close();
});
},$list);
... ... @@ -285,6 +292,7 @@ switch ($argv[1]??0){
_echo('进程已退出');
db()->close();
});
break;
... ...
... ... @@ -2,54 +2,103 @@
//error_reporting();
include_once __DIR__."/../vendor/autoload.php";
$runNumber = 1000;
// 循环阻塞
while ($runNumber){
$runNumber--;
// 需要同步的id
$id = redis()->lPop('sync_email_lists');
if($id && is_numeric($id)){
// 占用当前的id,占用2小时
if(redis()->add('just_sync_'.$id,time(),600)){
try{
// 开始同步
$email = db()->cache(3600)->first(\Model\emailSql::first($id));
if($email){
$sync = new \Service\SyncMail($email);
// ai邮件只同步2天内的
$sync->search(
(new \Lib\Imap\ImapSearch())
->dateGt(date('Y-m-d',strtotime("-1 day")))
);
$sync->isUidAfter(2)->sync();
$sync = null;
unset($sync);
}
use Swoole\Process;
function start(){
// 删除停止运行的值
// redis()->delete(SYNC_RUNNING_REDIS_KEY,'email_sync_stop_num');
// 进程管理器
$pm = new Process\Manager();
// 启动业务进程
$pm->addBatch(10,function (Process\Pool $pool, int $worker_id){
swoole_set_process_name('php-email-sync-list-'.$worker_id);
include_once __DIR__."/../vendor/autoload.php";
_echo("业务进程({$worker_id})启动成功");
$goNum = 0;
// 循环阻塞
while (true){
}catch (Throwable $e){
logs('sync : '.$e->getMessage());
while ($goNum > 50){
co::sleep(0.5);
continue;
}
// 需要同步的id
$id = redis()->lPop('sync_email_lists');
if($id && is_numeric($id)){
// 占用当前的id,占用2小时
if(redis()->add('just_sync_'.$id,time(),600)){
// redis()->set('sync_my_pid:'.getmypid(),time(),86400);
// 启动一个协程
go(function () use ($id,&$goNum){
$goNum++;
try{
// 开始同步
$email = db()->cache(3600)->first(\Model\emailSql::first($id));
if($email){
$sync = new \Service\SyncMail($email);
// ai邮件只同步2天内的
$sync->search(
(new \Lib\Imap\ImapSearch())
->dateGt(date('Y-m-d',strtotime("-1 day")))
);
$sync->isUidAfter(2)->sync();
$sync = null;
unset($sync);
}
}catch (Throwable $e){
logs('sync : '.$e->getMessage());
}
// 协程完成后执行的函数
co::defer(function () use ($id,&$goNum){
$goNum--;
// 30秒后 消除占用
redis()->expire('just_sync_'.$id,120);
// 写入日志
\Lib\Log::getInstance()->write();
// 手动释放
db()->close();
});
});
}
}
// 30秒后 消除占用
redis()->expire('just_sync_'.$id,120);
//每次都暂停1秒,防止同一时间启动太多的任务
co::sleep(1);
// 写入日志
\Lib\Log::getInstance()->write();
}
}else{
break;
}
return 0;
},true);
// 启动管理器
$pm->start();
}
start();
... ...
... ... @@ -5,6 +5,8 @@ include_once __DIR__."/../vendor/autoload.php";
swoole_set_process_name('php-email-sync-list-my');
\Lib\DbPool::$clientNumber = 600;
\Co\run(function (){
$goNum = 0;
while (1){
... ... @@ -36,6 +38,7 @@ swoole_set_process_name('php-email-sync-list-my');
\Lib\Log::getInstance()->write();
db()->close();
});
},$id);
... ...
... ... @@ -6,7 +6,7 @@ use Swoole\Database\PDOConfig;
use Swoole\Database\PDOPool;
/**
* db 池
* db 池 不主动释放连接
* @author:dc
* @time 2023/2/13 15:03
* Class DbPool
... ... @@ -21,6 +21,17 @@ class DbPool {
*/
static $pool = null;
/**
* 获取到的连接
* @var array
*/
public static $clientAll = [];
/**
* 连接的数量
* @var int
*/
public static $clientNumber = 1024;
/**
* 实例
... ... @@ -44,21 +55,26 @@ class DbPool {
\PDO::ATTR_TIMEOUT => 30,
]);
static::$pool = new PDOPool($pdoconfig,1024);
static::$pool = new PDOPool($pdoconfig,self::$clientNumber);
}
// 获取链接
$this->client = static::$pool->get();
}
public function getClient()
{
$id = \co::getCid();
if(empty(static::$clientAll[$id])){
static::$clientAll[$id] = self::$pool->get();
}
}
/**
* 结束
*/
public function __destruct(){
$this->close();
// $this->close();
}
/**
... ... @@ -67,8 +83,11 @@ class DbPool {
* @time 2024/5/30 10:30
*/
public function close(){
self::$pool->put($this->client);
$this->client = null;
$id = \co::getCid();
if (isset(static::$clientAll[$id])){
self::$pool->put(static::$clientAll[$id]);
}
unset(static::$clientAll[$id]);
}
... ...