import sys import os from typing import Dict, List, Optional, Tuple from bs4 import BeautifulSoup import json import requests import re sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '..'))) from common.llmcaller import LLMCaller from common.sshconnection import SSHConnection import time import logging from dotenv import load_dotenv load_dotenv() deepseek_key=os.getenv('DEEPSEEK_API_KEY') logger = logging.getLogger(__name__) logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', handlers=[ logging.FileHandler('bandit.log'), logging.StreamHandler() # 同时输出到控制台 ] ) class RecordFileManager: """记录文件管理类,用于追加和读取最后一条记录""" def __init__(self, filepath: str): """ 初始化记录文件管理器 Args: filepath: 记录文件路径 """ self.filepath = filepath # 确保目录存在 directory = os.path.dirname(filepath) if directory and not os.path.exists(directory): os.makedirs(directory) def append(self, n: int, content: str) -> bool: """ 添加一条记录到文件末尾 Args: n: 数字部分 content: 内容部分 Returns: bool: 是否成功添加 """ try: with open(self.filepath, 'a', encoding='utf-8') as f: f.write(f"{n} {content}\n") return True except Exception as e: print(f"添加记录失败: {e}") return False def load_last_record(self) -> Optional[Tuple[int, str]]: """ 读取最后一条记录 Returns: Optional[Tuple[int, str]]: (数字, 内容) 或 None(如果文件不存在或为空) """ try: # 如果文件不存在,返回None if not os.path.exists(self.filepath): return None with open(self.filepath, 'r', encoding='utf-8') as f: lines = f.readlines() # 如果文件为空,返回None if not lines: return None # 获取最后一行,去除换行符 last_line = lines[-1].strip() # 如果最后一行为空,向前查找非空行 for line in reversed(lines): line = line.strip() if line: last_line = line break else: # 所有行都为空 return None # 解析最后一行 parts = last_line.split(' ', 1) # 只分割第一个空格 if len(parts) >= 2: try: n = int(parts[0]) content = parts[1] return (n, content) except ValueError: print(f"解析数字失败: {parts[0]}") return None else: print(f"记录格式错误: {last_line}") return None except Exception as e: print(f"读取记录失败: {e}") return None class BanditLevelFetcher: """获取Bandit级别信息""" def __init__(self): self.base_url = "https://overthewire.org/wargames/bandit" def get_level_info(self, level: int) -> Tuple[str, str]: """获取指定级别的目标和帮助信息""" try: if level == 0: url = f"{self.base_url}/bandit0.html" else: url = f"{self.base_url}/bandit{level}.html" response = requests.get(url, timeout=10) response.raise_for_status() soup = BeautifulSoup(response.content, 'html.parser') # 查找Level Goal goal = "" goal_header = soup.find('h2', string='Level Goal') if goal_header: goal_content = goal_header.find_next_sibling('p') if goal_content: goal = goal_content.get_text().strip() # 查找Commands you may need to solve this level help_info = "" help_header = soup.find('h2', string='Commands you may need to solve this level') if help_header: help_content = help_header.find_next_sibling('p') if help_content: help_info = help_content.get_text().strip() # 如果没有找到标准格式,尝试其他方式 if not goal: # 尝试查找包含关键词的段落 paragraphs = soup.find_all('p') for p in paragraphs: text = p.get_text().lower() if 'password' in text or 'level' in text or 'goal' in text: goal = p.get_text().strip() break return goal, help_info except Exception as e: logger.error(f"获取级别 {level} 信息失败: {e}") return "", "" def getcommand(level:int,goal:str,help_info:str,userprompt:str): systemprompt=f""" 你是一个Linux专家,熟悉使用Linux,包括命令行的各种技巧,正在解决OverTheWire Bandit第{level}级挑战。 游戏规则: 1. 每个级别都有一个目标,你需要通过执行一系列命令来完成目标。最终目标是在系统中的某个文件中找到一个password. 1. 每个级别都有一个帮助信息,帮助信息中会包含一些有用的信息,比如可能用到的系统命令。 本关目标: {goal} 帮助信息,可能用到的系统命令: {help_info} 游戏的难点是如何找到文件并读取文件内容。有时候文件是特殊字符,需要使用技巧来读取,有时候文件藏在很多目录里,需要技巧才能找到。你要发挥聪明才智才能完成任务。 游戏的方法是: 我会登录系统,并执行初始命令 ls -l,然后把所有的结果返回给你。 你来输出下一个命令,我执行后,把结果返回给你。 只输出可以直接执行的命令,不要输出任何解释,以便程序执行。 有的文件里密码可能有说明,有些文件则只包括一个字符串,那这个字符串就是password 直到你告诉我目标完成。目标完成的标志是:password 必须取得真正的密码才能完成,真正的密码肯定不是'password' 如果部分命令返回异常,可以要求重新执行。 """ openai_llm = LLMCaller('deepseek', api_key=deepseek_key) response = openai_llm.call( system_prompt=systemprompt, user_prompt=userprompt, temperature=0.7 ) print(response) return response def main(): recorder = RecordFileManager("records.txt") bandit_level_fetcher = BanditLevelFetcher() sshclient=SSHConnection() lastpassword="" hostname=f"bandit.labs.overthewire.org" last_record = recorder.load_last_record() if last_record: n, lastpassword = last_record print(f"最后一条记录: {n} {lastpassword}") else: n=0 for i in range(n,33): if i==0: lastpassword="bandit0" username=f"bandit{i}" print(f"正在获取级别 {i} 的信息...") logger.info(f"正在获取级别 {i} 的信息...") goal,info = bandit_level_fetcher.get_level_info(i+1) #print(goal,info) logger.info(f"level {i} goal: {goal}") if not sshclient.connect(username,lastpassword,hostname=hostname): logger.error(f"level {i} connect failed") break result=sshclient.execute_command("ls -l") userprompt=f""" command: ls -l return value"{result} """ while True: command=getcommand(i,goal,info,userprompt) print(command) logger.info(f"level {i} command: {command}") match = re.search(r"(.*?)", command) if match: lastpassword = match.group(1).strip() print(f"get password: {lastpassword}") recorder.append(i+1,lastpassword) sshclient.close() logger.info(f"level {i} get password: {lastpassword}") break result=sshclient.execute_command(command) print(result) userprompt+=f""" command: {command} return value: {result} """ print(userprompt) logger.info(f"level {i} userprompt: {userprompt}") if __name__ == "__main__": main() # 使用示例