作者 邓超

1

正在显示 1 个修改的文件 包含 58 行增加58 行删除
@@ -53,43 +53,43 @@ function start(){ @@ -53,43 +53,43 @@ function start(){
53 co::sleep(1); 53 co::sleep(1);
54 }else{ 54 }else{
55 // 占用当前的id,占用2小时 55 // 占用当前的id,占用2小时
56 - redis()->add('just_sync_'.$id,time(),7200);  
57 - // 启动一个协程  
58 - go(function () use (&$start_num,$worker_id,$id){  
59 - $start_num++;  
60 -  
61 - // 开始同步  
62 - try {  
63 - sync($id,$worker_id);  
64 - }catch (\Throwable $e){  
65 - // 重新发布同步任务,如果失败了是否重新发布 56 + if(redis()->add('just_sync_'.$id,time(),7200)){
  57 + // 启动一个协程
  58 + go(function () use (&$start_num,$worker_id,$id){
  59 + $start_num++;
  60 +
  61 + // 开始同步
  62 + try {
  63 + sync($id,$worker_id);
  64 + }catch (\Throwable $e){
  65 + // 重新发布同步任务,如果失败了是否重新发布
66 // redis()->rPush('sync_email_lists',$id); 66 // redis()->rPush('sync_email_lists',$id);
67 67
68 // _echo($e->getMessage()); 68 // _echo($e->getMessage());
69 - logs(  
70 - $e->getMessage().PHP_EOL.$e->getTraceAsString(),  
71 - LOG_PATH.'/'.$worker_id.'.log'  
72 - );  
73 - }  
74 -  
75 - // 协程完成后执行的函数  
76 - co::defer(function () use (&$start_num,$worker_id,$id){ 69 + logs(
  70 + $e->getMessage().PHP_EOL.$e->getTraceAsString(),
  71 + LOG_PATH.'/'.$worker_id.'.log'
  72 + );
  73 + }
  74 +
  75 + // 协程完成后执行的函数
  76 + co::defer(function () use (&$start_num,$worker_id,$id){
77 // _echo('正常关闭进程('.$worker_id.')下的协程('.co::getCid().')'); 77 // _echo('正常关闭进程('.$worker_id.')下的协程('.co::getCid().')');
78 - $start_num--;  
79 - // 消除占用  
80 - redis()->delete('just_sync_'.$id);  
81 - // 写入日志  
82 - \Lib\Log::getInstance()->write(); 78 + $start_num--;
  79 + // 消除占用
  80 + redis()->delete('just_sync_'.$id);
  81 + // 写入日志
  82 + \Lib\Log::getInstance()->write();
83 83
84 - // 关闭数据库链接  
85 - db()->close();  
86 - // 关闭redis链接  
87 - redis()->close(); 84 + // 关闭数据库链接
  85 + db()->close();
  86 + // 关闭redis链接
  87 + redis()->close();
88 88
89 - });  
90 -  
91 - }); 89 + });
92 90
  91 + });
  92 + }
93 } 93 }
94 }else{ 94 }else{
95 // 协程到了最大的数量,阻塞1秒 95 // 协程到了最大的数量,阻塞1秒
@@ -118,41 +118,41 @@ function start(){ @@ -118,41 +118,41 @@ function start(){
118 co::sleep(1); 118 co::sleep(1);
119 }else{ 119 }else{
120 // 占用当前的id,占用2小时 120 // 占用当前的id,占用2小时
121 - redis()->add('just_sync_body_'.$id['lists_id'],time(),600);  
122 - // 启动一个协程  
123 - go(function () use (&$start_num,$worker_id,$id){ 121 + if(redis()->add('just_sync_body_'.$id['lists_id'],time(),600)){
  122 + // 启动一个协程
  123 + go(function () use (&$start_num,$worker_id,$id){
124 124
125 - $start_num++; 125 + $start_num++;
126 126
127 - // 开始同步  
128 - try {  
129 - sync_body($id,$worker_id);  
130 - }catch (\Throwable $e){ 127 + // 开始同步
  128 + try {
  129 + sync_body($id,$worker_id);
  130 + }catch (\Throwable $e){
131 // _echo($e->getMessage()); 131 // _echo($e->getMessage());
132 - logs(  
133 - $e->getMessage().PHP_EOL.$e->getTraceAsString(),  
134 - LOG_PATH.'/'.$worker_id.'.log'  
135 - );  
136 - }  
137 -  
138 - // 协程完成后执行的函数  
139 - co::defer(function () use (&$start_num,$worker_id,$id){ 132 + logs(
  133 + $e->getMessage().PHP_EOL.$e->getTraceAsString(),
  134 + LOG_PATH.'/'.$worker_id.'.log'
  135 + );
  136 + }
  137 +
  138 + // 协程完成后执行的函数
  139 + co::defer(function () use (&$start_num,$worker_id,$id){
140 // _echo('正常关闭进程('.$worker_id.')下的协程('.co::getCid().')'); 140 // _echo('正常关闭进程('.$worker_id.')下的协程('.co::getCid().')');
141 - $start_num--;  
142 - // 消除占用  
143 - redis()->delete('just_sync_body_'.$id['lists_id']);  
144 - // 写入日志  
145 - \Lib\Log::getInstance()->write(); 141 + $start_num--;
  142 + // 消除占用
  143 + redis()->delete('just_sync_body_'.$id['lists_id']);
  144 + // 写入日志
  145 + \Lib\Log::getInstance()->write();
146 146
147 - // 关闭数据库链接  
148 - db()->close();  
149 - // 关闭redis链接  
150 - redis()->close(); 147 + // 关闭数据库链接
  148 + db()->close();
  149 + // 关闭redis链接
  150 + redis()->close();
151 151
152 - });  
153 -  
154 - }); 152 + });
155 153
  154 + });
  155 + }
156 } 156 }
157 }else{ 157 }else{
158 // 协程到了最大的数量,阻塞1秒 158 // 协程到了最大的数量,阻塞1秒