2025-05-26 15:57:15 +08:00

259 lines
8.9 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import sys
import os
from typing import Dict, List, Optional, Tuple
from bs4 import BeautifulSoup
import json
import requests
import re
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '..')))
from common.llmcaller import LLMCaller
from common.sshconnection import SSHConnection
import time
import logging
from dotenv import load_dotenv
load_dotenv()
deepseek_key=os.getenv('DEEPSEEK_API_KEY')
logger = logging.getLogger(__name__)
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('bandit.log'),
logging.StreamHandler() # 同时输出到控制台
]
)
class RecordFileManager:
"""记录文件管理类,用于追加和读取最后一条记录"""
def __init__(self, filepath: str):
"""
初始化记录文件管理器
Args:
filepath: 记录文件路径
"""
self.filepath = filepath
# 确保目录存在
directory = os.path.dirname(filepath)
if directory and not os.path.exists(directory):
os.makedirs(directory)
def append(self, n: int, content: str) -> bool:
"""
添加一条记录到文件末尾
Args:
n: 数字部分
content: 内容部分
Returns:
bool: 是否成功添加
"""
try:
with open(self.filepath, 'a', encoding='utf-8') as f:
f.write(f"{n} {content}\n")
return True
except Exception as e:
print(f"添加记录失败: {e}")
return False
def load_last_record(self) -> Optional[Tuple[int, str]]:
"""
读取最后一条记录
Returns:
Optional[Tuple[int, str]]: (数字, 内容) 或 None如果文件不存在或为空
"""
try:
# 如果文件不存在返回None
if not os.path.exists(self.filepath):
return None
with open(self.filepath, 'r', encoding='utf-8') as f:
lines = f.readlines()
# 如果文件为空返回None
if not lines:
return None
# 获取最后一行,去除换行符
last_line = lines[-1].strip()
# 如果最后一行为空,向前查找非空行
for line in reversed(lines):
line = line.strip()
if line:
last_line = line
break
else:
# 所有行都为空
return None
# 解析最后一行
parts = last_line.split(' ', 1) # 只分割第一个空格
if len(parts) >= 2:
try:
n = int(parts[0])
content = parts[1]
return (n, content)
except ValueError:
print(f"解析数字失败: {parts[0]}")
return None
else:
print(f"记录格式错误: {last_line}")
return None
except Exception as e:
print(f"读取记录失败: {e}")
return None
class BanditLevelFetcher:
"""获取Bandit级别信息"""
def __init__(self):
self.base_url = "https://overthewire.org/wargames/bandit"
def get_level_info(self, level: int) -> Tuple[str, str]:
"""获取指定级别的目标和帮助信息"""
try:
if level == 0:
url = f"{self.base_url}/bandit0.html"
else:
url = f"{self.base_url}/bandit{level}.html"
response = requests.get(url, timeout=10)
response.raise_for_status()
soup = BeautifulSoup(response.content, 'html.parser')
# 查找Level Goal
goal = ""
goal_header = soup.find('h2', string='Level Goal')
if goal_header:
goal_content = goal_header.find_next_sibling('p')
if goal_content:
goal = goal_content.get_text().strip()
# 查找Commands you may need to solve this level
help_info = ""
help_header = soup.find('h2', string='Commands you may need to solve this level')
if help_header:
help_content = help_header.find_next_sibling('p')
if help_content:
help_info = help_content.get_text().strip()
# 如果没有找到标准格式,尝试其他方式
if not goal:
# 尝试查找包含关键词的段落
paragraphs = soup.find_all('p')
for p in paragraphs:
text = p.get_text().lower()
if 'password' in text or 'level' in text or 'goal' in text:
goal = p.get_text().strip()
break
return goal, help_info
except Exception as e:
logger.error(f"获取级别 {level} 信息失败: {e}")
return "", ""
def getcommand(level:int,goal:str,help_info:str,userprompt:str):
systemprompt=f"""
你是一个Linux专家熟悉使用Linux,包括命令行的各种技巧正在解决OverTheWire Bandit第{level}级挑战。
游戏规则:
1. 每个级别都有一个目标你需要通过执行一系列命令来完成目标。最终目标是在系统中的某个文件中找到一个password.
1. 每个级别都有一个帮助信息,帮助信息中会包含一些有用的信息,比如可能用到的系统命令。
本关目标: {goal}
帮助信息,可能用到的系统命令: {help_info}
游戏的难点是如何找到文件并读取文件内容。有时候文件是特殊字符,需要使用技巧来读取,有时候文件藏在很多目录里,需要技巧才能找到。你要发挥聪明才智才能完成任务。
游戏的方法是:
我会登录系统,并执行初始命令 ls -l,然后把所有的结果返回给你。
你来输出下一个命令,我执行后,把结果返回给你。
只输出可以直接执行的命令,不要输出任何解释,以便程序执行。
有的文件里密码可能有说明有些文件则只包括一个字符串那这个字符串就是password
直到你告诉我目标完成。目标完成的标志是:<goal_done>password</goal_done>
必须取得真正的密码才能完成,真正的密码肯定不是'password'
如果部分命令返回异常,可以要求重新执行。
"""
openai_llm = LLMCaller('deepseek', api_key=deepseek_key)
response = openai_llm.call(
system_prompt=systemprompt,
user_prompt=userprompt,
temperature=0.7
)
print(response)
return response
def main():
recorder = RecordFileManager("records.txt")
bandit_level_fetcher = BanditLevelFetcher()
sshclient=SSHConnection()
lastpassword=""
hostname=f"bandit.labs.overthewire.org"
last_record = recorder.load_last_record()
if last_record:
n, lastpassword = last_record
print(f"最后一条记录: {n} {lastpassword}")
else:
n=0
for i in range(n,33):
if i==0:
lastpassword="bandit0"
username=f"bandit{i}"
print(f"正在获取级别 {i} 的信息...")
logger.info(f"正在获取级别 {i} 的信息...")
goal,info = bandit_level_fetcher.get_level_info(i+1)
#print(goal,info)
logger.info(f"level {i} goal: {goal}")
if not sshclient.connect(username,lastpassword,hostname=hostname):
logger.error(f"level {i} connect failed")
break
result=sshclient.execute_command("ls -l")
userprompt=f"""
command: ls -l
return value"{result}
"""
while True:
command=getcommand(i,goal,info,userprompt)
print(command)
logger.info(f"level {i} command: {command}")
match = re.search(r"<goal_done>(.*?)</goal_done>", command)
if match:
lastpassword = match.group(1).strip()
print(f"get password: {lastpassword}")
recorder.append(i+1,lastpassword)
sshclient.close()
logger.info(f"level {i} get password: {lastpassword}")
break
result=sshclient.execute_command(command)
print(result)
userprompt+=f"""
command: {command}
return value: {result}
"""
print(userprompt)
logger.info(f"level {i} userprompt: {userprompt}")
if __name__ == "__main__":
main()
# 使用示例