Add files via upload

This commit is contained in:
Ky0toFu 2025-02-17 14:38:25 +08:00 committed by GitHub
parent 7cf6f9acf3
commit 4fd654df10
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
17 changed files with 110 additions and 57 deletions

View File

@ -14,8 +14,13 @@ Mirror Flowers 是一个基于 AI 的代码安全审计工具,能够自动检
## 更新记录
### 2025-02-17
- 修复了API配置保存和调用的问题
- 修复了AI分析结果在前端显示的问题
- 优化了JSON响应格式确保前端能正确解析和显示AI分析建议
### 2024-02-14
### 2025-02-14
- 优化了依赖管理:
- 更新了 setup.py 配置
- 重构了 requirements.txt
@ -36,7 +41,7 @@ Mirror Flowers 是一个基于 AI 的代码安全审计工具,能够自动检
- 优化了代码上下文理解能力
- 提高了漏洞分析的准确性
### 2024-02-11
### 2025-02-11
- 完善了Python代码分析功能
- 添加了完整的依赖分析,支持追踪导入关系和别名
- 增强了函数调用分析,支持类方法和实例方法的调用追踪

View File

@ -15,8 +15,8 @@ class LogLevel(str, Enum):
class Settings(BaseSettings):
# API配置
OPENAI_API_KEY: str = os.getenv("OPENAI_API_KEY", "")
OPENAI_API_BASE: str = os.getenv("OPENAI_API_BASE", "https://api.siliconflow.cn/v1")
OPENAI_MODEL: str = os.getenv("OPENAI_MODEL", "01-ai/Yi-1.5-34B-Chat-16K")
OPENAI_API_BASE: str = os.getenv("OPENAI_API_BASE", "")
OPENAI_MODEL: str = os.getenv("OPENAI_MODEL", "")
# 日志配置
LOG_LEVEL: LogLevel = os.getenv("LOG_LEVEL", LogLevel.INFO)

View File

@ -358,6 +358,28 @@ class CoreAnalyzer:
async def _ai_verify_suspicious(self, suspicious_files: List[Dict[str, Any]]) -> Dict[str, Any]:
"""AI验证可疑代码"""
# 从配置文件读取API设置
config_path = Path(__file__).parent.parent.parent / "config" / "api_config.json"
if config_path.exists():
with open(config_path, "r") as f:
config = json.load(f)
api_key = config.get("api_key")
api_base = config.get("api_base")
model = config.get("model")
else:
api_key = settings.OPENAI_API_KEY
api_base = settings.OPENAI_API_BASE
model = settings.OPENAI_MODEL
if not api_key:
raise ValueError("未配置API密钥")
# 创建 OpenAI 客户端
client = AsyncOpenAI(
api_key=api_key,
base_url=api_base
)
results = {}
for file_info in suspicious_files:
@ -410,49 +432,10 @@ class CoreAnalyzer:
}}"""
# 调用AI API
client = AsyncOpenAI(
api_key=settings.OPENAI_API_KEY,
base_url=settings.OPENAI_API_BASE
)
response = await client.chat.completions.create(
model=settings.OPENAI_MODEL,
messages=[
{"role": "system", "content": "你是一个专业的代码安全审计专家。请以JSON格式返回分析结果。"},
{"role": "user", "content": prompt}
],
temperature=0.2
)
# 解析AI响应
ai_response = response.choices[0].message.content
# 确保返回的是有效的JSON
try:
json_response = json.loads(ai_response)
except json.JSONDecodeError:
# 如果不是有效的JSON尝试提取JSON部分
json_match = re.search(r'```json\s*(.*?)\s*```', ai_response, re.DOTALL)
if json_match:
json_response = json.loads(json_match.group(1))
else:
raise ValueError("无法解析AI响应为JSON格式")
results[file_path] = {
"issues": file_info.get('issues', []),
"similar_code": [],
"ai_analysis": {
"status": "success",
"analysis": {
"raw_text": json.dumps(json_response, ensure_ascii=False, indent=2),
"summary": {
"risk_level": json_response.get("impact_analysis", {}).get("dangerous_function", {}).get("severity", "unknown"),
"vulnerability_count": len(file_info.get('issues', []))
},
"vulnerabilities": [],
"recommendations": []
}
}
"ai_analysis": await self._call_ai_api(prompt)
}
except Exception as e:
@ -652,15 +635,32 @@ class CoreAnalyzer:
async def _call_ai_api(self, prompt: str) -> Dict:
"""调用AI API进行代码分析"""
try:
# 从配置文件读取API设置
config_path = Path(__file__).parent.parent.parent / "config" / "api_config.json"
if config_path.exists():
with open(config_path, "r") as f:
config = json.load(f)
api_key = config.get("api_key")
api_base = config.get("api_base")
model = config.get("model")
else:
# 如果配置文件不存在,使用环境变量或默认值
api_key = settings.OPENAI_API_KEY
api_base = settings.OPENAI_API_BASE
model = settings.OPENAI_MODEL
if not api_key:
raise ValueError("未配置API密钥")
# 创建 OpenAI 客户端
client = AsyncOpenAI(
api_key=settings.OPENAI_API_KEY,
base_url=settings.OPENAI_API_BASE
api_key=api_key,
base_url=api_base
)
# 调用 API
response = await client.chat.completions.create(
model=settings.OPENAI_MODEL,
model=model,
messages=[
{"role": "system", "content": "你是一个专业的代码安全审计专家。请分析代码中的安全问题并提供修复建议。"},
{"role": "user", "content": prompt}
@ -672,17 +672,65 @@ class CoreAnalyzer:
# 获取响应文本
analysis_text = response.choices[0].message.content
# 构造标准的响应格式
return {
"status": "success",
"analysis": {
"raw_text": analysis_text,
"summary": self._extract_summary(analysis_text),
"vulnerabilities": self._extract_vulnerabilities(analysis_text),
"recommendations": self._extract_recommendations(analysis_text)
# 尝试从响应文本中提取JSON内容
try:
# 如果返回的是markdown代码块提取其中的JSON内容
if analysis_text.startswith('```') and analysis_text.endswith('```'):
# 移除markdown代码块标记
json_str = analysis_text.split('\n', 1)[1].rsplit('\n', 1)[0]
if json_str.startswith('json'):
json_str = json_str[4:].strip()
analysis_data = json.loads(json_str)
else:
# 直接尝试解析JSON
analysis_data = json.loads(analysis_text)
# 构造前端期望的响应格式
return {
"status": "success",
"analysis": {
# 将对象转换为格式化的JSON字符串
"raw_text": json.dumps(analysis_data, ensure_ascii=False, indent=2),
"summary": {
"risk_level": analysis_data.get("impact_analysis", {})
.get("dangerous_function", {})
.get("severity", "unknown"),
"vulnerability_count": len(analysis_data.get("vulnerability_confirmation", {}))
},
"vulnerabilities": [
{
"type": vuln_type,
"severity": vuln_data.get("severity", "unknown"),
"description": vuln_data.get("evidence", "")
}
for vuln_type, vuln_data in analysis_data.get("vulnerability_confirmation", {}).items()
],
"recommendations": [
{
"issue": rec_type,
"solution": (
f"{rec_data.get('code_level_fix', '')}\n"
f"{rec_data.get('secure_coding_practices', '')}"
).strip()
}
for rec_type, rec_data in analysis_data.get("remediation_suggestions", {}).items()
]
}
}
}
except json.JSONDecodeError as e:
logger.error(f"JSON解析失败: {str(e)}")
# 如果JSON解析失败返回原始文本
return {
"status": "success",
"analysis": {
"raw_text": analysis_text,
"summary": self._extract_summary(analysis_text),
"vulnerabilities": self._extract_vulnerabilities(analysis_text),
"recommendations": self._extract_recommendations(analysis_text)
}
}
except Exception as e:
logger.error(f"AI API调用失败: {str(e)}")
return {