diff --git a/ProxyCat-CN/ProxyCat-V1.9.py b/ProxyCat-CN/ProxyCat-V1.9.py new file mode 100644 index 0000000..032e15b --- /dev/null +++ b/ProxyCat-CN/ProxyCat-V1.9.py @@ -0,0 +1,471 @@ +import threading, logoprint, argparse, logging, asyncio, socket, base64, getip, httpx, time, re, struct, random +from config import load_config, DEFAULT_CONFIG +from colorama import init, Fore, Style +from proxy_check import check_proxies +from update import check_for_updates +from banner import print_banner +from itertools import cycle + +init(autoreset=True) +class ColoredFormatter(logging.Formatter): + COLORS = { + logging.INFO: Fore.GREEN, + logging.WARNING: Fore.YELLOW, + logging.ERROR: Fore.RED, + logging.CRITICAL: Fore.RED + Style.BRIGHT, + } + + def format(self, record): + log_color = self.COLORS.get(record.levelno, Fore.WHITE) + record.msg = f"{log_color}{record.msg}{Style.RESET_ALL}" + return super().format(record) + +log_format = '%(asctime)s - %(levelname)s - %(message)s' +formatter = ColoredFormatter(log_format) + +console_handler = logging.StreamHandler() +console_handler.setFormatter(formatter) + +logging.basicConfig(level=logging.INFO, handlers=[console_handler]) + +def load_proxies(file_path='ip.txt'): + with open(file_path, 'r') as file: + return [line.strip() for line in file if '://' in line] + +def validate_proxy(proxy): + pattern = re.compile(r'^(?Psocks5|http|https)://(?P[^:]+):(?P\d+)$') + return pattern.match(proxy) is not None + +class AsyncProxyServer: + def __init__(self, config): + self.config = {**DEFAULT_CONFIG, **config} + self.username = self.config['username'].strip() + self.password = self.config['password'].strip() + self.auth_required = bool(self.username and self.password) + self.mode = self.config['mode'] + self.interval = int(self.config['interval']) + self.use_getip = self.config.get('use_getip', 'False').lower() == 'true' + self.proxy_file = self.config['proxy_file'] + self.proxies = self.load_proxies() + self.proxy_cycle = cycle(self.proxies) + self.current_proxy = next(self.proxy_cycle) if self.proxies else "没有可用的代理" + self.last_switch_time = time.time() + self.rate_limiter = asyncio.Queue(maxsize=3000) + self.proxy_failed = False + + def load_proxies(self): + proxies = load_proxies(self.proxy_file) + valid_proxies = [p for p in proxies if validate_proxy(p)] + + if self.use_getip: + valid_proxies = [] + for _ in range(4): + new_ip = getip.newip() + if validate_proxy(new_ip): + valid_proxies.append(new_ip) + break + else: + logging.error("多次尝试获取有效代理失败,退出程序") + exit(1) + + return valid_proxies + + async def get_next_proxy(self): + if self.mode == 'load_balance': + return random.choice(self.proxies) + elif self.mode == 'custom': + return await self.custom_proxy_switch() + + if time.time() - self.last_switch_time >= self.interval: + await self.get_proxy() + return self.current_proxy + + async def get_proxy(self): + self.current_proxy = getip.newip() if self.use_getip else next(self.proxy_cycle) + self.last_switch_time = time.time() + logging.info(f"切换到新的代理: {self.current_proxy}") + + async def custom_proxy_switch(self): + """ 自定义的代理切换逻辑 """ + return self.proxies[0] if self.proxies else "没有可用的代理" + + def time_until_next_switch(self): + return float('inf') if self.mode == 'load_balance' else max(0, self.interval - (time.time() - self.last_switch_time)) + + async def acquire(self): + await self.rate_limiter.put(None) + await asyncio.sleep(0.001) + self.rate_limiter.get_nowait() + + async def handle_client(self, reader, writer): + try: + # await self.acquire() + first_byte = await reader.read(1) + if not first_byte: + return + + if first_byte == b'\x05': + await self.handle_socks5_connection(reader, writer) + else: + await self._handle_client_impl(reader, writer, first_byte) + except asyncio.CancelledError: + logging.info("客户端处理取消") + except Exception as e: + logging.error(f"客户端处理出错: {e}") + finally: + writer.close() + await writer.wait_closed() + + async def handle_socks5_connection(self, reader, writer): + nmethods = ord(await reader.readexactly(1)) + await reader.readexactly(nmethods) + + writer.write(b'\x05\x02' if self.auth_required else b'\x05\x00') + await writer.drain() + + if self.auth_required: + auth_version = await reader.readexactly(1) + if auth_version != b'\x01': + writer.close() + return + + ulen = ord(await reader.readexactly(1)) + username = await reader.readexactly(ulen) + plen = ord(await reader.readexactly(1)) + password = await reader.readexactly(plen) + + if username.decode() != self.username or password.decode() != self.password: + writer.write(b'\x01\x01') + await writer.drain() + writer.close() + return + + writer.write(b'\x01\x00') + await writer.drain() + + version, cmd, _, atyp = struct.unpack('!BBBB', await reader.readexactly(4)) + if cmd != 1: + writer.write(b'\x05\x07\x00\x01\x00\x00\x00\x00\x00\x00') + await writer.drain() + writer.close() + return + + if atyp == 1: + dst_addr = socket.inet_ntoa(await reader.readexactly(4)) + elif atyp == 3: + addr_len = ord(await reader.readexactly(1)) + dst_addr = (await reader.readexactly(addr_len)).decode() + elif atyp == 4: + dst_addr = socket.inet_ntop(socket.AF_INET6, await reader.readexactly(16)) + else: + writer.write(b'\x05\x08\x00\x01\x00\x00\x00\x00\x00\x00') + await writer.drain() + writer.close() + return + + dst_port = struct.unpack('!H', await reader.readexactly(2))[0] + + try: + proxy = await self.get_next_proxy() + proxy_type, proxy_addr = proxy.split('://') + proxy_auth, proxy_host_port = self._split_proxy_auth(proxy_addr) + proxy_host, proxy_port = proxy_host_port.split(':') + proxy_port = int(proxy_port) + + remote_reader, remote_writer = await asyncio.open_connection(proxy_host, proxy_port) + + if proxy_type == 'socks5': + remote_writer.write(b'\x05\x01\x00') + await remote_writer.drain() + await remote_reader.readexactly(2) + + remote_writer.write(b'\x05\x01\x00' + (b'\x03' + len(dst_addr).to_bytes(1, 'big') + dst_addr.encode() if isinstance(dst_addr, str) else b'\x01' + socket.inet_aton(dst_addr)) + struct.pack('!H', dst_port)) + await remote_writer.drain() + + await remote_reader.readexactly(10) + elif proxy_type in ['http', 'https']: + connect_request = f'CONNECT {dst_addr}:{dst_port} HTTP/1.1\r\nHost: {dst_addr}:{dst_port}\r\n' + if proxy_auth: + connect_request += f'Proxy-Authorization: Basic {base64.b64encode(proxy_auth.encode()).decode()}\r\n' + connect_request += '\r\n' + remote_writer.write(connect_request.encode()) + await remote_writer.drain() + + while True: + line = await remote_reader.readline() + if line == b'\r\n': + break + + writer.write(b'\x05\x00\x00\x01\x00\x00\x00\x00\x00\x00') + await writer.drain() + + await asyncio.gather( + self._pipe(reader, remote_writer), + self._pipe(remote_reader, writer) + ) + except Exception as e: + logging.error(f"SOCKS5 连接错误: {e}") + writer.write(b'\x05\x01\x00\x01\x00\x00\x00\x00\x00\x00') + await writer.drain() + + async def _handle_client_impl(self, reader, writer, first_byte): + try: + request_line = first_byte + await reader.readline() + if not request_line: + return + + try: + method, path, _ = request_line.decode('utf-8', errors='ignore').split() + except ValueError: + #logging.error(f"无效的请求行: {request_line}") + return + + headers = {} + while True: + line = await reader.readline() + if line == b'\r\n': + break + if line == b'': + return + try: + name, value = line.decode('utf-8', errors='ignore').strip().split(': ', 1) + headers[name.lower()] = value + except ValueError: + #logging.error(f"无效的请求行: {line}") + continue + + if self.auth_required and not self._authenticate(headers): + writer.write(b'HTTP/1.1 407 Proxy Authentication Required\r\nProxy-Authenticate: Basic realm="Proxy"\r\n\r\n') + await writer.drain() + return + + if method == 'CONNECT': + await self._handle_connect(path, reader, writer) + else: + await self._handle_request(method, path, headers, reader, writer) + except asyncio.CancelledError: + raise + except Exception as e: + logging.error(f"处理客户端请求时出错: {e}") + + def _authenticate(self, headers): + if not self.auth_required: + return True + + auth = headers.get('proxy-authorization') + if not auth: + return False + try: + scheme, credentials = auth.split() + if scheme.lower() != 'basic': + return False + username, password = base64.b64decode(credentials).decode().split(':') + return username == self.username and password == self.password + except: + return False + + async def _handle_connect(self, path, reader, writer): + try: + host, port = path.split(':') + port = int(port) + except ValueError: + # logging.error(f"无效的连接路径: {path}") + writer.write(b'HTTP/1.1 400 Bad Request\r\n\r\n') + await writer.drain() + return + + proxy = await self.get_next_proxy() + proxy_type, proxy_addr = proxy.split('://') + proxy_auth, proxy_host_port = self._split_proxy_auth(proxy_addr) + proxy_host, proxy_port = proxy_host_port.split(':') + proxy_port = int(proxy_port) + + try: + remote_reader, remote_writer = await asyncio.wait_for(asyncio.open_connection(proxy_host, proxy_port),timeout=10) + + if proxy_type == 'http': + connect_headers = [f'CONNECT {host}:{port} HTTP/1.1', f'Host: {host}:{port}'] + if proxy_auth: + auth_header = f'Proxy-Authorization: Basic {base64.b64encode(proxy_auth.encode()).decode()}' + connect_headers.append(auth_header) + connect_request = '\r\n'.join(connect_headers) + '\r\n\r\n' + remote_writer.write(connect_request.encode()) + await remote_writer.drain() + response = await remote_reader.readline() + if not response.startswith(b'HTTP/1.1 200'): + raise Exception("Bad Gateway") + while (await remote_reader.readline()) != b'\r\n': + pass + elif proxy_type == 'socks5': + remote_writer.write(b'\x05\x01\x00') + await remote_writer.drain() + if (await remote_reader.read(2))[1] == 0: + remote_writer.write(b'\x05\x01\x00\x03' + len(host).to_bytes(1, 'big') + host.encode() + port.to_bytes(2, 'big')) + await remote_writer.drain() + if (await remote_reader.read(10))[1] != 0: + raise Exception("Bad Gateway") + else: + raise Exception("Unsupported proxy type") + + writer.write(b'HTTP/1.1 200 Connection Established\r\n\r\n') + await writer.drain() + + await asyncio.gather( + self._pipe(reader, remote_writer), + self._pipe(remote_reader, writer) + ) + except asyncio.TimeoutError: + logging.error("连接超时") + writer.write(b'HTTP/1.1 504 Gateway Timeout\r\n\r\n') + await writer.drain() + except Exception as e: + logging.error(f"代理地址失效,切换代理地址") + if not self.proxy_failed: + self.proxy_failed = True + await self.get_proxy() + else: + self.proxy_failed = False + + def _split_proxy_auth(self, proxy_addr): + match = re.match(r'((?P.+?):(?P.+?)@)?(?P.+)', proxy_addr) + if match: + username = match.group('username') + password = match.group('password') + host = match.group('host') + if username and password: + return f"{username}:{password}", host + return None, proxy_addr + + async def _pipe(self, reader, writer): + try: + while True: + try: + data = await reader.read(8192) + if not data: + break + writer.write(data) + await writer.drain() + except asyncio.CancelledError: + break + finally: + writer.close() + await writer.wait_closed() + + async def _handle_request(self, method, path, headers, reader, writer): + body = await reader.read() + proxy = await self.get_next_proxy() + proxy_type, proxy_addr = proxy.split('://') + proxy_auth, proxy_host_port = self._split_proxy_auth(proxy_addr) + + client_kwargs = { + "limits": httpx.Limits(max_keepalive_connections=500, max_connections=3000), + "timeout": 30, + } + + if proxy_type in ['http', 'https']: + client_kwargs["proxies"] = {proxy_type: f"{proxy_type}://{proxy_host_port}"} + elif proxy_type == 'socks5': + client_kwargs["transport"] = httpx.AsyncHTTPTransport(proxy=f"{proxy_type}://{proxy_host_port}") + + if proxy_auth: + headers['Proxy-Authorization'] = f'Basic {base64.b64encode(proxy_auth.encode()).decode()}' + + async with httpx.AsyncClient(**client_kwargs) as client: + try: + async with client.stream(method, path, headers=headers, content=body) as response: + await self._write_response(writer, response) + except asyncio.CancelledError: + raise + except Exception as e: + logging.error(f"请求处理出错: {e}") + writer.write(b'HTTP/1.1 502 Bad Gateway\r\n\r\n') + await writer.drain() + + async def _write_response(self, writer, response): + writer.write(f'HTTP/1.1 {response.status_code} {response.reason_phrase}\r\n'.encode('utf-8', errors='ignore')) + writer.write(b'Transfer-Encoding: chunked\r\n') + for name, value in response.headers.items(): + if name.lower() != 'transfer-encoding': + writer.write(f'{name}: {value}\r\n'.encode('utf-8', errors='ignore')) + writer.write(b'\r\n') + await writer.drain() + + async for chunk in response.aiter_bytes(chunk_size=8192): + if asyncio.current_task().cancelled(): + raise asyncio.CancelledError() + writer.write(f'{len(chunk):X}\r\n'.encode('utf-8', errors='ignore')) + writer.write(chunk) + writer.write(b'\r\n') + await writer.drain() + writer.write(b'0\r\n\r\n') + await writer.drain() + +def update_status(server): + while True: + if server.mode == 'load_balance': + status = f"\r{Fore.YELLOW}当前代理: {Fore.GREEN}{server.current_proxy}" + else: + time_left = server.time_until_next_switch() + status = f"\r{Fore.YELLOW}当前代理: {Fore.GREEN}{server.current_proxy} | {Fore.YELLOW}下次切换: {Fore.GREEN}{time_left:.1f}秒" + print(status, end='', flush=True) + time.sleep(1) + +async def handle_client_wrapper(server, reader, writer, clients): + task = asyncio.create_task(server.handle_client(reader, writer)) + clients.add(task) + try: + await task + except Exception as e: + logging.error(f"客户端处理出错: {e}") + finally: + clients.remove(task) + +async def run_server(server): + clients = set() + server_instance = None + try: + server_instance = await asyncio.start_server(lambda r, w: handle_client_wrapper(server, r, w, clients),'0.0.0.0', int(server.config['port'])) + async with server_instance: + await server_instance.serve_forever() + except asyncio.CancelledError: + logging.info("服务器正在关闭...") + finally: + if server_instance: + server_instance.close() + await server_instance.wait_closed() + for client in clients: + client.cancel() + await asyncio.gather(*clients, return_exceptions=True) + +async def run_proxy_check(server): + if server.config.get('check_proxies', 'False').lower() == 'true': + logging.info("开始检测代理地址...") + valid_proxies = await check_proxies(server.proxies) + if valid_proxies: + server.proxies = valid_proxies + server.proxy_cycle = cycle(valid_proxies) + server.current_proxy = next(server.proxy_cycle) + logging.info(f"有效代理地址: {valid_proxies}") + else: + logging.error("没有有效的代理地址") + else: + logging.info("代理检测已禁用") + +if __name__ == '__main__': + parser = argparse.ArgumentParser(description=logoprint.logos()) + parser.add_argument('-c', '--config', default='config.ini', help='配置文件路径') + args = parser.parse_args() + + config = load_config(args.config) + server = AsyncProxyServer(config) + print_banner(config) + asyncio.run(check_for_updates()) + asyncio.run(run_proxy_check(server)) + + status_thread = threading.Thread(target=update_status, args=(server,), daemon=True) + status_thread.start() + + try: + asyncio.run(run_server(server)) + except KeyboardInterrupt: + logging.info("程序被用户中断") diff --git a/ProxyCat-CN/banner.py b/ProxyCat-CN/banner.py new file mode 100644 index 0000000..6c60d6c --- /dev/null +++ b/ProxyCat-CN/banner.py @@ -0,0 +1,18 @@ +from colorama import Fore + +def print_banner(config): + auth_info = f"{config.get('username')}:{config.get('password')}" if config.get('username') and config.get('password') else "未设置 (无需认证)" + banner_info = [ + ('公众号', '樱花庄的本间白猫'), + ('博客', 'https://y.shironekosan.cn'), + ('代理轮换模式', '循环' if config.get('mode') == 'cycle' else '负载均衡' if config.get('mode') == 'load_balance' else '单轮'), + ('代理更换时间', f"{config.get('interval')}秒"), + ('默认账号密码', auth_info), + ('本地监听地址 (HTTP)', f"http://{auth_info + '@' if auth_info else ''}127.0.0.1:{config.get('port')}"), + ('本地监听地址 (SOCKS5)', f"socks5://{auth_info + '@' if auth_info else ''}127.0.0.1:{config.get('port')}"), + ('开源项目求 Star', 'https://github.com/honmashironeko/ProxyCat'), + ] + print(f"{Fore.MAGENTA}{'=' * 55}") + for key, value in banner_info: + print(f"{Fore.YELLOW}{key}: {Fore.GREEN}{value}") + print(f"{Fore.MAGENTA}{'=' * 55}\n") diff --git a/ProxyCat-CN/config.ini b/ProxyCat-CN/config.ini index 757ead5..d958437 100644 --- a/ProxyCat-CN/config.ini +++ b/ProxyCat-CN/config.ini @@ -15,7 +15,7 @@ username = neko password = 123456 # 是否使用 getip 模块获取代理地址 True or False(默认为:False) -use_getip = False +use_getip = True # 代理地址列表文件(默认为:ip.txt) proxy_file = ip.txt diff --git a/ProxyCat-CN/config.py b/ProxyCat-CN/config.py new file mode 100644 index 0000000..58f7fe2 --- /dev/null +++ b/ProxyCat-CN/config.py @@ -0,0 +1,16 @@ +import configparser + +DEFAULT_CONFIG = { + 'port': 1080, + 'mode': 'cycle', + 'interval': 300, + 'username': '', + 'password': '', + 'use_getip': False, + 'proxy_file': 'ip.txt', +} + +def load_config(config_file='config.ini'): + config = configparser.ConfigParser() + config.read(config_file, encoding='utf-8') + return {k: v for k, v in config['SETTINGS'].items()} \ No newline at end of file diff --git a/ProxyCat-CN/proxy_check.py b/ProxyCat-CN/proxy_check.py new file mode 100644 index 0000000..95ad449 --- /dev/null +++ b/ProxyCat-CN/proxy_check.py @@ -0,0 +1,54 @@ +import httpx +import socket +import re +import asyncio +import logging + +async def check_proxy(proxy): + proxy_type = proxy.split('://')[0] + check_funcs = { + 'http': check_http_proxy, + 'https': check_https_proxy, + 'socks5': check_socks_proxy + } + + if proxy_type not in check_funcs: + return False + + try: + return await check_funcs[proxy_type](proxy) + except Exception as e: + logging.error(f"{proxy_type.upper()}代理 {proxy} 检测失败: {e}") + return False + +async def check_http_proxy(proxy): + async with httpx.AsyncClient(proxies={'http://': proxy}, timeout=10) as client: + response = await client.get('http://www.baidu.com') + return response.status_code == 200 + +async def check_https_proxy(proxy): + async with httpx.AsyncClient(proxies={'https://': proxy}, timeout=10) as client: + response = await client.get('https://www.baidu.com') + return response.status_code == 200 + +async def check_socks_proxy(proxy): + proxy_type, proxy_addr = proxy.split('://') + proxy_host, proxy_port = proxy_addr.split(':') + proxy_port = int(proxy_port) + try: + reader, writer = await asyncio.wait_for(asyncio.open_connection(proxy_host, proxy_port), timeout=5) + writer.write(b'\x05\x01\x00') + await writer.drain() + response = await asyncio.wait_for(reader.readexactly(2), timeout=5) + writer.close() + await writer.wait_closed() + return response == b'\x05\x00' + except Exception: + return False + +async def check_proxies(proxies): + valid_proxies = [] + for proxy in proxies: + if await check_proxy(proxy): + valid_proxies.append(proxy) + return valid_proxies diff --git a/ProxyCat-CN/update.py b/ProxyCat-CN/update.py new file mode 100644 index 0000000..1f3afdd --- /dev/null +++ b/ProxyCat-CN/update.py @@ -0,0 +1,26 @@ +from colorama import Fore +from packaging import version +import httpx, asyncio, re + +async def check_for_updates(): + + try: + async with httpx.AsyncClient() as client: + response = await asyncio.wait_for(client.get("https://y.shironekosan.cn/1.html"), timeout=10) + response.raise_for_status() + content = response.text + match = re.search(r'

(ProxyCat-V\d+\.\d+)

', content) + if match: + latest_version = match.group(1) + CURRENT_VERSION = "ProxyCat-V1.7" + if version.parse(latest_version.split('-V')[1]) > version.parse(CURRENT_VERSION.split('-V')[1]): + print(f"{Fore.YELLOW}发现新版本!当前版本: {CURRENT_VERSION}, 最新版本: {latest_version}") + print(f"{Fore.YELLOW}请访问 https://pan.quark.cn/s/39b4b5674570 获取最新版本。") + print(f"{Fore.YELLOW}请访问 https://github.com/honmashironeko/ProxyCat 获取最新版本。") + print(f"{Fore.YELLOW}请访问 https://pan.baidu.com/s/1C9LVC9aiaQeYFSj_2mWH1w?pwd=13r5 获取最新版本。") + else: + print(f"{Fore.GREEN}当前版本已是最新 ({CURRENT_VERSION})") + else: + print(f"{Fore.RED}无法在响应中找到版本信息") + except Exception as e: + print(f"{Fore.RED}检查更新时发生错误: {e}") \ No newline at end of file diff --git a/ProxyCat-EN/ProxyCat-V1.9.py b/ProxyCat-EN/ProxyCat-V1.9.py new file mode 100644 index 0000000..f0d2fca --- /dev/null +++ b/ProxyCat-EN/ProxyCat-V1.9.py @@ -0,0 +1,472 @@ +import threading, logoprint, argparse, logging, asyncio, socket, base64, getip, httpx, time, re, struct, random +from config import load_config, DEFAULT_CONFIG +from colorama import init, Fore, Style +from proxy_check import check_proxies +from update import check_for_updates +from banner import print_banner +from itertools import cycle + +init(autoreset=True) +class ColoredFormatter(logging.Formatter): + COLORS = { + logging.INFO: Fore.GREEN, + logging.WARNING: Fore.YELLOW, + logging.ERROR: Fore.RED, + logging.CRITICAL: Fore.RED + Style.BRIGHT, + } + + def format(self, record): + log_color = self.COLORS.get(record.levelno, Fore.WHITE) + record.msg = f"{log_color}{record.msg}{Style.RESET_ALL}" + return super().format(record) + +log_format = '%(asctime)s - %(levelname)s - %(message)s' +formatter = ColoredFormatter(log_format) + +console_handler = logging.StreamHandler() +console_handler.setFormatter(formatter) + +logging.basicConfig(level=logging.INFO, handlers=[console_handler]) + +def load_proxies(file_path='ip.txt'): + with open(file_path, 'r') as file: + return [line.strip() for line in file if '://' in line] + +def validate_proxy(proxy): + pattern = re.compile(r'^(?Psocks5|http|https)://(?P[^:]+):(?P\d+)$') + return pattern.match(proxy) is not None + +class AsyncProxyServer: + def __init__(self, config): + self.config = {**DEFAULT_CONFIG, **config} + self.username = self.config['username'].strip() + self.password = self.config['password'].strip() + self.auth_required = bool(self.username and self.password) + self.mode = self.config['mode'] + self.interval = int(self.config['interval']) + self.use_getip = self.config.get('use_getip', 'False').lower() == 'true' + self.proxy_file = self.config['proxy_file'] + self.proxies = self.load_proxies() + self.proxy_cycle = cycle(self.proxies) + self.current_proxy = next(self.proxy_cycle) if self.proxies else "No available agents" + self.last_switch_time = time.time() + self.rate_limiter = asyncio.Queue(maxsize=3000) + self.proxy_failed = False + + def load_proxies(self): + proxies = load_proxies(self.proxy_file) + valid_proxies = [p for p in proxies if validate_proxy(p)] + + if self.use_getip: + valid_proxies = [] + for _ in range(4): + new_ip = getip.newip() + if validate_proxy(new_ip): + valid_proxies.append(new_ip) + break + else: + logging.error("Failed to obtain a valid proxy multiple times, exiting the program") + exit(1) + + return valid_proxies + + async def get_next_proxy(self): + if self.mode == 'load_balance': + return random.choice(self.proxies) + elif self.mode == 'custom': + return await self.custom_proxy_switch() + + if time.time() - self.last_switch_time >= self.interval: + await self.get_proxy() + return self.current_proxy + + async def get_proxy(self): + self.current_proxy = getip.newip() if self.use_getip else next(self.proxy_cycle) + self.last_switch_time = time.time() + logging.info(f"Switch to the new proxy: {self.current_proxy}") + + async def custom_proxy_switch(self): + """ Custom proxy switching logic """ + return self.proxies[0] if self.proxies else "No available agents" + + def time_until_next_switch(self): + return float('inf') if self.mode == 'load_balance' else max(0, self.interval - (time.time() - self.last_switch_time)) + + async def acquire(self): + await self.rate_limiter.put(None) + await asyncio.sleep(0.001) + self.rate_limiter.get_nowait() + + async def handle_client(self, reader, writer): + try: + # await self.acquire() + first_byte = await reader.read(1) + if not first_byte: + return + + if first_byte == b'\x05': + await self.handle_socks5_connection(reader, writer) + else: + await self._handle_client_impl(reader, writer, first_byte) + except asyncio.CancelledError: + logging.info("Client processing canceled") + except Exception as e: + logging.error(f"Error processing client: {e}") + finally: + writer.close() + await writer.wait_closed() + + async def handle_socks5_connection(self, reader, writer): + nmethods = ord(await reader.readexactly(1)) + await reader.readexactly(nmethods) + + writer.write(b'\x05\x02' if self.auth_required else b'\x05\x00') + await writer.drain() + + if self.auth_required: + auth_version = await reader.readexactly(1) + if auth_version != b'\x01': + writer.close() + return + + ulen = ord(await reader.readexactly(1)) + username = await reader.readexactly(ulen) + plen = ord(await reader.readexactly(1)) + password = await reader.readexactly(plen) + + if username.decode() != self.username or password.decode() != self.password: + writer.write(b'\x01\x01') + await writer.drain() + writer.close() + return + + writer.write(b'\x01\x00') + await writer.drain() + + version, cmd, _, atyp = struct.unpack('!BBBB', await reader.readexactly(4)) + if cmd != 1: + writer.write(b'\x05\x07\x00\x01\x00\x00\x00\x00\x00\x00') + await writer.drain() + writer.close() + return + + if atyp == 1: + dst_addr = socket.inet_ntoa(await reader.readexactly(4)) + elif atyp == 3: + addr_len = ord(await reader.readexactly(1)) + dst_addr = (await reader.readexactly(addr_len)).decode() + elif atyp == 4: + dst_addr = socket.inet_ntop(socket.AF_INET6, await reader.readexactly(16)) + else: + writer.write(b'\x05\x08\x00\x01\x00\x00\x00\x00\x00\x00') + await writer.drain() + writer.close() + return + + dst_port = struct.unpack('!H', await reader.readexactly(2))[0] + + try: + proxy = await self.get_next_proxy() + proxy_type, proxy_addr = proxy.split('://') + proxy_auth, proxy_host_port = self._split_proxy_auth(proxy_addr) + proxy_host, proxy_port = proxy_host_port.split(':') + proxy_port = int(proxy_port) + + remote_reader, remote_writer = await asyncio.open_connection(proxy_host, proxy_port) + + if proxy_type == 'socks5': + remote_writer.write(b'\x05\x01\x00') + await remote_writer.drain() + await remote_reader.readexactly(2) + + remote_writer.write(b'\x05\x01\x00' + (b'\x03' + len(dst_addr).to_bytes(1, 'big') + dst_addr.encode() if isinstance(dst_addr, str) else b'\x01' + socket.inet_aton(dst_addr)) + struct.pack('!H', dst_port)) + await remote_writer.drain() + + await remote_reader.readexactly(10) + elif proxy_type in ['http', 'https']: + connect_request = f'CONNECT {dst_addr}:{dst_port} HTTP/1.1\r\nHost: {dst_addr}:{dst_port}\r\n' + if proxy_auth: + connect_request += f'Proxy-Authorization: Basic {base64.b64encode(proxy_auth.encode()).decode()}\r\n' + connect_request += '\r\n' + remote_writer.write(connect_request.encode()) + await remote_writer.drain() + + while True: + line = await remote_reader.readline() + if line == b'\r\n': + break + + writer.write(b'\x05\x00\x00\x01\x00\x00\x00\x00\x00\x00') + await writer.drain() + + await asyncio.gather( + self._pipe(reader, remote_writer), + self._pipe(remote_reader, writer) + ) + except Exception as e: + logging.error(f"SOCKS5 Connection Error: {e}") + writer.write(b'\x05\x01\x00\x01\x00\x00\x00\x00\x00\x00') + await writer.drain() + + async def _handle_client_impl(self, reader, writer, first_byte): + try: + request_line = first_byte + await reader.readline() + if not request_line: + return + + try: + method, path, _ = request_line.decode('utf-8', errors='ignore').split() + except ValueError: + #logging.error(f"无效的请求行: {request_line}") + return + + headers = {} + while True: + line = await reader.readline() + if line == b'\r\n': + break + if line == b'': + return + try: + name, value = line.decode('utf-8', errors='ignore').strip().split(': ', 1) + headers[name.lower()] = value + except ValueError: + #logging.error(f"无效的请求行: {line}") + continue + + if self.auth_required and not self._authenticate(headers): + writer.write(b'HTTP/1.1 407 Proxy Authentication Required\r\nProxy-Authenticate: Basic realm="Proxy"\r\n\r\n') + await writer.drain() + return + + if method == 'CONNECT': + await self._handle_connect(path, reader, writer) + else: + await self._handle_request(method, path, headers, reader, writer) + except asyncio.CancelledError: + raise + except Exception as e: + logging.error(f"Error processing client request: {e}") + + def _authenticate(self, headers): + if not self.auth_required: + return True + + auth = headers.get('proxy-authorization') + if not auth: + return False + try: + scheme, credentials = auth.split() + if scheme.lower() != 'basic': + return False + username, password = base64.b64decode(credentials).decode().split(':') + return username == self.username and password == self.password + except: + return False + + async def _handle_connect(self, path, reader, writer): + try: + host, port = path.split(':') + port = int(port) + except ValueError: + # logging.error(f"无效的连接路径: {path}") + writer.write(b'HTTP/1.1 400 Bad Request\r\n\r\n') + await writer.drain() + return + + proxy = await self.get_next_proxy() + proxy_type, proxy_addr = proxy.split('://') + proxy_auth, proxy_host_port = self._split_proxy_auth(proxy_addr) + proxy_host, proxy_port = proxy_host_port.split(':') + proxy_port = int(proxy_port) + + try: + remote_reader, remote_writer = await asyncio.wait_for(asyncio.open_connection(proxy_host, proxy_port),timeout=10) + + if proxy_type == 'http': + connect_headers = [f'CONNECT {host}:{port} HTTP/1.1', f'Host: {host}:{port}'] + if proxy_auth: + auth_header = f'Proxy-Authorization: Basic {base64.b64encode(proxy_auth.encode()).decode()}' + connect_headers.append(auth_header) + connect_request = '\r\n'.join(connect_headers) + '\r\n\r\n' + remote_writer.write(connect_request.encode()) + await remote_writer.drain() + response = await remote_reader.readline() + if not response.startswith(b'HTTP/1.1 200'): + raise Exception("Bad Gateway") + while (await remote_reader.readline()) != b'\r\n': + pass + elif proxy_type == 'socks5': + remote_writer.write(b'\x05\x01\x00') + await remote_writer.drain() + if (await remote_reader.read(2))[1] == 0: + remote_writer.write(b'\x05\x01\x00\x03' + len(host).to_bytes(1, 'big') + host.encode() + port.to_bytes(2, 'big')) + await remote_writer.drain() + if (await remote_reader.read(10))[1] != 0: + raise Exception("Bad Gateway") + else: + raise Exception("Unsupported proxy type") + + writer.write(b'HTTP/1.1 200 Connection Established\r\n\r\n') + await writer.drain() + + await asyncio.gather( + self._pipe(reader, remote_writer), + self._pipe(remote_reader, writer) + ) + except asyncio.TimeoutError: + logging.error("Connection timed out") + writer.write(b'HTTP/1.1 504 Gateway Timeout\r\n\r\n') + await writer.drain() + except Exception as e: + logging.error(f"Proxy address is invalid, switching proxy address") + + if not self.proxy_failed: + self.proxy_failed = True + await self.get_proxy() + else: + self.proxy_failed = False + + def _split_proxy_auth(self, proxy_addr): + match = re.match(r'((?P.+?):(?P.+?)@)?(?P.+)', proxy_addr) + if match: + username = match.group('username') + password = match.group('password') + host = match.group('host') + if username and password: + return f"{username}:{password}", host + return None, proxy_addr + + async def _pipe(self, reader, writer): + try: + while True: + try: + data = await reader.read(8192) + if not data: + break + writer.write(data) + await writer.drain() + except asyncio.CancelledError: + break + finally: + writer.close() + await writer.wait_closed() + + async def _handle_request(self, method, path, headers, reader, writer): + body = await reader.read() + proxy = await self.get_next_proxy() + proxy_type, proxy_addr = proxy.split('://') + proxy_auth, proxy_host_port = self._split_proxy_auth(proxy_addr) + + client_kwargs = { + "limits": httpx.Limits(max_keepalive_connections=500, max_connections=3000), + "timeout": 30, + } + + if proxy_type in ['http', 'https']: + client_kwargs["proxies"] = {proxy_type: f"{proxy_type}://{proxy_host_port}"} + elif proxy_type == 'socks5': + client_kwargs["transport"] = httpx.AsyncHTTPTransport(proxy=f"{proxy_type}://{proxy_host_port}") + + if proxy_auth: + headers['Proxy-Authorization'] = f'Basic {base64.b64encode(proxy_auth.encode()).decode()}' + + async with httpx.AsyncClient(**client_kwargs) as client: + try: + async with client.stream(method, path, headers=headers, content=body) as response: + await self._write_response(writer, response) + except asyncio.CancelledError: + raise + except Exception as e: + logging.error(f"Error processing request: {e}") + writer.write(b'HTTP/1.1 502 Bad Gateway\r\n\r\n') + await writer.drain() + + async def _write_response(self, writer, response): + writer.write(f'HTTP/1.1 {response.status_code} {response.reason_phrase}\r\n'.encode('utf-8', errors='ignore')) + writer.write(b'Transfer-Encoding: chunked\r\n') + for name, value in response.headers.items(): + if name.lower() != 'transfer-encoding': + writer.write(f'{name}: {value}\r\n'.encode('utf-8', errors='ignore')) + writer.write(b'\r\n') + await writer.drain() + + async for chunk in response.aiter_bytes(chunk_size=8192): + if asyncio.current_task().cancelled(): + raise asyncio.CancelledError() + writer.write(f'{len(chunk):X}\r\n'.encode('utf-8', errors='ignore')) + writer.write(chunk) + writer.write(b'\r\n') + await writer.drain() + writer.write(b'0\r\n\r\n') + await writer.drain() + +def update_status(server): + while True: + if server.mode == 'load_balance': + status = f"\r{Fore.YELLOW}Current proxy: {Fore.GREEN}{server.current_proxy}" + else: + time_left = server.time_until_next_switch() + status = f"\r{Fore.YELLOW}Current proxy: {Fore.GREEN}{server.current_proxy} | {Fore.YELLOW}Switch next time: {Fore.GREEN}{time_left:.1f}s" + print(status, end='', flush=True) + time.sleep(1) + +async def handle_client_wrapper(server, reader, writer, clients): + task = asyncio.create_task(server.handle_client(reader, writer)) + clients.add(task) + try: + await task + except Exception as e: + logging.error(f"Error processing client: {e}") + finally: + clients.remove(task) + +async def run_server(server): + clients = set() + server_instance = None + try: + server_instance = await asyncio.start_server(lambda r, w: handle_client_wrapper(server, r, w, clients),'0.0.0.0', int(server.config['port'])) + async with server_instance: + await server_instance.serve_forever() + except asyncio.CancelledError: + logging.info("The server is shutting down...") + finally: + if server_instance: + server_instance.close() + await server_instance.wait_closed() + for client in clients: + client.cancel() + await asyncio.gather(*clients, return_exceptions=True) + +async def run_proxy_check(server): + if server.config.get('check_proxies', 'False').lower() == 'true': + logging.info("Starting to check proxy addresses...") + valid_proxies = await check_proxies(server.proxies) + if valid_proxies: + server.proxies = valid_proxies + server.proxy_cycle = cycle(valid_proxies) + server.current_proxy = next(server.proxy_cycle) + logging.info(f"Valid proxy addresses: {valid_proxies}") + else: + logging.error("No valid proxy addresses") + else: + logging.info("Proxy checking is disabled") + +if __name__ == '__main__': + parser = argparse.ArgumentParser(description=logoprint.logos()) + parser.add_argument('-c', '--config', default='config.ini', help='Configuration file path') + args = parser.parse_args() + + config = load_config(args.config) + server = AsyncProxyServer(config) + print_banner(config) + asyncio.run(check_for_updates()) + asyncio.run(run_proxy_check(server)) + + status_thread = threading.Thread(target=update_status, args=(server,), daemon=True) + status_thread.start() + + try: + asyncio.run(run_server(server)) + except KeyboardInterrupt: + logging.info("The program was interrupted by the user") diff --git a/ProxyCat-EN/README-EN.md b/ProxyCat-EN/README-EN.md index 92cc153..49e74f5 100644 --- a/ProxyCat-EN/README-EN.md +++ b/ProxyCat-EN/README-EN.md @@ -35,6 +35,10 @@ In summary, **ProxyCat** was born! This tool aims to transform short-lived IPs, ## Features +**Upstream multi protocol monitoring** + +- **Dual protocol support**: Supports SOCKS5 and HTTP protocol listening, adapts to more tools. + **Multi-Protocol Support** - **SOCKS5 Proxy**: Supports the SOCKS5 protocol, suitable for various network environments. @@ -54,6 +58,7 @@ In summary, **ProxyCat** was born! This tool aims to transform short-lived IPs, - **Automatic Validity Detection**: Automatically checks the availability of proxies at startup, filtering out invalid proxies to ensure the reliability of the proxy list. - **Supports Multiple Protocols for Verification**: Specifically checks HTTP, HTTPS, and SOCKS5 proxies to improve validation accuracy. +- **Support proxy failure switching**: In the process of forwarding traffic, if the proxy server suddenly fails, it can automatically switch to a new proxy. **Authentication Mechanism** @@ -204,6 +209,11 @@ After actual testing, when the proxy address server has sufficient performance, ## Changelog +### **2024/10/23** + +- Refactor the code structure and split some of the code into separate files. +- During the proxy process, if the proxy server suddenly fails, it will automatically request to replace the proxy server and reset the replacement timer. + ### 2024/09/29 - Removed the less-used single cycle mode and replaced it with a custom mode, allowing users to customize the proxy switching logic based on needs. @@ -277,6 +287,9 @@ After actual testing, when the proxy address server has sufficient performance, - [x] Performed batch validity checks on proxy servers in `ip.txt` during the first startup. - [x] Added local SOCKS protocol listening or fully switched to SOCKS to adapt to more software. - [ ] Added detailed logging to record the identity of all IPs connecting to ProxyCat and support multiple users. +- [ ] Increase the web UI and provide a more powerful and user-friendly interface. +- [ ] Add Docker one click deployment, simple and easy to use. +- [ ] Develop a babycat module that can run babycat on any server or host, turning it into a proxy server. If you have good ideas or encounter bugs during use, please contact the author through the following methods to provide feedback! diff --git a/ProxyCat-EN/banner.py b/ProxyCat-EN/banner.py new file mode 100644 index 0000000..dc12ab8 --- /dev/null +++ b/ProxyCat-EN/banner.py @@ -0,0 +1,18 @@ +from colorama import Fore + +def print_banner(config): + auth_info = f"{config.get('username')}:{config.get('password')}" if config.get('username') and config.get('password') else "Not set (no authentication required)" + banner_info = [ + ('Public Account', 'Cherry Blossom Manor\'s Main White Cat'), + ('Blog', 'https://y.shironekosan.cn'), + ('Proxy Rotation Mode', 'Cycle' if config.get('mode') == 'cycle' else 'Load Balance' if config.get('mode') == 'load_balance' else 'Single Round'), + ('Proxy Change Interval', f"{config.get('interval')} seconds"), + ('Default Username and Password', auth_info), + ('Local Listening Address (HTTP)', f"http://{auth_info + '@' if auth_info else ''}127.0.0.1:{config.get('port')}"), + ('Local Listening Address (SOCKS5)', f"socks5://{auth_info + '@' if auth_info else ''}127.0.0.1:{config.get('port')}"), + ('Open Source Project Seeking Star', 'https://github.com/honmashironeko/ProxyCat'), + ] + print(f"{Fore.MAGENTA}{'=' * 55}") + for key, value in banner_info: + print(f"{Fore.YELLOW}{key}: {Fore.GREEN}{value}") + print(f"{Fore.MAGENTA}{'=' * 55}\n") diff --git a/ProxyCat-EN/config.py b/ProxyCat-EN/config.py new file mode 100644 index 0000000..58f7fe2 --- /dev/null +++ b/ProxyCat-EN/config.py @@ -0,0 +1,16 @@ +import configparser + +DEFAULT_CONFIG = { + 'port': 1080, + 'mode': 'cycle', + 'interval': 300, + 'username': '', + 'password': '', + 'use_getip': False, + 'proxy_file': 'ip.txt', +} + +def load_config(config_file='config.ini'): + config = configparser.ConfigParser() + config.read(config_file, encoding='utf-8') + return {k: v for k, v in config['SETTINGS'].items()} \ No newline at end of file diff --git a/ProxyCat-EN/proxy_check.py b/ProxyCat-EN/proxy_check.py new file mode 100644 index 0000000..ae978f1 --- /dev/null +++ b/ProxyCat-EN/proxy_check.py @@ -0,0 +1,54 @@ +import httpx +import socket +import re +import asyncio +import logging + +async def check_proxy(proxy): + proxy_type = proxy.split('://')[0] + check_funcs = { + 'http': check_http_proxy, + 'https': check_https_proxy, + 'socks5': check_socks_proxy + } + + if proxy_type not in check_funcs: + return False + + try: + return await check_funcs[proxy_type](proxy) + except Exception as e: + logging.error(f"{proxy_type.upper()}Proxy {proxy} detection failed: {e}") + return False + +async def check_http_proxy(proxy): + async with httpx.AsyncClient(proxies={'http://': proxy}, timeout=10) as client: + response = await client.get('http://www.google.com') + return response.status_code == 200 + +async def check_https_proxy(proxy): + async with httpx.AsyncClient(proxies={'https://': proxy}, timeout=10) as client: + response = await client.get('https://www.google.com') + return response.status_code == 200 + +async def check_socks_proxy(proxy): + proxy_type, proxy_addr = proxy.split('://') + proxy_host, proxy_port = proxy_addr.split(':') + proxy_port = int(proxy_port) + try: + reader, writer = await asyncio.wait_for(asyncio.open_connection(proxy_host, proxy_port), timeout=5) + writer.write(b'\x05\x01\x00') + await writer.drain() + response = await asyncio.wait_for(reader.readexactly(2), timeout=5) + writer.close() + await writer.wait_closed() + return response == b'\x05\x00' + except Exception: + return False + +async def check_proxies(proxies): + valid_proxies = [] + for proxy in proxies: + if await check_proxy(proxy): + valid_proxies.append(proxy) + return valid_proxies diff --git a/ProxyCat-EN/update.py b/ProxyCat-EN/update.py new file mode 100644 index 0000000..14f39f5 --- /dev/null +++ b/ProxyCat-EN/update.py @@ -0,0 +1,26 @@ +from colorama import Fore +from packaging import version +import httpx, asyncio, re + +async def check_for_updates(): + + try: + async with httpx.AsyncClient() as client: + response = await asyncio.wait_for(client.get("https://y.shironekosan.cn/1.html"), timeout=10) + response.raise_for_status() + content = response.text + match = re.search(r'

(ProxyCat-V\d+\.\d+)

', content) + if match: + latest_version = match.group(1) + CURRENT_VERSION = "ProxyCat-V1.7" + if version.parse(latest_version.split('-V')[1]) > version.parse(CURRENT_VERSION.split('-V')[1]): + print(f"{Fore.YELLOW}New version found! Current version: {CURRENT_VERSION}, Latest version: {latest_version}") + print(f"{Fore.YELLOW}Please visit https://pan.quark.cn/s/39b4b5674570 to get the latest version.") + print(f"{Fore.YELLOW}Please visit https://github.com/honmashironeko/ProxyCat to get the latest version.") + print(f"{Fore.YELLOW}Please visit https://pan.baidu.com/s/1C9LVC9aiaQeYFSj_2mWH1w?pwd=13r5 to get the latest version.") + else: + print(f"{Fore.GREEN}The current version is up to date ({CURRENT_VERSION})") + else: + print(f"{Fore.RED}Unable to find version information in the response") + except Exception as e: + print(f"{Fore.RED}An error occurred while checking for updates: {e}") diff --git a/README.md b/README.md index 62c7a6d..e0fd94f 100644 --- a/README.md +++ b/README.md @@ -35,7 +35,11 @@ ## 功能特点 -**多协议支持** +**上游多协议监听** + +- **双协议支持**:支持 SOCKS5 和 HTTP 协议监听,适配更多工具 + +**多代理协议支持** - **SOCKS5 代理**:支持 SOCKS5 协议,适用于各种网络环境。 - **HTTP/HTTPS 代理**:支持 HTTP 和 HTTPS 代理,满足不同应用场景需求。 @@ -54,6 +58,7 @@ - **自动检测有效性**:在启动时自动检测代理的可用性,过滤无效代理,确保代理列表的可靠性。 - **支持多种协议检测**:针对 HTTP、HTTPS 和 SOCKS5 代理进行专门的检测,提升验证精度。 +- **支持代理失效切换**:在转发流量过程中,遇到代理服务器突然失效,可自动切换到新的代理上。 **认证机制** @@ -200,6 +205,11 @@ socks5://127.0.0.1:1080 ## 更新日志 +### **2024/10/23** + +- 重构代码结构,将部分代码分割成单独文件。 +- 支持代理过程中,遇到代理服务器突然失效,自动请求更换新的代理服务器,并重置更换计时器。 + ### 2024/09/29 - 去除使用较少的单次循环,更换为自定义模式,可根据需求自定义更换代理的逻辑。 @@ -273,6 +283,9 @@ socks5://127.0.0.1:1080 - [x] 首次启动时批量对 `ip.txt` 中的代理服务器进行有效性检查。 - [x] 增加本地监听 SOCKS 协议,或全面改成 SOCKS,以适配更多软件。 - [ ] 增加详细日志记录,记录所有连接 ProxyCat 的 IP 身份,支持多用户。 +- [ ] 增加Web UI,提供更加强大易用的界面。 +- [ ] 增加docker一键部署,简单易用。 +- [ ] 开发 babycat 模块,可将 babycat 在任意服务器或主机上运行,即可变成一台代理服务器。 如果您有好的创意,或在使用过程中遇到bug,请通过以下方式联系作者反馈!