mirror of
https://github.com/ChinaRan0/DeepSeekSelfTool.git
synced 2025-07-11 08:53:35 +00:00
commit
c25326d636
@ -1,11 +1,13 @@
|
||||
import os
|
||||
import re
|
||||
import json
|
||||
import time
|
||||
import requests
|
||||
from PyQt5 import QtCore, QtGui, QtWidgets
|
||||
from PyQt5.QtCore import QThread, pyqtSignal
|
||||
from config import OLLAMA_API_URL, OLLAMA_MODEL # 用户自定义配置
|
||||
|
||||
|
||||
class CyberTextEdit(QtWidgets.QTextEdit):
|
||||
def __init__(self, parent=None):
|
||||
super().__init__(parent)
|
||||
@ -21,9 +23,30 @@ class CyberTextEdit(QtWidgets.QTextEdit):
|
||||
}
|
||||
""")
|
||||
|
||||
|
||||
def seconds_utils(seconds):
|
||||
# 定义各个时间单位的秒数
|
||||
units = {
|
||||
'年': 365 * 86400,
|
||||
'月': 30 * 86400,
|
||||
'天': 86400,
|
||||
'小时': 3600,
|
||||
'分钟': 60,
|
||||
'秒': 1
|
||||
}
|
||||
time_str = []
|
||||
for unit, unit_seconds in units.items():
|
||||
count = seconds // unit_seconds
|
||||
if count > 0:
|
||||
time_str.append(f"{count}{unit}")
|
||||
seconds %= unit_seconds
|
||||
return "".join(time_str)
|
||||
|
||||
|
||||
class HackerWorker(QThread):
|
||||
analysis_complete = pyqtSignal(str)
|
||||
progress_update = pyqtSignal(str)
|
||||
button_text_update = pyqtSignal(str)
|
||||
|
||||
def __init__(self, files_content):
|
||||
super().__init__()
|
||||
@ -31,8 +54,10 @@ class HackerWorker(QThread):
|
||||
|
||||
def run(self):
|
||||
full_report = []
|
||||
for filepath, content in self.files_content.items():
|
||||
self.progress_update.emit(f"🔍 Analyzing {os.path.basename(filepath)}...")
|
||||
start_time = time.time()
|
||||
for index, (filepath, content) in enumerate(self.files_content.items(), start=1):
|
||||
self.progress_update.emit(
|
||||
f"🔍 Analyzing {os.path.basename(filepath)} ({index}/{len(self.files_content)})...")
|
||||
|
||||
prompt = f"""【强制指令】你是一个专业的安全审计AI,请按以下要求分析代码:
|
||||
|
||||
@ -72,23 +97,34 @@ class HackerWorker(QThread):
|
||||
result = json.loads(response.text)["response"]
|
||||
result = re.sub(r'<think>.*?</think>', '', result, flags=re.DOTALL)
|
||||
full_report.append(f"📄 文件:{filepath}\n{result}\n{'━' * 50}")
|
||||
# 预测剩余时间:
|
||||
pass_time = int((time.time() - start_time) / index * (len(self.files_content) - index))
|
||||
self.button_text_update.emit(f"⌛预计剩余{seconds_utils(pass_time)}")
|
||||
except Exception as e:
|
||||
full_report.append(f"❌ 错误:处理文件 {filepath} 时发生错误\n{str(e)}")
|
||||
# 预测剩余时间:
|
||||
pass_time = int((time.time() - start_time) / index * (len(self.files_content) - index))
|
||||
self.button_text_update.emit(f"⌛预计剩余{seconds_utils(pass_time)}")
|
||||
|
||||
self.analysis_complete.emit("\n".join(full_report))
|
||||
self.button_text_update.emit("🚨 启动扫描协议")
|
||||
|
||||
|
||||
class WebshellWorker(QThread):
|
||||
detection_complete = pyqtSignal(str)
|
||||
progress_update = pyqtSignal(str)
|
||||
button_text_update = pyqtSignal(str)
|
||||
|
||||
def __init__(self, files_content):
|
||||
super().__init__()
|
||||
self.files_content = files_content
|
||||
|
||||
def run(self):
|
||||
start_time = time.time()
|
||||
detection_results = []
|
||||
for filepath, content in self.files_content.items():
|
||||
self.progress_update.emit(f"🕵️ 扫描 {os.path.basename(filepath)}...")
|
||||
for index, (filepath, content) in enumerate(self.files_content.items(), start=1):
|
||||
self.progress_update.emit(
|
||||
f"🕵️ 扫描 {os.path.basename(filepath)} ({index}/{len(self.files_content)})...")
|
||||
|
||||
prompt = f"""【Webshell检测指令】请严格按以下步骤分析代码:
|
||||
|
||||
@ -121,10 +157,18 @@ class WebshellWorker(QThread):
|
||||
result = json.loads(response.text)["response"]
|
||||
result = re.sub(r'<think>.*?</think>', '', result, flags=re.DOTALL)
|
||||
detection_results.append(f"📁 {filepath}\n{result}\n{'━' * 50}")
|
||||
# 预测剩余时间:
|
||||
pass_time = int((time.time() - start_time) / index * (len(self.files_content) - index))
|
||||
self.button_text_update.emit(f"⌛预计剩余{seconds_utils(pass_time)}")
|
||||
except Exception as e:
|
||||
detection_results.append(f"❌ 错误:{filepath}\n{str(e)}")
|
||||
# 预测剩余时间:
|
||||
pass_time = int((time.time() - start_time) / index * (len(self.files_content) - index))
|
||||
self.button_text_update.emit(f"⌛预计剩余{seconds_utils(pass_time)}")
|
||||
|
||||
self.detection_complete.emit("\n".join(detection_results))
|
||||
self.button_text_update.emit("🚨 启动扫描协议")
|
||||
|
||||
|
||||
class CyberScanner(QtWidgets.QMainWindow):
|
||||
def __init__(self):
|
||||
@ -286,14 +330,17 @@ class CyberScanner(QtWidgets.QMainWindow):
|
||||
worker = HackerWorker(self.files_content)
|
||||
init_msg = "🚀 启动深度代码分析协议..."
|
||||
complete_signal = worker.analysis_complete
|
||||
worker.button_text_update.connect(self.update_button_text)
|
||||
else:
|
||||
worker = WebshellWorker(self.files_content)
|
||||
init_msg = "🕵️ 启动Webshell检测协议..."
|
||||
complete_signal = worker.detection_complete
|
||||
worker.button_text_update.connect(self.update_button_text)
|
||||
|
||||
self.scan_thread = worker
|
||||
self.scan_thread.progress_update.connect(self.update_status)
|
||||
complete_signal.connect(self.show_results)
|
||||
self.scan_thread.button_text_update.connect(self.update_button_text)
|
||||
self.scan_thread.start()
|
||||
|
||||
self.btn_scan.setEnabled(False)
|
||||
@ -338,6 +385,11 @@ class CyberScanner(QtWidgets.QMainWindow):
|
||||
|
||||
self.result_display.append(report)
|
||||
self.status_bar.showMessage("✅ 扫描完成")
|
||||
self.btn_scan.setEnabled(True)
|
||||
|
||||
def update_button_text(self, new_text):
|
||||
self.btn_scan.setText(new_text)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
# 保持源文本的核心内容不变
|
||||
|
Loading…
x
Reference in New Issue
Block a user