作者 邓超

增加代理

... ... @@ -29,9 +29,13 @@ class SyncToEsCmd {
$es = es();
$db = db();
$startTime = time();
while (1){
if(time()-43200 > $startTime){
break;
}
// 检查是否接收到信号
pcntl_signal_dispatch();
... ...
... ... @@ -16,9 +16,112 @@ class SmtpClient{
*/
protected $client;
public function __construct(string $host)
/**
* 是否验证了
*/
public $isAuth = false;
/**
* 协程锁
* @var bool
*/
private bool $_lock = false;
public function __construct()
{
$this->host = $host;
}
/**
* 执行
* @param $data
* @return array
* @author:dc
* @time 2025/4/17 9:03
*/
public function exec($data):array {
if (!$this->isAuth) {
try {
$auth = new Auth($data);
}catch (Throwable $e){
return [false,$e->getMessage()];
}
// 连接客户端
try {
$this->open($auth->host,$auth->out_ip, $auth->timeOut);
}catch (Throwable $e){
return [false,'500 ' . $e->getMessage()."\r\n"];
}
$line = $this->readLine();
return [true,$line];
} // 正式请求转发
else {
// 没有连接成功
if(!$this->isAuth){
return [false,"500 No proxy server.\r\n"];
}
$line = false;
try {
// 请求数据
// $num = 0;
// foreach (explode("\r\n",$data) as $cmd){
// if(strlen($cmd) > 0){
// if($n = self::$clients[$fd]->write($cmd."\r\n")){
// $num += $n;
// }
// }
// }
$num = $this->write($data);
if($num) $line = $this->readLine();
}catch (Throwable $e){
$line = '500 server error '.$e->getMessage()."\r\n";
}
if($line===false){
$line = "500 server error 2 \r\n";
}
if(!$line&&$data==".\r\n"){
$line = "250 Mail OK ok \r\n";
}
if(empty($line)){
$line = "250 Mail OK no reply \r\n";
}
return [true,$line];
}
}
/**
* 排他锁
* @return bool
*/
public function lock():void
{
while ($this->_lock){
co::sleep(0.1);
}
$this->_lock = true;
}
/**
* 解锁
* @author:dc
* @time 2025/4/17 9:09
*/
public function unlock(){
$this->_lock = false;
}
/**
... ... @@ -29,7 +132,8 @@ class SmtpClient{
* @author:dc
* @time 2025/3/31 10:27
*/
public function open(string $out_ip, int $timeout=5){
public function open(string $host, string $out_ip, int $timeout=5){
$this->host = $host;
$client = new \Swoole\Coroutine\Client(SWOOLE_SOCK_TCP | SWOOLE_SSL);
$client->set([
'timeout'=> $timeout,
... ... @@ -46,6 +150,7 @@ class SmtpClient{
if($client->isConnected()){
$this->client = $client;
$this->isAuth = true;
}else{
throw new Exception($this->host." connection fail. ");
}
... ... @@ -79,7 +184,7 @@ class SmtpClient{
*/
public function readLine($timeout=5){
if($this->is_read === 2){
return '220 DATA OK';
return "250 DATA OK\r\n";
}
if($this->is_read === 1){
... ... @@ -87,11 +192,11 @@ class SmtpClient{
}
$online = $this->client->recv($timeout);
return $online===false ? '500 read time out.' : $online;
return $online===false ? "500 read time out.\r\n" : $online;
}
public function __destruct()
public function close()
{
if(!empty($this->client)) $this->client->close();
unset($this->client);
... ...
... ... @@ -11,11 +11,15 @@ class ProxyService
* 连接数
* @var SmtpClient[]
*/
protected static $clients = [];
private $clients = [];
protected function push($msg){
echo 'out '.$msg;
if(substr($msg,-2)!=="\r\n"){
$msg .= "\r\n";
}
// echo 'out '.$msg;
return $msg;
}
... ... @@ -31,73 +35,45 @@ class ProxyService
SWOOLE_BASE,
SWOOLE_SOCK_TCP//|SWOOLE_SSL
);
$server->set([
'dispatch_mode' => 2 // 固定模式
]);
//监听连接进入事件。
$server->on('Connect', function ($server, $fd) {
$server->send($fd, $this->push("220 proxy client ok\r\n"));
});
//监听数据接收事件。
$server->on('Receive', function ($server, $fd, $reactor_id, $data) {
echo "in > ".$data;
// 建立连接
if (empty(self::$clients[$fd])) {
try {
$auth = new Auth($data);
}catch (Throwable $e){
$server->send($fd, $this->push($e->getMessage()));
$server->close($fd,true);
return;
$server->on('Receive', function (Swoole\Server $server, $fd, $reactor_id, $data) {
// echo "in ".rand(10,99)." > ".$data;
// $ridfid = $reactor_id.'_'.$fd;
if(empty($this->clients[$fd])){
$this->clients[$fd] = new SmtpClient();
}
// 创建一个客户端
self::$clients[$fd] = new SmtpClient($auth->host);
// 连接客户端
try {
self::$clients[$fd]->open($auth->out_ip, $auth->timeOut);
}catch (Throwable $e){
$server->send($fd,$this->push('500 ' . $e->getMessage()."\r\n"));
$server->close($fd,true);
return;
}
$line = self::$clients[$fd]->readLine(1);
$server->send($fd,$this->push($line));
// 加锁
$this->clients[$fd]->lock();
// 处理数据
$result = $this->clients[$fd]->exec($data);
// 解锁
$this->clients[$fd]->unlock();
// 返回结果
$server->send($fd, $this->push($result[1]));
// 是否关闭连接
if($result[0]===false){
// 关闭并释放资源
$this->clients[$fd]->close();
$this->clients[$fd] = null;
unset($this->clients[$fd]);
} // 正式请求转发
else {
// 没有连接成功
if(empty(self::$clients[$fd])){
$server->send($fd, $this->push("500 No proxy server.\r\n"));
$server->close($fd,true);
return;
}
$line = false;
try {
// 请求数据
$num = self::$clients[$fd]->write($data);
if($num) $line = self::$clients[$fd]->readLine();
}catch (Throwable $e){
$line = '500 server error '.$e->getMessage()."\r\n";
}
// echo 'out '.co::getCid()." => ".$line;
if($line!==false) $server->send($fd,$this->push($line));
}
});
//监听连接关闭事件。
$server->on('Close', function ($server, $fd) {
// echo '连接关闭了 => '.$fd."\n";
// 关闭并释放资源
self::$clients[$fd] = null;
unset(self::$clients[$fd]);
echo '连接关闭了 => '.$fd."\n";
unset($this->clients[$fd]);
});
//启动服务器
... ...