Update kali_server.py

修改通道控制,加上交互式通道
This commit is contained in:
AiShell 2025-05-03 15:08:06 +08:00 committed by GitHub
parent c2c2ae101d
commit ea00812dc0
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

View File

@ -2,8 +2,6 @@
# This script connect the MCP AI agent to Kali Linux terminal and API Server.
# some of the code here was inspired from https://github.com/whit3rabbit0/project_astro , be sure to check them out
import argparse
import json
import logging
@ -12,7 +10,10 @@ import subprocess
import sys
import traceback
import threading
from typing import Dict, Any
import tempfile
import signal
import time
from typing import Dict, Any, Optional, Tuple
from flask import Flask, request, jsonify
# Configure logging
@ -28,14 +29,14 @@ logger = logging.getLogger(__name__)
# Configuration
API_PORT = int(os.environ.get("API_PORT", 5000))
DEBUG_MODE = os.environ.get("DEBUG_MODE", "0").lower() in ("1", "true", "yes", "y")
COMMAND_TIMEOUT = 180 # 5 minutes default timeout
COMMAND_TIMEOUT = 180 # 3 minutes default timeout
app = Flask(__name__)
class CommandExecutor:
"""Class to handle command execution with better timeout management"""
def __init__(self, command: str, timeout: int = COMMAND_TIMEOUT):
def __init__(self, command: str, timeout: int = COMMAND_TIMEOUT, interactive: bool = False):
self.command = command
self.timeout = timeout
self.process = None
@ -45,21 +46,51 @@ class CommandExecutor:
self.stderr_thread = None
self.return_code = None
self.timed_out = False
self.interactive = interactive
# Check if expect is installed - needed for complex interactive commands
if self.interactive and "expect" in command:
try:
check_expect = subprocess.run(["which", "expect"],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
if check_expect.returncode != 0:
logger.warning("'expect' command not found, installing it...")
# Try to install expect
install_cmd = subprocess.run(["apt-get", "install", "-y", "expect"],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
if install_cmd.returncode != 0:
logger.error("Failed to install 'expect'. Some interactive commands may not work properly.")
except Exception as e:
logger.error(f"Error checking/installing 'expect': {str(e)}")
def _read_stdout(self):
"""Thread function to continuously read stdout"""
for line in iter(self.process.stdout.readline, ''):
self.stdout_data += line
if DEBUG_MODE:
logger.debug(f"STDOUT: {line.strip()}")
def _read_stderr(self):
"""Thread function to continuously read stderr"""
for line in iter(self.process.stderr.readline, ''):
self.stderr_data += line
if DEBUG_MODE:
logger.debug(f"STDERR: {line.strip()}")
def execute(self) -> Dict[str, Any]:
"""Execute the command and handle timeout gracefully"""
logger.info(f"Executing command: {self.command}")
# For interactive tools, use a different approach with pseudo-TTY
if self.interactive:
return self._execute_interactive()
else:
return self._execute_non_interactive()
def _execute_non_interactive(self) -> Dict[str, Any]:
"""Execute non-interactive command"""
try:
self.process = subprocess.Popen(
self.command,
@ -125,20 +156,98 @@ class CommandExecutor:
"partial_results": bool(self.stdout_data or self.stderr_data)
}
def _execute_interactive(self) -> Dict[str, Any]:
"""Execute command that requires a TTY/PTY"""
temp_output = tempfile.NamedTemporaryFile(delete=False, mode='w+')
temp_name = temp_output.name
temp_output.close()
def execute_command(command: str) -> Dict[str, Any]:
try:
# Create command that runs in a PTY and redirects output to our temp file
pty_command = f"script -q -c '{self.command}' {temp_name}"
# Execute the command
process = subprocess.Popen(
pty_command,
shell=True,
preexec_fn=os.setsid # Create new process group for proper kill
)
# Wait for timeout or completion
try:
return_code = process.wait(timeout=self.timeout)
timed_out = False
except subprocess.TimeoutExpired:
logger.warning(f"Interactive command timed out after {self.timeout} seconds")
# Kill the entire process group
os.killpg(os.getpgid(process.pid), signal.SIGTERM)
time.sleep(1)
try:
os.killpg(os.getpgid(process.pid), signal.SIGKILL)
except:
pass
return_code = -1
timed_out = True
# Read the captured output
with open(temp_name, 'r') as f:
output = f.read()
# Determine success based on output presence even if timed out
success = True if timed_out and output else (return_code == 0)
return {
"stdout": output,
"stderr": "", # All output is captured to stdout with the script command
"return_code": return_code,
"success": success,
"timed_out": timed_out,
"partial_results": timed_out and bool(output)
}
except Exception as e:
logger.error(f"Error executing interactive command: {str(e)}")
logger.error(traceback.format_exc())
return {
"stdout": "",
"stderr": f"Error executing interactive command: {str(e)}",
"return_code": -1,
"success": False,
"timed_out": False,
"partial_results": False
}
finally:
# Clean up temp file
try:
os.unlink(temp_name)
except:
pass
def execute_command(command: str, interactive: bool = False) -> Dict[str, Any]:
"""
Execute a shell command and return the result
Args:
command: The command to execute
interactive: Whether the command requires a TTY
Returns:
A dictionary containing the stdout, stderr, and return code
"""
executor = CommandExecutor(command)
executor = CommandExecutor(command, interactive=interactive)
return executor.execute()
# Helper function to determine if a command should be run in interactive mode
def needs_interactive_mode(command: str) -> bool:
"""Determine if a command likely needs an interactive TTY session"""
interactive_tools = [
"msfconsole", "sqlmap", "hydra", "wpscan", "metasploit",
"meterpreter", "postgresql", "mysql", "ssh", "ftp", "telnet"
]
return any(tool in command for tool in interactive_tools)
@app.route("/api/command", methods=["POST"])
def generic_command():
@ -146,6 +255,7 @@ def generic_command():
try:
params = request.json
command = params.get("command", "")
force_interactive = params.get("interactive", False)
if not command:
logger.warning("Command endpoint called without command parameter")
@ -153,7 +263,11 @@ def generic_command():
"error": "Command parameter is required"
}), 400
result = execute_command(command)
# Determine if command needs interactive mode
interactive = force_interactive or needs_interactive_mode(command)
logger.info(f"Running command in {'interactive' if interactive else 'non-interactive'} mode")
result = execute_command(command, interactive=interactive)
return jsonify(result)
except Exception as e:
logger.error(f"Error in command endpoint: {str(e)}")
@ -316,7 +430,8 @@ def sqlmap():
if additional_args:
command += f" {additional_args}"
result = execute_command(command)
# SQLMap definitely needs interactive mode
result = execute_command(command, interactive=True)
return jsonify(result)
except Exception as e:
logger.error(f"Error in sqlmap endpoint: {str(e)}")
@ -332,6 +447,8 @@ def metasploit():
params = request.json
module = params.get("module", "")
options = params.get("options", {})
commands = params.get("commands", [])
timeout = params.get("timeout", COMMAND_TIMEOUT)
if not module:
logger.warning("Metasploit called without module parameter")
@ -339,24 +456,74 @@ def metasploit():
"error": "Module parameter is required"
}), 400
# Format options for Metasploit
options_str = ""
for key, value in options.items():
options_str += f" {key}={value}"
# Create an MSF resource script
# Create a more sophisticated MSF resource script
resource_content = f"use {module}\n"
# Set module options
for key, value in options.items():
resource_content += f"set {key} {value}\n"
resource_content += "exploit\n"
# Save resource script to a temporary file
resource_file = "/tmp/mcp_msf_resource.rc"
# Run the exploit
resource_content += "exploit -z\n" # -z means don't interact with the session
# Additional post-exploit commands if provided
for cmd in commands:
resource_content += f"{cmd}\n"
# Handle sessions more gracefully
resource_content += "sessions -l\n" # List all sessions
resource_content += "sleep 3\n" # Give it time to complete
resource_content += "sessions -K\n" # Kill all sessions before exit
resource_content += "exit\n" # Exit msfconsole
# Save resource script to a temporary file with unique name
resource_file = f"/tmp/mcp_msf_resource_{os.getpid()}_{int(time.time())}.rc"
with open(resource_file, "w") as f:
f.write(resource_content)
command = f"msfconsole -q -r {resource_file}"
result = execute_command(command)
logger.info(f"Created Metasploit resource script: {resource_file}")
logger.info(f"Resource script content:\n{resource_content}")
# Use expect script to handle the metasploit console automatically
# This creates a command that will automatically handle interactive prompts
expect_script = f"""
timeout {timeout} expect -c '
set timeout {timeout}
spawn msfconsole -q -r {resource_file}
expect {{
"exploit completed" {{
sleep 5
send "exit\\r"
exp_continue
}}
"session 1 opened" {{
sleep 5
send "\\r"
exp_continue
}}
"command shell session" {{
sleep 3
send "exit\\r"
exp_continue
}}
"meterpreter >" {{
send "exit\\r"
exp_continue
}}
timeout {{
puts "TIMEOUT OCCURRED"
exit 1
}}
eof {{
exit 0
}}
}}
'
"""
# Execute the metasploit command with the expect script
command = expect_script
result = execute_command(command, interactive=True)
# Clean up the temporary file
try:
@ -414,7 +581,8 @@ def hydra():
command += f" {target} {service}"
result = execute_command(command)
# Hydra might benefit from interactive mode for certain services
result = execute_command(command, interactive=True)
return jsonify(result)
except Exception as e:
logger.error(f"Error in hydra endpoint: {str(e)}")
@ -475,12 +643,13 @@ def wpscan():
"error": "URL parameter is required"
}), 400
command = f"wpscan --url {url}"
command = f"wpscan --url {url} --no-banner"
if additional_args:
command += f" {additional_args}"
result = execute_command(command)
# WPScan needs interactive mode as it displays progress bars
result = execute_command(command, interactive=True)
return jsonify(result)
except Exception as e:
logger.error(f"Error in wpscan endpoint: {str(e)}")
@ -539,21 +708,13 @@ def health_check():
"all_essential_tools_available": all_essential_tools_available
})
@app.route("/mcp/capabilities", methods=["GET"])
def get_capabilities():
# Return tool capabilities similar to our existing MCP server
pass
@app.route("/mcp/tools/kali_tools/<tool_name>", methods=["POST"])
def execute_tool(tool_name):
# Direct tool execution without going through the API server
pass
def parse_args():
"""Parse command line arguments."""
parser = argparse.ArgumentParser(description="Run the Kali Linux API Server")
parser.add_argument("--debug", action="store_true", help="Enable debug mode")
parser.add_argument("--port", type=int, default=API_PORT, help=f"Port for the API server (default: {API_PORT})")
parser.add_argument("--timeout", type=int, default=COMMAND_TIMEOUT,
help=f"Default command timeout in seconds (default: {COMMAND_TIMEOUT})")
return parser.parse_args()
if __name__ == "__main__":
@ -568,5 +729,26 @@ if __name__ == "__main__":
if args.port != API_PORT:
API_PORT = args.port
# Update default timeout if specified
if args.timeout != COMMAND_TIMEOUT:
COMMAND_TIMEOUT = args.timeout
logger.info(f"Command timeout set to {COMMAND_TIMEOUT} seconds")
# Check for essential dependencies
try:
dependencies = ["script", "expect"]
for dep in dependencies:
check = subprocess.run(["which", dep], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
if check.returncode != 0:
logger.warning(f"'{dep}' command not found, installing required dependencies...")
install = subprocess.run(["apt-get", "update"], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
install = subprocess.run(["apt-get", "install", "-y", "expect", "bsdutils"],
stdout=subprocess.PIPE, stderr=subprocess.PIPE)
if install.returncode != 0:
logger.error(f"Failed to install required dependencies. Some features may not work properly.")
break
except Exception as e:
logger.error(f"Error checking dependencies: {str(e)}")
logger.info(f"Starting Kali Linux Tools API Server on port {API_PORT}")
app.run(host="0.0.0.0", port=API_PORT, debug=DEBUG_MODE)