SyncBody.php
6.4 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
<?php
namespace App\Console\Commands;
use App\Models\EList;
use App\Models\Email;
use App\Models\Folder;
use Helper\Mail\Mail;
use Illuminate\Console\Command;
use Illuminate\Support\Facades\Cache;
use Illuminate\Support\Facades\Log;
use Swoole\Coroutine\Redis;
use function Co\run;
class SyncBody extends Command
{
/**
* The name and signature of the console command.
*
* @var string
*/
protected $signature = 'SyncBody';
/**
* The console command description.
*
* @var string
*/
protected $description = '同步邮箱中邮件的内容';
/**
* Execute the console command.
*
* @return int
*/
public function handle()
{
// 携程数量越大,需要的内存越大
$max_coroutine = 1000;
// 最大携程数量
\co::set(['max_coroutine'=>$max_coroutine]);
run(function (){
// 删除key
swoole_redis()->delete('syncmailbodystop');
// 先开100个携程
for ($i = 100; $i > 0; $i--) {
// 创建携程
go(function () {
// 需要获取邮件的id
$isRun = true;
while ($isRun){
// @var $redis 这个必须放携程里面
$redis = swoole_redis();
// 获取到邮件的数据id
$id = $redis->lPop('syncMailBody');
if($id){
$this->sync($redis,$id);
}else {
// 暂停1秒
\co::sleep(1);
}
// 是否暂停同步
if($redis->get('syncmailbodystop') == 1){
$isRun = false;
}
// 关闭redis
$redis->close();
$redis = null;
}
});
}
});
return Command::SUCCESS;
}
/**
* 执行
* @param $redis Redis
* @param $id
* @author:dc
* @time 2023/2/8 11:44
*/
public function sync(&$redis,$id){
try {
// 先暂用,锁上这个id
if($redis->eval(...swoole_redis_add('syncmailbodyrun:'.$id,$id,3))){
$this->echoStr('number:'.$id);
// 读取邮件信息
$mail = EList::where('id',$id)->first();
if(!$mail){
return false;
}
$email_id = $mail->email_id;
$folder_id = $mail->folder_id;
/*** 获取邮箱信息 ***/
$email_key = 'email_list:'.$email_id;
// 判断是否有携程在查询了
if($redis->eval(...swoole_redis_add($email_key,1,600))){
// 查询邮箱
$emailModel = Email::where('id',$email_id)->first();
if($emailModel){
// 设置缓存
$redis->set($email_key,$emailModel->toArray(),600);
}else {
// 删除缓存
$redis->delete($email_key);
}
}else {
// 等待读取缓存
$is_while = 1;
while (true){
$emailModel = $redis->get($email_key);
// 等待中没有邮箱信息,跳过
if(!$emailModel || is_array($emailModel) || $is_while>100){
break;
}
$is_while++;
// 暂停0.5秒
\co::sleep(0.5);
}
}
/*** end 获取邮箱信息 ***/
if($emailModel){
$this->echoStr('找到:'.$emailModel['email']);
$email = $emailModel['email'];
$password = $emailModel['password'];
$imap = $emailModel['imap'];
/** 获取文件夹等情况 **/
$folder_key = 'email_folder:'.$folder_id;
if($redis->eval(...swoole_redis_add($folder_key,1,600))){
// 查询邮箱
$folder_name = Folder::getOriginName($folder_id);
if($folder_name){
// 设置缓存
$redis->set($folder_key,$folder_name,600);
}else {
// 删除缓存
$redis->delete($folder_key);
}
}else {
// 等待读取缓存
$is_while = 1; // 防止死循环
while ($is_while){
$folder_name = $redis->get($folder_key);
// 等待中没有邮箱信息,跳过
if(!$folder_name || $folder_name!=1 || $is_while>=100){
break;
}
$is_while++;
// 暂停0.5秒
\co::sleep(0.5);
}
}
$this->echoStr('目录:'.$folder_name);
if($folder_name){
Mail::syncBody(
$id,
$mail->msgno,
$email_id,
$folder_name,
$email,
$password,
$imap
);
}
}
}
}catch (\Throwable $e){
echo "协程(".\co::getCid()."): ".$e->getMessage().PHP_EOL.$e->getTraceAsString();
Log::error($e->getMessage().$e->getTraceAsString());
// 再次发布
$redis->rPush('syncMailBody',$id);
}
return true;
}
public function echoStr(string $str){
echo '协程('.\co::getCid().'): '.$str.PHP_EOL;
}
}