新增支持切换 deepseek api 和本地 ollama api

This commit is contained in:
sheaven 2025-02-02 10:55:14 +08:00
parent 747e06f205
commit 1e37ab3fd4
3 changed files with 84 additions and 169 deletions

View File

@ -10,26 +10,63 @@ from PyQt5.QtGui import QFont, QColor, QPalette, QLinearGradient
import config
import glob
from config import THEMES
# 配置参数(需要用户自行修改)
DEEPSEEK_API_KEY = config.DEEPSEEK_API_KEY
API_ENDPOINT = "https://api.deepseek.com/v1/chat/completions"
os.environ["QT_IM_MODULE"] = "none"
class APIAdapter:
def __init__(self):
self.api_type = config.API_TYPE
if self.api_type == "deepseek":
self.api_key = config.DEEPSEEK_API_KEY
self.api_endpoint = "https://api.deepseek.com/v1/chat/completions"
else: # ollama
self.api_endpoint = config.OLLAMA_API_URL
self.model = config.OLLAMA_MODEL
def chat_completion(self, prompt, temperature=0.3):
try:
if self.api_type == "deepseek":
headers = {
"Authorization": f"Bearer {self.api_key}",
"Content-Type": "application/json"
}
payload = {
"model": "deepseek-chat",
"messages": [{"role": "user", "content": prompt}],
"temperature": temperature
}
response = requests.post(self.api_endpoint, headers=headers, json=payload)
else: # ollama
payload = {
"model": self.model,
"messages": [{"role": "user", "content": prompt}],
"stream": False
}
response = requests.post(self.api_endpoint, json=payload)
response.raise_for_status()
if self.api_type == "deepseek":
return response.json()["choices"][0]["message"]["content"]
else:
content = response.json()["message"]["content"]
# 使用正则表达式移除<think></think>标签及其内容
import re
content = re.sub(r'<think>.*?</think>', '', content, flags=re.DOTALL)
return content.strip()
except Exception as e:
raise Exception(f"API请求错误: {str(e)}")
class AnalysisThread(QThread):
analysis_complete = pyqtSignal(str, bool)
def __init__(self, http_data, parent=None):
super().__init__(parent)
self.http_data = http_data
self.api = APIAdapter()
def run(self):
try:
headers = {
"Authorization": f"Bearer {DEEPSEEK_API_KEY}",
"Content-Type": "application/json"
}
prompt = f"""请进行网络安全分析。请严格按照以下步骤执行:
1. 分析以下HTTP请求的各个组成部分
2. 识别是否存在SQL注入XSSCSRF反序列化文件上传路径遍历OWASPTop10等常见攻击特征
@ -43,16 +80,7 @@ class AnalysisThread(QThread):
HTTP请求数据
{self.http_data}"""
payload = {
"model": "deepseek-chat",
"messages": [{"role": "user", "content": prompt}],
"temperature": 0.3
}
response = requests.post(API_ENDPOINT, headers=headers, json=payload)
response.raise_for_status()
result = response.json()['choices'][0]['message']['content']
result = self.api.chat_completion(prompt)
is_attack = "【分析结果】是" in result
self.analysis_complete.emit(result, is_attack)
@ -65,14 +93,10 @@ class DecodingThread(QThread):
def __init__(self, encoded_str, parent=None):
super().__init__(parent)
self.encoded_str = encoded_str
self.api = APIAdapter()
def run(self):
try:
headers = {
"Authorization": f"Bearer {DEEPSEEK_API_KEY}",
"Content-Type": "application/json"
}
prompt = f"""请完整分析并解码以下字符串,要求:
1. 识别所有可能的编码方式包括嵌套编码
2. 通过自己重新编码确认自己解码正确
@ -86,15 +110,7 @@ class DecodingThread(QThread):
解码过程逐步展示解码步骤
最终结果解码后的明文内容"""
payload = {
"model": "deepseek-chat",
"messages": [{"role": "user", "content": prompt}],
"temperature": 0.3
}
response = requests.post(API_ENDPOINT, headers=headers, json=payload)
response.raise_for_status()
result = response.json()['choices'][0]['message']['content']
result = self.api.chat_completion(prompt)
self.decoding_complete.emit(result)
except Exception as e:
@ -106,14 +122,10 @@ class ProcessAnalysisThread(QThread):
def __init__(self, process_data, parent=None):
super().__init__(parent)
self.process_data = process_data
self.api = APIAdapter()
def run(self):
try:
headers = {
"Authorization": f"Bearer {DEEPSEEK_API_KEY}",
"Content-Type": "application/json"
}
prompt = f"""你是一个Windows/Linux进程分析工程师要求
1. 用户将输出tasklist或者ps aux的结果
2. 帮助用户分析输出你所有认识的进程信息
@ -131,15 +143,7 @@ tasklist或者ps aux的结果{self.process_data}
安全进程的可终止性评估
"""
payload = {
"model": "deepseek-chat",
"messages": [{"role": "user", "content": prompt}],
"temperature": 0.3
}
response = requests.post(API_ENDPOINT, headers=headers, json=payload)
response.raise_for_status()
result = response.json()['choices'][0]['message']['content']
result = self.api.chat_completion(prompt)
self.process_complete.emit(result)
except Exception as e:
@ -151,14 +155,10 @@ class JsAuditThread(QThread):
def __init__(self, js_code, parent=None):
super().__init__(parent)
self.js_code = js_code
self.api = APIAdapter()
def run(self):
try:
headers = {
"Authorization": f"Bearer {DEEPSEEK_API_KEY}",
"Content-Type": "application/json"
}
prompt = f"""请对以下JavaScript代码进行完整的安全审计要求
1. 识别XSSCSRF不安全的DOM操作敏感信息泄露eval使用等安全问题
2. 检查第三方库的安全性和版本漏洞
@ -173,15 +173,7 @@ class JsAuditThread(QThread):
JavaScript代码
{self.js_code}"""
payload = {
"model": "deepseek-chat",
"messages": [{"role": "user", "content": prompt}],
"temperature": 0.3
}
response = requests.post(API_ENDPOINT, headers=headers, json=payload)
response.raise_for_status()
result = response.json()['choices'][0]['message']['content']
result = self.api.chat_completion(prompt)
self.audit_complete.emit(result)
except Exception as e:
@ -193,14 +185,10 @@ class HttpToPythonThread(QThread):
def __init__(self, http_request, parent=None):
super().__init__(parent)
self.http_request = http_request
self.api = APIAdapter()
def run(self):
try:
headers = {
"Authorization": f"Bearer {DEEPSEEK_API_KEY}",
"Content-Type": "application/json"
}
prompt = f"""你是一个专业Python开发助手请将以下HTTP请求转换为规范的Python代码使用requests库。按以下步骤处理
要求
1.用户输入完整请求头包含Content-Type和Authorization
@ -215,15 +203,7 @@ class HttpToPythonThread(QThread):
这是用户输入的内容
{self.http_request}"""
payload = {
"model": "deepseek-chat",
"messages": [{"role": "user", "content": prompt}],
"temperature": 0.3
}
response = requests.post(API_ENDPOINT, headers=headers, json=payload)
response.raise_for_status()
result = response.json()['choices'][0]['message']['content']
result = self.api.chat_completion(prompt)
self.conversion_complete.emit(result)
except Exception as e:
@ -236,14 +216,10 @@ class TextProcessThread(QThread):
super().__init__(parent)
self.source_text = source_text
self.sample_text = sample_text
self.api = APIAdapter()
def run(self):
try:
headers = {
"Authorization": f"Bearer {DEEPSEEK_API_KEY}",
"Content-Type": "application/json"
}
prompt = f"""写python代码请根据提供的样本格式将源文本转换为与样本相同的格式。要求
1. 分析样本文本的结构和格式特征
2. 保持源文本的核心内容不变
@ -259,15 +235,7 @@ class TextProcessThread(QThread):
请直接输出python脚本不要包含任何解释或说明不使用markdown格式"""
payload = {
"model": "deepseek-chat",
"messages": [{"role": "user", "content": prompt}],
"temperature": 0.3
}
response = requests.post(API_ENDPOINT, headers=headers, json=payload)
response.raise_for_status()
result = response.json()['choices'][0]['message']['content']
result = self.api.chat_completion(prompt)
self.process_complete.emit(result)
except Exception as e:
@ -280,14 +248,10 @@ class RegexGenThread(QThread):
super().__init__(parent)
self.source_text = source_text
self.sample_text = sample_text
self.api = APIAdapter()
def run(self):
try:
headers = {
"Authorization": f"Bearer {DEEPSEEK_API_KEY}",
"Content-Type": "application/json"
}
prompt = f"""请根据提供的样本格式,通过源文本生成正则表达式为与样本相同的内容。要求:
1. 分析样本文本的结构和格式特征
2. 保持源文本的核心内容不变
@ -302,33 +266,22 @@ class RegexGenThread(QThread):
请直接输出生成的多个正则表达式不要包含任何解释或说明不要使用markdown格式输出"""
payload = {
"model": "deepseek-chat",
"messages": [{"role": "user", "content": prompt}],
"temperature": 0.3
}
response = requests.post(API_ENDPOINT, headers=headers, json=payload)
response.raise_for_status()
result = response.json()['choices'][0]['message']['content']
result = self.api.chat_completion(prompt)
self.regex_complete.emit(result)
except Exception as e:
self.regex_complete.emit(f"正则表达式生成错误: {str(e)}")
class WebShellAnalysisThread(QThread):
analysis_complete = pyqtSignal(str, bool)
def __init__(self, file_content, parent=None):
super().__init__(parent)
self.file_content = file_content
self.api = APIAdapter()
def run(self):
try:
headers = {
"Authorization": f"Bearer {DEEPSEEK_API_KEY}",
"Content-Type": "application/json"
}
prompt = f"""请分析以下文件内容是否为WebShell或内存马。要求
1. 检查PHP/JSP/ASP等WebShell特征如加密函数执行系统命令文件操作
2. 识别内存马特征如无文件落地进程注入异常网络连接
@ -345,21 +298,13 @@ class WebShellAnalysisThread(QThread):
文件内容
{self.file_content}"""
payload = {
"model": "deepseek-chat",
"messages": [{"role": "user", "content": prompt}],
"temperature": 0.3
}
response = requests.post(API_ENDPOINT, headers=headers, json=payload)
response.raise_for_status()
result = response.json()['choices'][0]['message']['content']
result = self.api.chat_completion(prompt)
is_malicious = "【分析结果】是" in result
self.analysis_complete.emit(result, is_malicious)
except Exception as e:
self.analysis_complete.emit(f"错误发生: {str(e)}", False)
class TranslationThread(QThread):
translation_complete = pyqtSignal(str)
@ -368,14 +313,10 @@ class TranslationThread(QThread):
self.text = text
self.source_lang = source_lang
self.target_lang = target_lang
self.api = APIAdapter()
def run(self):
try:
headers = {
"Authorization": f"Bearer {DEEPSEEK_API_KEY}",
"Content-Type": "application/json"
}
prompt = f"""请将以下文本从{self.source_lang}专业地翻译成{self.target_lang}。要求:
1. 保持技术术语准确性特别是网络安全相关词汇
2. 保留代码格式和变量名
@ -385,15 +326,7 @@ class TranslationThread(QThread):
待翻译内容
{self.text}"""
payload = {
"model": "deepseek-chat",
"messages": [{"role": "user", "content": prompt}],
"temperature": 0.1
}
response = requests.post(API_ENDPOINT, headers=headers, json=payload)
response.raise_for_status()
result = response.json()['choices'][0]['message']['content']
result = self.api.chat_completion(prompt)
self.translation_complete.emit(result)
except Exception as e:
@ -406,6 +339,7 @@ class SourceCodeAuditThread(QThread):
def __init__(self, files, parent=None):
super().__init__(parent)
self.files = files
self.api = APIAdapter()
def run(self):
total_files = len(self.files)
@ -430,11 +364,6 @@ class SourceCodeAuditThread(QThread):
# API请求
try:
headers = {
"Authorization": f"Bearer {DEEPSEEK_API_KEY}",
"Content-Type": "application/json"
}
prompt = f"""请对以下代码文件进行安全审计,要求:
1. 识别SQL注入XSS文件包含命令执行等漏洞
2. 检查不安全的权限设置和敏感信息泄露
@ -448,32 +377,15 @@ class SourceCodeAuditThread(QThread):
文件内容
{content[:3000]}""" # 限制每个文件内容长度
payload = {
"model": "deepseek-chat",
"messages": [{"role": "user", "content": prompt}],
"temperature": 0.3
}
response = requests.post(API_ENDPOINT,
headers=headers,
json=payload,
timeout=30) # 增加超时设置
response.raise_for_status()
result = response.json()['choices'][0]['message']['content']
result = self.api.chat_completion(prompt)
audit_results.append(result)
except requests.exceptions.RequestException as e:
except Exception as e:
audit_results.append(
f"【API请求失败】{os.path.basename(file_path)}\n"
f"错误类型:{type(e).__name__}\n"
f"错误详情:{str(e)}"
)
except KeyError as e:
audit_results.append(
f"【响应解析失败】{os.path.basename(file_path)}\n"
f"错误字段:{str(e)}"
)
except Exception as e: # 全局异常兜底
audit_results.append(
@ -501,20 +413,7 @@ class SourceCodeAuditThread(QThread):
3. 同类漏洞合并显示
4. 不使用markdown格式直接输出文本"""
payload = {
"model": "deepseek-chat",
"messages": [{"role": "user", "content": final_prompt}],
"temperature": 0.3
}
response = requests.post(API_ENDPOINT,
headers=headers,
json=payload,
timeout=60)
response.raise_for_status()
final_result = response.json()['choices'][0]['message']['content']
# 添加原始数据备份
final_result = self.api.chat_completion(final_prompt)
final_result += "\n\n--- 原始数据备份 ---\n" + "\n".join(audit_results)
except Exception as e:

View File

@ -1,5 +1,13 @@
# API配置
API_TYPE="deepseek" # 可选值: "deepseek" 或 "ollama"
# DeepSeek API配置
DEEPSEEK_API_KEY=""
# Ollama API配置
OLLAMA_API_URL="http://localhost:11434/api/chat" # Ollama API地址
OLLAMA_MODEL="deepseek-r1:7b" # Ollama模型名称
# 主题配色方案
THEMES = {
"深色主题": {

View File

@ -1,5 +1,13 @@
# API配置
API_TYPE="deepseek" # 可选值: "deepseek" 或 "ollama"
# DeepSeek API配置
DEEPSEEK_API_KEY=""
# Ollama API配置
OLLAMA_API_URL="http://localhost:11434/api/chat" # Ollama API地址
OLLAMA_MODEL="deepseek-coder" # Ollama模型名称
# 主题配色方案
THEMES = {
"深色主题": {