This commit is contained in:
本间白猫 2025-01-02 17:14:36 +08:00
parent dd10bfd566
commit 739f27909e
52 changed files with 1296 additions and 1876 deletions

View File

@ -1,12 +0,0 @@
FROM python:3.8
WORKDIR /app
COPY . /app
RUN python -m venv /venv
RUN /venv/bin/pip install --no-cache-dir -r requirements.txt -i https://pypi.tuna.tsinghua.edu.cn/simple/
EXPOSE 1080
ENTRYPOINT ["/venv/bin/python", "ProxyCat-V1.9.py"]

View File

@ -1,471 +0,0 @@
import threading, logoprint, argparse, logging, asyncio, socket, base64, getip, httpx, time, re, struct, random
from config import load_config, DEFAULT_CONFIG
from colorama import init, Fore, Style
from proxy_check import check_proxies
from update import check_for_updates
from banner import print_banner
from itertools import cycle
init(autoreset=True)
class ColoredFormatter(logging.Formatter):
COLORS = {
logging.INFO: Fore.GREEN,
logging.WARNING: Fore.YELLOW,
logging.ERROR: Fore.RED,
logging.CRITICAL: Fore.RED + Style.BRIGHT,
}
def format(self, record):
log_color = self.COLORS.get(record.levelno, Fore.WHITE)
record.msg = f"{log_color}{record.msg}{Style.RESET_ALL}"
return super().format(record)
log_format = '%(asctime)s - %(levelname)s - %(message)s'
formatter = ColoredFormatter(log_format)
console_handler = logging.StreamHandler()
console_handler.setFormatter(formatter)
logging.basicConfig(level=logging.INFO, handlers=[console_handler])
def load_proxies(file_path='ip.txt'):
with open(file_path, 'r') as file:
return [line.strip() for line in file if '://' in line]
def validate_proxy(proxy):
pattern = re.compile(r'^(?P<scheme>socks5|http|https)://(?P<host>[^:]+):(?P<port>\d+)$')
return pattern.match(proxy) is not None
class AsyncProxyServer:
def __init__(self, config):
self.config = {**DEFAULT_CONFIG, **config}
self.username = self.config['username'].strip()
self.password = self.config['password'].strip()
self.auth_required = bool(self.username and self.password)
self.mode = self.config['mode']
self.interval = int(self.config['interval'])
self.use_getip = self.config.get('use_getip', 'False').lower() == 'true'
self.proxy_file = self.config['proxy_file']
self.proxies = self.load_proxies()
self.proxy_cycle = cycle(self.proxies)
self.current_proxy = next(self.proxy_cycle) if self.proxies else "没有可用的代理"
self.last_switch_time = time.time()
self.rate_limiter = asyncio.Queue(maxsize=3000)
self.proxy_failed = False
def load_proxies(self):
proxies = load_proxies(self.proxy_file)
valid_proxies = [p for p in proxies if validate_proxy(p)]
if self.use_getip:
valid_proxies = []
for _ in range(4):
new_ip = getip.newip()
if validate_proxy(new_ip):
valid_proxies.append(new_ip)
break
else:
logging.error("多次尝试获取有效代理失败,退出程序")
exit(1)
return valid_proxies
async def get_next_proxy(self):
if self.mode == 'load_balance':
return random.choice(self.proxies)
elif self.mode == 'custom':
return await self.custom_proxy_switch()
if time.time() - self.last_switch_time >= self.interval:
await self.get_proxy()
return self.current_proxy
async def get_proxy(self):
self.current_proxy = getip.newip() if self.use_getip else next(self.proxy_cycle)
self.last_switch_time = time.time()
logging.info(f"切换到新的代理: {self.current_proxy}")
async def custom_proxy_switch(self):
""" 自定义的代理切换逻辑 """
return self.proxies[0] if self.proxies else "没有可用的代理"
def time_until_next_switch(self):
return float('inf') if self.mode == 'load_balance' else max(0, self.interval - (time.time() - self.last_switch_time))
async def acquire(self):
await self.rate_limiter.put(None)
await asyncio.sleep(0.001)
self.rate_limiter.get_nowait()
async def handle_client(self, reader, writer):
try:
# await self.acquire()
first_byte = await reader.read(1)
if not first_byte:
return
if first_byte == b'\x05':
await self.handle_socks5_connection(reader, writer)
else:
await self._handle_client_impl(reader, writer, first_byte)
except asyncio.CancelledError:
logging.info("客户端处理取消")
except Exception as e:
logging.error(f"客户端处理出错: {e}")
finally:
writer.close()
await writer.wait_closed()
async def handle_socks5_connection(self, reader, writer):
nmethods = ord(await reader.readexactly(1))
await reader.readexactly(nmethods)
writer.write(b'\x05\x02' if self.auth_required else b'\x05\x00')
await writer.drain()
if self.auth_required:
auth_version = await reader.readexactly(1)
if auth_version != b'\x01':
writer.close()
return
ulen = ord(await reader.readexactly(1))
username = await reader.readexactly(ulen)
plen = ord(await reader.readexactly(1))
password = await reader.readexactly(plen)
if username.decode() != self.username or password.decode() != self.password:
writer.write(b'\x01\x01')
await writer.drain()
writer.close()
return
writer.write(b'\x01\x00')
await writer.drain()
version, cmd, _, atyp = struct.unpack('!BBBB', await reader.readexactly(4))
if cmd != 1:
writer.write(b'\x05\x07\x00\x01\x00\x00\x00\x00\x00\x00')
await writer.drain()
writer.close()
return
if atyp == 1:
dst_addr = socket.inet_ntoa(await reader.readexactly(4))
elif atyp == 3:
addr_len = ord(await reader.readexactly(1))
dst_addr = (await reader.readexactly(addr_len)).decode()
elif atyp == 4:
dst_addr = socket.inet_ntop(socket.AF_INET6, await reader.readexactly(16))
else:
writer.write(b'\x05\x08\x00\x01\x00\x00\x00\x00\x00\x00')
await writer.drain()
writer.close()
return
dst_port = struct.unpack('!H', await reader.readexactly(2))[0]
try:
proxy = await self.get_next_proxy()
proxy_type, proxy_addr = proxy.split('://')
proxy_auth, proxy_host_port = self._split_proxy_auth(proxy_addr)
proxy_host, proxy_port = proxy_host_port.split(':')
proxy_port = int(proxy_port)
remote_reader, remote_writer = await asyncio.open_connection(proxy_host, proxy_port)
if proxy_type == 'socks5':
remote_writer.write(b'\x05\x01\x00')
await remote_writer.drain()
await remote_reader.readexactly(2)
remote_writer.write(b'\x05\x01\x00' + (b'\x03' + len(dst_addr).to_bytes(1, 'big') + dst_addr.encode() if isinstance(dst_addr, str) else b'\x01' + socket.inet_aton(dst_addr)) + struct.pack('!H', dst_port))
await remote_writer.drain()
await remote_reader.readexactly(10)
elif proxy_type in ['http', 'https']:
connect_request = f'CONNECT {dst_addr}:{dst_port} HTTP/1.1\r\nHost: {dst_addr}:{dst_port}\r\n'
if proxy_auth:
connect_request += f'Proxy-Authorization: Basic {base64.b64encode(proxy_auth.encode()).decode()}\r\n'
connect_request += '\r\n'
remote_writer.write(connect_request.encode())
await remote_writer.drain()
while True:
line = await remote_reader.readline()
if line == b'\r\n':
break
writer.write(b'\x05\x00\x00\x01\x00\x00\x00\x00\x00\x00')
await writer.drain()
await asyncio.gather(
self._pipe(reader, remote_writer),
self._pipe(remote_reader, writer)
)
except Exception as e:
logging.error(f"SOCKS5 连接错误: {e}")
writer.write(b'\x05\x01\x00\x01\x00\x00\x00\x00\x00\x00')
await writer.drain()
async def _handle_client_impl(self, reader, writer, first_byte):
try:
request_line = first_byte + await reader.readline()
if not request_line:
return
try:
method, path, _ = request_line.decode('utf-8', errors='ignore').split()
except ValueError:
#logging.error(f"无效的请求行: {request_line}")
return
headers = {}
while True:
line = await reader.readline()
if line == b'\r\n':
break
if line == b'':
return
try:
name, value = line.decode('utf-8', errors='ignore').strip().split(': ', 1)
headers[name.lower()] = value
except ValueError:
#logging.error(f"无效的请求行: {line}")
continue
if self.auth_required and not self._authenticate(headers):
writer.write(b'HTTP/1.1 407 Proxy Authentication Required\r\nProxy-Authenticate: Basic realm="Proxy"\r\n\r\n')
await writer.drain()
return
if method == 'CONNECT':
await self._handle_connect(path, reader, writer)
else:
await self._handle_request(method, path, headers, reader, writer)
except asyncio.CancelledError:
raise
except Exception as e:
logging.error(f"处理客户端请求时出错: {e}")
def _authenticate(self, headers):
if not self.auth_required:
return True
auth = headers.get('proxy-authorization')
if not auth:
return False
try:
scheme, credentials = auth.split()
if scheme.lower() != 'basic':
return False
username, password = base64.b64decode(credentials).decode().split(':')
return username == self.username and password == self.password
except:
return False
async def _handle_connect(self, path, reader, writer):
try:
host, port = path.split(':')
port = int(port)
except ValueError:
# logging.error(f"无效的连接路径: {path}")
writer.write(b'HTTP/1.1 400 Bad Request\r\n\r\n')
await writer.drain()
return
proxy = await self.get_next_proxy()
proxy_type, proxy_addr = proxy.split('://')
proxy_auth, proxy_host_port = self._split_proxy_auth(proxy_addr)
proxy_host, proxy_port = proxy_host_port.split(':')
proxy_port = int(proxy_port)
try:
remote_reader, remote_writer = await asyncio.wait_for(asyncio.open_connection(proxy_host, proxy_port),timeout=10)
if proxy_type == 'http':
connect_headers = [f'CONNECT {host}:{port} HTTP/1.1', f'Host: {host}:{port}']
if proxy_auth:
auth_header = f'Proxy-Authorization: Basic {base64.b64encode(proxy_auth.encode()).decode()}'
connect_headers.append(auth_header)
connect_request = '\r\n'.join(connect_headers) + '\r\n\r\n'
remote_writer.write(connect_request.encode())
await remote_writer.drain()
response = await remote_reader.readline()
if not response.startswith(b'HTTP/1.1 200'):
raise Exception("Bad Gateway")
while (await remote_reader.readline()) != b'\r\n':
pass
elif proxy_type == 'socks5':
remote_writer.write(b'\x05\x01\x00')
await remote_writer.drain()
if (await remote_reader.read(2))[1] == 0:
remote_writer.write(b'\x05\x01\x00\x03' + len(host).to_bytes(1, 'big') + host.encode() + port.to_bytes(2, 'big'))
await remote_writer.drain()
if (await remote_reader.read(10))[1] != 0:
raise Exception("Bad Gateway")
else:
raise Exception("Unsupported proxy type")
writer.write(b'HTTP/1.1 200 Connection Established\r\n\r\n')
await writer.drain()
await asyncio.gather(
self._pipe(reader, remote_writer),
self._pipe(remote_reader, writer)
)
except asyncio.TimeoutError:
logging.error("连接超时")
writer.write(b'HTTP/1.1 504 Gateway Timeout\r\n\r\n')
await writer.drain()
except Exception as e:
logging.error(f"代理地址失效,切换代理地址")
if not self.proxy_failed:
self.proxy_failed = True
await self.get_proxy()
else:
self.proxy_failed = False
def _split_proxy_auth(self, proxy_addr):
match = re.match(r'((?P<username>.+?):(?P<password>.+?)@)?(?P<host>.+)', proxy_addr)
if match:
username = match.group('username')
password = match.group('password')
host = match.group('host')
if username and password:
return f"{username}:{password}", host
return None, proxy_addr
async def _pipe(self, reader, writer):
try:
while True:
try:
data = await reader.read(8192)
if not data:
break
writer.write(data)
await writer.drain()
except asyncio.CancelledError:
break
finally:
writer.close()
await writer.wait_closed()
async def _handle_request(self, method, path, headers, reader, writer):
body = await reader.read()
proxy = await self.get_next_proxy()
proxy_type, proxy_addr = proxy.split('://')
proxy_auth, proxy_host_port = self._split_proxy_auth(proxy_addr)
client_kwargs = {
"limits": httpx.Limits(max_keepalive_connections=500, max_connections=3000),
"timeout": 30,
}
if proxy_type in ['http', 'https']:
client_kwargs["proxies"] = {proxy_type: f"{proxy_type}://{proxy_host_port}"}
elif proxy_type == 'socks5':
client_kwargs["transport"] = httpx.AsyncHTTPTransport(proxy=f"{proxy_type}://{proxy_host_port}")
if proxy_auth:
headers['Proxy-Authorization'] = f'Basic {base64.b64encode(proxy_auth.encode()).decode()}'
async with httpx.AsyncClient(**client_kwargs) as client:
try:
async with client.stream(method, path, headers=headers, content=body) as response:
await self._write_response(writer, response)
except asyncio.CancelledError:
raise
except Exception as e:
logging.error(f"请求处理出错: {e}")
writer.write(b'HTTP/1.1 502 Bad Gateway\r\n\r\n')
await writer.drain()
async def _write_response(self, writer, response):
writer.write(f'HTTP/1.1 {response.status_code} {response.reason_phrase}\r\n'.encode('utf-8', errors='ignore'))
writer.write(b'Transfer-Encoding: chunked\r\n')
for name, value in response.headers.items():
if name.lower() != 'transfer-encoding':
writer.write(f'{name}: {value}\r\n'.encode('utf-8', errors='ignore'))
writer.write(b'\r\n')
await writer.drain()
async for chunk in response.aiter_bytes(chunk_size=8192):
if asyncio.current_task().cancelled():
raise asyncio.CancelledError()
writer.write(f'{len(chunk):X}\r\n'.encode('utf-8', errors='ignore'))
writer.write(chunk)
writer.write(b'\r\n')
await writer.drain()
writer.write(b'0\r\n\r\n')
await writer.drain()
def update_status(server):
while True:
if server.mode == 'load_balance':
status = f"\r{Fore.YELLOW}当前代理: {Fore.GREEN}{server.current_proxy}"
else:
time_left = server.time_until_next_switch()
status = f"\r{Fore.YELLOW}当前代理: {Fore.GREEN}{server.current_proxy} | {Fore.YELLOW}下次切换: {Fore.GREEN}{time_left:.1f}"
print(status, end='', flush=True)
time.sleep(1)
async def handle_client_wrapper(server, reader, writer, clients):
task = asyncio.create_task(server.handle_client(reader, writer))
clients.add(task)
try:
await task
except Exception as e:
logging.error(f"客户端处理出错: {e}")
finally:
clients.remove(task)
async def run_server(server):
clients = set()
server_instance = None
try:
server_instance = await asyncio.start_server(lambda r, w: handle_client_wrapper(server, r, w, clients),'0.0.0.0', int(server.config['port']))
async with server_instance:
await server_instance.serve_forever()
except asyncio.CancelledError:
logging.info("服务器正在关闭...")
finally:
if server_instance:
server_instance.close()
await server_instance.wait_closed()
for client in clients:
client.cancel()
await asyncio.gather(*clients, return_exceptions=True)
async def run_proxy_check(server):
if server.config.get('check_proxies', 'False').lower() == 'true':
logging.info("开始检测代理地址...")
valid_proxies = await check_proxies(server.proxies)
if valid_proxies:
server.proxies = valid_proxies
server.proxy_cycle = cycle(valid_proxies)
server.current_proxy = next(server.proxy_cycle)
logging.info(f"有效代理地址: {valid_proxies}")
else:
logging.error("没有有效的代理地址")
else:
logging.info("代理检测已禁用")
if __name__ == '__main__':
parser = argparse.ArgumentParser(description=logoprint.logos())
parser.add_argument('-c', '--config', default='config.ini', help='配置文件路径')
args = parser.parse_args()
config = load_config(args.config)
server = AsyncProxyServer(config)
print_banner(config)
asyncio.run(check_for_updates())
asyncio.run(run_proxy_check(server))
status_thread = threading.Thread(target=update_status, args=(server,), daemon=True)
status_thread.start()
try:
asyncio.run(run_server(server))
except KeyboardInterrupt:
logging.info("程序被用户中断")

View File

@ -1,18 +0,0 @@
from colorama import Fore
def print_banner(config):
auth_info = f"{config.get('username')}:{config.get('password')}" if config.get('username') and config.get('password') else "未设置 (无需认证)"
banner_info = [
('公众号', '樱花庄的本间白猫'),
('博客', 'https://y.shironekosan.cn'),
('代理轮换模式', '循环' if config.get('mode') == 'cycle' else '负载均衡' if config.get('mode') == 'load_balance' else '单轮'),
('代理更换时间', f"{config.get('interval')}"),
('默认账号密码', auth_info),
('本地监听地址 (HTTP)', f"http://{auth_info + '@' if auth_info else ''}127.0.0.1:{config.get('port')}"),
('本地监听地址 (SOCKS5)', f"socks5://{auth_info + '@' if auth_info else ''}127.0.0.1:{config.get('port')}"),
('开源项目求 Star', 'https://github.com/honmashironeko/ProxyCat'),
]
print(f"{Fore.MAGENTA}{'=' * 55}")
for key, value in banner_info:
print(f"{Fore.YELLOW}{key}: {Fore.GREEN}{value}")
print(f"{Fore.MAGENTA}{'=' * 55}\n")

View File

@ -1,24 +0,0 @@
[SETTINGS]
# 本地服务器监听端口(默认为:1080)
port = 1080
# 代理地址轮换模式cycle 表示循环使用custom 表示使用自定义模式load_balance 表示负载均衡(默认为:cycle)
mode = cycle
# 代理地址更换时间(秒),设置为 0 时每次请求都更换 IP(默认为:300)
interval = 300
# 本地服务器端口认证用户名((默认为:neko)当为空时不需要认证
username = neko
# 本地服务器端口认证密码(默认为:123456)当为空时不需要认证
password = 123456
# 是否使用 getip 模块获取代理地址 True or False(默认为:False)
use_getip = False
# 代理地址列表文件(默认为:ip.txt)
proxy_file = ip.txt
# 是否启用代理检测功能 True or False(默认为True)
check_proxies = True

View File

@ -1,16 +0,0 @@
import configparser
DEFAULT_CONFIG = {
'port': 1080,
'mode': 'cycle',
'interval': 300,
'username': '',
'password': '',
'use_getip': False,
'proxy_file': 'ip.txt',
}
def load_config(config_file='config.ini'):
config = configparser.ConfigParser()
config.read(config_file, encoding='utf-8')
return {k: v for k, v in config['SETTINGS'].items()}

View File

@ -1,9 +0,0 @@
services:
app:
build: .
volumes:
- "./config.ini:/app/config.ini"
- "./getip.py:/app/getip.py"
- "./ip.txt:/app/ip.txt"
ports:
- "1080:1080"

View File

@ -1,12 +0,0 @@
import requests
def newip():
print("正在获取新的代理IP")
url = f""
response = requests.get(url)
response.raise_for_status()
newip = "socks5://"+response.text.split("\r\n")[0]
print("新的代理IP为:"+newip)
return newip

View File

@ -1,58 +0,0 @@
import random
logo1 = """
|\ _,,,---,,_ by 本间白猫
ZZZzz /,`.-'`' -. ;-;;,_
|,4- ) )-,_. ,\ ( `'-'
'---''(_/--' `-'\_) ProxyCat
"""
logo2 = """
* ,MMM8&&&. *
MMMM88&&&&& .
MMMM88&&&&&&&
* MMM88&&&&&&&&
MMM88&&&&&&&&
'MMM88&&&&&&'
'MMM8&&&' *
/\/|_ __/\\
/ -\ /- ~\ . '
\ =_YT_ = /
/==*(` `\ ~ \ ProxyCat
/ \ / `\ by 本间白猫
| | ) ~ (
/ \ / ~ \\
\ / \~ ~/
_/\_/\_/\__ _/_/\_/\__~__/_/\_/\_/\_/\_/\_
| | | | ) ) | | | (( | | | | | |
| | | |( ( | | | \\ | | | | | |
| | | | )_) | | | |))| | | | | |
| | | | | | | | (/ | | | | | |
| | | | | | | | | | | | | | |
"""
logo3 = """
/\_/\ _
/`` \ / )
|n n |__ ( (
=(Y =.'` `\ \ \\
{`"` \ ) )
{ / |/ /
\\ ,( / /
ProxyCat) ) /-'\ ,_.' by 本间白猫
(,(,/ ((,,/
"""
logo4 = """
.-o=o-.
, /=o=o=o=\ .--.
_|\|=o=O=o=O=| \\
__.' a`\=o=o=o=(`\ /
'. a 4/`|.-""'`\ \ ;'`) .---.
\ .' / .--' |_.' / .-._)
by 本间白猫 `) _.' / /`-.__.' /
ProxyCat `'-.____; /'-.___.-'
`\"""`
"""
logos_list = [logo1, logo2, logo3, logo4]
def logos():
selected_logo = random.choice(logos_list)
print(selected_logo)

View File

@ -1,54 +0,0 @@
import httpx
import socket
import re
import asyncio
import logging
async def check_proxy(proxy):
proxy_type = proxy.split('://')[0]
check_funcs = {
'http': check_http_proxy,
'https': check_https_proxy,
'socks5': check_socks_proxy
}
if proxy_type not in check_funcs:
return False
try:
return await check_funcs[proxy_type](proxy)
except Exception as e:
logging.error(f"{proxy_type.upper()}代理 {proxy} 检测失败: {e}")
return False
async def check_http_proxy(proxy):
async with httpx.AsyncClient(proxies={'http://': proxy}, timeout=10) as client:
response = await client.get('http://www.baidu.com')
return response.status_code == 200
async def check_https_proxy(proxy):
async with httpx.AsyncClient(proxies={'https://': proxy}, timeout=10) as client:
response = await client.get('https://www.baidu.com')
return response.status_code == 200
async def check_socks_proxy(proxy):
proxy_type, proxy_addr = proxy.split('://')
proxy_host, proxy_port = proxy_addr.split(':')
proxy_port = int(proxy_port)
try:
reader, writer = await asyncio.wait_for(asyncio.open_connection(proxy_host, proxy_port), timeout=5)
writer.write(b'\x05\x01\x00')
await writer.drain()
response = await asyncio.wait_for(reader.readexactly(2), timeout=5)
writer.close()
await writer.wait_closed()
return response == b'\x05\x00'
except Exception:
return False
async def check_proxies(proxies):
valid_proxies = []
for proxy in proxies:
if await check_proxy(proxy):
valid_proxies.append(proxy)
return valid_proxies

View File

@ -1,26 +0,0 @@
from colorama import Fore
from packaging import version
import httpx, asyncio, re
async def check_for_updates():
try:
async with httpx.AsyncClient() as client:
response = await asyncio.wait_for(client.get("https://y.shironekosan.cn/1.html"), timeout=10)
response.raise_for_status()
content = response.text
match = re.search(r'<p>(ProxyCat-V\d+\.\d+)</p>', content)
if match:
latest_version = match.group(1)
CURRENT_VERSION = "ProxyCat-V1.9"
if version.parse(latest_version.split('-V')[1]) > version.parse(CURRENT_VERSION.split('-V')[1]):
print(f"{Fore.YELLOW}发现新版本!当前版本: {CURRENT_VERSION}, 最新版本: {latest_version}")
print(f"{Fore.YELLOW}请访问 https://pan.quark.cn/s/39b4b5674570 获取最新版本。")
print(f"{Fore.YELLOW}请访问 https://github.com/honmashironeko/ProxyCat 获取最新版本。")
print(f"{Fore.YELLOW}请访问 https://pan.baidu.com/s/1C9LVC9aiaQeYFSj_2mWH1w?pwd=13r5 获取最新版本。")
else:
print(f"{Fore.GREEN}当前版本已是最新 ({CURRENT_VERSION})")
else:
print(f"{Fore.RED}无法在响应中找到版本信息")
except Exception as e:
print(f"{Fore.RED}检查更新时发生错误: {e}")

View File

@ -1,12 +0,0 @@
FROM python:3.8
WORKDIR /app
COPY . /app
RUN python -m venv /venv
RUN /venv/bin/pip install --no-cache-dir -r requirements.txt
EXPOSE 1080
ENTRYPOINT ["/venv/bin/python", "ProxyCat-V1.9.py"]

View File

@ -1,472 +0,0 @@
import threading, logoprint, argparse, logging, asyncio, socket, base64, getip, httpx, time, re, struct, random
from config import load_config, DEFAULT_CONFIG
from colorama import init, Fore, Style
from proxy_check import check_proxies
from update import check_for_updates
from banner import print_banner
from itertools import cycle
init(autoreset=True)
class ColoredFormatter(logging.Formatter):
COLORS = {
logging.INFO: Fore.GREEN,
logging.WARNING: Fore.YELLOW,
logging.ERROR: Fore.RED,
logging.CRITICAL: Fore.RED + Style.BRIGHT,
}
def format(self, record):
log_color = self.COLORS.get(record.levelno, Fore.WHITE)
record.msg = f"{log_color}{record.msg}{Style.RESET_ALL}"
return super().format(record)
log_format = '%(asctime)s - %(levelname)s - %(message)s'
formatter = ColoredFormatter(log_format)
console_handler = logging.StreamHandler()
console_handler.setFormatter(formatter)
logging.basicConfig(level=logging.INFO, handlers=[console_handler])
def load_proxies(file_path='ip.txt'):
with open(file_path, 'r') as file:
return [line.strip() for line in file if '://' in line]
def validate_proxy(proxy):
pattern = re.compile(r'^(?P<scheme>socks5|http|https)://(?P<host>[^:]+):(?P<port>\d+)$')
return pattern.match(proxy) is not None
class AsyncProxyServer:
def __init__(self, config):
self.config = {**DEFAULT_CONFIG, **config}
self.username = self.config['username'].strip()
self.password = self.config['password'].strip()
self.auth_required = bool(self.username and self.password)
self.mode = self.config['mode']
self.interval = int(self.config['interval'])
self.use_getip = self.config.get('use_getip', 'False').lower() == 'true'
self.proxy_file = self.config['proxy_file']
self.proxies = self.load_proxies()
self.proxy_cycle = cycle(self.proxies)
self.current_proxy = next(self.proxy_cycle) if self.proxies else "No available agents"
self.last_switch_time = time.time()
self.rate_limiter = asyncio.Queue(maxsize=3000)
self.proxy_failed = False
def load_proxies(self):
proxies = load_proxies(self.proxy_file)
valid_proxies = [p for p in proxies if validate_proxy(p)]
if self.use_getip:
valid_proxies = []
for _ in range(4):
new_ip = getip.newip()
if validate_proxy(new_ip):
valid_proxies.append(new_ip)
break
else:
logging.error("Failed to obtain a valid proxy multiple times, exiting the program")
exit(1)
return valid_proxies
async def get_next_proxy(self):
if self.mode == 'load_balance':
return random.choice(self.proxies)
elif self.mode == 'custom':
return await self.custom_proxy_switch()
if time.time() - self.last_switch_time >= self.interval:
await self.get_proxy()
return self.current_proxy
async def get_proxy(self):
self.current_proxy = getip.newip() if self.use_getip else next(self.proxy_cycle)
self.last_switch_time = time.time()
logging.info(f"Switch to the new proxy: {self.current_proxy}")
async def custom_proxy_switch(self):
""" Custom proxy switching logic """
return self.proxies[0] if self.proxies else "No available agents"
def time_until_next_switch(self):
return float('inf') if self.mode == 'load_balance' else max(0, self.interval - (time.time() - self.last_switch_time))
async def acquire(self):
await self.rate_limiter.put(None)
await asyncio.sleep(0.001)
self.rate_limiter.get_nowait()
async def handle_client(self, reader, writer):
try:
# await self.acquire()
first_byte = await reader.read(1)
if not first_byte:
return
if first_byte == b'\x05':
await self.handle_socks5_connection(reader, writer)
else:
await self._handle_client_impl(reader, writer, first_byte)
except asyncio.CancelledError:
logging.info("Client processing canceled")
except Exception as e:
logging.error(f"Error processing client: {e}")
finally:
writer.close()
await writer.wait_closed()
async def handle_socks5_connection(self, reader, writer):
nmethods = ord(await reader.readexactly(1))
await reader.readexactly(nmethods)
writer.write(b'\x05\x02' if self.auth_required else b'\x05\x00')
await writer.drain()
if self.auth_required:
auth_version = await reader.readexactly(1)
if auth_version != b'\x01':
writer.close()
return
ulen = ord(await reader.readexactly(1))
username = await reader.readexactly(ulen)
plen = ord(await reader.readexactly(1))
password = await reader.readexactly(plen)
if username.decode() != self.username or password.decode() != self.password:
writer.write(b'\x01\x01')
await writer.drain()
writer.close()
return
writer.write(b'\x01\x00')
await writer.drain()
version, cmd, _, atyp = struct.unpack('!BBBB', await reader.readexactly(4))
if cmd != 1:
writer.write(b'\x05\x07\x00\x01\x00\x00\x00\x00\x00\x00')
await writer.drain()
writer.close()
return
if atyp == 1:
dst_addr = socket.inet_ntoa(await reader.readexactly(4))
elif atyp == 3:
addr_len = ord(await reader.readexactly(1))
dst_addr = (await reader.readexactly(addr_len)).decode()
elif atyp == 4:
dst_addr = socket.inet_ntop(socket.AF_INET6, await reader.readexactly(16))
else:
writer.write(b'\x05\x08\x00\x01\x00\x00\x00\x00\x00\x00')
await writer.drain()
writer.close()
return
dst_port = struct.unpack('!H', await reader.readexactly(2))[0]
try:
proxy = await self.get_next_proxy()
proxy_type, proxy_addr = proxy.split('://')
proxy_auth, proxy_host_port = self._split_proxy_auth(proxy_addr)
proxy_host, proxy_port = proxy_host_port.split(':')
proxy_port = int(proxy_port)
remote_reader, remote_writer = await asyncio.open_connection(proxy_host, proxy_port)
if proxy_type == 'socks5':
remote_writer.write(b'\x05\x01\x00')
await remote_writer.drain()
await remote_reader.readexactly(2)
remote_writer.write(b'\x05\x01\x00' + (b'\x03' + len(dst_addr).to_bytes(1, 'big') + dst_addr.encode() if isinstance(dst_addr, str) else b'\x01' + socket.inet_aton(dst_addr)) + struct.pack('!H', dst_port))
await remote_writer.drain()
await remote_reader.readexactly(10)
elif proxy_type in ['http', 'https']:
connect_request = f'CONNECT {dst_addr}:{dst_port} HTTP/1.1\r\nHost: {dst_addr}:{dst_port}\r\n'
if proxy_auth:
connect_request += f'Proxy-Authorization: Basic {base64.b64encode(proxy_auth.encode()).decode()}\r\n'
connect_request += '\r\n'
remote_writer.write(connect_request.encode())
await remote_writer.drain()
while True:
line = await remote_reader.readline()
if line == b'\r\n':
break
writer.write(b'\x05\x00\x00\x01\x00\x00\x00\x00\x00\x00')
await writer.drain()
await asyncio.gather(
self._pipe(reader, remote_writer),
self._pipe(remote_reader, writer)
)
except Exception as e:
logging.error(f"SOCKS5 Connection Error: {e}")
writer.write(b'\x05\x01\x00\x01\x00\x00\x00\x00\x00\x00')
await writer.drain()
async def _handle_client_impl(self, reader, writer, first_byte):
try:
request_line = first_byte + await reader.readline()
if not request_line:
return
try:
method, path, _ = request_line.decode('utf-8', errors='ignore').split()
except ValueError:
#logging.error(f"无效的请求行: {request_line}")
return
headers = {}
while True:
line = await reader.readline()
if line == b'\r\n':
break
if line == b'':
return
try:
name, value = line.decode('utf-8', errors='ignore').strip().split(': ', 1)
headers[name.lower()] = value
except ValueError:
#logging.error(f"无效的请求行: {line}")
continue
if self.auth_required and not self._authenticate(headers):
writer.write(b'HTTP/1.1 407 Proxy Authentication Required\r\nProxy-Authenticate: Basic realm="Proxy"\r\n\r\n')
await writer.drain()
return
if method == 'CONNECT':
await self._handle_connect(path, reader, writer)
else:
await self._handle_request(method, path, headers, reader, writer)
except asyncio.CancelledError:
raise
except Exception as e:
logging.error(f"Error processing client request: {e}")
def _authenticate(self, headers):
if not self.auth_required:
return True
auth = headers.get('proxy-authorization')
if not auth:
return False
try:
scheme, credentials = auth.split()
if scheme.lower() != 'basic':
return False
username, password = base64.b64decode(credentials).decode().split(':')
return username == self.username and password == self.password
except:
return False
async def _handle_connect(self, path, reader, writer):
try:
host, port = path.split(':')
port = int(port)
except ValueError:
# logging.error(f"无效的连接路径: {path}")
writer.write(b'HTTP/1.1 400 Bad Request\r\n\r\n')
await writer.drain()
return
proxy = await self.get_next_proxy()
proxy_type, proxy_addr = proxy.split('://')
proxy_auth, proxy_host_port = self._split_proxy_auth(proxy_addr)
proxy_host, proxy_port = proxy_host_port.split(':')
proxy_port = int(proxy_port)
try:
remote_reader, remote_writer = await asyncio.wait_for(asyncio.open_connection(proxy_host, proxy_port),timeout=10)
if proxy_type == 'http':
connect_headers = [f'CONNECT {host}:{port} HTTP/1.1', f'Host: {host}:{port}']
if proxy_auth:
auth_header = f'Proxy-Authorization: Basic {base64.b64encode(proxy_auth.encode()).decode()}'
connect_headers.append(auth_header)
connect_request = '\r\n'.join(connect_headers) + '\r\n\r\n'
remote_writer.write(connect_request.encode())
await remote_writer.drain()
response = await remote_reader.readline()
if not response.startswith(b'HTTP/1.1 200'):
raise Exception("Bad Gateway")
while (await remote_reader.readline()) != b'\r\n':
pass
elif proxy_type == 'socks5':
remote_writer.write(b'\x05\x01\x00')
await remote_writer.drain()
if (await remote_reader.read(2))[1] == 0:
remote_writer.write(b'\x05\x01\x00\x03' + len(host).to_bytes(1, 'big') + host.encode() + port.to_bytes(2, 'big'))
await remote_writer.drain()
if (await remote_reader.read(10))[1] != 0:
raise Exception("Bad Gateway")
else:
raise Exception("Unsupported proxy type")
writer.write(b'HTTP/1.1 200 Connection Established\r\n\r\n')
await writer.drain()
await asyncio.gather(
self._pipe(reader, remote_writer),
self._pipe(remote_reader, writer)
)
except asyncio.TimeoutError:
logging.error("Connection timed out")
writer.write(b'HTTP/1.1 504 Gateway Timeout\r\n\r\n')
await writer.drain()
except Exception as e:
logging.error(f"Proxy address is invalid, switching proxy address")
if not self.proxy_failed:
self.proxy_failed = True
await self.get_proxy()
else:
self.proxy_failed = False
def _split_proxy_auth(self, proxy_addr):
match = re.match(r'((?P<username>.+?):(?P<password>.+?)@)?(?P<host>.+)', proxy_addr)
if match:
username = match.group('username')
password = match.group('password')
host = match.group('host')
if username and password:
return f"{username}:{password}", host
return None, proxy_addr
async def _pipe(self, reader, writer):
try:
while True:
try:
data = await reader.read(8192)
if not data:
break
writer.write(data)
await writer.drain()
except asyncio.CancelledError:
break
finally:
writer.close()
await writer.wait_closed()
async def _handle_request(self, method, path, headers, reader, writer):
body = await reader.read()
proxy = await self.get_next_proxy()
proxy_type, proxy_addr = proxy.split('://')
proxy_auth, proxy_host_port = self._split_proxy_auth(proxy_addr)
client_kwargs = {
"limits": httpx.Limits(max_keepalive_connections=500, max_connections=3000),
"timeout": 30,
}
if proxy_type in ['http', 'https']:
client_kwargs["proxies"] = {proxy_type: f"{proxy_type}://{proxy_host_port}"}
elif proxy_type == 'socks5':
client_kwargs["transport"] = httpx.AsyncHTTPTransport(proxy=f"{proxy_type}://{proxy_host_port}")
if proxy_auth:
headers['Proxy-Authorization'] = f'Basic {base64.b64encode(proxy_auth.encode()).decode()}'
async with httpx.AsyncClient(**client_kwargs) as client:
try:
async with client.stream(method, path, headers=headers, content=body) as response:
await self._write_response(writer, response)
except asyncio.CancelledError:
raise
except Exception as e:
logging.error(f"Error processing request: {e}")
writer.write(b'HTTP/1.1 502 Bad Gateway\r\n\r\n')
await writer.drain()
async def _write_response(self, writer, response):
writer.write(f'HTTP/1.1 {response.status_code} {response.reason_phrase}\r\n'.encode('utf-8', errors='ignore'))
writer.write(b'Transfer-Encoding: chunked\r\n')
for name, value in response.headers.items():
if name.lower() != 'transfer-encoding':
writer.write(f'{name}: {value}\r\n'.encode('utf-8', errors='ignore'))
writer.write(b'\r\n')
await writer.drain()
async for chunk in response.aiter_bytes(chunk_size=8192):
if asyncio.current_task().cancelled():
raise asyncio.CancelledError()
writer.write(f'{len(chunk):X}\r\n'.encode('utf-8', errors='ignore'))
writer.write(chunk)
writer.write(b'\r\n')
await writer.drain()
writer.write(b'0\r\n\r\n')
await writer.drain()
def update_status(server):
while True:
if server.mode == 'load_balance':
status = f"\r{Fore.YELLOW}Current proxy: {Fore.GREEN}{server.current_proxy}"
else:
time_left = server.time_until_next_switch()
status = f"\r{Fore.YELLOW}Current proxy: {Fore.GREEN}{server.current_proxy} | {Fore.YELLOW}Switch next time: {Fore.GREEN}{time_left:.1f}s"
print(status, end='', flush=True)
time.sleep(1)
async def handle_client_wrapper(server, reader, writer, clients):
task = asyncio.create_task(server.handle_client(reader, writer))
clients.add(task)
try:
await task
except Exception as e:
logging.error(f"Error processing client: {e}")
finally:
clients.remove(task)
async def run_server(server):
clients = set()
server_instance = None
try:
server_instance = await asyncio.start_server(lambda r, w: handle_client_wrapper(server, r, w, clients),'0.0.0.0', int(server.config['port']))
async with server_instance:
await server_instance.serve_forever()
except asyncio.CancelledError:
logging.info("The server is shutting down...")
finally:
if server_instance:
server_instance.close()
await server_instance.wait_closed()
for client in clients:
client.cancel()
await asyncio.gather(*clients, return_exceptions=True)
async def run_proxy_check(server):
if server.config.get('check_proxies', 'False').lower() == 'true':
logging.info("Starting to check proxy addresses...")
valid_proxies = await check_proxies(server.proxies)
if valid_proxies:
server.proxies = valid_proxies
server.proxy_cycle = cycle(valid_proxies)
server.current_proxy = next(server.proxy_cycle)
logging.info(f"Valid proxy addresses: {valid_proxies}")
else:
logging.error("No valid proxy addresses")
else:
logging.info("Proxy checking is disabled")
if __name__ == '__main__':
parser = argparse.ArgumentParser(description=logoprint.logos())
parser.add_argument('-c', '--config', default='config.ini', help='Configuration file path')
args = parser.parse_args()
config = load_config(args.config)
server = AsyncProxyServer(config)
print_banner(config)
asyncio.run(check_for_updates())
asyncio.run(run_proxy_check(server))
status_thread = threading.Thread(target=update_status, args=(server,), daemon=True)
status_thread.start()
try:
asyncio.run(run_server(server))
except KeyboardInterrupt:
logging.info("The program was interrupted by the user")

View File

@ -1,273 +0,0 @@
![ProxyCat](https://socialify.git.ci/honmashironeko/ProxyCat/image?description=1&descriptionEditable=%E4%B8%80%E6%AC%BE%E8%BD%BB%E9%87%8F%E7%BA%A7%E7%9A%84%E4%BC%98%E7%A7%80%E4%BB%A3%E7%90%86%E6%B1%A0%E4%B8%AD%E9%97%B4%E4%BB%B6%EF%BC%8C%E5%AE%9E%E7%8E%B0%E4%BB%A3%E7%90%86%E7%9A%84%E8%87%AA%E5%8A%A8%E8%BD%AE%E6%8D%A2&font=Bitter&forks=1&issues=1&language=1&logo=https%3A%2F%2Favatars.githubusercontent.com%2Fu%2F139044047%3Fv%3D4&name=1&owner=1&pattern=Circuit%20Board&pulls=1&stargazers=1&theme=Dark)
<p align="center">
<a href="/ProxyCat-EN/README-EN.md">English</a>
·
<a href="/README.md">简体中文</a>
</p>
## Table of Contents
- [Development Motivation](#development-motivation)
- [Features](#features)
- [Installation and Usage](#installation-and-usage)
- [Installing Dependencies](#installing-dependencies)
- [Running the Tool](#running-the-tool)
- [Manually Entering Proxy Addresses](#manually-entering-proxy-addresses)
- [Configuration File](#configuration-file)
- [Demonstration](#demonstration)
- [Automatically Obtaining Proxy Addresses via API](#automatically-obtaining-proxy-addresses-via-api)
- [Performance](#performance)
- [Disclaimer](#disclaimer)
- [Changelog](#changelog)
- [Development Plan](#development-plan)
- [Acknowledgements](#acknowledgements)
- [Sponsor Open Source](#sponsor-open-source)
- [Recommended Proxies](#recommended-proxies)
## Development Motivation
During penetration testing, it is often necessary to hide or change IP addresses to bypass security devices. However, tunnel proxies available in the market are expensive, typically costing between 20-40 RMB per day, which is unaffordable for many. I noticed that short-lived IPs offer high cost-effectiveness, with each IP costing only a few cents, averaging 0.2-3 RMB per day.
In summary, **ProxyCat** was born! This tool aims to transform short-lived IPs, lasting from 1 minute to 60 minutes, into fixed IPs for use by other tools, forming a proxy pool server that can be deployed once for permanent use.
![项目原理图](./assets/202408260021207-1725093725174-21.png)
## Features
- **Dual Protocol Support**: Supports listening for SOCKS5 and HTTP protocols, adapting to more tools.
- **Multiple Proxy Protocols**: Supports HTTP/HTTPS/SOCKS5 proxy servers to meet the needs of different application scenarios.
- **Various Switching Modes**: Uses each proxy in the list in a cyclic order to ensure balanced use; randomly selects available proxies to distribute traffic load and enhance performance. Allows users to customize proxy selection logic to flexibly meet specific needs.
- **Function to Acquire Proxies**: Supports dynamically acquiring instantly available proxies through the GetIP function, ensuring the timeliness and effectiveness of proxies.
- **Automatic Detection of Validity**: Automatically detects the availability of proxies at startup, filters out invalid ones, and ensures the reliability of the proxy list.
- **Switch Only When Forwarding Through Proxies**: Changes to a new proxy server only when there is a new request as the timer counts down to zero, preventing continuous resource consumption during runtime.
- **Support for Proxy Failure Switching**: In case a proxy server suddenly fails during traffic forwarding, it can automatically switch to a new one.
- **Proxy Pool Authentication**: Supports authentication based on username and password for enhanced security against unauthorized access.
- **Real-time Status Updates**: Displays current proxy status and next switching time, helping users understand dynamic changes in proxies.
- **Configurable File**: Easily adjust parameters such as ports, modes, authentication information through config.ini file, accommodating various usage scenarios.
- **Version Checking**: Built-in version checking feature that automatically checks for updates and reminds users to upgrade, ensuring ongoing software optimization.
## Installation and Usage
### Installing Dependencies
The tool is implemented in Python and is recommended to use **Python 3.8** or higher. Before use, configure the dependencies using the following command:
````bash:c:\Users\hoshi\Documents\GitHub\ProxyCat\requirements.txt
pip install -r requirements.txt
# Or recommended to use domestic source:
pip install -r requirements.txt -i https://pypi.tuna.tsinghua.edu.cn/simple/
````
### Running the Tool
Run the following command in the project directory to view the help information and confirm the configuration is successful:
````bash:c:\Users\hoshi\Documents\GitHub\ProxyCat\ProxyCat.py
python3 ProxyCat.py -h
````
The following output indicates successful configuration:
```
|\ _,,,---,,_ by Honma Shironeko
ZZZzz /,`.-'`' -. ;-;;,_
|,4- ) )-,_. ,\ ( `'-'
'---''(_/--' `-'\_) ProxyCat
Usage: ProxyCat.py [-h] [-c]
Parameters:
-h, --help Show this help message and exit
-c C Specify the configuration file name (default: config.ini)
```
### Manually Entering Proxy Addresses in ip.txt
Enter proxy addresses in the `ip.txt` file in the following format (`socks5://127.0.0.1:7890` or `http://127.0.0.1:7890`), one per line:
````plaintext:c:\Users\hoshi\Documents\GitHub\ProxyCat\ip.txt
socks5://127.0.0.1:7890
https://127.0.0.1:7890
http://127.0.0.1:7890
...
````
### Configuration File
Configure parameters in `config.ini` (or a custom configuration file):
````ini:c:\Users\hoshi\Documents\GitHub\ProxyCat\config.ini
[SETTINGS]
# Local server listening port (default: 1080)
port = 1080
# Proxy rotation mode: cycle for sequential use, custom for custom mode, load_balance for load balancing (default: cycle)
mode = cycle
# Proxy switching interval (seconds). Set to 0 to switch IP on every request (default: 300)
interval = 300
# Username for authenticating the local server port (default: neko). Leave empty if no authentication is required
username = neko
# Password for authenticating the local server port (default: 123456). Leave empty if no authentication is required
password = 123456
# Whether to use the getip module to obtain proxy addresses True or False (default: False)
use_getip = False
# Proxy address list file (default: ip.txt)
proxy_file = ip.txt
# Whether to enable proxy verification feature True or False (default: True)
check_proxies = True
````
After configuring the corresponding parameters, you can use the tool:
````bash:c:\Users\hoshi\Documents\GitHub\ProxyCat\ProxyCat.py
python3 ProxyCat.py
````
### Demonstration
**Fixed Proxy Addresses (Default):**
````plaintext:c:\Users\hoshi\Documents\GitHub\ProxyCat\README.md
http://neko:123456@127.0.0.1:1080
http://127.0.0.1:1080
socks5://neko:123456@127.0.0.1:1080
socks5://127.0.0.1:1080
````
If you are deploying on the public network, replace `127.0.0.1` with your public IP.
![Clip_2024-09-30_09-05-17](C:\Users\hoshi\AppData\Local\Programs\PixPin\Temp\Clip_2024-09-30_09-05-17.png)
### Automatically Obtaining Proxy Addresses via API
The tool supports directly calling API interfaces to obtain proxy addresses. When you set `use_getip = True`, the tool will no longer read proxy addresses from the local `ip.txt` but will obtain new proxy addresses by executing the **getip.py** script (ensure your IP is whitelisted).
At this point, you need to modify the content of **getip.py** to your own interface, formatted as `IP:PORT`. The default is the `socks5` protocol. If you need to use `http`, please change it manually.
**Demonstration Result**
> The operator can be obtained from the advertisement area below.
![Clip_2024-08-31_20-44-23](https://github.com/user-attachments/assets/42c1f3ef-0e75-4b07-a901-1c8b76f7f9c3)
## Performance
After actual testing, when the proxy address server has sufficient performance, ProxyCat can handle **1000** concurrent connections without packet loss, covering most scanning and penetration testing needs.
![8e3f79309626ed0e653ba51b6482bff](./assets/8e3f79309626ed0e653ba51b6482bff-1725093725174-23.png)
## Disclaimer
- If you download, install, use, or modify this tool and related code, you indicate your trust in this tool.
- We do not assume any responsibility for any form of loss or damage to you or others caused by using this tool.
- If you engage in any illegal activities while using this tool, you must bear the corresponding consequences yourself. We will not bear any legal or related responsibilities.
- Please read and fully understand all the terms, especially the clauses that exempt or limit liability, and choose to accept or not accept.
- Unless you have read and accepted all the terms of this agreement, you are not authorized to download, install, or use this tool.
- Your actions of downloading, installing, and using this tool are deemed as your agreement to the above terms.
## Changelog
### **2024/10/23**
- Refactor the code structure and split some of the code into separate files.
- During the proxy process, if the proxy server suddenly fails, it will automatically request to replace the proxy server and reset the replacement timer.
### 2024/09/29
- Removed the less-used single cycle mode and replaced it with a custom mode, allowing users to customize the proxy switching logic based on needs.
- Modified proxy validity checks to asynchronous for increased speed.
- Removed support for the problematic SOCKS4 protocol.
- Enhanced the logging system aesthetics.
- Improved exception handling logic.
- Added validation for proxy formats to ensure correctness.
### 2024/09/10
- Optimized concurrency efficiency, supporting initiating the next request before receiving a response to enhance efficiency.
- Added load balancing mode, randomly sending requests to proxy addresses and using concurrent proxies to improve request efficiency.
- Modified proxy validity checks to asynchronous to improve efficiency.
### 2024/09/09
- Added a feature to set whether to perform validity checks on proxy addresses in `ip.txt` during the first startup and use only valid proxies.
- Function downgraded to support lower versions of Python.
### 2024/09/03
- Added local SOCKS5 listening to adapt to more software.
- Replaced some functions to support lower versions of Python.
- Enhanced display content aesthetics.
### 2024/08/31
- Reorganized the project structure.
- Enhanced display, continuously prompting the next proxy switch time.
- Supported stopping the tool with `Ctrl+C`.
- Significantly shifted to asynchronous requests, improving concurrent efficiency. Tested **1000** concurrent connections with a total of **5000** packets, losing about **50** packets, achieving approximately **99%** stability, and **500** concurrent connections with no packet loss.
- Abandoned the runtime parameter specification approach, modified to read from the local `ini` configuration file for higher usability.
- Supported local unauthenticated access to adapt to more software proxy methods.
- Added version detection feature to automatically prompt version information.
- Added identity verification for proxy server addresses, supporting only local reading as most APIs require whitelisting, thus no duplication was provided.
- Added a feature to update using `getip` only upon receiving new requests to reduce IP consumption.
- Added automatic recognition of proxy server address protocols to adapt to more proxy providers.
- Added support for HTTPS and SOCKS4 proxy protocols, currently covering HTTP, HTTPS, SOCKS5, and SOCKS4 protocols.
- Changed `asyncio.timeout()` to `asyncio.wait_for()` to support lower Python versions.
### 2024/08/25
- Automatically skipped empty lines when reading `ip.txt`.
- Replaced `httpx` with a concurrency pool to improve performance.
- Added a buffer dictionary to reduce latency for identical sites.
- Changed the logic of switching IPs on every request to randomly selecting proxies.
- Adopted more efficient structures and algorithms to optimize request handling logic.
### 2024/08/24
- Adopted an asynchronous approach to improve concurrency capabilities and reduce timeouts.
- Encapsulated duplicate code to enhance code reuse.
### 2024/08/23
- Modified concurrency logic.
- Added identity verification feature.
- Added an IP acquisition interface for permanent IP switching.
- Added a feature to switch IPs on every request.
## Development Plan
- [x] Added local server identity verification to prevent unauthorized use during public network deployment.
- [x] Added feature to switch IPs on every request.
- [x] Added a module for automatic acquisition and updating of static proxies for permanent operation.
- [x] Added load balancing mode, using multiple proxy addresses simultaneously to improve concurrency efficiency and reduce single server load.
- [x] Added version detection feature.
- [x] Added support for proxy address identity verification.
- [x] Added a feature to update using `getip` only when receiving new requests to reduce IP consumption.
- [x] Performed batch validity checks on proxy servers in `ip.txt` during the first startup.
- [x] Added local SOCKS protocol listening or fully switched to SOCKS to adapt to more software.
- [ ] Added detailed logging to record the identity of all IPs connecting to ProxyCat and support multiple users.
- [ ] Increase the web UI and provide a more powerful and user-friendly interface.
- [ ] Add Docker one click deployment, simple and easy to use.
- [ ] Develop a babycat module that can run babycat on any server or host, turning it into a proxy server.
If you have good ideas or encounter bugs during use, please contact the author through the following methods to provide feedback!
## Acknowledgements
No particular order is given. Thanks to the mentors who provided help for this project.
- [AabyssZG (曾哥)](https://github.com/AabyssZG)
- [ProbiusOfficial (探姬)](https://github.com/ProbiusOfficial)
- chars6
![Star History Chart](https://api.star-history.com/svg?repos=honmashironeko/ProxyCat&type=Date)
## Proxy Recommendations
- [Click here to purchase](https://www.ipmart.io?source=Shironeko)

Binary file not shown.

Before

Width:  |  Height:  |  Size: 123 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 218 KiB

View File

@ -1,18 +0,0 @@
from colorama import Fore
def print_banner(config):
auth_info = f"{config.get('username')}:{config.get('password')}" if config.get('username') and config.get('password') else "Not set (no authentication required)"
banner_info = [
('Public Account', 'Cherry Blossom Manor\'s Main White Cat'),
('Blog', 'https://y.shironekosan.cn'),
('Proxy Rotation Mode', 'Cycle' if config.get('mode') == 'cycle' else 'Load Balance' if config.get('mode') == 'load_balance' else 'Single Round'),
('Proxy Change Interval', f"{config.get('interval')} seconds"),
('Default Username and Password', auth_info),
('Local Listening Address (HTTP)', f"http://{auth_info + '@' if auth_info else ''}127.0.0.1:{config.get('port')}"),
('Local Listening Address (SOCKS5)', f"socks5://{auth_info + '@' if auth_info else ''}127.0.0.1:{config.get('port')}"),
('Open Source Project Seeking Star', 'https://github.com/honmashironeko/ProxyCat'),
]
print(f"{Fore.MAGENTA}{'=' * 55}")
for key, value in banner_info:
print(f"{Fore.YELLOW}{key}: {Fore.GREEN}{value}")
print(f"{Fore.MAGENTA}{'=' * 55}\n")

View File

@ -1,24 +0,0 @@
[SETTINGS]
# Local server listening port (default: 1080)
port = 1080
# # Proxy address rotation mode: cycle means circular use, once means stop when used up, and load_balance means load balancing (default: cycle)
mode = cycle
# Proxy address change time (seconds), when set to 0, the IP will be changed for each request (default: 300)
interval = 300
# Local server port authentication username (default: neko) No authentication is required when it is empty
username = neko
# Local server port authentication password (default: 123456) No authentication is required when it is empty
password = 123456
# Whether to use the getip module to obtain the proxy address True or False (default: False)
use_getip = False
# Proxy address list file (default: ip.txt)
proxy_file = ip.txt
# Whether proxy detection is enabled True or False(default True)
check_proxies = True

View File

@ -1,17 +0,0 @@
import configparser
DEFAULT_CONFIG = {
'port': 1080,
'mode': 'cycle',
'interval': 300,
'username': '',
'password': '',
'use_getip': True,
'proxy_file': 'ip.txt',
'check_proxies' : True
}
def load_config(config_file='config.ini'):
config = configparser.ConfigParser()
config.read(config_file, encoding='utf-8')
return {k: v for k, v in config['SETTINGS'].items()}

View File

@ -1,9 +0,0 @@
services:
app:
build: .
volumes:
- "./config.ini:/app/config.ini"
- "./getip.py:/app/getip.py"
- "./ip.txt:/app/ip.txt"
ports:
- "1080:1080"

View File

@ -1,12 +0,0 @@
import requests
def newip():
print("Getting new proxy IP")
url = f""
response = requests.get(url)
response.raise_for_status()
newip = "socks5://"+response.text.split("\r\n")[0]
print("The new proxy IP is:"+newip)
return newip

View File

@ -1 +0,0 @@
socks5://127.0.0.1:7890

View File

@ -1,58 +0,0 @@
import random
logo1 = """
|\ _,,,---,,_ by 本间白猫
ZZZzz /,`.-'`' -. ;-;;,_
|,4- ) )-,_. ,\ ( `'-'
'---''(_/--' `-'\_) ProxyCat
"""
logo2 = """
* ,MMM8&&&. *
MMMM88&&&&& .
MMMM88&&&&&&&
* MMM88&&&&&&&&
MMM88&&&&&&&&
'MMM88&&&&&&'
'MMM8&&&' *
/\/|_ __/\\
/ -\ /- ~\ . '
\ =_YT_ = /
/==*(` `\ ~ \ ProxyCat
/ \ / `\ by 本间白猫
| | ) ~ (
/ \ / ~ \\
\ / \~ ~/
_/\_/\_/\__ _/_/\_/\__~__/_/\_/\_/\_/\_/\_
| | | | ) ) | | | (( | | | | | |
| | | |( ( | | | \\ | | | | | |
| | | | )_) | | | |))| | | | | |
| | | | | | | | (/ | | | | | |
| | | | | | | | | | | | | | |
"""
logo3 = """
/\_/\ _
/`` \ / )
|n n |__ ( (
=(Y =.'` `\ \ \\
{`"` \ ) )
{ / |/ /
\\ ,( / /
ProxyCat) ) /-'\ ,_.' by 本间白猫
(,(,/ ((,,/
"""
logo4 = """
.-o=o-.
, /=o=o=o=\ .--.
_|\|=o=O=o=O=| \\
__.' a`\=o=o=o=(`\ /
'. a 4/`|.-""'`\ \ ;'`) .---.
\ .' / .--' |_.' / .-._)
by 本间白猫 `) _.' / /`-.__.' /
ProxyCat `'-.____; /'-.___.-'
`\"""`
"""
logos_list = [logo1, logo2, logo3, logo4]
def logos():
selected_logo = random.choice(logos_list)
print(selected_logo)

View File

@ -1,54 +0,0 @@
import httpx
import socket
import re
import asyncio
import logging
async def check_proxy(proxy):
proxy_type = proxy.split('://')[0]
check_funcs = {
'http': check_http_proxy,
'https': check_https_proxy,
'socks5': check_socks_proxy
}
if proxy_type not in check_funcs:
return False
try:
return await check_funcs[proxy_type](proxy)
except Exception as e:
logging.error(f"{proxy_type.upper()}Proxy {proxy} detection failed: {e}")
return False
async def check_http_proxy(proxy):
async with httpx.AsyncClient(proxies={'http://': proxy}, timeout=10) as client:
response = await client.get('http://www.google.com')
return response.status_code == 200
async def check_https_proxy(proxy):
async with httpx.AsyncClient(proxies={'https://': proxy}, timeout=10) as client:
response = await client.get('https://www.google.com')
return response.status_code == 200
async def check_socks_proxy(proxy):
proxy_type, proxy_addr = proxy.split('://')
proxy_host, proxy_port = proxy_addr.split(':')
proxy_port = int(proxy_port)
try:
reader, writer = await asyncio.wait_for(asyncio.open_connection(proxy_host, proxy_port), timeout=5)
writer.write(b'\x05\x01\x00')
await writer.drain()
response = await asyncio.wait_for(reader.readexactly(2), timeout=5)
writer.close()
await writer.wait_closed()
return response == b'\x05\x00'
except Exception:
return False
async def check_proxies(proxies):
valid_proxies = []
for proxy in proxies:
if await check_proxy(proxy):
valid_proxies.append(proxy)
return valid_proxies

View File

@ -1,6 +0,0 @@
requests
pysocks
colorama
httpx
packaging
httpx[socks]

View File

@ -1,26 +0,0 @@
from colorama import Fore
from packaging import version
import httpx, asyncio, re
async def check_for_updates():
try:
async with httpx.AsyncClient() as client:
response = await asyncio.wait_for(client.get("https://y.shironekosan.cn/1.html"), timeout=10)
response.raise_for_status()
content = response.text
match = re.search(r'<p>(ProxyCat-V\d+\.\d+)</p>', content)
if match:
latest_version = match.group(1)
CURRENT_VERSION = "ProxyCat-V1.9"
if version.parse(latest_version.split('-V')[1]) > version.parse(CURRENT_VERSION.split('-V')[1]):
print(f"{Fore.YELLOW}New version found! Current version: {CURRENT_VERSION}, Latest version: {latest_version}")
print(f"{Fore.YELLOW}Please visit https://pan.quark.cn/s/39b4b5674570 to get the latest version.")
print(f"{Fore.YELLOW}Please visit https://github.com/honmashironeko/ProxyCat to get the latest version.")
print(f"{Fore.YELLOW}Please visit https://pan.baidu.com/s/1C9LVC9aiaQeYFSj_2mWH1w?pwd=13r5 to get the latest version.")
else:
print(f"{Fore.GREEN}The current version is up to date ({CURRENT_VERSION})")
else:
print(f"{Fore.RED}Unable to find version information in the response")
except Exception as e:
print(f"{Fore.RED}An error occurred while checking for updates: {e}")

98
ProxyCat.py Normal file
View File

@ -0,0 +1,98 @@
from modules.modules import load_config, DEFAULT_CONFIG, check_proxies, check_for_updates, get_message, print_banner, logos
import threading, argparse, logging, asyncio, time
from modules.proxyserver import AsyncProxyServer
from colorama import init, Fore, Style
from itertools import cycle
init(autoreset=True)
class ColoredFormatter(logging.Formatter):
COLORS = {
logging.INFO: Fore.GREEN,
logging.WARNING: Fore.YELLOW,
logging.ERROR: Fore.RED,
logging.CRITICAL: Fore.RED + Style.BRIGHT,
}
def format(self, record):
log_color = self.COLORS.get(record.levelno, Fore.WHITE)
record.msg = f"{log_color}{record.msg}{Style.RESET_ALL}"
return super().format(record)
log_format = '%(asctime)s - %(levelname)s - %(message)s'
formatter = ColoredFormatter(log_format)
console_handler = logging.StreamHandler()
console_handler.setFormatter(formatter)
logging.basicConfig(level=logging.INFO, handlers=[console_handler])
def update_status(server):
while True:
if server.mode == 'load_balance':
status = f"\r{Fore.YELLOW}{get_message('current_proxy', server.language)}: {Fore.GREEN}{server.current_proxy}"
else:
time_left = server.time_until_next_switch()
status = f"\r{Fore.YELLOW}{get_message('current_proxy', server.language)}: {Fore.GREEN}{server.current_proxy} | {Fore.YELLOW}{get_message('next_switch', server.language)}: {Fore.GREEN}{time_left:.1f}{get_message('seconds', server.language)}"
print(status, end='', flush=True)
time.sleep(1)
async def handle_client_wrapper(server, reader, writer, clients):
task = asyncio.create_task(server.handle_client(reader, writer))
clients.add(task)
try:
await task
except Exception as e:
logging.error(get_message('client_handle_error', server.language, e))
finally:
clients.remove(task)
async def run_server(server):
clients = set()
server_instance = None
try:
server_instance = await asyncio.start_server(lambda r, w: handle_client_wrapper(server, r, w, clients),'0.0.0.0', int(server.config['port']))
async with server_instance:
await server_instance.serve_forever()
except asyncio.CancelledError:
logging.info(get_message('server_closing', server.language))
finally:
if server_instance:
server_instance.close()
await server_instance.wait_closed()
for client in clients:
client.cancel()
await asyncio.gather(*clients, return_exceptions=True)
async def run_proxy_check(server):
if server.config.get('check_proxies', 'False').lower() == 'true':
logging.info(get_message('proxy_check_start', server.language))
valid_proxies = await check_proxies(server.proxies)
if valid_proxies:
server.proxies = valid_proxies
server.proxy_cycle = cycle(valid_proxies)
server.current_proxy = next(server.proxy_cycle)
logging.info(get_message('valid_proxies', server.language, valid_proxies))
else:
logging.error(get_message('no_valid_proxies', server.language))
else:
logging.info(get_message('proxy_check_disabled', server.language))
if __name__ == '__main__':
parser = argparse.ArgumentParser(description=logos())
parser.add_argument('-c', '--config', default='config/config.ini', help='配置文件路径')
args = parser.parse_args()
config = load_config(args.config)
server = AsyncProxyServer(config)
print_banner(config)
asyncio.run(check_for_updates(config.get('language', 'cn').lower()))
asyncio.run(run_proxy_check(server))
status_thread = threading.Thread(target=update_status, args=(server,), daemon=True)
status_thread.start()
try:
asyncio.run(run_server(server))
except KeyboardInterrupt:
logging.info(get_message('user_interrupt', server.language))

235
README-EN.md Normal file
View File

@ -0,0 +1,235 @@
![ProxyCat](https://socialify.git.ci/honmashironeko/ProxyCat/image?description=1&descriptionEditable=A%20lightweight%20and%20excellent%20proxy%20pool%20middleware%20that%20implements%20automatic%20proxy%20rotation&font=Bitter&forks=1&issues=1&language=1&logo=https%3A%2F%2Favatars.githubusercontent.com%2Fu%2F139044047%3Fv%3D4&name=1&owner=1&pattern=Circuit%20Board&pulls=1&stargazers=1&theme=Dark)
<p align="center">
<a href="/README-EN.md">English</a>
·
<a href="/README.md">简体中文</a>
</p>
## Table of Contents
- [Development Background](#development-background)
- [Features](#features)
- [Installation and Usage](#installation-and-usage)
- [Dependencies Installation](#dependencies-installation)
- [Running the Tool](#running-the-tool)
- [Manual Proxy Entry](#iptxt-manual-proxy-entry)
- [Configuration File](#configuration-file)
- [Demo Effect](#demo-effect)
- [Using API for Automatic Proxy Retrieval](#using-api-for-automatic-proxy-retrieval)
- [Performance](#performance)
- [Disclaimer](#disclaimer)
- [Change Log](#change-log)
- [Development Plan](#development-plan)
- [Acknowledgments](#acknowledgments)
- [Proxy Recommendations](#proxy-recommendations)
## Development Background
During penetration testing, it's often necessary to hide or change IP addresses to bypass security devices. However, tunnel proxies in the market are expensive, typically costing $3-6 per day, which is unaffordable for many. The author noticed that short-term IPs offer high cost-effectiveness, with each IP costing just a few cents, averaging $0.03-0.4 per day.
Therefore, **ProxyCat** was born! This tool aims to transform short-term IPs (lasting from 1 to 60 minutes) into fixed IPs for other tools to use, creating a proxy pool server that can be used permanently after one deployment.
![Project Principle](./assets/项目原理图.png)
## Features
- **Dual Protocol Support**: Supports both SOCKS5 and HTTP protocol listening, compatible with more tools.
- **Multiple Proxy Protocols**: Supports HTTP/HTTPS/SOCKS5 proxy servers to meet various application needs.
- **Multiple Switching Modes**: Cycles through each proxy in the list sequentially; randomly selects available proxies to distribute traffic load and improve performance. Allows users to customize proxy selection logic for specific needs.
- **Function-based Proxy Retrieval**: Supports dynamic proxy retrieval through GetIP function for real-time availability.
- **Automatic Validity Detection**: Automatically detects proxy availability at startup to filter invalid proxies.
- **Switch Only During Proxy Forwarding**: Changes to new proxy server only when timer reaches zero and new requests arrive.
- **Proxy Failure Switching**: Automatically switches to new proxy if current proxy fails during traffic forwarding.
- **Proxy Pool Authentication**: Supports username/password-based authentication and blacklist/whitelist mechanisms.
- **Real-time Status Updates**: Displays current proxy status and next switch time.
- **Configurable File**: Easily adjust port, mode, authentication info via config.ini.
- **Version Detection**: Built-in version checking for automatic updates.
## Installation and Usage
### Dependencies Installation
The tool is based on Python, recommended version **Python 3.8** or above. Install dependencies using:
```bash
pip install -r requirements.txt
# Or using Chinese mirror:
pip install -r requirements.txt -i https://pypi.tuna.tsinghua.edu.cn/simple/
```
### Running the Tool
Run the following command in the project directory to view help information:
```bash
python3 ProxyCat.py -h
```
Success is indicated by this response:
```
|\ _,,,---,,_ by honmashironeko
ZZZzz /,`.-'`' -. ;-;;,_
|,4- ) )-,_. ,\ ( `'-'
'---''(_/--' `-'\_) ProxyCat
Usage: ProxyCat.py [-h] [-c]
Parameters:
-h, --help Show this help message and exit
-c C Specify config file name (default: config.ini)
```
### ip.txt Manual Proxy Entry
Add proxies to `ip.txt` in the following format (`socks5://127.0.0.1:7890` or `http://127.0.0.1:7890`), one per line:
```txt
socks5://127.0.0.1:7890
https://127.0.0.1:7890
http://127.0.0.1:7890
...
```
### Configuration File
Configure parameters in `config.ini` (or custom config file):
```ini
[SETTINGS]
# Local server listening port (default: 1080)
port = 1080
# Proxy rotation mode: cycle, custom, or load_balance (default: cycle)
mode = cycle
# Proxy change interval in seconds, 0 means change every request (default: 300)
interval = 300
# Local server authentication username (default: neko) empty means no auth
username = neko
# Local server authentication password (default: 123456) empty means no auth
password = 123456
# Whether to use getip module for proxy retrieval True/False (default: False)
use_getip = False
# Proxy list file (default: ip.txt)
proxy_file = ip.txt
# Enable proxy checking True/False (default: True)
check_proxies = True
# Language setting (cn/en)
language = en
# IP whitelist file path (empty to disable)
whitelist_file = whitelist.txt
# IP blacklist file path (empty to disable)
blacklist_file = blacklist.txt
# IP authentication priority (whitelist/blacklist)
# whitelist: Check whitelist first, allow if in whitelist
# blacklist: Check blacklist first, deny if in blacklist
ip_auth_priority = whitelist
```
After configuration, run:
```bash
python3 ProxyCat.py
```
### Demo Effect
**Fixed proxy address (default)**:
```
http://neko:123456@127.0.0.1:1080
http://127.0.0.1:1080
socks5://neko:123456@127.0.0.1:1080
socks5://127.0.0.1:1080
```
If you're deploying on a public network, simply replace `127.0.0.1` with your public IP.
![主界面图](./assets/主界面图.png)
### Using API for Automatic Proxy Retrieval
The tool supports direct API calls to obtain proxy addresses. When you configure `use_getip = True`, the tool will no longer read from local `ip.txt` but instead execute **getip.py** script to obtain new proxy addresses (ensure your IP is whitelisted, and the format should be IP:port, only one proxy address can be used each time).
In this case, you need to modify the content of **getip.py** to your own API, with format `IP:PORT`. Default protocol is `socks5`, manually change to `http` if needed.
## Performance
Through actual testing, when proxy server performance is sufficient, ProxyCat can handle **1000** concurrent connections without packet loss, covering most scanning and penetration testing needs.
![性能测试图](./assets/性能测试图.png)
## Disclaimer
- By downloading, installing, using, or modifying this tool and related code, you indicate your trust in this tool.
- We take no responsibility for any form of loss or damage caused to yourself or others while using this tool.
- You are solely responsible for any illegal activities during your use of this tool, and we bear no legal or associated liability.
- Please carefully read and fully understand all terms, especially liability exemption or limitation clauses, and choose to accept or not.
- Unless you have read and accepted all terms of this agreement, you have no right to download, install, or use this tool.
- Your download, installation, and usage actions indicate you have read and agreed to be bound by the above agreement.
## Change Log
### 2025/01/02
- Restructured software architecture for better usability
- Added blacklist/whitelist mechanism for authentication
- GetIP method now only requests proxy after receiving first request to prevent resource waste
- Changed language configuration logic, now controlled via config.ini parameter
- Updated configuration panel, addresses can be copied without username/password
- Added docker deployment support
### 2024/10/23
- Restructured code, split into separate files
- Added automatic proxy switching when current proxy fails during forwarding
[Additional change log entries follow similar pattern...]
## Development Plan
- [x] Added local server authentication
- [x] Added IP change per request feature
- [x] Added static proxy auto-update module
- [x] Added load balancing mode
- [x] Added version detection
- [x] Added proxy authentication support
- [x] Added request-triggered getip updates
- [x] Added initial proxy validity check
- [x] Added SOCKS protocol support
- [ ] Add detailed logging with multi-user support
- [ ] Add Web UI interface
- [x] Add docker deployment
- [ ] Develop babycat module
For feedback or suggestions, please contact via WeChat Official Account: **樱花庄的本间白猫**
## Acknowledgments
In no particular order, thanks to:
- [AabyssZG](https://github.com/AabyssZG)
- [ProbiusOfficial](https://github.com/ProbiusOfficial)
- [gh0stkey](https://github.com/gh0stkey)
- chars6
- qianzai
- ziwindlu
![Star History Chart](https://api.star-history.com/svg?repos=honmashironeko/ProxyCat&type=Date)
## Proxy Recommendations
- [First affordable proxy service - Get 5000 free IPs + ¥10 coupon with invite code](https://h.shanchendaili.com/invite_reg.html?invite=fM6fVG)
- [Various carrier data plans](https://172.lot-ml.com/ProductEn/Index/0b7c9adef5e9648f)
- [Click here to purchase](https://www.ipmart.io?source=Shironeko)

View File

@ -1,7 +1,7 @@
![ProxyCat](https://socialify.git.ci/honmashironeko/ProxyCat/image?description=1&descriptionEditable=%E4%B8%80%E6%AC%BE%E8%BD%BB%E9%87%8F%E7%BA%A7%E7%9A%84%E4%BC%98%E7%A7%80%E4%BB%A3%E7%90%86%E6%B1%A0%E4%B8%AD%E9%97%B4%E4%BB%B6%EF%BC%8C%E5%AE%9E%E7%8E%B0%E4%BB%A3%E7%90%86%E7%9A%84%E8%87%AA%E5%8A%A8%E8%BD%AE%E6%8D%A2&font=Bitter&forks=1&issues=1&language=1&logo=https%3A%2F%2Favatars.githubusercontent.com%2Fu%2F139044047%3Fv%3D4&name=1&owner=1&pattern=Circuit%20Board&pulls=1&stargazers=1&theme=Dark)
<p align="center">
<a href="/ProxyCat-EN/README-EN.md">English</a>
<a href="/README-EN.md">English</a>
·
<a href="/README.md">简体中文</a>
</p>
@ -31,7 +31,7 @@
综上所述,**ProxyCat** 应运而生本工具旨在将持续时间仅有1分钟至60分钟不等的短效IP转变为固定IP供其他工具使用形成代理池服务器部署一次即可永久使用。
![项目原理图](./assets/202408260021207.png)
![项目原理图](./assets/项目原理图.png)
## 功能特点
@ -42,7 +42,7 @@
- **自动检测有效性**:在启动时自动检测代理的可用性,过滤无效代理,确保代理列表的可靠性。
- **仅在代理转发时切换**:在计时器归零时,有新的请求才会更换为新的代理服务器,防止运行时一直消耗资源。
- **支持代理失效切换**:在转发流量过程中,遇到代理服务器突然失效,可自动切换到新的代理上。
- **代理池身份认证**:支持基于用户名和密码的代理认证,增强代理的安全性,防止未授权访问。
- **代理池身份认证**:支持基于用户名和密码的代理认证和黑白名单机制的代理认证,增强代理的安全性,防止未授权访问。
- **实时状态更新**:显示当前代理状态和下次切换时间,帮助用户了解代理动态。
- **可配置文件**:通过 config.ini 文件轻松调整端口、模式、认证信息等参数,适应不同使用场景。
- **版本检测**:内置版本检测功能,自动检查最新版本并提醒用户更新,确保软件的持续优化。
@ -124,6 +124,21 @@ proxy_file = ip.txt
# 是否启用代理检测功能 True or False(默认为True)
check_proxies = True
# 语言设置 (cn/en)
# Language setting (cn/en)
language = cn
# IP白名单文件路径留空则不启用白名单
whitelist_file = whitelist.txt
# IP黑名单文件路径留空则不启用黑名单
blacklist_file = blacklist.txt
# IP认证优先级whitelist/blacklist
# whitelist: 优先判断白名单在白名单中的IP直接放行
# blacklist: 优先判断黑名单在黑名单中的IP直接拒绝
ip_auth_priority = whitelist
```
配置对应参数后即可使用:
@ -145,11 +160,11 @@ socks5://127.0.0.1:1080
如果您是部署在公网,将 `127.0.0.1` 替换为您的公网IP即可。
![界面展示图](./assets/Clip_2024-08-29_10-15-56.png)
![主界面图](./assets/主界面图.png)
### 使用接口自动获取代理地址
工具支持直接调用代理地址获取的API接口。当您配置 `use_getip = True` 时,工具将不再从本地 `ip.txt` 中读取代理地址,而是通过执行 **getip.py** 脚本来获取新的代理地址请确保您的IP已加白名单
工具支持直接调用代理地址获取的API接口。当您配置 `use_getip = True` 时,工具将不再从本地 `ip.txt` 中读取代理地址,而是通过执行 **getip.py** 脚本来获取新的代理地址请确保您的IP已加白名单并且格式为IP:端口,每次只能使用一个代理地址)。
此时,您需要将 **getip.py** 的内容修改为您自己的接口,格式为 `IP:PORT`。默认为 `socks5` 协议,如需使用 `http`,请手动更改。
@ -157,7 +172,7 @@ socks5://127.0.0.1:1080
经过实际测试在代理地址服务器性能充足的情况下ProxyCat 能够处理 **1000** 并发连接且不丢包,基本可以覆盖大部分扫描和渗透测试需求。
![性能测试图](./assets/8e3f79309626ed0e653ba51b6482bff.png)
![性能测试图](./assets/性能测试图.png)
## 免责申明
@ -170,6 +185,15 @@ socks5://127.0.0.1:1080
## 更新日志
### 2025/01/02
- 重构软件结构,更加整洁易用。
- 新增支持黑白名单机制进行身份认证。
- 在使用GetIP方式的时候需要先收到一次请求才会获取代理防止每次运行都浪费资金。
- 语言配置逻辑更改不再分为两个版本通过config.ini文件中的语言配置参数进行显示。
- 配置信息面板更新,不配置账号密码的情况下也能直接复制地址使用。
- 新增docker方式部署。
### **2024/10/23**
- 重构代码结构,将部分代码分割成单独文件。
@ -249,7 +273,7 @@ socks5://127.0.0.1:1080
- [x] 增加本地监听 SOCKS 协议,或全面改成 SOCKS以适配更多软件。
- [ ] 增加详细日志记录,记录所有连接 ProxyCat 的 IP 身份,支持多用户。
- [ ] 增加Web UI提供更加强大易用的界面。
- [ ] 增加docker一键部署简单易用。
- [x] 增加docker一键部署简单易用。
- [ ] 开发 babycat 模块,可将 babycat 在任意服务器或主机上运行,即可变成一台代理服务器。
如果您有好的创意或在使用过程中遇到bug请通过以下方式联系作者反馈
@ -262,7 +286,10 @@ socks5://127.0.0.1:1080
- [AabyssZG (曾哥)](https://github.com/AabyssZG)
- [ProbiusOfficial (探姬)](https://github.com/ProbiusOfficial)
- [gh0stkey (EvilChen)](https://github.com/gh0stkey)
- chars6
- qianzai千载
- ziwindlu
![Star History Chart](https://api.star-history.com/svg?repos=honmashironeko/ProxyCat&type=Date)
@ -270,9 +297,11 @@ socks5://127.0.0.1:1080
开源不易,如果您觉得工具不错,或许可以试着赞助一下作者的开发哦~
![赞助](./assets/202408260020820.png)
![赞助](./assets/赞助.png)
## 代理推荐
- [第一家便宜大碗代理购买用邀请码注册得5000免费IP+10元优惠券](https://h.shanchendaili.com/invite_reg.html?invite=fM6fVG)
- [各大运营商流量卡](https://172.lot-ml.com/ProductEn/Index/0b7c9adef5e9648f)
- [各大运营商流量卡](https://172.lot-ml.com/ProductEn/Index/0b7c9adef5e9648f)
- [国外匿名代理](https://www.ipmart.io?source=Shironeko)

Binary file not shown.

Before

Width:  |  Height:  |  Size: 186 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 115 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 123 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 251 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 190 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 86 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 142 KiB

BIN
assets/主界面图.png Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 172 KiB

View File

Before

Width:  |  Height:  |  Size: 86 KiB

After

Width:  |  Height:  |  Size: 86 KiB

View File

Before

Width:  |  Height:  |  Size: 98 KiB

After

Width:  |  Height:  |  Size: 98 KiB

View File

Before

Width:  |  Height:  |  Size: 251 KiB

After

Width:  |  Height:  |  Size: 251 KiB

0
config/blacklist.txt Normal file
View File

52
config/config.ini Normal file
View File

@ -0,0 +1,52 @@
[SETTINGS]
# 本地服务器监听端口(默认为:1080)
# Local server listening port (default:1080)
port = 1080
# 代理地址轮换模式cycle 表示循环使用custom 表示使用自定义模式load_balance 表示负载均衡(默认为:cycle)
# Proxy rotation mode: cycle means cyclic use, custom means custom mode, load_balance means load balancing (default:cycle)
mode = cycle
# 代理地址更换时间(秒),设置为 0 时每次请求都更换 IP(默认为:300)
# Proxy address rotation interval (seconds), when set to 0, IP changes with each request (default:300)
interval = 300
# 本地服务器端口认证用户名((默认为:neko)当为空时不需要认证
# Local server authentication username (default:neko), no authentication required when empty
username = neko
# 本地服务器端口认证密码(默认为:123456)当为空时不需要认证
# Local server authentication password (default:123456), no authentication required when empty
password = 123456
# 是否使用 getip 模块获取代理地址 True or False(默认为:False)
# Whether to use getip module to obtain proxy addresses True or False (default:False)
use_getip = False
# 代理地址列表文件(默认为:ip.txt)
# Proxy address list file (default:ip.txt)
proxy_file = ip.txt
# 是否启用代理检测功能 True or False(默认为True)
# Whether to enable proxy detection feature True or False (default:True)
check_proxies = True
# 语言设置 (cn/en)
# Language setting (cn/en)
language = cn
# IP白名单文件路径留空则不启用白名单
# IP whitelist file path (leave empty to disable whitelist)
whitelist_file = whitelist.txt
# IP黑名单文件路径留空则不启用黑名单
# IP blacklist file path (leave empty to disable blacklist)
blacklist_file = blacklist.txt
# IP认证优先级whitelist/blacklist
# IP authentication priority (whitelist/blacklist)
# whitelist: 优先判断白名单在白名单中的IP直接放行
# whitelist: prioritize whitelist check, IPs in whitelist are allowed directly
# blacklist: 优先判断黑名单在黑名单中的IP直接拒绝
# blacklist: prioritize blacklist check, IPs in blacklist are rejected directly
ip_auth_priority = whitelist

15
config/getip.py Normal file
View File

@ -0,0 +1,15 @@
from modules.modules import get_message, load_config
import requests
def newip():
config = load_config()
language = config.get('language', 'cn')
print(get_message('getting_new_proxy', language))
url = f""
response = requests.get(url)
response.raise_for_status()
newip = "socks5://" + response.text.split("\r\n")[0]
print(get_message('new_proxy_is', language, newip))
return newip

1
config/whitelist.txt Normal file
View File

@ -0,0 +1 @@
127.0.0.1

View File

@ -1,20 +0,0 @@
<p align="center">
<a href="/docker/README_EN.md">English</a>
·
<a href="/docker/README.md">简体中文</a>
</p>
## 为什么要使用脚本生成dockerfile
本项目分为多种语言,不同语言内容可能存在差异,为了减少维护成本 使用脚本生成dockerfile方便后续维护。
## 使用方法
*生成dockerfile和docker-compose.yml*
``` shell
cd docker
python docker_tools.py -l CN
```
> 执行结束后会生成命令,按照指引运行即可

View File

@ -1,22 +0,0 @@
<p align="center">
<a href="/docker/README_EN.md">English</a>
·
<a href="/docker/README.md">简体中文</a>
</p>
Warning: this document uses machine translation
## Why Use a Script to Generate Dockerfiles
This project supports multiple languages, and the content may vary between languages. To reduce maintenance costs, scripts are used to generate Dockerfiles, making it easier for future updates.
## Usage
*Generate Dockerfile and docker-compose.yml*
```shell
cd docker
python docker_tools.py -l CN
```
> After execution, commands will be generated. Follow the instructions to run them.

View File

@ -1,122 +0,0 @@
import os
import logging
from typing import Dict
# todo: 优化docker-compose
# 项目根目录
PROJECT_ROOT: str = os.path.realpath('../')
# 模板目录
TEMPLATE_PATH: str = os.path.join(PROJECT_ROOT, 'docker/template')
# venv bin 路径
VENV_BIN_PATH = '/venv/bin'
# 语言
language: str
# 当前语言配置
current_config: 'DockerConfig'
class DockerConfig:
def __init__(self, source_path, run_file='ProxyCat-V1.9.py',
pip_install_cmd=f'{VENV_BIN_PATH}/pip install --no-cache-dir -r requirements.txt'):
# 源代码路径,因每个语言项目目录不同,所以需要传入项目根目录
self.source_path = os.path.join(PROJECT_ROOT, source_path)
# pip install命令
self.pip_install_cmd = pip_install_cmd
# 启动文件
self.run_file = run_file
def get_template_fields4dockerfile(self):
return {
'run_file': self.run_file,
'pip_install_cmd': self.pip_install_cmd
}
def get_template_fields4docker_compose(self):
return {
'none': None
}
configs: Dict[str, DockerConfig] = {
'CN': DockerConfig(source_path='ProxyCat-CN',
# 符合中国宝宝体质
pip_install_cmd=f'{VENV_BIN_PATH}/pip install --no-cache-dir -r requirements.txt' \
' -i https://pypi.tuna.tsinghua.edu.cn/simple/'),
'EN': DockerConfig(source_path='ProxyCat-EN')
}
def get_args():
import argparse
parser = argparse.ArgumentParser(description='a generate docker compose file tool')
parser.add_argument('-l', '--language', default='CN', help="LANGUAGE(support: CN, EN),default: CN")
return parser.parse_args()
def get_template(template_name: str) -> str:
with open(os.path.join(TEMPLATE_PATH, template_name), 'r') as f:
return f.read()
def write_file(file_name, content: str):
def w_f(p):
with open(p, 'w') as f:
f.write(content)
logging.info(f"write {p} success")
# 文件路径
f_p = os.path.join(current_config.source_path, file_name)
# 目录不存在报错
if not os.path.exists(os.path.dirname(f_p)):
logging.error(f"{os.path.dirname(f_p)} dir is not exists")
if os.path.exists(f_p):
logging.warning(f"{f_p} is exists")
confirm = input(f'{f_p} is exists, overwrite?(Y/n)')
if confirm.strip().lower() == 'y' or confirm.strip() == '':
w_f(f_p)
else:
logging.info(f"{f_p} is not overwrite")
else:
w_f(f_p)
def generate_docker_compose():
t = get_template('docker-compose_template')
dockerfile_content = t.format(
**current_config.get_template_fields4docker_compose()
)
write_file('docker-compose.yml', dockerfile_content)
def generate_docker_file():
t = get_template('Dockerfile_template')
dockerfile_content = t.format(
**current_config.get_template_fields4dockerfile()
)
write_file('Dockerfile', dockerfile_content)
def get_test_cmd():
return \
f'docker: \n' + \
f'\tcd {current_config.source_path} \n' + \
f'\tdocker build -t proxycat:latest . \n' + \
f'\tdocker run -it proxycat:latest -h\n' + \
f'docker-compose\n' + \
f'\tcd {current_config.source_path} \n' + \
f'\tdocker-compose up -d'
def init(args):
global language, current_config
language = args.language.upper()
current_config = configs.get(language)
if __name__ == '__main__':
the_args = get_args()
init(the_args)
generate_docker_file()
generate_docker_compose()
print(get_test_cmd())

View File

@ -1,12 +0,0 @@
FROM python:3.8
WORKDIR /app
COPY . /app
RUN python -m venv /venv
RUN {pip_install_cmd}
EXPOSE 1080
ENTRYPOINT ["/venv/bin/python", "{run_file}"]

View File

@ -1,9 +0,0 @@
services:
app:
build: .
volumes:
- "./config.ini:/app/config.ini"
- "./getip.py:/app/getip.py"
- "./ip.txt:/app/ip.txt"
ports:
- "1080:1080"

342
modules/modules.py Normal file
View File

@ -0,0 +1,342 @@
import asyncio, logging, random, httpx, re, os
from configparser import ConfigParser
from packaging import version
from colorama import Fore
MESSAGES = {
'cn': {
'getting_new_proxy': '正在获取新的代理IP',
'new_proxy_is': '新的代理IP为: {}',
'proxy_check_start': '开始检测代理地址...',
'proxy_check_disabled': '代理检测已禁用',
'valid_proxies': '有效代理地址: {}',
'no_valid_proxies': '没有有效的代理地址',
'proxy_check_failed': '{}代理 {} 检测失败: {}',
'proxy_switch': '切换到新的代理: {}',
'proxy_switch_detail': '已切换代理: {} -> {}',
'proxy_consecutive_fails': '代理 {} 连续失败 {} 次,正在切换新代理',
'proxy_invalid': '代理 {} 已失效,立即切换新代理',
'connection_timeout': '连接超时',
'proxy_invalid_switching': '代理地址失效,切换代理地址',
'data_transfer_timeout': '数据传输超时,正在重试...',
'connection_reset': '连接被重置',
'transfer_cancelled': '传输被取消',
'data_transfer_error': '数据传输错误: {}',
'unsupported_protocol': '不支持的协议请求: {}',
'client_error': '客户端处理出错: {}',
'response_write_error': '响应写入错误: {}',
'server_closing': '服务器正在关闭...',
'program_interrupted': '程序被用户中断',
'multiple_proxy_fail': '多次尝试获取有效代理失败,退出程序',
'current_proxy': '当前代理',
'next_switch': '下次切换',
'seconds': '',
'no_proxies_available': '没有可用的代理',
'proxy_file_not_found': '代理文件不存在: {}',
'auth_not_set': '未设置 (无需认证)',
'public_account': '公众号',
'blog': '博客',
'proxy_mode': '代理轮换模式',
'cycle': '循环',
'load_balance': '负载均衡',
'single_round': '单轮',
'proxy_interval': '代理更换时间',
'default_auth': '默认账号密码',
'local_http': '本地监听地址 (HTTP)',
'local_socks5': '本地监听地址 (SOCKS5)',
'star_project': '开源项目求 Star',
'client_request_error': '客户端请求错误: {}',
'client_handle_error': '客户端处理错误: {}',
'proxy_invalid_switch': '代理无效,切换代理',
'request_fail_retry': '请求失败,重试剩余次数: {}',
'request_error': '请求错误: {}',
'user_interrupt': '用户中断程序',
'new_version_found': '发现新版本!',
'visit_quark': '请访问 https://pan.quark.cn/s/39b4b5674570 获取最新版本。',
'visit_github': '请访问 https://github.com/honmashironeko/ProxyCat 获取最新版本。',
'visit_baidu': '请访问 https://pan.baidu.com/s/1C9LVC9aiaQeYFSj_2mWH1w?pwd=13r5 获取最新版本。',
'latest_version': '当前版本已是最新',
'version_info_not_found': '无法在响应中找到版本信息',
'update_check_error': '检查更新时发生错误: {}',
'unauthorized_ip': '未授权的IP尝试访问: {}',
'client_cancelled': '客户端连接已取消',
'socks5_connection_error': 'SOCKS5连接错误: {}',
'connect_timeout': '连接超时',
'connection_reset': '连接被重置',
'transfer_cancelled': '传输已取消',
'client_request_error': '客户端请求处理错误: {}',
'unsupported_protocol': '不支持的协议: {}',
'proxy_invalid_switch': '代理无效,正在切换',
'request_retry': '请求失败,重试中 (剩余{}次)',
'request_error': '请求过程中出错: {}',
'response_write_error': '写入响应时出错: {}',
'consecutive_failures': '检测到连续代理失败: {}',
'invalid_proxy': '当前代理无效: {}',
'proxy_switched': '已从代理 {} 切换到 {}'
},
'en': {
'getting_new_proxy': 'Getting new proxy IP',
'new_proxy_is': 'New proxy IP is: {}',
'proxy_check_start': 'Starting proxy check...',
'proxy_check_disabled': 'Proxy check is disabled',
'valid_proxies': 'Valid proxies: {}',
'no_valid_proxies': 'No valid proxies found',
'proxy_check_failed': '{} proxy {} check failed: {}',
'proxy_switch': 'Switching to new proxy: {}',
'proxy_switch_detail': 'Switched proxy: {} -> {}',
'proxy_consecutive_fails': 'Proxy {} failed {} times consecutively, switching to new proxy',
'proxy_invalid': 'Proxy {} is invalid, switching immediately',
'connection_timeout': 'Connection timeout',
'proxy_invalid_switching': 'Proxy invalid, switching to new proxy',
'data_transfer_timeout': 'Data transfer timeout, retrying...',
'connection_reset': 'Connection reset',
'transfer_cancelled': 'Transfer cancelled',
'data_transfer_error': 'Data transfer error: {}',
'unsupported_protocol': 'Unsupported protocol request: {}',
'client_error': 'Client handling error: {}',
'response_write_error': 'Response write error: {}',
'server_closing': 'Server is closing...',
'program_interrupted': 'Program interrupted by user',
'multiple_proxy_fail': 'Multiple attempts to get valid proxy failed, exiting',
'current_proxy': 'Current Proxy',
'next_switch': 'Next Switch',
'seconds': 's',
'no_proxies_available': 'No proxies available',
'proxy_file_not_found': 'Proxy file not found: {}',
'auth_not_set': 'Not set (No authentication required)',
'public_account': 'WeChat Public Number',
'blog': 'Blog',
'proxy_mode': 'Proxy Rotation Mode',
'cycle': 'Cycle',
'load_balance': 'Load Balance',
'single_round': 'Single Round',
'proxy_interval': 'Proxy Change Interval',
'default_auth': 'Default Username and Password',
'local_http': 'Local Listening Address (HTTP)',
'local_socks5': 'Local Listening Address (SOCKS5)',
'star_project': 'Star the Project',
'client_request_error': 'Client request error: {}',
'client_handle_error': 'Client handling error: {}',
'proxy_invalid_switch': 'Proxy invalid, switching proxy',
'request_fail_retry': 'Request failed, retrying remaining times: {}',
'request_error': 'Request error: {}',
'user_interrupt': 'User interrupted the program',
'new_version_found': 'New version found!',
'visit_quark': 'Please visit https://pan.quark.cn/s/39b4b5674570 to get the latest version.',
'visit_github': 'Please visit https://github.com/honmashironeko/ProxyCat to get the latest version.',
'visit_baidu': 'Please visit https://pan.baidu.com/s/1C9LVC9aiaQeYFSj_2mWH1w?pwd=13r5 to get the latest version.',
'latest_version': 'You are using the latest version',
'version_info_not_found': 'Version information not found in the response',
'update_check_error': 'Error occurred while checking for updates: {}',
'unauthorized_ip': 'Unauthorized IP attempt: {}',
'client_cancelled': 'Client connection cancelled',
'socks5_connection_error': 'SOCKS5 connection error: {}',
'connect_timeout': 'Connection timeout',
'connection_reset': 'Connection reset',
'transfer_cancelled': 'Transfer cancelled',
'data_transfer_error': 'Data transfer error: {}',
'client_request_error': 'Client request handling error: {}',
'unsupported_protocol': 'Unsupported protocol: {}',
'proxy_invalid_switch': 'Proxy invalid, switching',
'request_retry': 'Request failed, retrying ({} left)',
'request_error': 'Error during request: {}',
'response_write_error': 'Error writing response: {}',
'consecutive_failures': 'Consecutive proxy failures detected for {}',
'invalid_proxy': 'Current proxy is invalid: {}',
'proxy_switched': 'Switched from proxy {} to {}'
}
}
def get_message(key, lang='cn', *args):
try:
return MESSAGES[lang][key].format(*args) if args else MESSAGES[lang][key]
except KeyError:
return MESSAGES['cn'][key] if key in MESSAGES['cn'] else key
def print_banner(config):
language = config.get('language', 'cn').lower()
has_auth = config.get('username') and config.get('password')
auth_info = f"{config.get('username')}:{config.get('password')}" if has_auth else get_message('auth_not_set', language)
http_addr = f"http://{auth_info}@127.0.0.1:{config.get('port')}" if has_auth else f"http://127.0.0.1:{config.get('port')}"
socks5_addr = f"socks5://{auth_info}@127.0.0.1:{config.get('port')}" if has_auth else f"socks5://127.0.0.1:{config.get('port')}"
banner_info = [
(get_message('public_account', language), '樱花庄的本间白猫'),
(get_message('blog', language), 'https://y.shironekosan.cn'),
(get_message('proxy_mode', language), get_message('cycle', language) if config.get('mode') == 'cycle' else get_message('load_balance', language) if config.get('mode') == 'load_balance' else get_message('single_round', language)),
(get_message('proxy_interval', language), f"{config.get('interval')}{get_message('seconds', language)}"),
(get_message('default_auth', language), auth_info),
(get_message('local_http', language), http_addr),
(get_message('local_socks5', language), socks5_addr),
(get_message('star_project', language), 'https://github.com/honmashironeko/ProxyCat'),
]
print(f"{Fore.MAGENTA}{'=' * 55}")
for key, value in banner_info:
print(f"{Fore.YELLOW}{key}: {Fore.GREEN}{value}")
print(f"{Fore.MAGENTA}{'=' * 55}\n")
logo1 = r"""
|\ _,,,---,,_ by 本间白猫
ZZZzz /,`.-'`' -. ;-;;,_
|,4- ) )-,_. ,\ ( `'-'
'---''(_/--' `-'\_) ProxyCat
"""
logo2 = r"""
* ,MMM8&&&. *
MMMM88&&&&& .
MMMM88&&&&&&&
* MMM88&&&&&&&&
MMM88&&&&&&&&
'MMM88&&&&&&'
'MMM8&&&' *
/\/|_ __/\\
/ -\ /- ~\ . '
\ =_YT_ = /
/==*(` `\ ~ \ ProxyCat
/ \ / `\ by 本间白猫
| | ) ~ (
/ \ / ~ \\
\ / \~ ~/
_/\_/\_/\__ _/_/\_/\__~__/_/\_/\_/\_/\_/\_
| | | | ) ) | | | (( | | | | | |
| | | |( ( | | | \\ | | | | | |
| | | | )_) | | | |))| | | | | |
| | | | | | | | (/ | | | | | |
| | | | | | | | | | | | | | |
"""
logo3 = r"""
/\_/\ _
/`` \ / )
|n n |__ ( (
=(Y =.'` `\ \ \\
{`"` \ ) )
{ / |/ /
\\ ,( / /
ProxyCat) ) /-'\ ,_.' by 本间白猫
(,(,/ ((,,/
"""
logo4 = r"""
.-o=o-.
, /=o=o=o=\ .--.
_|\|=o=O=o=O=| \\
__.' a`\=o=o=o=(`\ /
'. a 4/`|.-""'`\ \ ;'`) .---.
\ .' / .--' |_.' / .-._)
by 本间白猫 `) _.' / /`-.__.' /
ProxyCat `'-.____; /'-.___.-'
`\"""`
"""
logos_list = [logo1, logo2, logo3, logo4]
def logos():
selected_logo = random.choice(logos_list)
print(selected_logo)
DEFAULT_CONFIG = {
'port': '1080',
'mode': 'cycle',
'interval': '300',
'username': 'neko',
'password': '123456',
'use_getip': 'False',
'proxy_file': 'ip.txt',
'check_proxies': 'True',
'whitelist_file': '',
'blacklist_file': '',
'ip_auth_priority': 'whitelist',
'language': 'cn'
}
def load_config(config_file='config/config.ini'):
config = ConfigParser()
config.read(config_file, encoding='utf-8')
settings = {}
if config.has_section('SETTINGS'):
settings.update(dict(config.items('SETTINGS')))
for key in ['proxy_file', 'whitelist_file', 'blacklist_file']:
if key in settings and settings[key]:
config_dir = os.path.dirname(config_file)
settings[key] = os.path.join(config_dir, settings[key])
return {**DEFAULT_CONFIG, **settings}
def load_ip_list(file_path):
if not file_path or not os.path.exists(file_path):
return set()
with open(file_path, 'r') as f:
return {line.strip() for line in f if line.strip()}
async def check_proxy(proxy):
proxy_type = proxy.split('://')[0]
check_funcs = {
'http': check_http_proxy,
'https': check_https_proxy,
'socks5': check_socks_proxy
}
if proxy_type not in check_funcs:
return False
try:
return await check_funcs[proxy_type](proxy)
except Exception as e:
logging.error(f"{proxy_type.upper()}代理 {proxy} 检测失败: {e}")
return False
async def check_http_proxy(proxy):
async with httpx.AsyncClient(proxies={'http://': proxy}, timeout=10) as client:
response = await client.get('http://www.baidu.com')
return response.status_code == 200
async def check_https_proxy(proxy):
async with httpx.AsyncClient(proxies={'https://': proxy}, timeout=10) as client:
response = await client.get('https://www.baidu.com')
return response.status_code == 200
async def check_socks_proxy(proxy):
proxy_type, proxy_addr = proxy.split('://')
proxy_host, proxy_port = proxy_addr.split(':')
proxy_port = int(proxy_port)
try:
reader, writer = await asyncio.wait_for(asyncio.open_connection(proxy_host, proxy_port), timeout=5)
writer.write(b'\x05\x01\x00')
await writer.drain()
response = await asyncio.wait_for(reader.readexactly(2), timeout=5)
writer.close()
await writer.wait_closed()
return response == b'\x05\x00'
except Exception:
return False
async def check_proxies(proxies):
valid_proxies = []
for proxy in proxies:
if await check_proxy(proxy):
valid_proxies.append(proxy)
return valid_proxies
async def check_for_updates(language='cn'):
try:
async with httpx.AsyncClient() as client:
response = await asyncio.wait_for(client.get("https://y.shironekosan.cn/1.html"), timeout=10)
response.raise_for_status()
content = response.text
match = re.search(r'<p>(ProxyCat-V\d+\.\d+)</p>', content)
if match:
latest_version = match.group(1)
CURRENT_VERSION = "ProxyCat-V1.9.1"
if version.parse(latest_version.split('-V')[1]) > version.parse(CURRENT_VERSION.split('-V')[1]):
print(f"{Fore.YELLOW}{get_message('new_version_found', language)} 当前版本: {CURRENT_VERSION}, 最新版本: {latest_version}")
print(f"{Fore.YELLOW}{get_message('visit_quark', language)}")
print(f"{Fore.YELLOW}{get_message('visit_github', language)}")
print(f"{Fore.YELLOW}{get_message('visit_baidu', language)}")
else:
print(f"{Fore.GREEN}{get_message('latest_version', language)} ({CURRENT_VERSION})")
else:
print(f"{Fore.RED}{get_message('version_info_not_found', language)}")
except Exception as e:
print(f"{Fore.RED}{get_message('update_check_error', language, e)}")

515
modules/proxyserver.py Normal file
View File

@ -0,0 +1,515 @@
import asyncio, httpx, logging, re, socket, struct, time, socket, base64, random
from modules.modules import get_message, load_ip_list
from itertools import cycle
from config import getip
def load_proxies(file_path='ip.txt'):
with open(file_path, 'r') as file:
return [line.strip() for line in file if '://' in line]
def validate_proxy(proxy):
pattern = re.compile(r'^(?P<scheme>socks5|http|https)://(?P<host>[^:]+):(?P<port>\d+)$')
return pattern.match(proxy) is not None
class AsyncProxyServer:
def __init__(self, config):
self.config = config
self.username = self.config['username'].strip()
self.password = self.config['password'].strip()
self.auth_required = bool(self.username and self.password)
self.mode = self.config['mode']
self.interval = int(self.config['interval'])
self.use_getip = self.config.get('use_getip', 'False').lower() == 'true'
self.proxy_file = self.config['proxy_file']
self.language = self.config.get('language', 'cn').lower()
self.whitelist = load_ip_list(config.get('whitelist_file', ''))
self.blacklist = load_ip_list(config.get('blacklist_file', ''))
self.ip_auth_priority = config.get('ip_auth_priority', 'whitelist')
if not self.use_getip:
self.proxies = self._load_file_proxies()
self.proxy_cycle = cycle(self.proxies)
self.current_proxy = next(self.proxy_cycle) if self.proxies else "No proxies available"
else:
self.proxies = []
self.proxy_cycle = None
self.current_proxy = None
self.last_switch_time = time.time()
self.rate_limiter = asyncio.Queue(maxsize=3000)
self.proxy_failed = False
self.proxy_fail_count = 0
self.max_fail_count = 2
async def get_next_proxy(self):
if self.mode == 'load_balance':
return random.choice(self.proxies)
elif self.mode == 'custom':
return await self.custom_proxy_switch()
if time.time() - self.last_switch_time >= self.interval:
await self.get_proxy()
if self.use_getip and not self.current_proxy:
self.current_proxy = await self._load_getip_proxy()
return self.current_proxy
async def _load_getip_proxy(self):
valid_proxies = []
for _ in range(4):
new_ip = getip.newip()
if validate_proxy(new_ip):
valid_proxies.append(new_ip)
break
else:
logging.error("Failed to get a valid proxy after multiple attempts")
exit(1)
return valid_proxies[0]
def _load_file_proxies(self):
try:
with open(self.proxy_file, 'r') as file:
proxies = [line.strip() for line in file if '://' in line]
valid_proxies = [p for p in proxies if validate_proxy(p)]
if not valid_proxies:
logging.error("No valid proxies found in the file")
exit(1)
return valid_proxies
except FileNotFoundError:
logging.error(f"Proxy file not found: {self.proxy_file}")
exit(1)
async def get_proxy(self):
if self.use_getip:
self.current_proxy = getip.newip()
else:
self.current_proxy = next(self.proxy_cycle)
self.last_switch_time = time.time()
logging.info(f"Switched to proxy: {self.current_proxy}")
async def custom_proxy_switch(self):
return self.proxies[0] if self.proxies else "No proxies available"
def time_until_next_switch(self):
return float('inf') if self.mode == 'load_balance' else max(0, self.interval - (time.time() - self.last_switch_time))
async def acquire(self):
await self.rate_limiter.put(None)
await asyncio.sleep(0.001)
self.rate_limiter.get_nowait()
def check_ip_auth(self, ip):
if self.ip_auth_priority == 'whitelist':
if self.whitelist and ip in self.whitelist:
return True
if self.blacklist and ip in self.blacklist:
return False
return not self.whitelist
else:
if self.blacklist and ip in self.blacklist:
return False
if self.whitelist and ip in self.whitelist:
return True
return not self.blacklist
async def handle_client(self, reader, writer):
try:
client_ip = writer.get_extra_info('peername')[0]
if not self.check_ip_auth(client_ip):
logging.warning(get_message('unauthorized_ip', self.language, client_ip))
writer.close()
await writer.wait_closed()
return
first_byte = await reader.read(1)
if not first_byte:
return
if (first_byte == b'\x05'):
await self.handle_socks5_connection(reader, writer)
else:
await self._handle_client_impl(reader, writer, first_byte)
except asyncio.CancelledError:
logging.info(get_message('client_cancelled', self.language))
except Exception as e:
logging.error(get_message('client_error', self.language, e))
finally:
writer.close()
await writer.wait_closed()
async def handle_socks5_connection(self, reader, writer):
try:
nmethods = ord(await reader.readexactly(1))
await reader.readexactly(nmethods)
writer.write(b'\x05\x02' if self.auth_required else b'\x05\x00')
await writer.drain()
if self.auth_required:
auth_version = await reader.readexactly(1)
if auth_version != b'\x01':
writer.close()
return
ulen = ord(await reader.readexactly(1))
username = await reader.readexactly(ulen)
plen = ord(await reader.readexactly(1))
password = await reader.readexactly(plen)
if username.decode() != self.username or password.decode() != self.password:
writer.write(b'\x01\x01')
await writer.drain()
writer.close()
return
writer.write(b'\x01\x00')
await writer.drain()
version, cmd, _, atyp = struct.unpack('!BBBB', await reader.readexactly(4))
if cmd != 1:
writer.write(b'\x05\x07\x00\x01\x00\x00\x00\x00\x00\x00')
await writer.drain()
writer.close()
return
if atyp == 1:
dst_addr = socket.inet_ntoa(await reader.readexactly(4))
elif atyp == 3:
addr_len = ord(await reader.readexactly(1))
dst_addr = (await reader.readexactly(addr_len)).decode()
elif atyp == 4:
dst_addr = socket.inet_ntop(socket.AF_INET6, await reader.readexactly(16))
else:
writer.write(b'\x05\x08\x00\x01\x00\x00\x00\x00\x00\x00')
await writer.drain()
writer.close()
return
dst_port = struct.unpack('!H', await reader.readexactly(2))[0]
proxy = await self.get_next_proxy()
proxy_type, proxy_addr = proxy.split('://')
proxy_auth, proxy_host_port = self._split_proxy_auth(proxy_addr)
proxy_host, proxy_port = proxy_host_port.split(':')
proxy_port = int(proxy_port)
remote_reader, remote_writer = await asyncio.open_connection(proxy_host, proxy_port)
if proxy_type == 'socks5':
await self._initiate_socks5(remote_reader, remote_writer, dst_addr, dst_port)
elif proxy_type in ['http', 'https']:
await self._initiate_http(remote_reader, remote_writer, dst_addr, dst_port, proxy_auth)
writer.write(b'\x05\x00\x00\x01\x00\x00\x00\x00\x00\x00')
await writer.drain()
await asyncio.gather(
self._pipe(reader, remote_writer),
self._pipe(remote_reader, writer)
)
except Exception as e:
logging.error(get_message('socks5_connection_error', self.language, e))
writer.write(b'\x05\x01\x00\x01\x00\x00\x00\x00\x00\x00')
await writer.drain()
async def _initiate_socks5(self, remote_reader, remote_writer, dst_addr, dst_port):
remote_writer.write(b'\x05\x01\x00')
await remote_writer.drain()
await remote_reader.readexactly(2)
remote_writer.write(b'\x05\x01\x00' + (b'\x03' + len(dst_addr).to_bytes(1, 'big') + dst_addr.encode() if isinstance(dst_addr, str) else b'\x01' + socket.inet_aton(dst_addr)) + struct.pack('!H', dst_port))
await remote_writer.drain()
await remote_reader.readexactly(10)
async def _initiate_http(self, remote_reader, remote_writer, dst_addr, dst_port, proxy_auth):
connect_request = f'CONNECT {dst_addr}:{dst_port} HTTP/1.1\r\nHost: {dst_addr}:{dst_port}\r\n'
if proxy_auth:
connect_request += f'Proxy-Authorization: Basic {base64.b64encode(proxy_auth.encode()).decode()}\r\n'
connect_request += '\r\n'
remote_writer.write(connect_request.encode())
await remote_writer.drain()
while True:
line = await remote_reader.readline()
if line == b'\r\n':
break
async def _pipe(self, reader, writer):
try:
buffer_size = 32768
while True:
try:
data = await asyncio.wait_for(reader.read(buffer_size), timeout=30)
if not data:
break
writer.write(data)
await writer.drain()
except asyncio.TimeoutError:
logging.warning(get_message('data_transfer_timeout', self.language))
continue
except ConnectionResetError:
logging.error(get_message('connection_reset', self.language))
break
except asyncio.CancelledError:
logging.info(get_message('transfer_cancelled', self.language))
except Exception as e:
logging.error(get_message('data_transfer_error', self.language, e))
finally:
try:
writer.close()
await writer.wait_closed()
except:
pass
async def _handle_client_impl(self, reader, writer, first_byte):
try:
request_line = first_byte + await reader.readline()
if not request_line:
return
try:
method, path, _ = request_line.decode('utf-8', errors='ignore').split()
except ValueError:
return
headers = {}
while True:
line = await reader.readline()
if line == b'\r\n':
break
if line == b'':
return
try:
name, value = line.decode('utf-8', errors='ignore').strip().split(': ', 1)
headers[name.lower()] = value
except ValueError:
continue
if self.auth_required and not self._authenticate(headers):
writer.write(b'HTTP/1.1 407 Proxy Authentication Required\r\nProxy-Authenticate: Basic realm="Proxy"\r\n\r\n')
await writer.drain()
return
if method == 'CONNECT':
await self._handle_connect(path, reader, writer)
else:
await self._handle_request(method, path, headers, reader, writer)
except asyncio.CancelledError:
raise
except Exception as e:
logging.error(get_message('client_request_error', self.language, e))
def _authenticate(self, headers):
if not self.auth_required:
return True
auth = headers.get('proxy-authorization')
if not auth:
return False
try:
scheme, credentials = auth.split()
if scheme.lower() != 'basic':
return False
username, password = base64.b64decode(credentials).decode().split(':')
return username == self.username and password == self.password
except:
return False
async def _handle_connect(self, path, reader, writer):
try:
host, port = path.split(':')
port = int(port)
except ValueError:
writer.write(b'HTTP/1.1 400 Bad Request\r\n\r\n')
await writer.drain()
return
proxy = await self.get_next_proxy()
proxy_type, proxy_addr = proxy.split('://')
proxy_auth, proxy_host_port = self._split_proxy_auth(proxy_addr)
proxy_host, proxy_port = proxy_host_port.split(':')
proxy_port = int(proxy_port)
try:
remote_reader, remote_writer = await asyncio.wait_for(asyncio.open_connection(proxy_host, proxy_port), timeout=10)
if proxy_type == 'http':
connect_headers = [f'CONNECT {host}:{port} HTTP/1.1', f'Host: {host}:{port}']
if proxy_auth:
auth_header = f'Proxy-Authorization: Basic {base64.b64encode(proxy_auth.encode()).decode()}'
connect_headers.append(auth_header)
connect_request = '\r\n'.join(connect_headers) + '\r\n\r\n'
remote_writer.write(connect_request.encode())
await remote_writer.drain()
response = await remote_reader.readline()
if not response.startswith(b'HTTP/1.1 200'):
raise Exception("Bad Gateway")
while (await remote_reader.readline()) != b'\r\n':
pass
elif proxy_type == 'socks5':
remote_writer.write(b'\x05\x01\x00')
await remote_writer.drain()
if (await remote_reader.read(2))[1] == 0:
remote_writer.write(b'\x05\x01\x00\x03' + len(host).to_bytes(1, 'big') + host.encode() + port.to_bytes(2, 'big'))
await remote_writer.drain()
if (await remote_reader.read(10))[1] != 0:
raise Exception("Bad Gateway")
else:
raise Exception("Unsupported proxy type")
writer.write(b'HTTP/1.1 200 Connection Established\r\n\r\n')
await writer.drain()
await asyncio.gather(
self._pipe(reader, remote_writer),
self._pipe(remote_reader, writer)
)
except asyncio.TimeoutError:
logging.error("Connection timeout")
writer.write(b'HTTP/1.1 504 Gateway Timeout\r\n\r\n')
await writer.drain()
except Exception as e:
logging.error("Proxy invalid, switching")
if not self.proxy_failed:
self.proxy_failed = True
await self.get_proxy()
else:
self.proxy_failed = False
def _split_proxy_auth(self, proxy_addr):
match = re.match(r'((?P<username>.+?):(?P<password>.+?)@)?(?P<host>.+)', proxy_addr)
if match:
username = match.group('username')
password = match.group('password')
host = match.group('host')
if username and password:
return f"{username}:{password}", host
return None, proxy_addr
async def _handle_request(self, method, path, headers, reader, writer):
if not path.startswith(('http://', 'https://')):
logging.warning(get_message('unsupported_protocol', self.language, path))
writer.write(b'HTTP/1.1 400 Bad Request\r\nContent-Length: 0\r\n\r\n')
await writer.drain()
return
body = await reader.read()
retry_count = 2
while retry_count > 0:
try:
proxy = await self.get_next_proxy()
proxy_type, proxy_addr = proxy.split('://')
proxy_auth, proxy_host_port = self._split_proxy_auth(proxy_addr)
client_kwargs = {
"limits": httpx.Limits(
max_keepalive_connections=100,
max_connections=1000,
keepalive_expiry=30
),
"timeout": httpx.Timeout(30.0),
"follow_redirects": True
}
if proxy_type in ['http', 'https']:
client_kwargs["proxies"] = {
"http://": f"{proxy_type}://{proxy_host_port}",
"https://": f"{proxy_type}://{proxy_host_port}"
}
elif proxy_type == 'socks5':
client_kwargs["transport"] = httpx.AsyncHTTPTransport(
proxy=f"{proxy_type}://{proxy_host_port}",
verify=False
)
if proxy_auth:
headers['Proxy-Authorization'] = f'Basic {base64.b64encode(proxy_auth.encode()).decode()}'
async with httpx.AsyncClient(**client_kwargs) as client:
async with client.stream(
method,
path,
headers=headers,
content=body,
timeout=30.0
) as response:
await self._write_response(writer, response)
self.proxy_fail_count = 0
return
except (httpx.TimeoutException, httpx.ConnectTimeout, httpx.ConnectError):
logging.warning(get_message('request_retry', self.language, retry_count-1))
await self.handle_proxy_failure()
retry_count -= 1
if retry_count > 0:
await asyncio.sleep(1)
continue
except Exception as e:
logging.error(get_message('request_error', self.language, e))
await self.handle_proxy_failure()
break
writer.write(b'HTTP/1.1 502 Bad Gateway\r\n\r\n')
await writer.drain()
async def _write_response(self, writer, response):
try:
status_line = f'HTTP/1.1 {response.status_code} {response.reason_phrase}\r\n'
writer.write(status_line.encode('utf-8', errors='ignore'))
writer.write(b'Transfer-Encoding: chunked\r\n')
for name, value in response.headers.items():
if name.lower() not in ('transfer-encoding', 'connection'):
writer.write(f'{name}: {value}\r\n'.encode('utf-8', errors='ignore'))
writer.write(b'\r\n')
await writer.drain()
async for chunk in response.aiter_bytes(chunk_size=32768):
if asyncio.current_task().cancelled():
raise asyncio.CancelledError
writer.write(f'{len(chunk):X}\r\n'.encode('utf-8', errors='ignore'))
writer.write(chunk)
writer.write(b'\r\n')
await writer.drain()
writer.write(b'0\r\n\r\n')
await writer.drain()
except Exception as e:
logging.error(get_message('response_write_error', self.language, e))
raise
async def check_current_proxy(self):
try:
proxy = self.current_proxy
proxy_type = proxy.split('://')[0]
async with httpx.AsyncClient(
proxies={f"{proxy_type}://": proxy},
timeout=10,
verify=False
) as client:
response = await client.get('https://www.baidu.com')
return response.status_code == 200
except Exception:
return False
async def handle_proxy_failure(self):
if await self.check_current_proxy():
self.proxy_fail_count += 1
if self.proxy_fail_count >= self.max_fail_count:
logging.warning(get_message('consecutive_failures', self.language, self.current_proxy))
await self.force_switch_proxy()
else:
logging.error(get_message('invalid_proxy', self.language, self.current_proxy))
await self.force_switch_proxy()
async def force_switch_proxy(self):
self.proxy_failed = True
self.proxy_fail_count = 0
old_proxy = self.current_proxy
await self.get_proxy()
self.last_switch_time = time.time()
logging.info(get_message('proxy_switched', self.language, old_proxy, self.current_proxy))