MultipartUpload.php
6.4 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
<?php
namespace Qcloud\Cos;
use GuzzleHttp\Pool;
class MultipartUpload {
    const MIN_PART_SIZE = 1048576;
    const MAX_PART_SIZE = 5368709120;
    const DEFAULT_PART_SIZE = 5242880;
    const MAX_PARTS     = 10000;
    private $client;
    private $options;
    private $partSize;
    private $parts;
    private $body;
    private $progress;
    private $totalSize;
    private $uploadedSize;
    private $concurrency;
    private $partNumberList;
    private $needMd5;
    private $retry;
    public function __construct($client, $body, $options = array()) {
        $minPartSize = $options['PartSize'];
        unset($options['PartSize']);
        $this->body = $body;
        $this->client = $client;
        $this->options = $options;
        $this->partSize = $this->calculatePartSize($minPartSize);
        $this->concurrency = isset($options['Concurrency']) ? $options['Concurrency'] : 10;
        $this->progress = isset($options['Progress']) ? $options['Progress'] : function($totalSize, $uploadedSize) {};
        $this->parts = [];
        $this->partNumberList = [];
        $this->uploadedSize = 0;
        $this->totalSize = $this->body->getSize();
        $this->needMd5 = isset($options['ContentMD5']) ? $options['ContentMD5'] : true;
        $this->retry = isset($options['Retry']) ? $options['Retry'] : 3;
    }
    public function performUploading() {
        $uploadId= $this->initiateMultipartUpload();
        $this->uploadParts($uploadId);
        foreach ( $this->parts as $key => $row ){
            $num1[$key] = $row ['PartNumber'];
            $num2[$key] = $row ['ETag'];
        }
        array_multisort($num1, SORT_ASC, $num2, SORT_ASC, $this->parts);
        return $this->client->completeMultipartUpload(array(
            'Bucket' => $this->options['Bucket'],
            'Key' => $this->options['Key'],
            'UploadId' => $uploadId,
            'Parts' => $this->parts)
        );
    }
    public function uploadParts($uploadId) {
        $uploadRequests = function ($uploadId) {
            $partNumber = 1;
            $index = 1;
            $offset = 0;
            $partSize = 0;
            for ( ; ; $partNumber ++) {
                if ($this->body->eof()) {
                    break;
                }
                $body = $this->body->read($this->partSize);
                $partSize = $this->partSize;
                if ($offset + $this->partSize >= $this->totalSize) {
                    $partSize = $this->totalSize - $offset;
                }
                $offset += $partSize;
                if (empty($body)) {
                    break;
                }
                if (isset($this->parts[$partNumber])) {
                    continue;
                }
                $this->partNumberList[$index]['PartNumber'] = $partNumber;
                $this->partNumberList[$index]['PartSize'] = $partSize;
                $params = array(
                    'Bucket' => $this->options['Bucket'],
                    'Key' => $this->options['Key'],
                    'UploadId' => $uploadId,
                    'PartNumber' => $partNumber,
                    'Body' => $body,
                    'ContentMD5' => $this->needMd5
                );
                if ($this->needMd5 == false) {
                    unset($params["ContentMD5"]);
                }
                if (!isset($this->parts[$partNumber])) {
                    $command = $this->client->getCommand('uploadPart', $params);
                    $request = $this->client->commandToRequestTransformer($command);
                    $index ++;
                    yield $request;
                }
            }
        }; 
        $pool = new Pool($this->client->httpClient, $uploadRequests($uploadId), [
            'concurrency' => $this->concurrency,
            'fulfilled' => function ($response, $index) {
                $index = $index + 1;
                $partNumber = $this->partNumberList[$index]['PartNumber'];
                $partSize = $this->partNumberList[$index]['PartSize'];
                //兼容两种写法,防止index为undefined
                if (array_key_exists('etag', $response->getHeaders())) {
                    $etag = $response->getHeaders()["etag"][0];
                }
                if (array_key_exists('ETag', $response->getHeaders())) {
                    $etag = $response->getHeaders()["ETag"][0];
                }
                $part = array('PartNumber' => $partNumber, 'ETag' => $etag);
                $this->parts[$partNumber] = $part;
                $this->uploadedSize += $partSize;
                call_user_func_array($this->progress, [$this->totalSize, $this->uploadedSize]);
            },
           
            'rejected' => function ($reason, $index) {
                printf("part [%d] upload failed, reason: %s\n", $index, $reason);
                throw($reason);
            }
        ]);
        $promise = $pool->promise();
        $promise->wait();
    }
    public function resumeUploading() {
        $uploadId = $this->options['UploadId'];
        $rt = $this->client->ListParts(
            array('UploadId' => $uploadId,
                'Bucket'=>$this->options['Bucket'],
                'Key'=>$this->options['Key']));
                $parts = array();
        if (count($rt['Parts']) > 0) {
            foreach ($rt['Parts'] as $part) {
                $this->parts[$part['PartNumber']] = array('PartNumber' => $part['PartNumber'], 'ETag' => $part['ETag']);
            }
        }
        $this->uploadParts($uploadId);
        foreach ( $this->parts as $key => $row ){
            $num1[$key] = $row ['PartNumber'];
            $num2[$key] = $row ['ETag'];
        }
        array_multisort($num1, SORT_ASC, $num2, SORT_ASC, $this->parts);
        return $this->client->completeMultipartUpload(array(
            'Bucket' => $this->options['Bucket'],
            'Key' => $this->options['Key'],
            'UploadId' => $uploadId,
            'Parts' => $this->parts)
        );
    }
    private function calculatePartSize($minPartSize)
    {   
        $partSize = intval(ceil(($this->body->getSize() / self::MAX_PARTS)));
        $partSize = max($minPartSize, $partSize);
        $partSize = min($partSize, self::MAX_PART_SIZE);
        $partSize = max($partSize, self::MIN_PART_SIZE);
        return $partSize;
    }
    private function initiateMultipartUpload() {
        $result = $this->client->createMultipartUpload($this->options);
        return $result['UploadId'];
    }
}