作者 邓超

sync

1 <?php 1 <?php
2 2
3 3
  4 +// 进程管理器
  5 +$pm = new \Swoole\Process\Manager();
4 6
  7 +// 启动业务进程
  8 +$pm->addBatch(5,function (\Swoole\Process\Pool $pool, int $worker_id){
  9 + //error_reporting();
  10 + include_once __DIR__."/../vendor/autoload.php";
5 11
6 -class SyncEmailMy{ 12 + _echo('子进程重新启动了==>'.$worker_id);
  13 + \Lib\DbPool::$clientNumber = 52;
  14 + \Lib\RedisPool::$clientNumber = 52;
  15 + while (1){
7 16
8 - static $maxGo = 0; 17 + if(\Lib\SwGo::$runNumber >= 50){
  18 + co::sleep(1);
  19 + continue;
  20 + }
9 21
10 - public static function sync(){  
11 - while (1){ 22 + $id = redis()->lPop('sync_email_lists_my');
  23 + // _echo('读取到'.$id);
  24 + if($id && is_numeric($id)){
  25 + // 占用当前的id,占用2小时
  26 + if(redis()->add('just_sync_'.$id,time(),600)){
12 27
13 -// if(self::$maxGo >= 50){  
14 -// co::sleep(1);  
15 -// continue;  
16 -// } 28 + \Lib\SwGo::start(function ($id){
17 29
18 - $id = redis()->lPop('sync_email_lists_my');  
19 - // _echo('读取到'.$id);  
20 - if($id && is_numeric($id)){  
21 - // 占用当前的id,占用2小时  
22 - if(redis()->add('just_sync_'.$id,time(),600)){ 30 + try{
  31 + // 开始同步
  32 + (new \Service\SyncMail($id))->isUidAfter(2)->sync();
  33 + }catch (Throwable $e){
  34 + _echo($e->getMessage());
  35 + }
23 36
24 - self::go($id); 37 + redis()->expire('just_sync_'.$id,30);
  38 + },$id);
25 39
26 - }  
27 -// co::sleep(0.2);  
28 - }else{  
29 - sleep(1);  
30 } 40 }
31 - 41 + co::sleep(0.2);
  42 + }else{
  43 + co::sleep(1);
32 } 44 }
33 - }  
34 -  
35 - public static function go($id){  
36 -// go(function ($id){  
37 -// self::$maxGo++;  
38 - try{  
39 - // 开始同步  
40 - (new \Service\SyncMail($id))->isUidAfter(2)->sync();  
41 - }catch (Throwable $e){  
42 - _echo($e->getMessage());  
43 - }  
44 -  
45 -// co::defer(function () use ($id){  
46 -// self::$maxGo--;  
47 - // 30秒后 消除占用  
48 - redis()->expire('just_sync_'.$id,30);  
49 -  
50 - \Lib\Log::getInstance()->write();  
51 45
52 -// db()->close();  
53 -// });  
54 -// },$id);  
55 } 46 }
56 47
57 -}  
58 -  
59 -  
60 -// 进程管理器  
61 -$pm = new \Swoole\Process\Manager();  
62 -  
63 -// 启动业务进程  
64 -$pm->addBatch(100,function (\Swoole\Process\Pool $pool, int $worker_id){  
65 - //error_reporting();  
66 - include_once __DIR__."/../vendor/autoload.php";  
67 -  
68 - _echo('子进程重新启动了==>'.$worker_id);  
69 -  
70 - \Lib\DbPool::$clientNumber = 60;  
71 -  
72 - SyncEmailMy::sync();  
73 -  
74 -},false); 48 +},true);
75 49
76 $pm->start(); 50 $pm->start();
77 51
@@ -19,6 +19,17 @@ class RedisPool { @@ -19,6 +19,17 @@ class RedisPool {
19 */ 19 */
20 static $pool = null; 20 static $pool = null;
21 21
  22 + /**
  23 + * 获取到的连接
  24 + * @var array
  25 + */
  26 + public static $clientAll = [];
  27 +
  28 + /**
  29 + * 连接的数量
  30 + * @var int
  31 + */
  32 + public static $clientNumber = 1024;
22 33
23 /** 34 /**
24 * RedisPool constructor. 35 * RedisPool constructor.
@@ -33,35 +44,39 @@ class RedisPool { @@ -33,35 +44,39 @@ class RedisPool {
33 ->withAuth(REDIS_PASSWORD) 44 ->withAuth(REDIS_PASSWORD)
34 ->withDbIndex(REDIS_DB) 45 ->withDbIndex(REDIS_DB)
35 ->withTimeout(60) 46 ->withTimeout(60)
36 - ,1024 47 + ,self::$clientNumber
37 ); 48 );
38 49
39 } 50 }
40 -  
41 - // 获取一个连接,放入当前实例  
42 - $this->client = static::$pool->get();  
43 } 51 }
44 52
45 - 53 + public function getClient()
  54 + {
  55 + $id = \co::getCid();
  56 + if(empty(static::$clientAll[$id])){
  57 + static::$clientAll[$id] = self::$pool->get();
  58 + }
  59 + return static::$clientAll[$id];
  60 + }
46 61
47 62
48 public function __destruct() 63 public function __destruct()
49 { 64 {
50 - $this->close(); 65 +// $this->close();
51 } 66 }
52 67
53 68
54 /** 69 /**
55 - * 关闭 70 + * 关闭链接
56 * @author:dc 71 * @author:dc
57 - * @time 2023/3/16 13:42 72 + * @time 2024/5/30 10:30
58 */ 73 */
59 public function close(){ 74 public function close(){
60 -  
61 - self::$pool->put($this->client);  
62 -  
63 - $this->client = null;  
64 - 75 + $id = \co::getCid();
  76 + if (isset(static::$clientAll[$id])){
  77 + self::$pool->put(static::$clientAll[$id]);
  78 + }
  79 + unset(static::$clientAll[$id]);
65 } 80 }
66 81
67 82