From e3c70c5944cdc59a3a4d0c7dea91abde27099ee6 Mon Sep 17 00:00:00 2001 From: AiShell <96778509+SunZhimin2021@users.noreply.github.com> Date: Thu, 1 May 2025 09:32:47 +0800 Subject: [PATCH] Add files via upload MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 通过MCP使用kali的方法 --- mcpforpentest/kali_server.py | 572 +++++++++++++++++++++++++++++++++++ mcpforpentest/mcp_server.py | 417 +++++++++++++++++++++++++ 2 files changed, 989 insertions(+) create mode 100644 mcpforpentest/kali_server.py create mode 100644 mcpforpentest/mcp_server.py diff --git a/mcpforpentest/kali_server.py b/mcpforpentest/kali_server.py new file mode 100644 index 0000000..db5be7e --- /dev/null +++ b/mcpforpentest/kali_server.py @@ -0,0 +1,572 @@ +#!/usr/bin/env python3 + +# This script connect the MCP AI agent to Kali Linux terminal and API Server. + +# some of the code here was inspired from https://github.com/whit3rabbit0/project_astro , be sure to check them out + +import argparse +import json +import logging +import os +import subprocess +import sys +import traceback +import threading +from typing import Dict, Any +from flask import Flask, request, jsonify + +# Configure logging +logging.basicConfig( + level=logging.INFO, + format="%(asctime)s [%(levelname)s] %(message)s", + handlers=[ + logging.StreamHandler(sys.stdout) + ] +) +logger = logging.getLogger(__name__) + +# Configuration +API_PORT = int(os.environ.get("API_PORT", 5000)) +DEBUG_MODE = os.environ.get("DEBUG_MODE", "0").lower() in ("1", "true", "yes", "y") +COMMAND_TIMEOUT = 180 # 5 minutes default timeout + +app = Flask(__name__) + +class CommandExecutor: + """Class to handle command execution with better timeout management""" + + def __init__(self, command: str, timeout: int = COMMAND_TIMEOUT): + self.command = command + self.timeout = timeout + self.process = None + self.stdout_data = "" + self.stderr_data = "" + self.stdout_thread = None + self.stderr_thread = None + self.return_code = None + self.timed_out = False + + def _read_stdout(self): + """Thread function to continuously read stdout""" + for line in iter(self.process.stdout.readline, ''): + self.stdout_data += line + + def _read_stderr(self): + """Thread function to continuously read stderr""" + for line in iter(self.process.stderr.readline, ''): + self.stderr_data += line + + def execute(self) -> Dict[str, Any]: + """Execute the command and handle timeout gracefully""" + logger.info(f"Executing command: {self.command}") + + try: + self.process = subprocess.Popen( + self.command, + shell=True, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + text=True, + bufsize=1 # Line buffered + ) + + # Start threads to read output continuously + self.stdout_thread = threading.Thread(target=self._read_stdout) + self.stderr_thread = threading.Thread(target=self._read_stderr) + self.stdout_thread.daemon = True + self.stderr_thread.daemon = True + self.stdout_thread.start() + self.stderr_thread.start() + + # Wait for the process to complete or timeout + try: + self.return_code = self.process.wait(timeout=self.timeout) + # Process completed, join the threads + self.stdout_thread.join() + self.stderr_thread.join() + except subprocess.TimeoutExpired: + # Process timed out but we might have partial results + self.timed_out = True + logger.warning(f"Command timed out after {self.timeout} seconds. Terminating process.") + + # Try to terminate gracefully first + self.process.terminate() + try: + self.process.wait(timeout=5) # Give it 5 seconds to terminate + except subprocess.TimeoutExpired: + # Force kill if it doesn't terminate + logger.warning("Process not responding to termination. Killing.") + self.process.kill() + + # Update final output + self.return_code = -1 + + # Always consider it a success if we have output, even with timeout + success = True if self.timed_out and (self.stdout_data or self.stderr_data) else (self.return_code == 0) + + return { + "stdout": self.stdout_data, + "stderr": self.stderr_data, + "return_code": self.return_code, + "success": success, + "timed_out": self.timed_out, + "partial_results": self.timed_out and (self.stdout_data or self.stderr_data) + } + + except Exception as e: + logger.error(f"Error executing command: {str(e)}") + logger.error(traceback.format_exc()) + return { + "stdout": self.stdout_data, + "stderr": f"Error executing command: {str(e)}\n{self.stderr_data}", + "return_code": -1, + "success": False, + "timed_out": False, + "partial_results": bool(self.stdout_data or self.stderr_data) + } + + +def execute_command(command: str) -> Dict[str, Any]: + """ + Execute a shell command and return the result + + Args: + command: The command to execute + + Returns: + A dictionary containing the stdout, stderr, and return code + """ + executor = CommandExecutor(command) + return executor.execute() + + +@app.route("/api/command", methods=["POST"]) +def generic_command(): + """Execute any command provided in the request.""" + try: + params = request.json + command = params.get("command", "") + + if not command: + logger.warning("Command endpoint called without command parameter") + return jsonify({ + "error": "Command parameter is required" + }), 400 + + result = execute_command(command) + return jsonify(result) + except Exception as e: + logger.error(f"Error in command endpoint: {str(e)}") + logger.error(traceback.format_exc()) + return jsonify({ + "error": f"Server error: {str(e)}" + }), 500 + + +@app.route("/api/tools/nmap", methods=["POST"]) +def nmap(): + """Execute nmap scan with the provided parameters.""" + try: + params = request.json + target = params.get("target", "") + scan_type = params.get("scan_type", "-sCV") + ports = params.get("ports", "") + additional_args = params.get("additional_args", "-T4 -Pn") + + if not target: + logger.warning("Nmap called without target parameter") + return jsonify({ + "error": "Target parameter is required" + }), 400 + + command = f"nmap {scan_type}" + + if ports: + command += f" -p {ports}" + + if additional_args: + # Basic validation for additional args - more sophisticated validation would be better + command += f" {additional_args}" + + command += f" {target}" + + result = execute_command(command) + return jsonify(result) + except Exception as e: + logger.error(f"Error in nmap endpoint: {str(e)}") + logger.error(traceback.format_exc()) + return jsonify({ + "error": f"Server error: {str(e)}" + }), 500 + +@app.route("/api/tools/gobuster", methods=["POST"]) +def gobuster(): + """Execute gobuster with the provided parameters.""" + try: + params = request.json + url = params.get("url", "") + mode = params.get("mode", "dir") + wordlist = params.get("wordlist", "/usr/share/wordlists/dirb/common.txt") + additional_args = params.get("additional_args", "") + + if not url: + logger.warning("Gobuster called without URL parameter") + return jsonify({ + "error": "URL parameter is required" + }), 400 + + # Validate mode + if mode not in ["dir", "dns", "fuzz", "vhost"]: + logger.warning(f"Invalid gobuster mode: {mode}") + return jsonify({ + "error": f"Invalid mode: {mode}. Must be one of: dir, dns, fuzz, vhost" + }), 400 + + command = f"gobuster {mode} -u {url} -w {wordlist}" + + if additional_args: + command += f" {additional_args}" + + result = execute_command(command) + return jsonify(result) + except Exception as e: + logger.error(f"Error in gobuster endpoint: {str(e)}") + logger.error(traceback.format_exc()) + return jsonify({ + "error": f"Server error: {str(e)}" + }), 500 + +@app.route("/api/tools/dirb", methods=["POST"]) +def dirb(): + """Execute dirb with the provided parameters.""" + try: + params = request.json + url = params.get("url", "") + wordlist = params.get("wordlist", "/usr/share/wordlists/dirb/common.txt") + additional_args = params.get("additional_args", "") + + if not url: + logger.warning("Dirb called without URL parameter") + return jsonify({ + "error": "URL parameter is required" + }), 400 + + command = f"dirb {url} {wordlist}" + + if additional_args: + command += f" {additional_args}" + + result = execute_command(command) + return jsonify(result) + except Exception as e: + logger.error(f"Error in dirb endpoint: {str(e)}") + logger.error(traceback.format_exc()) + return jsonify({ + "error": f"Server error: {str(e)}" + }), 500 + +@app.route("/api/tools/nikto", methods=["POST"]) +def nikto(): + """Execute nikto with the provided parameters.""" + try: + params = request.json + target = params.get("target", "") + additional_args = params.get("additional_args", "") + + if not target: + logger.warning("Nikto called without target parameter") + return jsonify({ + "error": "Target parameter is required" + }), 400 + + command = f"nikto -h {target}" + + if additional_args: + command += f" {additional_args}" + + result = execute_command(command) + return jsonify(result) + except Exception as e: + logger.error(f"Error in nikto endpoint: {str(e)}") + logger.error(traceback.format_exc()) + return jsonify({ + "error": f"Server error: {str(e)}" + }), 500 + +@app.route("/api/tools/sqlmap", methods=["POST"]) +def sqlmap(): + """Execute sqlmap with the provided parameters.""" + try: + params = request.json + url = params.get("url", "") + data = params.get("data", "") + additional_args = params.get("additional_args", "") + + if not url: + logger.warning("SQLMap called without URL parameter") + return jsonify({ + "error": "URL parameter is required" + }), 400 + + command = f"sqlmap -u {url} --batch" + + if data: + command += f" --data=\"{data}\"" + + if additional_args: + command += f" {additional_args}" + + result = execute_command(command) + return jsonify(result) + except Exception as e: + logger.error(f"Error in sqlmap endpoint: {str(e)}") + logger.error(traceback.format_exc()) + return jsonify({ + "error": f"Server error: {str(e)}" + }), 500 + +@app.route("/api/tools/metasploit", methods=["POST"]) +def metasploit(): + """Execute metasploit module with the provided parameters.""" + try: + params = request.json + module = params.get("module", "") + options = params.get("options", {}) + + if not module: + logger.warning("Metasploit called without module parameter") + return jsonify({ + "error": "Module parameter is required" + }), 400 + + # Format options for Metasploit + options_str = "" + for key, value in options.items(): + options_str += f" {key}={value}" + + # Create an MSF resource script + resource_content = f"use {module}\n" + for key, value in options.items(): + resource_content += f"set {key} {value}\n" + resource_content += "exploit\n" + + # Save resource script to a temporary file + resource_file = "/tmp/mcp_msf_resource.rc" + with open(resource_file, "w") as f: + f.write(resource_content) + + command = f"msfconsole -q -r {resource_file}" + result = execute_command(command) + + # Clean up the temporary file + try: + os.remove(resource_file) + except Exception as e: + logger.warning(f"Error removing temporary resource file: {str(e)}") + + return jsonify(result) + except Exception as e: + logger.error(f"Error in metasploit endpoint: {str(e)}") + logger.error(traceback.format_exc()) + return jsonify({ + "error": f"Server error: {str(e)}" + }), 500 + +@app.route("/api/tools/hydra", methods=["POST"]) +def hydra(): + """Execute hydra with the provided parameters.""" + try: + params = request.json + target = params.get("target", "") + service = params.get("service", "") + username = params.get("username", "") + username_file = params.get("username_file", "") + password = params.get("password", "") + password_file = params.get("password_file", "") + additional_args = params.get("additional_args", "") + + if not target or not service: + logger.warning("Hydra called without target or service parameter") + return jsonify({ + "error": "Target and service parameters are required" + }), 400 + + if not (username or username_file) or not (password or password_file): + logger.warning("Hydra called without username/password parameters") + return jsonify({ + "error": "Username/username_file and password/password_file are required" + }), 400 + + command = f"hydra -t 4" + + if username: + command += f" -l {username}" + elif username_file: + command += f" -L {username_file}" + + if password: + command += f" -p {password}" + elif password_file: + command += f" -P {password_file}" + + if additional_args: + command += f" {additional_args}" + + command += f" {target} {service}" + + result = execute_command(command) + return jsonify(result) + except Exception as e: + logger.error(f"Error in hydra endpoint: {str(e)}") + logger.error(traceback.format_exc()) + return jsonify({ + "error": f"Server error: {str(e)}" + }), 500 + +@app.route("/api/tools/john", methods=["POST"]) +def john(): + """Execute john with the provided parameters.""" + try: + params = request.json + hash_file = params.get("hash_file", "") + wordlist = params.get("wordlist", "/usr/share/wordlists/rockyou.txt") + format_type = params.get("format", "") + additional_args = params.get("additional_args", "") + + if not hash_file: + logger.warning("John called without hash_file parameter") + return jsonify({ + "error": "Hash file parameter is required" + }), 400 + + command = f"john" + + if format_type: + command += f" --format={format_type}" + + if wordlist: + command += f" --wordlist={wordlist}" + + if additional_args: + command += f" {additional_args}" + + command += f" {hash_file}" + + result = execute_command(command) + return jsonify(result) + except Exception as e: + logger.error(f"Error in john endpoint: {str(e)}") + logger.error(traceback.format_exc()) + return jsonify({ + "error": f"Server error: {str(e)}" + }), 500 + +@app.route("/api/tools/wpscan", methods=["POST"]) +def wpscan(): + """Execute wpscan with the provided parameters.""" + try: + params = request.json + url = params.get("url", "") + additional_args = params.get("additional_args", "") + + if not url: + logger.warning("WPScan called without URL parameter") + return jsonify({ + "error": "URL parameter is required" + }), 400 + + command = f"wpscan --url {url}" + + if additional_args: + command += f" {additional_args}" + + result = execute_command(command) + return jsonify(result) + except Exception as e: + logger.error(f"Error in wpscan endpoint: {str(e)}") + logger.error(traceback.format_exc()) + return jsonify({ + "error": f"Server error: {str(e)}" + }), 500 + +@app.route("/api/tools/enum4linux", methods=["POST"]) +def enum4linux(): + """Execute enum4linux with the provided parameters.""" + try: + params = request.json + target = params.get("target", "") + additional_args = params.get("additional_args", "-a") + + if not target: + logger.warning("Enum4linux called without target parameter") + return jsonify({ + "error": "Target parameter is required" + }), 400 + + command = f"enum4linux {additional_args} {target}" + + result = execute_command(command) + return jsonify(result) + except Exception as e: + logger.error(f"Error in enum4linux endpoint: {str(e)}") + logger.error(traceback.format_exc()) + return jsonify({ + "error": f"Server error: {str(e)}" + }), 500 + + +# Health check endpoint +@app.route("/health", methods=["GET"]) +def health_check(): + """Health check endpoint.""" + # Check if essential tools are installed + essential_tools = ["nmap", "gobuster", "dirb", "nikto"] + tools_status = {} + + for tool in essential_tools: + try: + result = execute_command(f"which {tool}") + tools_status[tool] = result["success"] + except: + tools_status[tool] = False + + all_essential_tools_available = all(tools_status.values()) + + return jsonify({ + "status": "healthy", + "message": "Kali Linux Tools API Server is running", + "tools_status": tools_status, + "all_essential_tools_available": all_essential_tools_available + }) + +@app.route("/mcp/capabilities", methods=["GET"]) +def get_capabilities(): + # Return tool capabilities similar to our existing MCP server + pass + +@app.route("/mcp/tools/kali_tools/", methods=["POST"]) +def execute_tool(tool_name): + # Direct tool execution without going through the API server + pass + +def parse_args(): + """Parse command line arguments.""" + parser = argparse.ArgumentParser(description="Run the Kali Linux API Server") + parser.add_argument("--debug", action="store_true", help="Enable debug mode") + parser.add_argument("--port", type=int, default=API_PORT, help=f"Port for the API server (default: {API_PORT})") + return parser.parse_args() + +if __name__ == "__main__": + args = parse_args() + + # Set configuration from command line arguments + if args.debug: + DEBUG_MODE = True + os.environ["DEBUG_MODE"] = "1" + logger.setLevel(logging.DEBUG) + + if args.port != API_PORT: + API_PORT = args.port + + logger.info(f"Starting Kali Linux Tools API Server on port {API_PORT}") + app.run(host="0.0.0.0", port=API_PORT, debug=DEBUG_MODE) diff --git a/mcpforpentest/mcp_server.py b/mcpforpentest/mcp_server.py new file mode 100644 index 0000000..dba3691 --- /dev/null +++ b/mcpforpentest/mcp_server.py @@ -0,0 +1,417 @@ +#!/usr/bin/env python3 + +# This script connect the MCP AI agent to Kali Linux terminal and API Server. + +# some of the code here was inspired from https://github.com/whit3rabbit0/project_astro , be sure to check them out + +import sys +import os +import argparse +import logging +from typing import Dict, Any, Optional +import requests + +from mcp.server.fastmcp import FastMCP + +# Configure logging +logging.basicConfig( + level=logging.INFO, + format="%(asctime)s [%(levelname)s] %(message)s", + handlers=[ + logging.StreamHandler(sys.stdout) + ] +) +logger = logging.getLogger(__name__) + +# Default configuration +DEFAULT_KALI_SERVER = "http://localhost:5000" # change to your linux IP +DEFAULT_REQUEST_TIMEOUT = 300 # 5 minutes default timeout for API requests + +class KaliToolsClient: + """Client for communicating with the Kali Linux Tools API Server""" + + def __init__(self, server_url: str, timeout: int = DEFAULT_REQUEST_TIMEOUT): + """ + Initialize the Kali Tools Client + + Args: + server_url: URL of the Kali Tools API Server + timeout: Request timeout in seconds + """ + self.server_url = server_url.rstrip("/") + self.timeout = timeout + logger.info(f"Initialized Kali Tools Client connecting to {server_url}") + + def safe_get(self, endpoint: str, params: Optional[Dict[str, Any]] = None) -> Dict[str, Any]: + """ + Perform a GET request with optional query parameters. + + Args: + endpoint: API endpoint path (without leading slash) + params: Optional query parameters + + Returns: + Response data as dictionary + """ + if params is None: + params = {} + + url = f"{self.server_url}/{endpoint}" + + try: + logger.debug(f"GET {url} with params: {params}") + response = requests.get(url, params=params, timeout=self.timeout) + response.raise_for_status() + return response.json() + except requests.exceptions.RequestException as e: + logger.error(f"Request failed: {str(e)}") + return {"error": f"Request failed: {str(e)}", "success": False} + except Exception as e: + logger.error(f"Unexpected error: {str(e)}") + return {"error": f"Unexpected error: {str(e)}", "success": False} + + def safe_post(self, endpoint: str, json_data: Dict[str, Any]) -> Dict[str, Any]: + """ + Perform a POST request with JSON data. + + Args: + endpoint: API endpoint path (without leading slash) + json_data: JSON data to send + + Returns: + Response data as dictionary + """ + url = f"{self.server_url}/{endpoint}" + + try: + logger.debug(f"POST {url} with data: {json_data}") + response = requests.post(url, json=json_data, timeout=self.timeout) + response.raise_for_status() + return response.json() + except requests.exceptions.RequestException as e: + logger.error(f"Request failed: {str(e)}") + return {"error": f"Request failed: {str(e)}", "success": False} + except Exception as e: + logger.error(f"Unexpected error: {str(e)}") + return {"error": f"Unexpected error: {str(e)}", "success": False} + + def execute_command(self, command: str) -> Dict[str, Any]: + """ + Execute a generic command on the Kali server + + Args: + command: Command to execute + + Returns: + Command execution results + """ + return self.safe_post("api/command", {"command": command}) + + def check_health(self) -> Dict[str, Any]: + """ + Check the health of the Kali Tools API Server + + Returns: + Health status information + """ + return self.safe_get("health") + +def setup_mcp_server(kali_client: KaliToolsClient) -> FastMCP: + """ + Set up the MCP server with all tool functions + + Args: + kali_client: Initialized KaliToolsClient + + Returns: + Configured FastMCP instance + """ + mcp = FastMCP("kali-mcp") + + @mcp.tool() + def nmap_scan(target: str, scan_type: str = "-sV", ports: str = "", additional_args: str = "") -> Dict[str, Any]: + """ + Execute an Nmap scan against a target. + + Args: + target: The IP address or hostname to scan + scan_type: Scan type (e.g., -sV for version detection) + ports: Comma-separated list of ports or port ranges + additional_args: Additional Nmap arguments + + Returns: + Scan results + """ + data = { + "target": target, + "scan_type": scan_type, + "ports": ports, + "additional_args": additional_args + } + return kali_client.safe_post("api/tools/nmap", data) + + @mcp.tool() + def gobuster_scan(url: str, mode: str = "dir", wordlist: str = "/usr/share/wordlists/dirb/common.txt", additional_args: str = "") -> Dict[str, Any]: + """ + Execute Gobuster to find directories, DNS subdomains, or virtual hosts. + + Args: + url: The target URL + mode: Scan mode (dir, dns, fuzz, vhost) + wordlist: Path to wordlist file + additional_args: Additional Gobuster arguments + + Returns: + Scan results + """ + data = { + "url": url, + "mode": mode, + "wordlist": wordlist, + "additional_args": additional_args + } + return kali_client.safe_post("api/tools/gobuster", data) + + @mcp.tool() + def dirb_scan(url: str, wordlist: str = "/usr/share/wordlists/dirb/common.txt", additional_args: str = "") -> Dict[str, Any]: + """ + Execute Dirb web content scanner. + + Args: + url: The target URL + wordlist: Path to wordlist file + additional_args: Additional Dirb arguments + + Returns: + Scan results + """ + data = { + "url": url, + "wordlist": wordlist, + "additional_args": additional_args + } + return kali_client.safe_post("api/tools/dirb", data) + + @mcp.tool() + def nikto_scan(target: str, additional_args: str = "") -> Dict[str, Any]: + """ + Execute Nikto web server scanner. + + Args: + target: The target URL or IP + additional_args: Additional Nikto arguments + + Returns: + Scan results + """ + data = { + "target": target, + "additional_args": additional_args + } + return kali_client.safe_post("api/tools/nikto", data) + + @mcp.tool() + def sqlmap_scan(url: str, data: str = "", additional_args: str = "") -> Dict[str, Any]: + """ + Execute SQLmap SQL injection scanner. + + Args: + url: The target URL + data: POST data string + additional_args: Additional SQLmap arguments + + Returns: + Scan results + """ + post_data = { + "url": url, + "data": data, + "additional_args": additional_args + } + return kali_client.safe_post("api/tools/sqlmap", post_data) + + @mcp.tool() + def metasploit_run(module: str, options: Dict[str, Any] = {}) -> Dict[str, Any]: + """ + Execute a Metasploit module. + + Args: + module: The Metasploit module path + options: Dictionary of module options + + Returns: + Module execution results + """ + data = { + "module": module, + "options": options + } + return kali_client.safe_post("api/tools/metasploit", data) + + @mcp.tool() + def hydra_attack( + target: str, + service: str, + username: str = "", + username_file: str = "", + password: str = "", + password_file: str = "", + additional_args: str = "" + ) -> Dict[str, Any]: + """ + Execute Hydra password cracking tool. + + Args: + target: Target IP or hostname + service: Service to attack (ssh, ftp, http-post-form, etc.) + username: Single username to try + username_file: Path to username file + password: Single password to try + password_file: Path to password file + additional_args: Additional Hydra arguments + + Returns: + Attack results + """ + data = { + "target": target, + "service": service, + "username": username, + "username_file": username_file, + "password": password, + "password_file": password_file, + "additional_args": additional_args + } + return kali_client.safe_post("api/tools/hydra", data) + + @mcp.tool() + def john_crack( + hash_file: str, + wordlist: str = "/usr/share/wordlists/rockyou.txt", + format_type: str = "", + additional_args: str = "" + ) -> Dict[str, Any]: + """ + Execute John the Ripper password cracker. + + Args: + hash_file: Path to file containing hashes + wordlist: Path to wordlist file + format_type: Hash format type + additional_args: Additional John arguments + + Returns: + Cracking results + """ + data = { + "hash_file": hash_file, + "wordlist": wordlist, + "format": format_type, + "additional_args": additional_args + } + return kali_client.safe_post("api/tools/john", data) + + @mcp.tool() + def wpscan_analyze(url: str, additional_args: str = "") -> Dict[str, Any]: + """ + Execute WPScan WordPress vulnerability scanner. + + Args: + url: The target WordPress URL + additional_args: Additional WPScan arguments + + Returns: + Scan results + """ + data = { + "url": url, + "additional_args": additional_args + } + return kali_client.safe_post("api/tools/wpscan", data) + + @mcp.tool() + def enum4linux_scan(target: str, additional_args: str = "-a") -> Dict[str, Any]: + """ + Execute Enum4linux Windows/Samba enumeration tool. + + Args: + target: The target IP or hostname + additional_args: Additional enum4linux arguments + + Returns: + Enumeration results + """ + data = { + "target": target, + "additional_args": additional_args + } + return kali_client.safe_post("api/tools/enum4linux", data) + + @mcp.tool() + def server_health() -> Dict[str, Any]: + """ + Check the health status of the Kali API server. + + Returns: + Server health information + """ + return kali_client.check_health() + + @mcp.tool() + def execute_command(command: str) -> Dict[str, Any]: + """ + Execute an arbitrary command on the Kali server. + + Args: + command: The command to execute + + Returns: + Command execution results + """ + return kali_client.execute_command(command) + + return mcp + +def parse_args(): + """Parse command line arguments.""" + parser = argparse.ArgumentParser(description="Run the Kali MCP Client") + parser.add_argument("--server", type=str, default=DEFAULT_KALI_SERVER, + help=f"Kali API server URL (default: {DEFAULT_KALI_SERVER})") + parser.add_argument("--timeout", type=int, default=DEFAULT_REQUEST_TIMEOUT, + help=f"Request timeout in seconds (default: {DEFAULT_REQUEST_TIMEOUT})") + parser.add_argument("--debug", action="store_true", help="Enable debug logging") + return parser.parse_args() + +def main(): + """Main entry point for the MCP server.""" + args = parse_args() + + # Configure logging based on debug flag + if args.debug: + logger.setLevel(logging.DEBUG) + logger.debug("Debug logging enabled") + + # Initialize the Kali Tools client + kali_client = KaliToolsClient(args.server, args.timeout) + + # Check server health and log the result + health = kali_client.check_health() + if "error" in health: + logger.warning(f"Unable to connect to Kali API server at {args.server}: {health['error']}") + logger.warning("MCP server will start, but tool execution may fail") + else: + logger.info(f"Successfully connected to Kali API server at {args.server}") + logger.info(f"Server health status: {health['status']}") + if not health.get("all_essential_tools_available", False): + logger.warning("Not all essential tools are available on the Kali server") + missing_tools = [tool for tool, available in health.get("tools_status", {}).items() if not available] + if missing_tools: + logger.warning(f"Missing tools: {', '.join(missing_tools)}") + + # Set up and run the MCP server + mcp = setup_mcp_server(kali_client) + logger.info("Starting Kali MCP server") + mcp.run() + +if __name__ == "__main__": + main()