SyncSubmitTaskDistribution.php 3.4 KB
<?php

namespace App\Console\Commands\Sync;


use App\Exceptions\InquiryFilterException;
use App\Models\Project\Project;
use App\Models\SyncSubmitTask\SyncSubmitTask as SyncSubmitTaskModel;
use App\Services\SyncSubmitTaskService;
use Illuminate\Console\Command;
use Illuminate\Support\Facades\Cache;
use Illuminate\Support\Facades\DB;
use Illuminate\Support\Facades\Redis;
use Illuminate\Support\Facades\Schema;
use Illuminate\Support\Str;
/**
 *
 * Class SyncSubmitTask
 * @package App\Console\Commands
 * @author zbj
 * @date 2023/11/28
 */
class SyncSubmitTaskDistribution extends Command
{

    protected $signature = 'sync_submit_task_distribution';
    protected $description = '询盘、访问异步任务分发';

    public function handle()
    {
        while (true) {
            try {
                $len = Redis::llen('sync_submit_task');
                if (!$len) {
                    $max_id = SyncSubmitTaskModel::orderBy('id', 'desc')->value('id');
                    if ($max_id > 2000000) {
                        $this->backup();
                    } else {
                        $tasks = SyncSubmitTaskModel::where('status', 0)->limit(100)->get();
                        foreach ($tasks as $task) {
                            $task->status = 3;
                            $task->save();
                            Redis::lpush('sync_submit_task', $task->id);
                            $this->output('分发:' . $task->id);
                        }
                    }
                }else{
                    sleep(1);
                }
            }catch (\Exception $e){
                $this->output('异常:' . $e->getMessage());
            }
        }
    }

    /**
     * 输出处理日志
     */
    public function output($message): bool
    {
        echo date('Y-m-d H:i:s') . ' | ' . $message . PHP_EOL;
        return true;
    }

    /**
     * 备份数据
     * @author zbj
     * @date 2024/1/23
     */
    public function backup()
    {
        DB::beginTransaction();
        try {
            $table = (new SyncSubmitTaskModel())->getTable();
            $new_table = $table . '_backup_' . date('Ymd');
            if(Schema::hasTable($new_table)){
                $new_table = $table . '_backup_' . date('YmdH');
            }
            //重命名当前表
            Schema::rename($table, $new_table);
            //克隆表数据
            DB::statement('CREATE TABLE ' . $table . ' LIKE ' . $new_table);

            //未入队的写到新表
            $page = 1;
            while (true){
                $list = DB::table($new_table)->where('status', 0)->forPage($page, 5000)->get();
                if(!count($list)){
                    break;
                }
                $data = [];
                foreach ($list as $task) {
                    $data[] = [
                        'type' => $task->type,
                        'data' => $task->data,
                        'created_at' => $task->created_at,
                        'updated_at' => $task->updated_at,
                    ];
                }
                $data && SyncSubmitTaskModel::insert($data);
                $page++;
            }

            DB::commit();

            $this->output('数据备份成功');
        } catch (\Exception $e) {
            $this->output('数据备份失败' . $e->getMessage());
            DB::rollBack();
        }
        return $new_table ?? '';
    }
}