作者 邓超

1

<?php
#echo file_get_contents('http://gold.cnfol.com/mingjiadianjin/20230210/30052311.shtml');
\ No newline at end of file
include_once __DIR__."/../vendor/autoload.php";
$ids = db()->all('select `id` from `'.\Model\emailSql::$table.'` limit 1000 offset 0');
print_r($ids);
\ No newline at end of file
... ...
... ... @@ -27,7 +27,7 @@ function start(){
// 启动一个进程来管理定时
$pm->add(function (Process\Pool $pool, int $workerId)use (&$table){
_echo("进程({$workerId})启动成功");
_echo("定时进程({$workerId})启动成功");
// 每10分钟统计一次邮箱数量
\Swoole\Timer::tick(600000,function () use (&$table){
$table->set('etotal',['val'=> db()->count(\Model\emailSql::count())]);
... ... @@ -60,6 +60,27 @@ function start(){
});
// 需要更新同步的邮件
\Swoole\Timer::tick(600000,function (){
$id = 0;
while (true){
$ids = db()->all('select `id` from `'.\Model\emailSql::$table.'` where `id` > '.$id.' order by `id` asc limit 1000 offset 0');
if(!$ids){
break;
}
foreach ($ids as $v){
$id = $v['id'];
redis()->rPush('sync_email_lists', $v['id']);
}
}
});
// 进行阻塞,否则定时器无法运行
while (true){
co::sleep(9999);
... ... @@ -75,7 +96,7 @@ function start(){
// 启动业务进程
$pm->addBatch(WORKER_NUM,function (Process\Pool $pool, int $worker_id) use (&$table){
_echo("进程({$worker_id})启动成功");
_echo("业务进程({$worker_id})启动成功");
// 协程id集
$cid = [];
... ... @@ -147,6 +168,7 @@ function start(){
},true);
// 启动管理器
$pm->start();
}
... ... @@ -195,12 +217,50 @@ function create_coroutine(array &$cid,int &$isRunMaxCNum,$worker_id){
/**
* 开始同步
* 开始同步, 这里是主要的业务代码
* @author:dc
* @time 2023/2/13 9:42
*/
function sync(){
co::sleep(1);
// 需要同步的id
$id = redis()->lPop('sync_email_lists');
if(!$id){
co::sleep(1);
return -1;
}
$email = db()->first(\Model\emailSql::first($id));
if(!$email){
return 0;
}
if($email['pwd_error']){
return 1;
}
$mailServer = new Lib\Mail\Mail();
try {
// 登录服务器
$mailServer->login($email['email'],base64_decode($email['password']),$email['imap']);
}catch (Throwable $e){
if($e->getCode() == 403){
// 登录失败了 ,
db()->update(\Model\emailSql::$table,['pwd_error'=>1],dbWhere(['id'=>$id]));
}
return 2;
}
$mailServer->client->setId($id);
// 同步文件夹
$mailServer->syncFolder($email['email'],db());
$email = null;
$mailServer = null;
}
... ...
... ... @@ -63,6 +63,14 @@ class Home {
}else{
// 新增
$ret = db()->insert(emailSql::$table,$data);
if($ret){
// 增加邮件数量
redis()->incr('email_total');
// 立即处理 同步
redis()->lPush('sync_email_lists', $ret);
}
}
... ...
... ... @@ -192,7 +192,7 @@ class Imap {
//解析登录数据每个服务商返回的登录结果不一样,很难兼容
if($result[0] != 'ok'){
throw new \Exception('IMAP Login Error:'.end($result[1]));
throw new \Exception('IMAP Login Error:'.end($result[1]),403);
}
// 是否是只读模式
... ...
... ... @@ -2,6 +2,8 @@
namespace Lib\Mail;
use Lib\DbPool;
/**
* 操作邮件
* @author:dc
... ... @@ -13,9 +15,9 @@ class Mail {
/**
* imap服务器连接实例
* @var Imap[]
* @var Imap
*/
public static array $client = [];
public Imap $client;
/**
... ... @@ -26,36 +28,31 @@ class Mail {
* @author:dc
* @time 2023/2/5 10:46
*/
public static function login(string $email,string $password,string $imap) {
if(!empty(static::$client[$email]) && static::$client[$email] instanceof Imap){
if(static::$client[$email]->noop()){
return true;
}
}
static::$client[$email] = new Imap();
// $imap->debug();
public function login(string $email,string $password,string $imap) {
$this->client = new Imap();
// 是否初始成功
static::$client[$email]->login("ssl://{$imap}:993",$email,$password);
$this->client->login("ssl://{$imap}:993",$email,$password);
return true;
}
/**
* 同步文件夹
* @param $email
* @param MySQL|null $db
* @param DbPool|null $db
* @return mixed
* @author:dc
* @time 2023/2/5 10:58
*/
public static function syncFolder($email,$db=null){
public function syncFolder($db=null){
$db = $db ? $db : db();
// 读取所有文件夹,未解密
$folders = static::$client[$email]->getFolder();
$folders = $this->client->getFolder();
DB::beginTransaction();
$db->transaction();
foreach ($folders as $folder){
// 处理子父文件夹
$folder['id'] = explode('/',$folder['folder']);
$folder['name'] = explode('/',$folder['parseFolder']);
... ... @@ -63,14 +60,14 @@ class Mail {
foreach ($folder['id'] as $k=>$item){
// 插入到数据库
$pid = Folder::_insert(
static::$client[$email]->getId(),
$this->client->getId(),
$folder['name'][$k],
$item,
$pid
);
}
}
DB::commit();
$db->commit();
}
... ...
... ... @@ -135,6 +135,28 @@ class RedisPool {
return $this->unserialize($this->client->rPop($key));
}
/**
* 自增
* @param $key
* @return int
* @author:dc
* @time 2023/2/17 15:29
*/
public function incr($key){
return $this->client->incr($key);
}
/**
* 自减
* @param $key
* @return int
* @author:dc
* @time 2023/2/17 15:29
*/
public function decr($key){
return $this->client->decr($key);
}
/**
* 删除
... ...
<?php
namespace Model;
/**
* 文件夹
* @author:dc
* @time 2023/2/17 18:04
* Class folderSql
* @package Model
*/
class folderSql {
public static $table = 'folders';
}
... ...
... ... @@ -32,7 +32,7 @@ class listsSql {
$where = ['email_id'=>$email_id];
if($folder_id) $where['folder_id'] = $folder_id;
return "select * from `".static::$table."` where ".dbWhere($where)." order by `udate` desc limit 30 offset ".($p-1);
return "select * from `".static::$table."` where ".dbWhere($where)." order by `udate` desc limit 30 offset ".(($p-1)*30);
}
... ...