ProxyCat/ProxyCat_v1.3.py
本间白猫 ed4f57b092 Update
2024-08-25 22:26:49 +08:00

292 lines
12 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from http.server import BaseHTTPRequestHandler, HTTPServer
from concurrent.futures import ThreadPoolExecutor
from httpx import AsyncClient, Limits
import multiprocessing
import threading
import logoprint
import argparse
import asyncio
import logging
import socket
import select
import base64
import getip
import socks
import httpx
import time
import random
logging.basicConfig(level=logging.INFO)
proxy_index, rotate_mode, rotate_interval = 0, 'cycle', 60
proxy_fail_count = {}
cache = {}
def load_proxies(file_path='ip.txt'):
with open(file_path, 'r') as file:
proxies = [line.strip().split('://') for line in file]
return [(p[0], *p[1].split(':')) for p in proxies if len(p) == 2]
def rotate_proxies(proxies, interval):
global proxy_index
while True:
time.sleep(interval)
if args.k:
proxies.clear()
proxies.extend(get_proxy_from_getip())
proxy_index = (proxy_index + 1) % len(proxies)
else:
if rotate_mode == 'cycle':
proxy_index = (proxy_index + 1) % len(proxies)
elif rotate_mode == 'once' and proxy_index < len(proxies) - 1:
proxy_index += 1
logging.info(f"切换到代理地址: {proxies[proxy_index]}")
def get_proxy_from_getip():
proxy = getip.newip()
protocol, host_port = proxy.split('://')
host, port = host_port.split(':')
return [(protocol, host, port)]
proxies = []
class ProxyHTTPRequestHandler(BaseHTTPRequestHandler):
def __init__(self, *args, **kwargs):
self.client = AsyncClient(http2=True, timeout=httpx.Timeout(30.0, read=30.0), limits=Limits(max_connections=500))
self.tunnel_established = False
super().__init__(*args, **kwargs)
def _update_proxy(self):
global proxy_index, proxy_fail_count
if rotate_interval == 0:
if args.k:
proxies.clear()
proxies.extend(get_proxy_from_getip())
proxy_index = random.randint(0, len(proxies) - 1)
logging.info(f"切换到代理地址: {proxies[proxy_index]}")
protocol, host, port = proxies[proxy_index]
self.proxy_dict = {"http://": f"{protocol}://{host}:{port}", "https://": f"{protocol}://{host}:{port}"}
def _authenticate(self):
auth = self.headers.get('Proxy-Authorization')
if not auth:
self.send_response(407)
self.send_header('Proxy-Authenticate', 'Basic realm="Proxy"')
self.end_headers()
return False
auth = auth.split()
if len(auth) != 2:
self.send_error(400, message="Invalid authentication format")
return False
if auth[0].lower() != 'basic':
self.send_error(400, message="Unsupported authentication method")
return False
auth = auth[1].encode('utf-8')
auth = base64.b64decode(auth).decode('utf-8')
username, password = auth.split(':', 1)
if username != self.server.username or password != self.server.password:
self.send_error(403, message="Invalid credentials")
return False
return True
def do_GET(self):
if self._authenticate():
self._proxy_request()
def do_POST(self):
if self._authenticate():
self._proxy_request()
def do_CONNECT(self):
if self._authenticate():
self._tunnel_request()
def _proxy_request(self):
self._update_proxy()
data = self.rfile.read(int(self.headers['Content-Length'])) if 'Content-Length' in self.headers else None
headers = {key: val for key, val in self.headers.items()}
headers['Connection'] = 'keep-alive'
async def handle():
for attempt in range(3):
try:
logging.info(f"处理请求: {self.command} {self.path}")
response = await self.client.request(self.command, self.path, headers=headers, data=data,
proxies=self.proxy_dict, stream=True)
self.send_response(response.status_code)
self.send_headers(response)
await self.forward_content(response)
break
except (httpx.RequestError, socket.timeout, OSError) as e:
logging.error(f"请求失败: {e}")
self._handle_proxy_failure()
if attempt == 2:
self.send_error(500, message=str(e))
asyncio.run(handle())
def _handle_proxy_failure(self):
global proxy_index, proxy_fail_count
current_proxy = proxies[proxy_index]
if current_proxy in proxy_fail_count:
proxy_fail_count[current_proxy] += 1
else:
proxy_fail_count[current_proxy] = 1
if proxy_fail_count[current_proxy] >= 3:
logging.info(f"代理地址 {current_proxy} 失败次数达到3次尝试切换到下一个代理")
original_proxy_index = proxy_index
if args.k:
new_proxies = get_proxy_from_getip()
if new_proxies:
proxies.clear()
proxies.extend(new_proxies)
proxy_index = random.randint(0, len(proxies) - 1)
logging.info(f"切换到新代理地址: {proxies[proxy_index]}")
else:
logging.warning("无法获取新代理,继续使用原代理")
proxy_index = original_proxy_index
else:
if rotate_mode == 'cycle':
proxy_index = random.randint(0, len(proxies) - 1)
elif rotate_mode == 'once' and proxy_index < len(proxies) - 1:
proxy_index += 1
logging.info(f"切换到代理地址: {proxies[proxy_index]}")
proxy_fail_count[current_proxy] = 0
def send_headers(self, response):
for key, value in response.headers.items():
if key.lower() != 'connection':
self.send_header(key, value)
self.send_header('Connection', 'keep-alive')
self.end_headers()
async def forward_content(self, response):
async for chunk in response.aiter_bytes(chunk_size=4096):
if chunk:
self.wfile.write(chunk)
self.wfile.flush()
def _tunnel_request(self):
self._update_proxy()
host, port = self.path.split(':')
try:
if not self.tunnel_established:
remote_socket = self._connect_via_proxy(host, int(port))
self.tunnel_established = True
self.send_response(200, 'Connection Established')
self.send_header('Connection', 'keep-alive')
self.end_headers()
self._forward_data(self.connection, remote_socket)
else:
self.send_response(200, 'Connection Already Established')
self.send_header('Connection', 'keep-alive')
self.end_headers()
except (socket.error, Exception) as e:
logging.error(f"隧道请求失败: {e}")
self._handle_proxy_failure()
self.send_error(502, message=str(e))
def _connect_via_proxy(self, host, port):
protocol = proxies[proxy_index][0]
if protocol == 'http':
return self._connect_via_http_proxy(host, port)
elif protocol == 'socks5':
return self._connect_via_socks5_proxy(host, port)
else:
raise Exception("不支持的代理类型")
def _connect_via_http_proxy(self, host, port):
proxy_connect = f"CONNECT {host}:{port} HTTP/1.1\r\nHost: {host}:{port}\r\nConnection: keep-alive\r\n\r\n"
remote_socket = socket.create_connection((proxies[proxy_index][1], int(proxies[proxy_index][2])))
remote_socket.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, 1024 * 4096)
remote_socket.setsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF, 1024 * 4096)
remote_socket.sendall(proxy_connect.encode())
response = remote_socket.recv(4096).decode('utf-8')
if '200 Connection established' not in response:
raise Exception(f"无法通过 HTTP 代理建立连接: {response}")
return remote_socket
def _connect_via_socks5_proxy(self, host, port):
remote_socket = socks.socksocket()
remote_socket.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, 1024 * 4096)
remote_socket.setsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF, 1024 * 4096)
remote_socket.set_proxy(socks.SOCKS5, proxies[proxy_index][1], int(proxies[proxy_index][2]))
remote_socket.connect((host, port))
return remote_socket
def _forward_data(self, client_socket, remote_socket):
try:
sockets = [client_socket, remote_socket]
while True:
read_sockets, _, error_sockets = select.select(sockets, [], sockets, 10)
if error_sockets:
logging.warning("Socket错误终止连接")
break
for sock in read_sockets:
other_sock = client_socket if sock is remote_socket else remote_socket
data = sock.recv(4096)
if not data:
return
other_sock.sendall(data)
finally:
client_socket.close()
remote_socket.close()
def run(server_class=HTTPServer, handler_class=ProxyHTTPRequestHandler, port=1080, mode='cycle', interval=60, username='neko', password='123456', use_getip=False):
global rotate_mode, rotate_interval, proxies, args
rotate_mode, rotate_interval = mode, interval
if use_getip:
proxies = get_proxy_from_getip()
else:
proxies = load_proxies()
if proxies:
logging.info(f"初始代理地址: {proxies[proxy_index]}")
else:
logging.error("没有加载到任何代理地址")
return
server_address, max_workers = ('', port), multiprocessing.cpu_count() * 5
executor = ThreadPoolExecutor(max_workers=max_workers)
server = server_class(server_address, handler_class)
server.username = username
server.password = password
if interval > 0:
thread = threading.Thread(target=rotate_proxies, args=(proxies, interval))
thread.daemon = True
thread.start()
serve_requests(server, executor)
def serve_requests(server, executor):
try:
while True:
request, client_address = server.get_request()
executor.submit(server.process_request, request, client_address)
except KeyboardInterrupt:
pass
finally:
executor.shutdown(wait=True)
server.server_close()
def print_icpscan_banner(port, mode, interval, username, password):
mode = '循环' if mode == 'cycle' else '单轮'
print("--------------------------------------------------------")
print("公众号:樱花庄的本间白猫")
print("博客:https://y.shironekosan.cn")
print("Github:https://github.com/honmashironeko/ProxyCat")
print("Gitcode:https://gitcode.com/honmashironeko/ProxyCat")
print("--------------------------------------------------------")
print(f"监听端口: {port}, 代理轮换模式: {mode}, 代理更换时间: {interval}")
print(f"默认账号密码: {username}:{password}")
if __name__ == '__main__':
parser = argparse.ArgumentParser(description=logoprint.logos())
parser.add_argument('-p', type=int, default=1080, help='监听端口')
parser.add_argument('-m', default='cycle', help='代理轮换模式:cycle 表示循环使用,once 表示用完即止')
parser.add_argument('-t', type=int, default=60, help='代理更换时间(秒),设置为0秒时变成每次请求更换IP')
parser.add_argument('-up', default='neko:123456', help='指定账号密码,格式为username:password')
parser.add_argument('-k', action='store_true', help='使用 getip 模块获取代理地址')
args = parser.parse_args()
username, password = args.up.split(':')
print_icpscan_banner(port=args.p, mode=args.m, interval=args.t, username=username, password=password)
run(port=args.p, mode=args.m, interval=args.t, username=username, password=password, use_getip=args.k)