This commit is contained in:
本间白猫 2025-01-07 14:41:40 +08:00
parent 1b975f82bf
commit 0698879532
2 changed files with 172 additions and 106 deletions

View File

@ -1,5 +1,6 @@
from modules.modules import load_config, DEFAULT_CONFIG, check_proxies, check_for_updates, get_message, print_banner, logos
import threading, argparse, logging, asyncio, time
import threading, argparse, logging, asyncio, time, socket, signal, sys, os
from concurrent.futures import ThreadPoolExecutor
from modules.proxyserver import AsyncProxyServer
from colorama import init, Fore, Style
from itertools import cycle
@ -83,6 +84,67 @@ async def run_proxy_check(server):
else:
logging.info(get_message('proxy_check_disabled', server.language))
class ProxyCat:
def __init__(self):
self.executor = ThreadPoolExecutor(
max_workers=min(32, (os.cpu_count() or 1) * 4),
thread_name_prefix="proxy_worker"
)
loop = asyncio.get_event_loop()
loop.set_default_executor(self.executor)
if hasattr(asyncio, 'WindowsSelectorEventLoopPolicy'):
if os.name == 'nt':
asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy())
socket.setdefaulttimeout(30)
if hasattr(socket, 'TCP_NODELAY'):
socket.TCP_NODELAY = True
self.running = True
signal.signal(signal.SIGINT, self.handle_shutdown)
signal.signal(signal.SIGTERM, self.handle_shutdown)
async def start_server(self):
try:
server = await asyncio.start_server(
self.handle_client,
self.config.get('SERVER', 'host'),
self.config.get('SERVER', 'port')
)
print(f"代理服务器运行在 {self.config.get('SERVER', 'host')}:{self.config.get('SERVER', 'port')}")
async with server:
await server.serve_forever()
except Exception as e:
print(f"服务器启动错误: {e}")
sys.exit(1)
def handle_shutdown(self, signum, frame):
print("\n正在关闭服务器...")
self.running = False
self.executor.shutdown(wait=True)
sys.exit(0)
async def handle_client(self, reader, writer):
try:
await asyncio.get_event_loop().run_in_executor(
self.executor,
self.process_client_request,
reader,
writer
)
except Exception as e:
print(f"处理客户端请求时出错: {e}")
finally:
try:
writer.close()
await writer.wait_closed()
except:
pass
if __name__ == '__main__':
parser = argparse.ArgumentParser(description=logos())
parser.add_argument('-c', '--config', default='config/config.ini', help='配置文件路径')

View File

@ -1,8 +1,10 @@
import asyncio, httpx, logging, re, socket, struct, time, socket, base64, random
from modules.modules import get_message, load_ip_list
from asyncio import TimeoutError
from itertools import cycle
from config import getip
def load_proxies(file_path='ip.txt'):
with open(file_path, 'r') as file:
return [line.strip() for line in file if '://' in line]
@ -41,7 +43,7 @@ class AsyncProxyServer:
self.proxy_fail_count = 0
self.max_fail_count = 2
self.semaphore = asyncio.Semaphore(20000)
self.buffer_size = 256 * 1024
self.buffer_size = 512 * 1024
self.proxy_cache = {}
self.proxy_cache_ttl = 10
self.last_switch_attempt = 0
@ -51,6 +53,12 @@ class AsyncProxyServer:
self.proxy_check_ttl = 5
self.check_cooldown = 1
self.last_check_time = {}
self.retry_count = 3
self.timeout = 30
self.max_concurrent_requests = 50
self.request_semaphore = asyncio.Semaphore(self.max_concurrent_requests)
self.connection_pool = {}
self.pipeline_enabled = True
async def get_next_proxy(self):
if self.mode == 'load_balance':
@ -275,26 +283,17 @@ class AsyncProxyServer:
async def _pipe(self, reader, writer):
try:
while True:
try:
data = await asyncio.wait_for(reader.read(self.buffer_size), timeout=30)
if not data:
break
writer.write(data)
await writer.drain()
except asyncio.TimeoutError:
logging.warning(get_message('data_transfer_timeout', self.language))
continue
except ConnectionResetError:
logging.error(get_message('connection_reset', self.language))
data = await reader.read(self.buffer_size)
if not data:
break
except asyncio.CancelledError:
logging.info(get_message('transfer_cancelled', self.language))
writer.write(data)
if len(data) >= self.buffer_size:
await writer.drain()
except Exception as e:
logging.error(get_message('data_transfer_error', self.language, e))
logging.error(f"数据传输错误: {e}")
finally:
try:
writer.close()
await writer.wait_closed()
await writer.drain()
except:
pass
@ -424,99 +423,60 @@ class AsyncProxyServer:
return None, proxy_addr
async def _handle_request(self, method, path, headers, reader, writer):
if not path.startswith(('http://', 'https://')):
logging.warning(get_message('unsupported_protocol', self.language, path))
writer.write(b'HTTP/1.1 400 Bad Request\r\nContent-Length: 0\r\n\r\n')
await writer.drain()
return
body = await reader.read()
retry_count = 2
while retry_count > 0:
async with self.request_semaphore:
try:
proxy = await self.get_next_proxy()
proxy_type, proxy_addr = proxy.split('://')
proxy_auth, proxy_host_port = self._split_proxy_auth(proxy_addr)
key = f"{proxy}:{path}"
client_kwargs = {
"limits": httpx.Limits(
max_keepalive_connections=100,
max_connections=1000,
keepalive_expiry=30
),
"timeout": httpx.Timeout(30.0),
"follow_redirects": True
}
if proxy_type in ['http', 'https']:
client_kwargs["proxies"] = {
"http://": f"{proxy_type}://{proxy_host_port}",
"https://": f"{proxy_type}://{proxy_host_port}"
}
elif proxy_type == 'socks5':
client_kwargs["transport"] = httpx.AsyncHTTPTransport(
proxy=f"{proxy_type}://{proxy_host_port}",
verify=False
)
if proxy_auth:
headers['Proxy-Authorization'] = f'Basic {base64.b64encode(proxy_auth.encode()).decode()}'
async with httpx.AsyncClient(**client_kwargs) as client:
async with client.stream(
method,
path,
headers=headers,
content=body,
timeout=30.0
) as response:
await self._write_response(writer, response)
self.proxy_fail_count = 0
return
except (httpx.TimeoutException, httpx.ConnectTimeout, httpx.ConnectError):
logging.warning(get_message('request_retry', self.language, retry_count-1))
await self.handle_proxy_failure()
retry_count -= 1
if retry_count > 0:
await asyncio.sleep(1)
continue
if key in self.connection_pool:
client = self.connection_pool[key]
else:
client = await self._create_client(proxy)
self.connection_pool[key] = client
async with client.stream(
method,
path,
headers=headers,
content=reader,
) as response:
writer.write(f'HTTP/1.1 {response.status_code} {response.reason_phrase}\r\n'.encode())
async for chunk in response.aiter_bytes(chunk_size=self.buffer_size):
writer.write(chunk)
if len(chunk) >= self.buffer_size:
await writer.drain()
await writer.drain()
except Exception as e:
logging.error(get_message('request_error', self.language, e))
await self.handle_proxy_failure()
break
writer.write(b'HTTP/1.1 502 Bad Gateway\r\n\r\n')
await writer.drain()
async def _write_response(self, writer, response):
try:
status_line = f'HTTP/1.1 {response.status_code} {response.reason_phrase}\r\n'
writer.write(status_line.encode('utf-8', errors='ignore'))
writer.write(b'Transfer-Encoding: chunked\r\n')
for name, value in response.headers.items():
if name.lower() not in ('transfer-encoding', 'connection'):
writer.write(f'{name}: {value}\r\n'.encode('utf-8', errors='ignore'))
writer.write(b'\r\n')
await writer.drain()
chunk_size = 256 * 1024
async for chunk in response.aiter_bytes(chunk_size=chunk_size):
if asyncio.current_task().cancelled():
raise asyncio.CancelledError
writer.write(f'{len(chunk):X}\r\n'.encode('utf-8', errors='ignore'))
writer.write(chunk)
writer.write(b'\r\n')
logging.error(f"请求处理错误: {e}")
writer.write(b'HTTP/1.1 502 Bad Gateway\r\n\r\n')
await writer.drain()
writer.write(b'0\r\n\r\n')
await writer.drain()
except Exception as e:
logging.error(get_message('response_write_error', self.language, e))
raise
finally:
await self._cleanup_connections()
async def _create_client(self, proxy):
return httpx.AsyncClient(
proxies={"all://": proxy},
limits=httpx.Limits(
max_keepalive_connections=100,
max_connections=1000,
keepalive_expiry=30
),
timeout=30.0,
http2=True,
)
async def _cleanup_connections(self):
current_time = time.time()
expired_keys = [
key for key, client in self.connection_pool.items()
if current_time - client._last_used > 30
]
for key in expired_keys:
client = self.connection_pool.pop(key)
await client.aclose()
async def check_current_proxy(self):
try:
@ -610,3 +570,47 @@ class AsyncProxyServer:
is_valid = await self._check_proxy_impl(proxy)
self.proxy_cache[proxy] = (current_time, is_valid)
return is_valid
async def handle_request(self, client_reader, client_writer, target_host, target_port):
for retry in range(self.retry_count):
try:
target_reader, target_writer = await asyncio.wait_for(
asyncio.open_connection(target_host, target_port),
timeout=self.timeout
)
forward_task = asyncio.create_task(
self.forward_data(client_reader, target_writer, "客户端 -> 目标")
)
backward_task = asyncio.create_task(
self.forward_data(target_reader, client_writer, "目标 -> 客户端")
)
await asyncio.gather(forward_task, backward_task)
break
except TimeoutError:
print(f"连接超时,重试 {retry + 1}/{self.retry_count}")
continue
except Exception as e:
print(f"代理转发错误: {e}")
if retry == self.retry_count - 1:
raise
continue
finally:
try:
target_writer.close()
await target_writer.wait_closed()
except:
pass
async def forward_data(self, reader, writer, direction):
try:
while True:
data = await asyncio.wait_for(reader.read(8192), timeout=self.timeout)
if not data:
break
writer.write(data)
await writer.drain()
except TimeoutError:
print(f"{direction} 数据传输超时")
except Exception as e:
print(f"{direction} 数据传输错误: {e}")