AIPentest/Tools/llmsysadmin.py
AiShell 2ea72ab5c4
Update llmsysadmin.py
modify modelname from .env
2025-03-20 14:10:40 +08:00

308 lines
10 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# -*- coding: utf-8 -*-
import os
import subprocess
import platform
import json
import yaml
from typing import Dict, Optional, Tuple, List
from enum import Enum
from dataclasses import dataclass
from openai import OpenAI # deepseek也用openai接口
import sys
import locale
from dotenv import load_dotenv
load_dotenv()
api_key = os.getenv("api_key")
base_url=os.getenv('base_url')
modelname=os.getenv('modelname')
class RequestType(Enum):
COMMAND = "A"
CONFIG = "B"
@dataclass
class SystemInfo:
os_type: str
os_version: str
user_privileges: bool
current_user: str
@dataclass
class CommandResponse:
analysis: str
commands: List[str]
comments: List[str]
verification: List[str]
warnings: List[str]
class SystemAdminAssistant:
def __init__(self, ):
"""初始化系统管理助手"""
#openai.api_key = api_key
self.system_info = self._collect_system_info()
self.deepseek_client = OpenAI(api_key=api_key,base_url=base_url)
def _collect_system_info(self) -> SystemInfo:
"""收集系统信息"""
os_type = platform.system()
os_version = ""
current_user = os.getenv('USER', os.getenv('USERNAME', 'unknown'))
try:
if os_type == "Linux":
os_version = subprocess.check_output(['uname', '-a'], text=True).strip()
elif os_type == "Darwin": # macOS
os_version = subprocess.check_output(['sw_vers', '-productVersion'], text=True).strip()
elif os_type == "Windows":
os_version = platform.version()
# 检查是否有管理员权限
if os_type in ["Linux", "Darwin"]:
is_admin = os.getuid() == 0
else: # Windows
import ctypes
is_admin = ctypes.windll.shell32.IsUserAnAdmin() != 0
except subprocess.SubprocessError:
os_version = "Unknown"
is_admin = False
return SystemInfo(
os_type=os_type,
os_version=os_version,
user_privileges=is_admin,
current_user=current_user
)
def _get_ai_response(self, user_request: str) -> Optional[CommandResponse]:
"""从Deepseek获取系统管理建议"""
system_prompt = f"""你是一个专业的系统管理员助手。
系统信息:
- 操作系统:{self.system_info.os_type}
- 版本:{self.system_info.os_version}
- 当前用户:{self.system_info.current_user}
- 管理员权限:{'' if self.system_info.user_privileges else ''}
请针对用户需求提供专业的系统管理建议,包括:
1. 需求分析
2. 具体命令或配置修改方案
3. 命令注释
4. 验证命令
5. 安全警告
以JSON格式返回包含以下字段
analysis: 需求分析
commands: 命令列表
comments: 注释列表
verification: 验证命令列表
warnings: 安全警告列表
直接返回内容,不要包含```json以便解析
"""
try:
response = self.deepseek_client.chat.completions.create(
model=modelname, # 阿里云用的是模型名称deepseek官网用的是deepseek-chat
messages=[
{"role": "system", "content": system_prompt},
{"role": "user", "content": user_request}
],
temperature=0
)
print(response.choices[0].message.content)
response_text = response.choices[0].message.content.strip()
response_data = json.loads(response_text)
return CommandResponse(
analysis=response_data["analysis"],
commands=response_data["commands"],
comments=response_data["comments"],
verification=response_data["verification"],
warnings=response_data["warnings"]
)
except Exception as e:
print(f"调用AI API时出错{e}")
return None
def _check_command_safety(self, command: str) -> List[str]:
"""检查命令的安全性"""
warnings = []
dangerous_commands = ['rm -rf', 'mkfs', 'format'] #可继续自行添加
# 检查危险命令
for dangerous_cmd in dangerous_commands:
if dangerous_cmd in command.lower():
warnings.append(f"警告:检测到危险命令 '{dangerous_cmd}',请谨慎执行")
# 检查是否需要root权限
if 'sudo' in command:
warnings.append("警告此命令需要root权限")
return warnings
def _execute_command(self, command: str) -> Tuple[bool, str]:
"""执行命令并返回结果"""
try:
result = subprocess.run(
command,
shell=True,
text=True,
capture_output=True
)
if result.returncode == 0:
return True, result.stdout
else:
return False, f"错误: {result.stderr}"
except subprocess.SubprocessError as e:
return False, f"执行出错: {str(e)}"
def _handle_config_file(self, file_path: str, changes: Dict) -> Tuple[bool, str]:
"""处理配置文件修改"""
try:
with open(file_path, 'r') as f:
content = f.read()
# 根据文件扩展名选择解析器
ext = os.path.splitext(file_path)[1].lower()
if ext in ['.json']:
data = json.loads(content)
elif ext in ['.yaml', '.yml']:
data = yaml.safe_load(content)
else:
return False, "不支持的配置文件格式"
# 应用更改
for key, value in changes.items():
data[key] = value
# 保存更改
with open(file_path, 'w') as f:
if ext in ['.json']:
json.dump(data, f, indent=4)
elif ext in ['.yaml', '.yml']:
yaml.safe_dump(data, f)
return True, "配置文件更新成功"
except Exception as e:
return False, f"配置文件处理错误: {str(e)}"
def process_request(self, user_request: str):
"""处理用户请求的主函数"""
print("\n" + "="*50)
print(f"系统信息:")
print(f"操作系统: {self.system_info.os_type}")
print(f"系统版本: {self.system_info.os_version}")
print(f"当前用户: {self.system_info.current_user}")
print(f"管理员权限: {'' if self.system_info.user_privileges else ''}")
print("="*50)
# 获取AI响应
response = self._get_ai_response(user_request)
if not response:
print("无法获取处理建议")
return
# 显示需求分析
print(f"\n需求分析:\n{response.analysis}")
# 显示命令和注释
print("\n推荐方案:")
for cmd, comment in zip(response.commands, response.comments):
print(f"\n# {comment}")
print(cmd)
# 检查命令安全性
warnings = self._check_command_safety(cmd)
if warnings:
print("\n安全警告:")
for warning in warnings:
print(f"- {warning}")
# 显示验证命令
if response.verification:
print("\n验证命令:")
for verify_cmd in response.verification:
print(verify_cmd)
# 显示其他警告
if response.warnings:
print("\n其他注意事项:")
for warning in response.warnings:
print(f"- {warning}")
# 请求用户确认
confirmation = input("\n是否执行这些命令?([Y]/n): ").lower().strip()
if confirmation in ['y', 'yes', '']:
print("\n开始执行命令...")
for cmd in response.commands:
print(f"\n执行: {cmd}")
success, output = self._execute_command(cmd)
if success:
print("执行成功!输出:")
print(output)
else:
print("执行失败!错误:")
print(output)
break
# 执行验证命令
if success and response.verification:
print("\n执行验证命令...")
for verify_cmd in response.verification:
print(f"\n验证: {verify_cmd}")
success, output = self._execute_command(verify_cmd)
print(output)
else:
print("\n已取消执行")
def safe_input(prompt):
print(prompt, end='', flush=True)
input_bytes = sys.stdin.buffer.readline()
try:
# 尝试用系统编码正常解码
return input_bytes.decode(sys.stdin.encoding).strip()
except UnicodeDecodeError:
# 替换无法解码的字节
return input_bytes.decode(sys.stdin.encoding, errors='ignore').strip()
def main():
"""主程序"""
# 设置默认编码
if sys.platform.startswith('win'):
# Windows 系统
sys.stdin.reconfigure(encoding='utf-8')
sys.stdout.reconfigure(encoding='utf-8')
else:
# Linux/Unix 系统
sys.stdin.reconfigure(encoding=locale.getpreferredencoding())
sys.stdout.reconfigure(encoding=locale.getpreferredencoding())
# 然后正常使用 input
#user_request = input("请输入您的系统管理需求 (输入 'quit' 退出): ")
assistant = SystemAdminAssistant()
while True:
print("\n" + "="*50)
user_request = safe_input("请输入您的系统管理需求 (输入 'quit' 退出): ")
if user_request.lower() == 'quit':
break
assistant.process_request(user_request)
if __name__ == "__main__":
main()