作者 zhl

修改任务获取方式

@@ -51,12 +51,13 @@ class GeoQuestionRes extends Command @@ -51,12 +51,13 @@ class GeoQuestionRes extends Command
51 sleep(300); 51 sleep(300);
52 continue; 52 continue;
53 } 53 }
54 - $lock_key = "geo_task_lock:$task_id"; 54 + $lock_key = "geo_task_lock_" . $task_id;
55 if (!Redis::setnx($lock_key, 1)) { 55 if (!Redis::setnx($lock_key, 1)) {
56 $this->output("任务 $task_id 已被其他进程锁定,跳过"); 56 $this->output("任务 $task_id 已被其他进程锁定,跳过");
  57 + sleep(30); // 程序挂起, 避免最后一个任务 扫数据表和Redis
57 continue; 58 continue;
58 } 59 }
59 - Redis::expire($lock_key, 1200); // 1小时自动解锁 60 + Redis::expire($lock_key, 600); // 10自动解锁
60 $this->output('执行的任务ID:' . $task_id); 61 $this->output('执行的任务ID:' . $task_id);
61 $geoQuestionModel = new GeoQuestion(); 62 $geoQuestionModel = new GeoQuestion();
62 $taskInfo = $geoQuestionModel->read(['id'=>$task_id]); 63 $taskInfo = $geoQuestionModel->read(['id'=>$task_id]);
@@ -86,6 +87,8 @@ class GeoQuestionRes extends Command @@ -86,6 +87,8 @@ class GeoQuestionRes extends Command
86 $geoResultModel = new GeoQuestionResult(); 87 $geoResultModel = new GeoQuestionResult();
87 $geoLogModel = new GeoQuestionLog(); 88 $geoLogModel = new GeoQuestionLog();
88 foreach ($taskInfo['question'] as $question) { 89 foreach ($taskInfo['question'] as $question) {
  90 + Redis::expire($lock_key, 1200); // 一个问题执行时间可能会达到15-18分钟
  91 +
89 $en_question = Translate::tran($question, 'zh') ?? ''; 92 $en_question = Translate::tran($question, 'zh') ?? '';
90 $this->output('项目ID:' . $taskInfo['project_id'] . ', 问题 开始:' . $question); 93 $this->output('项目ID:' . $taskInfo['project_id'] . ', 问题 开始:' . $question);
91 foreach ($platformsArr as $platform) { 94 foreach ($platformsArr as $platform) {
@@ -345,19 +348,28 @@ class GeoQuestionRes extends Command @@ -345,19 +348,28 @@ class GeoQuestionRes extends Command
345 $key = 'geo_task_list'; 348 $key = 'geo_task_list';
346 $task_id = Redis::rpop($key); 349 $task_id = Redis::rpop($key);
347 if(empty($task_id)){ 350 if(empty($task_id)){
348 - $project_ids = GeoQuestion::where('status', GeoQuestion::STATUS_OPEN)->where('next_time', '<=', date('Y-m-d'))  
349 - ->orderBy('next_time', 'asc')->pluck('project_id')->unique()->values()->toArray();  
350 - if(!empty($project_ids)){  
351 - foreach ($project_ids as $project_id){  
352 - $ids = GeoQuestion::where(['project_id' => $project_id, 'status' => GeoQuestion::STATUS_OPEN])->where('next_time', '<=', date('Y-m-d'))->pluck('id');  
353 - foreach ($ids as $id) {  
354 - //检查任务是否执行过  
355 - if (!Redis::exists("geo_task_lock:$id")) {  
356 - Redis::lpush($key, $id);  
357 - }  
358 - }  
359 - $task_id = Redis::rpop($key); 351 + $lock_key = 'geo_task_generation_lock';
  352 + $lock_ttl = 60; // 锁时间大于当前 锁功能执行时间
  353 + // 尝试获取锁,非阻塞方式
  354 + $lock = Redis::set($lock_key, 1, 'EX', $lock_ttl, 'NX');
  355 + if (empty($lock))
  356 + return $task_id;
  357 +
  358 + $project_ids = GeoQuestion::where('status', GeoQuestion::STATUS_OPEN)->where('next_time', '<=', date('Y-m-d'))->pluck('project_id')->unique()->values()->toArray();
  359 + if(FALSE == empty($project_ids)){
  360 + $ids = GeoQuestion::where('status', GeoQuestion::STATUS_OPEN)
  361 + ->whereIn('project_id', $project_ids)
  362 + ->where(function ($query){
  363 + $query->where('current_time', '!=', date('Y-m-d'))
  364 + ->orWhereNull('current_time');
  365 + })
  366 + ->orderBy('project_id', 'asc')
  367 + ->pluck('id');
  368 +
  369 + foreach ($ids as $id) {
  370 + Redis::lpush($key, $id);
360 } 371 }
  372 + $task_id = Redis::rpop($key);
361 } 373 }
362 } 374 }
363 return $task_id; 375 return $task_id;