sync_email.php
3.3 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
<?php
//error_reporting();
use Swoole\Process;
include_once __DIR__."/config.php";
include_once __DIR__."/function.php";
function start(){
// 启动一个进程池进行管理
$topPool = new Process\Pool(WORKER_NUM);
// 这个是启用协程
$topPool->set(['enable_coroutine' => true]);
// 协程配置
\co::set([
'max_coroutine'=>1000, // 最大携程数量
'hook_flags'=>SWOOLE_HOOK_TCP, // redis需要的配置
]);
// 开始工作
$topPool->on('WorkerStart',function (Process\Pool $pool,$worker_id){
_echo("进程({$worker_id})启动成功");
// 协程id集
$cid = [];
// 删除停止运行的值
swoole_redis()->delete(RUNNING_REDIS_KEY);
// \Co\run(function (){
// 开启多个协程
foreach (range(1,COROUTINE_NUM) as $i){
go(function () use ($cid,$worker_id){
// 协程id
$cid[co::getCid()] = co::getCid();
$redis = swoole_redis();
// 同步操作
while (true){
// 开始同步
try {
sync();
}catch (Throwable $e){
logs($e->getMessage(),LOG_PATH.'/'.$worker_id.'_'.co::getCid().'.log');
}
// 是否停止
if($redis->get(RUNNING_REDIS_KEY) == 'stop'){
break;
}
// 阻塞1秒
co::sleep(1);
}
// 协程完成后执行的函数
co::defer(function () use ($cid){
_echo('正常关闭协程('.co::getCid().')');
unset($cid[co::getCid()]);
});
});
}
// });
// 是否退出进程
while (true){
if($cid){
co::sleep(1);
}else{
_echo('正常关闭进程('.$worker_id.')');
// 关闭当前进程
$pool->shutdown();
break;
}
}
});
// 停止工作后的回调
$topPool->on('WorkerStop', function (\Swoole\Process\Pool $pool, $workerId) {
_echo("[Worker #{$workerId}] WorkerStop\n");
});
$topPool->start();
}
/**
* 开始同步
* @author:dc
* @time 2023/2/13 9:42
*/
function sync(){
db()->first();
}
switch ($argv[1]){
case 'start':{
start();
break;
}
case 'stop':{
\Co\run(function (){
echo "正在退出程序...\n";
swoole_redis()->set(RUNNING_REDIS_KEY,'stop');
while (true){
$num = exec("ps -ef | grep \"sync_email.php start\" | grep -v grep | wc -l");
if(!$num){
break;
}
co::sleep(0.5);
}
echo "已退出程序\n";
});
break;
}
default:{
echo "参数 :\n";
echo "\tstart n n是协程的数量\n";
echo "\tstop\n";
echo "\trestart n\n";
echo PHP_EOL;
break;
}
}
exit();