diff --git a/mcpforpentest/mcp_server.py b/mcpforpentest/mcp_server.py index dba3691..33bfa29 100644 --- a/mcpforpentest/mcp_server.py +++ b/mcpforpentest/mcp_server.py @@ -8,7 +8,7 @@ import sys import os import argparse import logging -from typing import Dict, Any, Optional +from typing import Dict, Any, Optional, List, Union import requests from mcp.server.fastmcp import FastMCP @@ -95,17 +95,40 @@ class KaliToolsClient: logger.error(f"Unexpected error: {str(e)}") return {"error": f"Unexpected error: {str(e)}", "success": False} - def execute_command(self, command: str) -> Dict[str, Any]: + def execute_command(self, command: str, interactive: bool = False) -> Dict[str, Any]: """ Execute a generic command on the Kali server Args: command: Command to execute + interactive: Whether the command requires interactive mode Returns: Command execution results """ - return self.safe_post("api/command", {"command": command}) + return self.safe_post("api/command", {"command": command, "interactive": interactive}) + + def execute_interactive_tool(self, tool: str, commands: List[str] = None, timeout: int = None) -> Dict[str, Any]: + """ + Execute an interactive tool with a sequence of commands + + Args: + tool: The tool to execute (e.g., msfconsole, sqlmap) + commands: List of commands to execute in the interactive session + timeout: Custom timeout for this interactive session + + Returns: + Interactive session results + """ + data = { + "command": tool, + "interactive": True + } + + if timeout is not None: + data["timeout"] = timeout + + return self.safe_post("api/command", data) def check_health(self) -> Dict[str, Any]: """ @@ -230,23 +253,72 @@ def setup_mcp_server(kali_client: KaliToolsClient) -> FastMCP: } return kali_client.safe_post("api/tools/sqlmap", post_data) + @mcp.tool() - def metasploit_run(module: str, options: Dict[str, Any] = {}) -> Dict[str, Any]: + def msfconsole_interactive(resource_script: str = None, commands: List[str] = None, timeout: int = None) -> Dict[str, Any]: """ - Execute a Metasploit module. + Start an interactive Metasploit console session. Args: - module: The Metasploit module path - options: Dictionary of module options + resource_script: Optional path to MSF resource script + commands: List of commands to execute in MSF console + timeout: Custom timeout for this MSF console session Returns: - Module execution results + MSF console session results """ - data = { - "module": module, - "options": options - } - return kali_client.safe_post("api/tools/metasploit", data) + base_command = "msfconsole -q" + + if resource_script: + if not os.path.exists(resource_script): + return { + "error": f"Resource script file not found: {resource_script}", + "success": False + } + base_command += f" -r {resource_script}" + + # Create a temporary resource script if commands are provided + temp_resource_file = None + if commands and not resource_script: + try: + import tempfile + temp_resource_file = tempfile.NamedTemporaryFile(delete=False, mode='w+', suffix='.rc') + for cmd in commands: + temp_resource_file.write(f"{cmd}\n") + temp_resource_file.close() + base_command += f" -r {temp_resource_file.name}" + except Exception as e: + logger.error(f"Error creating temporary resource file: {str(e)}") + return { + "error": f"Failed to create resource script: {str(e)}", + "success": False + } + + try: + result = kali_client.execute_command(base_command, interactive=True) + + # Clean up temporary file if created + if temp_resource_file and os.path.exists(temp_resource_file.name): + try: + os.unlink(temp_resource_file.name) + except: + pass + + return result + except Exception as e: + logger.error(f"Error in msfconsole interactive: {str(e)}") + + # Clean up on error too + if temp_resource_file and os.path.exists(temp_resource_file.name): + try: + os.unlink(temp_resource_file.name) + except: + pass + + return { + "error": f"MSF console execution failed: {str(e)}", + "success": False + } @mcp.tool() def hydra_attack( @@ -358,17 +430,91 @@ def setup_mcp_server(kali_client: KaliToolsClient) -> FastMCP: return kali_client.check_health() @mcp.tool() - def execute_command(command: str) -> Dict[str, Any]: + def execute_command(command: str, interactive: bool = False) -> Dict[str, Any]: """ Execute an arbitrary command on the Kali server. Args: command: The command to execute + interactive: Whether the command requires interactive mode Returns: Command execution results """ - return kali_client.execute_command(command) + return kali_client.execute_command(command, interactive) + + @mcp.tool() + def run_interactive(tool: str, commands: List[str] = None, timeout: int = None) -> Dict[str, Any]: + """ + Run a tool in interactive mode. + + Args: + tool: The tool to run (e.g., msfconsole, sqlmap, hydra) + commands: List of commands to send to the interactive session + timeout: Custom timeout for this interactive session + + Returns: + Interactive session results + """ + # Special handling for known interactive tools + if tool.startswith("msfconsole"): + return msfconsole_interactive(commands=commands, timeout=timeout) + elif tool.startswith("sqlmap"): + # Add --batch to make it non-interactive but still use the interactive executor + if not "--batch" in tool: + tool += " --batch" + + return kali_client.execute_interactive_tool(tool, commands, timeout) + + @mcp.tool() + def sql_interactive(url: str, data: str = "", commands: List[str] = None, timeout: int = None) -> Dict[str, Any]: + """ + Run SQLMap in interactive mode with more options. + + Args: + url: Target URL + data: Optional POST data + commands: Additional SQLMap commands/options + timeout: Custom timeout for this session + + Returns: + SQLMap session results + """ + command = f"sqlmap -u {url} --batch" + + if data: + command += f" --data=\"{data}\"" + + return kali_client.execute_command(command, interactive=True) + + @mcp.tool() + def create_metasploit_resource(commands: List[str], output_file: str) -> Dict[str, Any]: + """ + Create a Metasploit resource script. + + Args: + commands: List of MSF commands to include in the script + output_file: Path to save the resource script + + Returns: + Result of script creation + """ + try: + with open(output_file, 'w') as f: + for cmd in commands: + f.write(f"{cmd}\n") + + return { + "success": True, + "message": f"Created Metasploit resource script at {output_file}", + "file": output_file + } + except Exception as e: + logger.error(f"Error creating resource script: {str(e)}") + return { + "success": False, + "error": f"Failed to create resource script: {str(e)}" + } return mcp