作者 邓超

1

... ... @@ -2,6 +2,7 @@
namespace App\Console\Commands;
use App\Models\EList;
use Helper\Mail\Mail;
use Illuminate\Console\Command;
use function Co\run;
... ... @@ -30,29 +31,33 @@ class Demo extends Command
public function handle()
{
run(function (){
go(function (){
$redis = swoole_redis();
// $a = $redis->eval(...swoole_redis_add('asdad:1',1,600));
// var_dump($a);
$redis->rPush('syncMailBody',300);
// $redis->rPush('syncMailBody',301);
// $redis->rPush('syncMailBody',302);
// $redis->rPush('syncMailBody',303);
// $redis->rPush('syncMailBody',304);
// $redis->rPush('syncMailBody',305);
// $redis->rPush('syncMailBody',306);
// $redis->rPush('syncMailBody',307);
// $redis->rPush('syncMailBody',308);
// $redis->rPush('syncMailBody',309);
$redis->set('syncmailbodystop',1);
});
});
// run(function (){
// go(function (){
//
//
// $redis = swoole_redis();
////// $a = $redis->eval(...swoole_redis_add('asdad:1',1,600));
////// var_dump($a);
// for ($i=1;$i<=345;$i++){
// $redis->rPush('syncMailBody',$i);
// }
//
//// $redis->rPush('syncMailBody',301);
//// $redis->rPush('syncMailBody',302);
//// $redis->rPush('syncMailBody',303);
//// $redis->rPush('syncMailBody',304);
//// $redis->rPush('syncMailBody',305);
//// $redis->rPush('syncMailBody',306);
//// $redis->rPush('syncMailBody',307);
//// $redis->rPush('syncMailBody',308);
//// $redis->rPush('syncMailBody',309);
//
// $redis->set('syncmailbodystop',1);
//
// });
//
// });
//
... ...
... ... @@ -9,6 +9,7 @@ use Helper\Mail\Mail;
use Illuminate\Console\Command;
use Illuminate\Support\Facades\Cache;
use Illuminate\Support\Facades\Log;
use Swoole\Coroutine\MySQL;
use Swoole\Coroutine\Redis;
use function Co\run;
... ... @@ -51,14 +52,16 @@ class SyncBody extends Command
go(function () {
// 需要获取邮件的id
$isRun = true;
// @var $redis 这个必须放携程里面
$redis = swoole_redis();
$db = swoole_db();
while ($isRun){
// @var $redis 这个必须放携程里面
$redis = swoole_redis();
// 获取到邮件的数据id
$id = $redis->lPop('syncMailBody');
if($id){
$this->sync($redis,$id);
$this->sync($redis,$db,$id);
}else {
// 暂停1秒
\co::sleep(1);
... ... @@ -69,11 +72,14 @@ class SyncBody extends Command
if($redis->get('syncmailbodystop') == 1){
$isRun = false;
}
// 关闭redis
$redis->close();
$redis = null;
}
// 关闭redis
$redis->close();
$redis = null;
$db->close();
$db = null;
});
}
... ... @@ -88,33 +94,40 @@ class SyncBody extends Command
/**
* 执行
* @param $redis Redis
* @param $db MySQL
* @param $id
* @author:dc
* @time 2023/2/8 11:44
*/
public function sync(&$redis,$id){
public function sync(&$redis,$db,$id){
try {
// 先暂用,锁上这个id
if($redis->eval(...swoole_redis_add('syncmailbodyrun:'.$id,$id,3))){
$this->echoStr('number:'.$id);
// 读取邮件信息
$mail = EList::where('id',$id)->first();
$mail = swoole_db_first(
$db,
'select `id`,`uid`,`msgno`,`folder_id`,`email_id` from `lists` where `id` = '.$id
);
if(!$mail){
return false;
}
$email_id = $mail->email_id;
$folder_id = $mail->folder_id;
$email_id = $mail['email_id'];
$folder_id = $mail['folder_id'];
/*** 获取邮箱信息 ***/
$email_key = 'email_list:'.$email_id;
// 判断是否有携程在查询了
if($redis->eval(...swoole_redis_add($email_key,1,600))){
// 查询邮箱
$emailModel = Email::where('id',$email_id)->first();
$emailModel = swoole_db_first(
$db,
'select * from `emails` where `id` = '.$email_id
);
if($emailModel){
// 设置缓存
$redis->set($email_key,$emailModel->toArray(),600);
$redis->set($email_key,$emailModel,600);
}else {
// 删除缓存
$redis->delete($email_key);
... ... @@ -169,14 +182,17 @@ class SyncBody extends Command
}
$this->echoStr('目录:'.$folder_name);
if($folder_name){
// 登录imap服务器
Mail::login($email,$password,$imap);
// 设置id
Mail::$client[$email]->setId($email_id);
Mail::syncBody(
$id,
$mail->msgno,
$mail['msgno'],
$email_id,
$folder_name,
$email,
$password,
$imap
$email
);
}
}
... ...
... ... @@ -8,6 +8,7 @@ use Helper\Mail\Mail;
use Illuminate\Console\Command;
use Illuminate\Support\Facades\Cache;
use Illuminate\Support\Facades\Log;
use Swoole\Coroutine\MySQL;
use Swoole\Coroutine\Redis;
use function Co\run;
... ... @@ -45,14 +46,8 @@ class SyncMailList extends Command
'hook_flags'=>SWOOLE_HOOK_TCP, // redis需要的配置
]);
// redis 配置
$redis_config = [
'host' => env('REDIS_HOST','127.0.0.1'),
'port' => env('REDIS_PORT',6379),
'password' => env('REDIS_PASSWORD',null)
];
run(function () use ($rand,$max_coroutine,$redis_config){
run(function () use ($rand,$max_coroutine){
// 获取邮箱总数量
$size = Email::where([])->count();
// 最后一条数据的id
... ... @@ -62,11 +57,11 @@ class SyncMailList extends Command
if ($max_coroutine){
for ($i = $max_coroutine; $i > 0; $i--) {
// 创建携程
go(function () use ($size,$rand,$lastId,$redis_config){
go(function () use ($size,$rand,$lastId){
// redis 携程中无法使用laravel的cache的redis驱动
$redis = swoole_redis();
$db = swoole_db();
$n = 1;
while ($n <= $lastId){
echo 'syncMail'.$rand.':'.$n;echo PHP_EOL;
... ... @@ -76,11 +71,19 @@ class SyncMailList extends Command
);
// 是否已存在
if($add) {
$this->sync($n);
try {
$this->sync($n,$db);
}catch (\Throwable $e){
echo "协程(".\co::getCid()."):".$e->getMessage();
}
}
$n++;
}
$redis->close();
$db->close();
$redis = $db = null;
});
}
... ... @@ -93,35 +96,40 @@ class SyncMailList extends Command
/**
* 开始同步执行
* @param int $n 第几条数据
* @param int $n
* @param MySQL $db
* @author:dc
* @time 2023/2/5 17:21
* @time 2023/2/9 11:40
*/
private function sync(int $n = 0){
private function sync(int $n,MySQL $db){
/** @var $email Email */
$email = Email::where(['id'=>$n])->first();
$email = swoole_db_first(
$db,
'select * from `emails` where `id` = '.$n
);
// 密码没有错误,且状态正常的
if ($email && $email->pwd_error == 0 && $email->status == Email::STATUS_ACTIVE){
if ($email && $email['pwd_error'] == 0 && $email['status'] == Email::STATUS_ACTIVE){
// 登录imap服务器
Mail::login($email->email,$email->password,$email->imap);
Mail::login($email['email'],base64_decode($email['password']),$email['imap']);
// 设置id
Mail::$client[$email->email]->setId($email->id);
Mail::$client[$email['email']]->setId($email['id']);
// 同步文件夹
Mail::syncFolder($email->email);
Mail::syncFolder($email['email']);
// 获取当前邮箱的所有文件夹
$folders = Folder::_all($email->id);
$folders = Folder::_all($email['id']);
// 目前只发现最高2级
foreach ($folders as $folder){
if(empty($folder['_child'])){
// 同步邮件
Mail::syncMail($email->email,$email->id,$folder['id'],$folder['origin_folder']);
Mail::syncMail($email['email'],$email['id'],$folder['id'],$folder['origin_folder']);
}else{
// 循环子级目录,有子级的情况,父级不可操作,且不会有邮件
foreach ($folder['_child'] as $f){
// 同步邮件
Mail::syncMail($email->email,$email->id,$f['id'],$folder['origin_folder'].'/'.$f['origin_folder']);
Mail::syncMail($email['email'],$email['id'],$f['id'],$folder['origin_folder'].'/'.$f['origin_folder']);
}
}
... ...
... ... @@ -50,6 +50,7 @@ class MailApi
$model->imap = $formData['imap'];
$model->smtp = $formData['smtp'];
$model->status = Email::STATUS_ACTIVE;
$model->password = @base64_encode($formData['password']);
try {
... ...
... ... @@ -877,8 +877,9 @@ class Imap {
* @author:dc
* @time 2022/11/25 11:15
*/
public function noop():void {
$this->request('NOOP');
public function noop():bool {
$status = $this->request('NOOP');
return $status[0] == 'ok';
}
/**
... ...
... ... @@ -35,10 +35,16 @@ class Mail {
* @time 2023/2/5 10:46
*/
public static function login(string $email,string $password,string $imap) {
if(!empty(static::$client[$email]) && static::$client[$email] instanceof Imap){
if(static::$client[$email]->noop()){
return true;
}
}
static::$client[$email] = new Imap();
// $imap->debug();
// 是否初始成功
static::$client[$email]->login("ssl://{$imap}:993",$email,$password);
return true;
}
... ... @@ -89,7 +95,7 @@ class Mail {
public static function syncMail($email,$email_id,$folder_id,$folder='INBOX'):bool {
// 选择文件夹
try {
$status = Mail::$client[$email]->selectFolder($folder);
$status = static::$client[$email]->selectFolder($folder);
}catch (\Throwable $e){
Log::error($email.' 选择文件夹错误:'.$e->getMessage());
return false;
... ... @@ -128,7 +134,7 @@ class Mail {
$dataids = EList::_getIdsByMsgno($email_id,$folder_id,$msgno);
// 循环
$results = Mail::$client[$email]->fetchHeader($msgno);
$results = static::$client[$email]->fetchHeader($msgno);
if($results){
DB::beginTransaction();
... ... @@ -211,23 +217,17 @@ class Mail {
* @param $email_id
* @param $folder_name
* @param $email
* @param $password
* @param $imap
* @return bool
* @throws \Exception
* @author:dc
* @time 2023/2/8 13:45
* @time 2023/2/9 10:29
*/
public static function syncBody($id,$msgno, $email_id,$folder_name,$email, $password,$imap):bool {
public static function syncBody($id,$msgno, $email_id,$folder_name,$email):bool {
// 登录imap服务器
Mail::login($email,$password,$imap);
// 设置id
Mail::$client[$email]->setId($email_id);
// 选择文件夹
Mail::$client[$email]->selectFolder($folder_name);
static::$client[$email]->selectFolder($folder_name);
$body = Mail::$client[$email]->fetchBody([$msgno],storage_path('email/'.$email_id));
$body = static::$client[$email]->fetchBody([$msgno],storage_path('email/'.$email_id));
if(!empty($body[$msgno]['RFC822.TEXT'])){
\App\Models\Body::_insert($id,$body[$msgno]['RFC822.TEXT']);
... ...
... ... @@ -144,6 +144,40 @@ function swoole_redis_add($key,$val,$ttl=-1):array {
];
}
/**
* swoole 操作db
* @return \Swoole\Coroutine\MySQL
* @author:dc
* @time 2023/2/8 16:48
*/
function swoole_db():\Swoole\Coroutine\MySQL{
$swoole_mysql = new \Swoole\Coroutine\MySQL();
$swoole_mysql->connect([
'host' => env('DB_HOST','127.0.0.1'),
'port' => env('DB_PORT',3306),
'user' => env('DB_USERNAME'),
'password' => env('DB_PASSWORD'),
'database' => env('DB_DATABASE'),
]);
return $swoole_mysql;
}
/**
* 查询一条
* @param $db \Swoole\Coroutine\MySQL
* @param $query
* @return mixed
* @author:dc
* @time 2023/2/8 17:15
*/
function swoole_db_first(&$db,$query){
$row = $db->query($query);
return $row[0]??null;
}
... ...