作者 邓超

1

1 <?php 1 <?php
2 -  
3 -/**  
4 - * TODO:: 这个文件是定时去拉取所有邮箱中的邮件  
5 - * 暂时不用这个  
6 - */  
7 -//error_reporting();  
8 -  
9 -use Swoole\Process;  
10 -  
11 -  
12 -  
13 -include_once __DIR__."/../vendor/autoload.php";  
14 -  
15 -  
16 -function start(){  
17 -  
18 -// 删除停止运行的值  
19 - redis()->delete(SYNC_RUNNING_REDIS_KEY);  
20 -  
21 - /** 创建一个表 **/  
22 - $table = new Swoole\Table(128);// 128 行  
23 - $table->column('val', Swoole\Table::TYPE_INT);  
24 - $table->create();  
25 -  
26 - // 初始时,进行一次统计  
27 - $table->set('etotal',['val'=> db()->count(\Model\emailSql::count())]);  
28 -  
29 - // 进程管理器  
30 - $pm = new Process\Manager();  
31 -  
32 - // 启动一个进程来管理定时  
33 - $pm->add(function (Process\Pool $pool, int $workerId)use (&$table){  
34 - _echo("定时进程({$workerId})启动成功");  
35 - // 每10分钟统计一次邮箱数量  
36 - \Swoole\Timer::tick(600000,function () use (&$table){  
37 - $table->set('etotal',['val'=> db()->count(\Model\emailSql::count())]);  
38 - });  
39 -  
40 - // 每2秒执行一次  
41 - \Swoole\Timer::tick(2000,function () use (&$table,&$pool){  
42 - // 是否停止脚本  
43 - $table->set('stop',['val'=> redis()->get(SYNC_RUNNING_REDIS_KEY) === 'stop' ? 1 : 0]);  
44 -// _echo('定时器');  
45 - // 检查是否结束了所有的协程同步代码  
46 - if ($table->get('stop','val')) {  
47 - $stop_num = 0;  
48 - foreach (range(0, WORKER_NUM) as $i) {  
49 - if ($table->exists('ps' . $i)) {  
50 - $stop_num++;  
51 - }  
52 - }  
53 - if($stop_num >= WORKER_NUM){  
54 -// 退出进程  
55 - $pool->shutdown();  
56 - }  
57 - }  
58 -  
59 - // 邮件总数  
60 - $total = redis()->get('email_total',0);  
61 - if($total > $table->get('etotal','val')){  
62 - $table->set('etotal',['val'=> $total]);  
63 - }  
64 -  
65 - });  
66 -  
67 -  
68 - //todo:: 需要更新同步的邮件,每10分钟同步一次,这里是的时间是微妙  
69 - \Swoole\Timer::tick(600000,function (){  
70 -  
71 - start_now_mail();  
72 -  
73 - });  
74 -  
75 -  
76 -  
77 - // 进行阻塞,否则定时器无法运行  
78 - while (true){  
79 - co::sleep(9999);  
80 - }  
81 - },true);  
82 -  
83 -  
84 -// 协程配置  
85 - \co::set([  
86 - 'max_coroutine'=>COROUTINE_MAX_NUM, // 最大携程数量  
87 - 'hook_flags'=>SWOOLE_HOOK_TCP, // redis需要的配置  
88 - ]);  
89 -  
90 - // 启动业务进程  
91 - $pm->addBatch(WORKER_NUM,function (Process\Pool $pool, int $worker_id) use (&$table){  
92 - _echo("业务进程({$worker_id})启动成功");  
93 -  
94 - // 协程id集  
95 - $cid = [];  
96 -  
97 - $i = 0;  
98 - $email_total = $table->get('etotal','val');//邮件总数量  
99 - $isRunMaxCNum = 1; // 允许最大协程数量,如果为0则停止所有协程工作,相当于停止脚本  
100 - // 是否退出进程  
101 - while (true){  
102 - $i++;  
103 - // 每10秒 验证一次邮箱数量,好控制协程的数量  
104 - if($i>=10){  
105 - $email_total = $table->get('etotal','val');  
106 - $i = 0;  
107 - }  
108 -  
109 - if(!$email_total){  
110 - break;  
111 - }  
112 -  
113 - // 每个协程 分配 10个邮箱任务  
114 - $cnum = ceil($email_total/(WORKER_NUM*10));  
115 - // 当前协程的数量  
116 - $nowCnum = count($cid);  
117 - // 说明 需要新的协程了  
118 - if($cnum > $nowCnum){  
119 - // 开启所需要的协程数量  
120 - foreach (range(0,$cnum-$nowCnum) as $v){  
121 - // 启动一个协程  
122 - create_coroutine($cid,$isRunMaxCNum,$worker_id);  
123 - }  
124 - }  
125 - // 暂时没有实现 减少协程数量操作  
126 -// else if ($cnum < $nowCnum){  
127 -// // 说明 协程数量过多,小于了1个协程处理10个邮箱的,资源闲置情况  
128 -// // 销毁多余协程  
129 -// $isRunMaxCNum = $nowCnum - $cnum;  
130 -// }  
131 -  
132 - // 每3秒检查一次是否要停止 协程  
133 - if($i%3 === 0){  
134 -// _echo('是否收到退出信号:'.$table->get('stop','val'));  
135 - if ($table->get('stop','val')){  
136 - // 停止  
137 - $isRunMaxCNum = 0;  
138 - }  
139 - }  
140 -  
141 - // 这个是检查 cid的如果协程全部退出,则退出进程  
142 - co::sleep(1);  
143 - // 跳出无限循环了  
144 - if(!$cid){  
145 - _echo('正常关闭进程('.$worker_id.')');  
146 - break;  
147 - }  
148 - }  
149 -  
150 - // 是否停止,这里进行阻塞  
151 - if ($table->get('stop','val')){  
152 - // 某个进程退出了  
153 - $table->set('ps'.$worker_id,['val'=>1]);  
154 - // 阻塞直到 主进程 kill掉所有子进程  
155 - while (true){  
156 - co::sleep(5);  
157 - }  
158 - }  
159 -  
160 -  
161 - },true);  
162 -  
163 -  
164 - // 启动管理器  
165 - $pm->start();  
166 -  
167 -}  
168 -  
169 -/**  
170 - * 创建协程  
171 - * @param array $cid  
172 - * @param int $isRunMaxCNum  
173 - * @param $worker_id  
174 - * @author:dc  
175 - * @time 2023/2/14 17:04  
176 - */  
177 -function create_coroutine(array &$cid,int &$isRunMaxCNum,$worker_id){  
178 - go(function () use (&$cid,&$isRunMaxCNum,$worker_id){  
179 - // 协程id  
180 - $cid[co::getCid()] = co::getCid();  
181 -  
182 - // 同步操作  
183 - while (true){  
184 - // 是否退出协程  
185 - if(!$isRunMaxCNum){  
186 -// _echo('协程('.co::getCid().'): stop '.$isRunMaxCNum);  
187 - break;  
188 - }  
189 -  
190 - // 开始同步  
191 - try {  
192 - sync($worker_id);  
193 - }catch (\Throwable $e){  
194 - _echo($e->getMessage());  
195 - logs(  
196 - $e->getMessage().PHP_EOL.$e->getTraceAsString(),  
197 - LOG_PATH.'/'.$worker_id.'_'.co::getCid().'.log'  
198 - );  
199 - }  
200 -  
201 - // 阻塞1秒  
202 - co::sleep(1);  
203 - }  
204 -  
205 - // 协程完成后执行的函数  
206 - co::defer(function () use (&$cid,$worker_id){  
207 - _echo('正常关闭进程('.$worker_id.')下的协程('.co::getCid().')');  
208 - unset($cid[co::getCid()]);  
209 - });  
210 -  
211 - });  
212 -}  
213 -  
214 -  
215 -/**  
216 - * 开始同步, 这里是主要的业务代码  
217 - * @param int $worker_id 进程号  
218 - * @return int  
219 - * @author:dc  
220 - * @time 2023/2/18 11:27  
221 - */  
222 -function sync($worker_id=0){  
223 - // 需要同步的id  
224 - $id = redis()->lPop('sync_email_lists');  
225 -  
226 - if(!$id){  
227 - co::sleep(1);  
228 - return -1;  
229 - }  
230 -  
231 - _echo($worker_id.': 协程('.co::getCid().'):抢到 '.$id);  
232 -  
233 - $email = db()->first(\Model\emailSql::first($id));  
234 - if(!$email){  
235 - return 0;  
236 - }  
237 -  
238 - if($email['pwd_error']){  
239 - return 1;  
240 - }  
241 -  
242 - $mailServer = new Lib\Mail\Mail($email['email'],base64_decode($email['password']),$email['imap']);  
243 -  
244 - // 登录服务器  
245 - if(!$mailServer->login()){  
246 - return 2;  
247 - }  
248 -  
249 - // 文件夹间隔1天同步一次  
250 - if(empty($email['last_sync_time']) || time() > $email['last_sync_time']+86400){  
251 - // 同步文件夹  
252 - $mailServer->syncFolder($id,db());  
253 - }  
254 -  
255 - // 读取到邮箱中的文件夹  
256 - $folders = db()->all(\Model\folderSql::all($email['id']));  
257 - if(!$folders){  
258 - return 3;  
259 - }  
260 - $folders = list_to_tree($folders);  
261 - foreach ($folders as $folder){  
262 - try {  
263 - if(empty($folder['_child'])){  
264 - // 同步父文件夹  
265 - $mailServer->syncMail($id,$folder['id'],$folder['origin_folder']);  
266 - }else{  
267 - foreach ($folder as $item){  
268 - // 同步子文件夹  
269 - $mailServer->syncMail($id,$item['id'],$item['origin_folder']);  
270 - }  
271 - }  
272 -  
273 - }catch (Throwable $e){  
274 - logs(  
275 - $e->getMessage(),  
276 - LOG_PATH.'/imap/'.$email['email'].'.error.log'  
277 - );  
278 - }  
279 - }  
280 -  
281 -  
282 - $email = null;  
283 - $mailServer = null;  
284 -  
285 -}  
286 -  
287 -  
288 -  
289 -  
290 -switch ($argv[1]){  
291 - case 'start':{  
292 - start();  
293 - break;  
294 - }  
295 - case 'stop':{  
296 - \Co\run(function (){  
297 - echo "正在退出程序...\n非必要请不要强制kill掉进程\n";  
298 - redis()->set(SYNC_RUNNING_REDIS_KEY,'stop');  
299 - while (true){  
300 - $num = exec("ps -ef | grep \"sync_email.php start\" | grep -v grep | wc -l");  
301 - if(!$num){  
302 - break;  
303 - }  
304 - co::sleep(0.5);  
305 - }  
306 - echo "已退出程序\n";  
307 - });  
308 - break;  
309 - }  
310 - default:{  
311 - break;  
312 - }  
313 -}  
314 -  
315 -  
316 -  
317 -  
318 -  
319 -  
320 -  
321 -  
322 -  
323 - 2 +//
  3 +///**
  4 +// * TODO:: 这个文件是定时去拉取所有邮箱中的邮件
  5 +// * 暂时不用这个
  6 +// */
  7 +////error_reporting();
  8 +//
  9 +//use Swoole\Process;
  10 +//
  11 +//
  12 +//
  13 +//include_once __DIR__."/../vendor/autoload.php";
  14 +//
  15 +//
  16 +//function start(){
  17 +//
  18 +//// 删除停止运行的值
  19 +// redis()->delete(SYNC_RUNNING_REDIS_KEY);
  20 +//
  21 +// /** 创建一个表 **/
  22 +// $table = new Swoole\Table(128);// 128 行
  23 +// $table->column('val', Swoole\Table::TYPE_INT);
  24 +// $table->create();
  25 +//
  26 +// // 初始时,进行一次统计
  27 +// $table->set('etotal',['val'=> db()->count(\Model\emailSql::count())]);
  28 +//
  29 +// // 进程管理器
  30 +// $pm = new Process\Manager();
  31 +//
  32 +// // 启动一个进程来管理定时
  33 +// $pm->add(function (Process\Pool $pool, int $workerId)use (&$table){
  34 +// _echo("定时进程({$workerId})启动成功");
  35 +// // 每10分钟统计一次邮箱数量
  36 +// \Swoole\Timer::tick(600000,function () use (&$table){
  37 +// $table->set('etotal',['val'=> db()->count(\Model\emailSql::count())]);
  38 +// });
  39 +//
  40 +// // 每2秒执行一次
  41 +// \Swoole\Timer::tick(2000,function () use (&$table,&$pool){
  42 +// // 是否停止脚本
  43 +// $table->set('stop',['val'=> redis()->get(SYNC_RUNNING_REDIS_KEY) === 'stop' ? 1 : 0]);
  44 +//// _echo('定时器');
  45 +// // 检查是否结束了所有的协程同步代码
  46 +// if ($table->get('stop','val')) {
  47 +// $stop_num = 0;
  48 +// foreach (range(0, WORKER_NUM) as $i) {
  49 +// if ($table->exists('ps' . $i)) {
  50 +// $stop_num++;
  51 +// }
  52 +// }
  53 +// if($stop_num >= WORKER_NUM){
  54 +//// 退出进程
  55 +// $pool->shutdown();
  56 +// }
  57 +// }
  58 +//
  59 +// // 邮件总数
  60 +// $total = redis()->get('email_total',0);
  61 +// if($total > $table->get('etotal','val')){
  62 +// $table->set('etotal',['val'=> $total]);
  63 +// }
  64 +//
  65 +// });
  66 +//
  67 +//
  68 +// //todo:: 需要更新同步的邮件,每10分钟同步一次,这里是的时间是微妙
  69 +// \Swoole\Timer::tick(600000,function (){
  70 +//
  71 +// start_now_mail();
  72 +//
  73 +// });
  74 +//
  75 +//
  76 +//
  77 +// // 进行阻塞,否则定时器无法运行
  78 +// while (true){
  79 +// co::sleep(9999);
  80 +// }
  81 +// },true);
  82 +//
  83 +//
  84 +//// 协程配置
  85 +// \co::set([
  86 +// 'max_coroutine'=>COROUTINE_MAX_NUM, // 最大携程数量
  87 +// 'hook_flags'=>SWOOLE_HOOK_TCP, // redis需要的配置
  88 +// ]);
  89 +//
  90 +// // 启动业务进程
  91 +// $pm->addBatch(WORKER_NUM,function (Process\Pool $pool, int $worker_id) use (&$table){
  92 +// _echo("业务进程({$worker_id})启动成功");
  93 +//
  94 +// // 协程id集
  95 +// $cid = [];
  96 +//
  97 +// $i = 0;
  98 +// $email_total = $table->get('etotal','val');//邮件总数量
  99 +// $isRunMaxCNum = 1; // 允许最大协程数量,如果为0则停止所有协程工作,相当于停止脚本
  100 +// // 是否退出进程
  101 +// while (true){
  102 +// $i++;
  103 +// // 每10秒 验证一次邮箱数量,好控制协程的数量
  104 +// if($i>=10){
  105 +// $email_total = $table->get('etotal','val');
  106 +// $i = 0;
  107 +// }
  108 +//
  109 +// if(!$email_total){
  110 +// break;
  111 +// }
  112 +//
  113 +// // 每个协程 分配 10个邮箱任务
  114 +// $cnum = ceil($email_total/(WORKER_NUM*10));
  115 +// // 当前协程的数量
  116 +// $nowCnum = count($cid);
  117 +// // 说明 需要新的协程了
  118 +// if($cnum > $nowCnum){
  119 +// // 开启所需要的协程数量
  120 +// foreach (range(0,$cnum-$nowCnum) as $v){
  121 +// // 启动一个协程
  122 +// create_coroutine($cid,$isRunMaxCNum,$worker_id);
  123 +// }
  124 +// }
  125 +// // 暂时没有实现 减少协程数量操作
  126 +//// else if ($cnum < $nowCnum){
  127 +//// // 说明 协程数量过多,小于了1个协程处理10个邮箱的,资源闲置情况
  128 +//// // 销毁多余协程
  129 +//// $isRunMaxCNum = $nowCnum - $cnum;
  130 +//// }
  131 +//
  132 +// // 每3秒检查一次是否要停止 协程
  133 +// if($i%3 === 0){
  134 +//// _echo('是否收到退出信号:'.$table->get('stop','val'));
  135 +// if ($table->get('stop','val')){
  136 +// // 停止
  137 +// $isRunMaxCNum = 0;
  138 +// }
  139 +// }
  140 +//
  141 +// // 这个是检查 cid的如果协程全部退出,则退出进程
  142 +// co::sleep(1);
  143 +// // 跳出无限循环了
  144 +// if(!$cid){
  145 +// _echo('正常关闭进程('.$worker_id.')');
  146 +// break;
  147 +// }
  148 +// }
  149 +//
  150 +// // 是否停止,这里进行阻塞
  151 +// if ($table->get('stop','val')){
  152 +// // 某个进程退出了
  153 +// $table->set('ps'.$worker_id,['val'=>1]);
  154 +// // 阻塞直到 主进程 kill掉所有子进程
  155 +// while (true){
  156 +// co::sleep(5);
  157 +// }
  158 +// }
  159 +//
  160 +//
  161 +// },true);
  162 +//
  163 +//
  164 +// // 启动管理器
  165 +// $pm->start();
  166 +//
  167 +//}
  168 +//
  169 +///**
  170 +// * 创建协程
  171 +// * @param array $cid
  172 +// * @param int $isRunMaxCNum
  173 +// * @param $worker_id
  174 +// * @author:dc
  175 +// * @time 2023/2/14 17:04
  176 +// */
  177 +//function create_coroutine(array &$cid,int &$isRunMaxCNum,$worker_id){
  178 +// go(function () use (&$cid,&$isRunMaxCNum,$worker_id){
  179 +// // 协程id
  180 +// $cid[co::getCid()] = co::getCid();
  181 +//
  182 +// // 同步操作
  183 +// while (true){
  184 +// // 是否退出协程
  185 +// if(!$isRunMaxCNum){
  186 +//// _echo('协程('.co::getCid().'): stop '.$isRunMaxCNum);
  187 +// break;
  188 +// }
  189 +//
  190 +// // 开始同步
  191 +// try {
  192 +// sync($worker_id);
  193 +// }catch (\Throwable $e){
  194 +// _echo($e->getMessage());
  195 +// logs(
  196 +// $e->getMessage().PHP_EOL.$e->getTraceAsString(),
  197 +// LOG_PATH.'/'.$worker_id.'_'.co::getCid().'.log'
  198 +// );
  199 +// }
  200 +//
  201 +// // 阻塞1秒
  202 +// co::sleep(1);
  203 +// }
  204 +//
  205 +// // 协程完成后执行的函数
  206 +// co::defer(function () use (&$cid,$worker_id){
  207 +// _echo('正常关闭进程('.$worker_id.')下的协程('.co::getCid().')');
  208 +// unset($cid[co::getCid()]);
  209 +// });
  210 +//
  211 +// });
  212 +//}
  213 +//
  214 +//
  215 +///**
  216 +// * 开始同步, 这里是主要的业务代码
  217 +// * @param int $worker_id 进程号
  218 +// * @return int
  219 +// * @author:dc
  220 +// * @time 2023/2/18 11:27
  221 +// */
  222 +//function sync($worker_id=0){
  223 +// // 需要同步的id
  224 +// $id = redis()->lPop('sync_email_lists');
  225 +//
  226 +// if(!$id){
  227 +// co::sleep(1);
  228 +// return -1;
  229 +// }
  230 +//
  231 +// _echo($worker_id.': 协程('.co::getCid().'):抢到 '.$id);
  232 +//
  233 +// $email = db()->first(\Model\emailSql::first($id));
  234 +// if(!$email){
  235 +// return 0;
  236 +// }
  237 +//
  238 +// if($email['pwd_error']){
  239 +// return 1;
  240 +// }
  241 +//
  242 +// $mailServer = new Lib\Mail\Mail($email['email'],base64_decode($email['password']),$email['imap']);
  243 +//
  244 +// // 登录服务器
  245 +// if(!$mailServer->login()){
  246 +// return 2;
  247 +// }
  248 +//
  249 +// // 文件夹间隔1天同步一次
  250 +// if(empty($email['last_sync_time']) || time() > $email['last_sync_time']+86400){
  251 +// // 同步文件夹
  252 +// $mailServer->syncFolder($id,db());
  253 +// }
  254 +//
  255 +// // 读取到邮箱中的文件夹
  256 +// $folders = db()->all(\Model\folderSql::all($email['id']));
  257 +// if(!$folders){
  258 +// return 3;
  259 +// }
  260 +// $folders = list_to_tree($folders);
  261 +// foreach ($folders as $folder){
  262 +// try {
  263 +// if(empty($folder['_child'])){
  264 +// // 同步父文件夹
  265 +// $mailServer->syncMail($id,$folder['id'],$folder['origin_folder']);
  266 +// }else{
  267 +// foreach ($folder as $item){
  268 +// // 同步子文件夹
  269 +// $mailServer->syncMail($id,$item['id'],$item['origin_folder']);
  270 +// }
  271 +// }
  272 +//
  273 +// }catch (Throwable $e){
  274 +// logs(
  275 +// $e->getMessage(),
  276 +// LOG_PATH.'/imap/'.$email['email'].'.error.log'
  277 +// );
  278 +// }
  279 +// }
  280 +//
  281 +//
  282 +// $email = null;
  283 +// $mailServer = null;
  284 +//
  285 +//}
  286 +//
  287 +//
  288 +//
  289 +//
  290 +//switch ($argv[1]){
  291 +// case 'start':{
  292 +// start();
  293 +// break;
  294 +// }
  295 +// case 'stop':{
  296 +// \Co\run(function (){
  297 +// echo "正在退出程序...\n非必要请不要强制kill掉进程\n";
  298 +// redis()->set(SYNC_RUNNING_REDIS_KEY,'stop');
  299 +// while (true){
  300 +// $num = exec("ps -ef | grep \"sync_email.php start\" | grep -v grep | wc -l");
  301 +// if(!$num){
  302 +// break;
  303 +// }
  304 +// co::sleep(0.5);
  305 +// }
  306 +// echo "已退出程序\n";
  307 +// });
  308 +// break;
  309 +// }
  310 +// default:{
  311 +// break;
  312 +// }
  313 +//}
  314 +//
  315 +//
  316 +//
  317 +//
  318 +//
  319 +//
  320 +//
  321 +//
  322 +//
  323 +//