send_job.php
7.1 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
<?php
include_once "../vendor/autoload.php";
// 这里试试不用多进程模式,用多协程模式
function start(){
    // 删除key
    redis()->delete('send_job_is_stop');
    // 开启协程
    \Co\run(function (){
        $cNum = 0;//协程运行的数量
        while (true){
            try {
                // 是否要停止
                if(redis()->get('send_job_is_stop')=='stop'){
                    break;
                }
                $lists  =   db()->all(\Model\sendJobsSql::sendList(500-$cNum));
                // 循环
                foreach ($lists as $list){
                    // 占用 id
                    if(redis()->add('send_job_run_id_'.$list['id'],$list['id'])){
                        go(function ($data) use (&$cNum){
                            $cNum++; // 协程数+1
                            // 表单数据
                            $data['maildata'] = json_decode($data['maildata'],true);
                            // 查询邮箱
                            $email = db()->first(\Model\emailSql::first($data['email_id']));
                            // 更新状态
                            \Model\sendJobsSql::upStatus($data['id'],1,db());
                            // 是否是单发送
                            if($data['maildata']['massSuit']??0){
                                $tos    =   $data['maildata']['tos'];
                                foreach ($tos as $to){
                                    // 是否暂停
                                    if(db()->value(\Model\sendJobsSql::isStatus($data['id'])) === 3){
                                        break;
                                    }
                                    // 是否已发送过了
                                    if(db()->count(\Model\sendJobStatusSql::count($data['id'],$to['email']))){
                                        continue;
                                    }
                                    // 每个收件人单独发送
                                    $data['maildata']['tos'] = [$to];
                                    $result = \Lib\Mail\MailFun::sendEmail($data['maildata'],$email);
                                    // 插入紫薯精
                                    db()->insert(\Model\sendJobStatusSql::$table,[
                                        'job_id'  =>  $data['id'],
                                        'to_email'  =>  $to['email'],
                                        'status'    =>  $result[0] ? 1 : 0,
                                        'error'    =>  $result[0] ? $result[1] : ''
                                    ]);
                                    // 时间距离下次的时间
                                    if($data['maildata']['masssuit_interval_send']??[]){
                                        $time = rand($data['maildata']['masssuit_interval_send']['start'],$data['maildata']['masssuit_interval_send']['end']);
                                        if($time){
                                            $block = false;
                                            while (true){
                                                // 没5秒循环一次
                                                if(redis()->get('send_job_is_stop')=='stop'){
                                                    $block = true;
                                                    break;
                                                }
                                                $time-=5;
                                                co::sleep(5);
                                                // 执行下一次了
                                                if (!$time){
                                                    $block = true;
                                                    break;
                                                }
                                            }
                                            if($block){
                                                break;
                                            }
                                        }
                                    }
                                }
                            }else{
                                $result = \Lib\Mail\MailFun::sendEmail($data['maildata'],$email);
                                // 更新状态
                                \Model\sendJobsSql::upStatus($data['id'],2,db());
                                // 插入紫薯精
                                db()->insert(\Model\sendJobStatusSql::$table,[
                                    'job_id'  =>  $data['id'],
                                    'to_email'  =>  'all',
                                    'status'    =>  $result[0] ? 1 : 0,
                                    'error'    =>  $result[0] ? $result[1] : ''
                                ]);
                            }
                            // 协程结束后
                            co::defer(function ($id) use(&$cNum,$data){
                                $cNum--;
                                // 验证是否完成
                                if($data['maildata']['massSuit']??0){
                                    $total = db()->count(\Model\sendJobStatusSql::count($data['id']));
                                    // 更新状态
                                    \Model\sendJobsSql::upStatus($data['id'],$total == $data['total'] ? 2 : 0,db());
                                }
                                // 写入日志
                                \Lib\Log::getInstance()->write();
                                // 结束后要关闭数据库链接,不然链接一直暂用
                                db()->close();
                                // 删除占用
                                redis()->delete('send_job_run_id_'.$data['id']);
                                redis()->close();
                            });
                        },$list);
                    }
                }
            }catch (Throwable $e){
                logs($e->getMessage().$e->getTraceAsString());
            }
            \Lib\Log::getInstance()->write();
            // 暂停5秒
            co::sleep(5);
        }
        // 这个是等待所有协程退出
        while (true){
            if(!$cNum){
                break;
            }
            co::sleep(1);
        }
    });
}
$ps = "ps -ef | grep \"send_job.php start\" | grep -v grep | wc -l";
switch ($argv[1]??0){
    case 'start':{
//        $num = exec($ps);
//        if($num){
//            echo '正则运行,请勿重复运行';
//        }else{
            start();
//        }
        break;
    }
    case 'stop':{
        \Co\run(function ($ps){
            echo "正在退出程序...\n非必要请不要强制kill掉进程\n";
            redis()->set('send_job_is_stop','stop');
            while (true){
                $num = exec($ps);
                if(!$num){
                    break;
                }
                co::sleep(0.2);
            }
            echo "已退出程序\n";
        },$ps);
        break;
    }
    default:{
        break;
    }
}