# -*- coding: utf-8 -*- import os import subprocess import platform import json import yaml from typing import Dict, Optional, Tuple, List from enum import Enum from dataclasses import dataclass from openai import OpenAI # deepseek也用openai接口 import sys import locale from dotenv import load_dotenv load_dotenv() api_key = os.getenv("api_key") base_url=os.getenv('base_url') modelname=os.getenv('modelname') class RequestType(Enum): COMMAND = "A" CONFIG = "B" @dataclass class SystemInfo: os_type: str os_version: str user_privileges: bool current_user: str @dataclass class CommandResponse: analysis: str commands: List[str] comments: List[str] verification: List[str] warnings: List[str] class SystemAdminAssistant: def __init__(self, ): """初始化系统管理助手""" #openai.api_key = api_key self.system_info = self._collect_system_info() self.deepseek_client = OpenAI(api_key=api_key,base_url=base_url) def _collect_system_info(self) -> SystemInfo: """收集系统信息""" os_type = platform.system() os_version = "" current_user = os.getenv('USER', os.getenv('USERNAME', 'unknown')) try: if os_type == "Linux": os_version = subprocess.check_output(['uname', '-a'], text=True).strip() elif os_type == "Darwin": # macOS os_version = subprocess.check_output(['sw_vers', '-productVersion'], text=True).strip() elif os_type == "Windows": os_version = platform.version() # 检查是否有管理员权限 if os_type in ["Linux", "Darwin"]: is_admin = os.getuid() == 0 else: # Windows import ctypes is_admin = ctypes.windll.shell32.IsUserAnAdmin() != 0 except subprocess.SubprocessError: os_version = "Unknown" is_admin = False return SystemInfo( os_type=os_type, os_version=os_version, user_privileges=is_admin, current_user=current_user ) def _get_ai_response(self, user_request: str) -> Optional[CommandResponse]: """从Deepseek获取系统管理建议""" system_prompt = f"""你是一个专业的系统管理员助手。 系统信息: - 操作系统:{self.system_info.os_type} - 版本:{self.system_info.os_version} - 当前用户:{self.system_info.current_user} - 管理员权限:{'是' if self.system_info.user_privileges else '否'} 请针对用户需求提供专业的系统管理建议,包括: 1. 需求分析 2. 具体命令或配置修改方案 3. 命令注释 4. 验证命令 5. 安全警告 以JSON格式返回,包含以下字段: analysis: 需求分析 commands: 命令列表 comments: 注释列表 verification: 验证命令列表 warnings: 安全警告列表 直接返回内容,不要包含```json,以便解析 """ try: response = self.deepseek_client.chat.completions.create( model=modelname, # 阿里云用的是模型名称,deepseek官网用的是deepseek-chat messages=[ {"role": "system", "content": system_prompt}, {"role": "user", "content": user_request} ], temperature=0 ) print(response.choices[0].message.content) response_text = response.choices[0].message.content.strip() response_data = json.loads(response_text) return CommandResponse( analysis=response_data["analysis"], commands=response_data["commands"], comments=response_data["comments"], verification=response_data["verification"], warnings=response_data["warnings"] ) except Exception as e: print(f"调用AI API时出错:{e}") return None def _check_command_safety(self, command: str) -> List[str]: """检查命令的安全性""" warnings = [] dangerous_commands = ['rm -rf', 'mkfs', 'format'] #可继续自行添加 # 检查危险命令 for dangerous_cmd in dangerous_commands: if dangerous_cmd in command.lower(): warnings.append(f"警告:检测到危险命令 '{dangerous_cmd}',请谨慎执行") # 检查是否需要root权限 if 'sudo' in command: warnings.append("警告:此命令需要root权限") return warnings def _execute_command(self, command: str) -> Tuple[bool, str]: """执行命令并返回结果""" try: result = subprocess.run( command, shell=True, text=True, capture_output=True ) if result.returncode == 0: return True, result.stdout else: return False, f"错误: {result.stderr}" except subprocess.SubprocessError as e: return False, f"执行出错: {str(e)}" def _handle_config_file(self, file_path: str, changes: Dict) -> Tuple[bool, str]: """处理配置文件修改""" try: with open(file_path, 'r') as f: content = f.read() # 根据文件扩展名选择解析器 ext = os.path.splitext(file_path)[1].lower() if ext in ['.json']: data = json.loads(content) elif ext in ['.yaml', '.yml']: data = yaml.safe_load(content) else: return False, "不支持的配置文件格式" # 应用更改 for key, value in changes.items(): data[key] = value # 保存更改 with open(file_path, 'w') as f: if ext in ['.json']: json.dump(data, f, indent=4) elif ext in ['.yaml', '.yml']: yaml.safe_dump(data, f) return True, "配置文件更新成功" except Exception as e: return False, f"配置文件处理错误: {str(e)}" def process_request(self, user_request: str): """处理用户请求的主函数""" print("\n" + "="*50) print(f"系统信息:") print(f"操作系统: {self.system_info.os_type}") print(f"系统版本: {self.system_info.os_version}") print(f"当前用户: {self.system_info.current_user}") print(f"管理员权限: {'是' if self.system_info.user_privileges else '否'}") print("="*50) # 获取AI响应 response = self._get_ai_response(user_request) if not response: print("无法获取处理建议") return # 显示需求分析 print(f"\n需求分析:\n{response.analysis}") # 显示命令和注释 print("\n推荐方案:") for cmd, comment in zip(response.commands, response.comments): print(f"\n# {comment}") print(cmd) # 检查命令安全性 warnings = self._check_command_safety(cmd) if warnings: print("\n安全警告:") for warning in warnings: print(f"- {warning}") # 显示验证命令 if response.verification: print("\n验证命令:") for verify_cmd in response.verification: print(verify_cmd) # 显示其他警告 if response.warnings: print("\n其他注意事项:") for warning in response.warnings: print(f"- {warning}") # 请求用户确认 confirmation = input("\n是否执行这些命令?([Y]/n): ").lower().strip() if confirmation in ['y', 'yes', '']: print("\n开始执行命令...") for cmd in response.commands: print(f"\n执行: {cmd}") success, output = self._execute_command(cmd) if success: print("执行成功!输出:") print(output) else: print("执行失败!错误:") print(output) break # 执行验证命令 if success and response.verification: print("\n执行验证命令...") for verify_cmd in response.verification: print(f"\n验证: {verify_cmd}") success, output = self._execute_command(verify_cmd) print(output) else: print("\n已取消执行") def safe_input(prompt): print(prompt, end='', flush=True) input_bytes = sys.stdin.buffer.readline() try: # 尝试用系统编码正常解码 return input_bytes.decode(sys.stdin.encoding).strip() except UnicodeDecodeError: # 替换无法解码的字节 return input_bytes.decode(sys.stdin.encoding, errors='ignore').strip() def main(): """主程序""" # 设置默认编码 if sys.platform.startswith('win'): # Windows 系统 sys.stdin.reconfigure(encoding='utf-8') sys.stdout.reconfigure(encoding='utf-8') else: # Linux/Unix 系统 sys.stdin.reconfigure(encoding=locale.getpreferredencoding()) sys.stdout.reconfigure(encoding=locale.getpreferredencoding()) # 然后正常使用 input #user_request = input("请输入您的系统管理需求 (输入 'quit' 退出): ") assistant = SystemAdminAssistant() while True: print("\n" + "="*50) user_request = safe_input("请输入您的系统管理需求 (输入 'quit' 退出): ") if user_request.lower() == 'quit': break assistant.process_request(user_request) if __name__ == "__main__": main()