diff --git a/mcpforpentest/kali_server.py b/mcpforpentest/kali_server.py index db5be7e..966c085 100644 --- a/mcpforpentest/kali_server.py +++ b/mcpforpentest/kali_server.py @@ -2,8 +2,6 @@ # This script connect the MCP AI agent to Kali Linux terminal and API Server. -# some of the code here was inspired from https://github.com/whit3rabbit0/project_astro , be sure to check them out - import argparse import json import logging @@ -12,7 +10,10 @@ import subprocess import sys import traceback import threading -from typing import Dict, Any +import tempfile +import signal +import time +from typing import Dict, Any, Optional, Tuple from flask import Flask, request, jsonify # Configure logging @@ -28,14 +29,14 @@ logger = logging.getLogger(__name__) # Configuration API_PORT = int(os.environ.get("API_PORT", 5000)) DEBUG_MODE = os.environ.get("DEBUG_MODE", "0").lower() in ("1", "true", "yes", "y") -COMMAND_TIMEOUT = 180 # 5 minutes default timeout +COMMAND_TIMEOUT = 180 # 3 minutes default timeout app = Flask(__name__) class CommandExecutor: """Class to handle command execution with better timeout management""" - def __init__(self, command: str, timeout: int = COMMAND_TIMEOUT): + def __init__(self, command: str, timeout: int = COMMAND_TIMEOUT, interactive: bool = False): self.command = command self.timeout = timeout self.process = None @@ -45,21 +46,51 @@ class CommandExecutor: self.stderr_thread = None self.return_code = None self.timed_out = False + self.interactive = interactive + + # Check if expect is installed - needed for complex interactive commands + if self.interactive and "expect" in command: + try: + check_expect = subprocess.run(["which", "expect"], + stdout=subprocess.PIPE, + stderr=subprocess.PIPE) + if check_expect.returncode != 0: + logger.warning("'expect' command not found, installing it...") + # Try to install expect + install_cmd = subprocess.run(["apt-get", "install", "-y", "expect"], + stdout=subprocess.PIPE, + stderr=subprocess.PIPE) + if install_cmd.returncode != 0: + logger.error("Failed to install 'expect'. Some interactive commands may not work properly.") + except Exception as e: + logger.error(f"Error checking/installing 'expect': {str(e)}") def _read_stdout(self): """Thread function to continuously read stdout""" for line in iter(self.process.stdout.readline, ''): self.stdout_data += line + if DEBUG_MODE: + logger.debug(f"STDOUT: {line.strip()}") def _read_stderr(self): """Thread function to continuously read stderr""" for line in iter(self.process.stderr.readline, ''): self.stderr_data += line + if DEBUG_MODE: + logger.debug(f"STDERR: {line.strip()}") def execute(self) -> Dict[str, Any]: """Execute the command and handle timeout gracefully""" logger.info(f"Executing command: {self.command}") - + + # For interactive tools, use a different approach with pseudo-TTY + if self.interactive: + return self._execute_interactive() + else: + return self._execute_non_interactive() + + def _execute_non_interactive(self) -> Dict[str, Any]: + """Execute non-interactive command""" try: self.process = subprocess.Popen( self.command, @@ -124,21 +155,99 @@ class CommandExecutor: "timed_out": False, "partial_results": bool(self.stdout_data or self.stderr_data) } + + def _execute_interactive(self) -> Dict[str, Any]: + """Execute command that requires a TTY/PTY""" + temp_output = tempfile.NamedTemporaryFile(delete=False, mode='w+') + temp_name = temp_output.name + temp_output.close() + + try: + # Create command that runs in a PTY and redirects output to our temp file + pty_command = f"script -q -c '{self.command}' {temp_name}" + + # Execute the command + process = subprocess.Popen( + pty_command, + shell=True, + preexec_fn=os.setsid # Create new process group for proper kill + ) + + # Wait for timeout or completion + try: + return_code = process.wait(timeout=self.timeout) + timed_out = False + except subprocess.TimeoutExpired: + logger.warning(f"Interactive command timed out after {self.timeout} seconds") + # Kill the entire process group + os.killpg(os.getpgid(process.pid), signal.SIGTERM) + time.sleep(1) + try: + os.killpg(os.getpgid(process.pid), signal.SIGKILL) + except: + pass + return_code = -1 + timed_out = True + + # Read the captured output + with open(temp_name, 'r') as f: + output = f.read() + + # Determine success based on output presence even if timed out + success = True if timed_out and output else (return_code == 0) + + return { + "stdout": output, + "stderr": "", # All output is captured to stdout with the script command + "return_code": return_code, + "success": success, + "timed_out": timed_out, + "partial_results": timed_out and bool(output) + } + + except Exception as e: + logger.error(f"Error executing interactive command: {str(e)}") + logger.error(traceback.format_exc()) + return { + "stdout": "", + "stderr": f"Error executing interactive command: {str(e)}", + "return_code": -1, + "success": False, + "timed_out": False, + "partial_results": False + } + finally: + # Clean up temp file + try: + os.unlink(temp_name) + except: + pass -def execute_command(command: str) -> Dict[str, Any]: +def execute_command(command: str, interactive: bool = False) -> Dict[str, Any]: """ Execute a shell command and return the result Args: command: The command to execute + interactive: Whether the command requires a TTY Returns: A dictionary containing the stdout, stderr, and return code """ - executor = CommandExecutor(command) + executor = CommandExecutor(command, interactive=interactive) return executor.execute() +# Helper function to determine if a command should be run in interactive mode +def needs_interactive_mode(command: str) -> bool: + """Determine if a command likely needs an interactive TTY session""" + interactive_tools = [ + "msfconsole", "sqlmap", "hydra", "wpscan", "metasploit", + "meterpreter", "postgresql", "mysql", "ssh", "ftp", "telnet" + ] + + return any(tool in command for tool in interactive_tools) + @app.route("/api/command", methods=["POST"]) def generic_command(): @@ -146,6 +255,7 @@ def generic_command(): try: params = request.json command = params.get("command", "") + force_interactive = params.get("interactive", False) if not command: logger.warning("Command endpoint called without command parameter") @@ -153,7 +263,11 @@ def generic_command(): "error": "Command parameter is required" }), 400 - result = execute_command(command) + # Determine if command needs interactive mode + interactive = force_interactive or needs_interactive_mode(command) + logger.info(f"Running command in {'interactive' if interactive else 'non-interactive'} mode") + + result = execute_command(command, interactive=interactive) return jsonify(result) except Exception as e: logger.error(f"Error in command endpoint: {str(e)}") @@ -316,7 +430,8 @@ def sqlmap(): if additional_args: command += f" {additional_args}" - result = execute_command(command) + # SQLMap definitely needs interactive mode + result = execute_command(command, interactive=True) return jsonify(result) except Exception as e: logger.error(f"Error in sqlmap endpoint: {str(e)}") @@ -332,6 +447,8 @@ def metasploit(): params = request.json module = params.get("module", "") options = params.get("options", {}) + commands = params.get("commands", []) + timeout = params.get("timeout", COMMAND_TIMEOUT) if not module: logger.warning("Metasploit called without module parameter") @@ -339,24 +456,74 @@ def metasploit(): "error": "Module parameter is required" }), 400 - # Format options for Metasploit - options_str = "" - for key, value in options.items(): - options_str += f" {key}={value}" - - # Create an MSF resource script + # Create a more sophisticated MSF resource script resource_content = f"use {module}\n" + + # Set module options for key, value in options.items(): resource_content += f"set {key} {value}\n" - resource_content += "exploit\n" - # Save resource script to a temporary file - resource_file = "/tmp/mcp_msf_resource.rc" + # Run the exploit + resource_content += "exploit -z\n" # -z means don't interact with the session + + # Additional post-exploit commands if provided + for cmd in commands: + resource_content += f"{cmd}\n" + + # Handle sessions more gracefully + resource_content += "sessions -l\n" # List all sessions + resource_content += "sleep 3\n" # Give it time to complete + resource_content += "sessions -K\n" # Kill all sessions before exit + resource_content += "exit\n" # Exit msfconsole + + # Save resource script to a temporary file with unique name + resource_file = f"/tmp/mcp_msf_resource_{os.getpid()}_{int(time.time())}.rc" with open(resource_file, "w") as f: f.write(resource_content) - command = f"msfconsole -q -r {resource_file}" - result = execute_command(command) + logger.info(f"Created Metasploit resource script: {resource_file}") + logger.info(f"Resource script content:\n{resource_content}") + + # Use expect script to handle the metasploit console automatically + # This creates a command that will automatically handle interactive prompts + expect_script = f""" +timeout {timeout} expect -c ' + set timeout {timeout} + spawn msfconsole -q -r {resource_file} + expect {{ + "exploit completed" {{ + sleep 5 + send "exit\\r" + exp_continue + }} + "session 1 opened" {{ + sleep 5 + send "\\r" + exp_continue + }} + "command shell session" {{ + sleep 3 + send "exit\\r" + exp_continue + }} + "meterpreter >" {{ + send "exit\\r" + exp_continue + }} + timeout {{ + puts "TIMEOUT OCCURRED" + exit 1 + }} + eof {{ + exit 0 + }} + }} +' +""" + + # Execute the metasploit command with the expect script + command = expect_script + result = execute_command(command, interactive=True) # Clean up the temporary file try: @@ -414,7 +581,8 @@ def hydra(): command += f" {target} {service}" - result = execute_command(command) + # Hydra might benefit from interactive mode for certain services + result = execute_command(command, interactive=True) return jsonify(result) except Exception as e: logger.error(f"Error in hydra endpoint: {str(e)}") @@ -475,12 +643,13 @@ def wpscan(): "error": "URL parameter is required" }), 400 - command = f"wpscan --url {url}" + command = f"wpscan --url {url} --no-banner" if additional_args: command += f" {additional_args}" - result = execute_command(command) + # WPScan needs interactive mode as it displays progress bars + result = execute_command(command, interactive=True) return jsonify(result) except Exception as e: logger.error(f"Error in wpscan endpoint: {str(e)}") @@ -539,21 +708,13 @@ def health_check(): "all_essential_tools_available": all_essential_tools_available }) -@app.route("/mcp/capabilities", methods=["GET"]) -def get_capabilities(): - # Return tool capabilities similar to our existing MCP server - pass - -@app.route("/mcp/tools/kali_tools/", methods=["POST"]) -def execute_tool(tool_name): - # Direct tool execution without going through the API server - pass - def parse_args(): """Parse command line arguments.""" parser = argparse.ArgumentParser(description="Run the Kali Linux API Server") parser.add_argument("--debug", action="store_true", help="Enable debug mode") parser.add_argument("--port", type=int, default=API_PORT, help=f"Port for the API server (default: {API_PORT})") + parser.add_argument("--timeout", type=int, default=COMMAND_TIMEOUT, + help=f"Default command timeout in seconds (default: {COMMAND_TIMEOUT})") return parser.parse_args() if __name__ == "__main__": @@ -568,5 +729,26 @@ if __name__ == "__main__": if args.port != API_PORT: API_PORT = args.port + # Update default timeout if specified + if args.timeout != COMMAND_TIMEOUT: + COMMAND_TIMEOUT = args.timeout + logger.info(f"Command timeout set to {COMMAND_TIMEOUT} seconds") + + # Check for essential dependencies + try: + dependencies = ["script", "expect"] + for dep in dependencies: + check = subprocess.run(["which", dep], stdout=subprocess.PIPE, stderr=subprocess.PIPE) + if check.returncode != 0: + logger.warning(f"'{dep}' command not found, installing required dependencies...") + install = subprocess.run(["apt-get", "update"], stdout=subprocess.PIPE, stderr=subprocess.PIPE) + install = subprocess.run(["apt-get", "install", "-y", "expect", "bsdutils"], + stdout=subprocess.PIPE, stderr=subprocess.PIPE) + if install.returncode != 0: + logger.error(f"Failed to install required dependencies. Some features may not work properly.") + break + except Exception as e: + logger.error(f"Error checking dependencies: {str(e)}") + logger.info(f"Starting Kali Linux Tools API Server on port {API_PORT}") app.run(host="0.0.0.0", port=API_PORT, debug=DEBUG_MODE)