Topthink.php 7.1 KB
<?php
// +----------------------------------------------------------------------
// | ThinkPHP [ WE CAN DO IT JUST THINK IT ]
// +----------------------------------------------------------------------
// | Copyright (c) 2006-2015 http://thinkphp.cn All rights reserved.
// +----------------------------------------------------------------------
// | Licensed ( http://www.apache.org/licenses/LICENSE-2.0 )
// +----------------------------------------------------------------------
// | Author: yunwuxin <448901948@qq.com>
// +----------------------------------------------------------------------

namespace think\queue\connector;

use think\exception\HttpException;
use think\queue\Connector;
use think\Request;
use think\queue\job\Topthink as TopthinkJob;
use think\Response;

class Topthink extends Connector
{
    protected $options = [
        'token'       => '',
        'project_id'  => '',
        'protocol'    => 'https',
        'host'        => 'qns.topthink.com',
        'port'        => 443,
        'api_version' => 1,
        'max_retries' => 3,
        'default'     => 'default'
    ];

    /** @var  Request */
    protected $request;

    protected $url;

    protected $curl = null;

    protected $last_status;

    protected $headers = [];

    public function __construct($options)
    {
        if (!empty($options)) {
            $this->options = array_merge($this->options, $options);
        }

        $this->url = "{$this->options['protocol']}://{$this->options['host']}:{$this->options['port']}/v{$this->options['api_version']}/";

        $this->headers['Authorization'] = "Bearer {$this->options['token']}";

        $this->request = Request::instance();
    }

    public function push($job, $data = '', $queue = null)
    {
        return $this->pushRaw(0, $queue, $this->createPayload($job, $data));
    }

    public function later($delay, $job, $data = '', $queue = null)
    {
        return $this->pushRaw($delay, $queue, $this->createPayload($job, $data));
    }

    public function release($queue, $job, $delay)
    {
        return $this->pushRaw($delay, $queue, $job->payload, $job->attempts);
    }

    public function marshal()
    {
        $job = new TopthinkJob($this, $this->marshalPushedJob(), $this->request->header('topthink-message-queue'));
        if ($this->request->header('topthink-message-status') == 'success') {
            $job->fire();
        } else {
            $job->failed();
        }
        return new Response('OK');
    }

    public function pushRaw($delay, $queue, $payload, $attempts = 0)
    {
        $queue_name = $this->getQueue($queue);
        $queue      = rawurlencode($queue_name);
        $url        = "project/{$this->options['project_id']}/queue/{$queue}/message";
        $message    = [
            'payload'  => $payload,
            'attempts' => $attempts,
            'delay'    => $delay
        ];

        return $this->apiCall('POST', $url, $message)->id;
    }

    public function deleteMessage($queue, $id)
    {
        $queue = rawurlencode($queue);
        $url   = "project/{$this->options['project_id']}/queue/{$queue}/message/{$id}";
        return $this->apiCall('DELETE', $url);
    }

    protected function apiCall($type, $url, $params = [])
    {
        $url = "{$this->url}$url";

        if ($this->curl == null) {
            $this->curl = curl_init();
        }

        switch ($type = strtoupper($type)) {
            case 'DELETE':
                curl_setopt($this->curl, CURLOPT_URL, $url);
                curl_setopt($this->curl, CURLOPT_CUSTOMREQUEST, $type);
                curl_setopt($this->curl, CURLOPT_POSTFIELDS, json_encode($params));
                break;
            case 'PUT':
                curl_setopt($this->curl, CURLOPT_URL, $url);
                curl_setopt($this->curl, CURLOPT_CUSTOMREQUEST, $type);
                curl_setopt($this->curl, CURLOPT_POSTFIELDS, json_encode($params));
                break;
            case 'POST':
                curl_setopt($this->curl, CURLOPT_URL, $url);
                curl_setopt($this->curl, CURLOPT_CUSTOMREQUEST, $type);
                curl_setopt($this->curl, CURLOPT_POST, true);
                curl_setopt($this->curl, CURLOPT_POSTFIELDS, $params);
                break;
            case 'GET':
                curl_setopt($this->curl, CURLOPT_POSTFIELDS, null);
                curl_setopt($this->curl, CURLOPT_CUSTOMREQUEST, $type);
                curl_setopt($this->curl, CURLOPT_HTTPGET, true);
                $url .= '?' . http_build_query($params);
                curl_setopt($this->curl, CURLOPT_URL, $url);
                break;
        }

        curl_setopt($this->curl, CURLOPT_SSL_VERIFYPEER, false);
        curl_setopt($this->curl, CURLOPT_RETURNTRANSFER, true);

        $headers = [];
        foreach ($this->headers as $k => $v) {
            if ($k == 'Connection') {
                $v = 'Close';
            }
            $headers[] = "$k: $v";
        }

        curl_setopt($this->curl, CURLOPT_HTTPHEADER, $headers);
        curl_setopt($this->curl, CURLOPT_CONNECTTIMEOUT, 10);

        return $this->callWithRetries();
    }

    protected function callWithRetries()
    {
        for ($retry = 0; $retry < $this->options['max_retries']; $retry++) {
            $out = curl_exec($this->curl);
            if ($out === false) {
                $this->reportHttpError(0, curl_error($this->curl));
            }
            $this->last_status = curl_getinfo($this->curl, CURLINFO_HTTP_CODE);

            if ($this->last_status >= 200 && $this->last_status < 300) {
                return self::jsonDecode($out);
            } elseif ($this->last_status >= 500) {
                self::waitRandomInterval($retry);
            } else {
                $this->reportHttpError($this->last_status, $out);
            }
        }
        $this->reportHttpError($this->last_status, "Service unavailable");
        return null;
    }

    protected static function jsonDecode($response)
    {
        $data = json_decode($response);

        $json_error = json_last_error();
        if ($json_error != JSON_ERROR_NONE) {
            throw new \RuntimeException($json_error);
        }

        return $data;
    }

    protected static function waitRandomInterval($retry)
    {
        $max_delay = pow(4, $retry) * 100 * 1000;
        usleep(rand(0, $max_delay));
    }

    protected function reportHttpError($status, $text)
    {
        throw new HttpException($status, "http error: {$status} | {$text}");
    }

    /**
     * Marshal out the pushed job and payload.
     *
     * @return object
     */
    protected function marshalPushedJob()
    {
        return (object) [
            'id'       => $this->request->header('topthink-message-id'),
            'payload'  => $this->request->getContent(),
            'attempts' => $this->request->header('topthink-message-attempts')
        ];
    }


    public function __destruct()
    {
        if ($this->curl != null) {
            curl_close($this->curl);
            $this->curl = null;
        }
    }

    public function pop($queue = null)
    {
        throw new \RuntimeException('pop queues not support for this type');
    }
}