Add files via upload

通过MCP使用kali的方法
This commit is contained in:
AiShell 2025-05-01 09:32:47 +08:00 committed by GitHub
parent 4ce3d540e3
commit e3c70c5944
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 989 additions and 0 deletions

View File

@ -0,0 +1,572 @@
#!/usr/bin/env python3
# This script connect the MCP AI agent to Kali Linux terminal and API Server.
# some of the code here was inspired from https://github.com/whit3rabbit0/project_astro , be sure to check them out
import argparse
import json
import logging
import os
import subprocess
import sys
import traceback
import threading
from typing import Dict, Any
from flask import Flask, request, jsonify
# Configure logging
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(message)s",
handlers=[
logging.StreamHandler(sys.stdout)
]
)
logger = logging.getLogger(__name__)
# Configuration
API_PORT = int(os.environ.get("API_PORT", 5000))
DEBUG_MODE = os.environ.get("DEBUG_MODE", "0").lower() in ("1", "true", "yes", "y")
COMMAND_TIMEOUT = 180 # 5 minutes default timeout
app = Flask(__name__)
class CommandExecutor:
"""Class to handle command execution with better timeout management"""
def __init__(self, command: str, timeout: int = COMMAND_TIMEOUT):
self.command = command
self.timeout = timeout
self.process = None
self.stdout_data = ""
self.stderr_data = ""
self.stdout_thread = None
self.stderr_thread = None
self.return_code = None
self.timed_out = False
def _read_stdout(self):
"""Thread function to continuously read stdout"""
for line in iter(self.process.stdout.readline, ''):
self.stdout_data += line
def _read_stderr(self):
"""Thread function to continuously read stderr"""
for line in iter(self.process.stderr.readline, ''):
self.stderr_data += line
def execute(self) -> Dict[str, Any]:
"""Execute the command and handle timeout gracefully"""
logger.info(f"Executing command: {self.command}")
try:
self.process = subprocess.Popen(
self.command,
shell=True,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
bufsize=1 # Line buffered
)
# Start threads to read output continuously
self.stdout_thread = threading.Thread(target=self._read_stdout)
self.stderr_thread = threading.Thread(target=self._read_stderr)
self.stdout_thread.daemon = True
self.stderr_thread.daemon = True
self.stdout_thread.start()
self.stderr_thread.start()
# Wait for the process to complete or timeout
try:
self.return_code = self.process.wait(timeout=self.timeout)
# Process completed, join the threads
self.stdout_thread.join()
self.stderr_thread.join()
except subprocess.TimeoutExpired:
# Process timed out but we might have partial results
self.timed_out = True
logger.warning(f"Command timed out after {self.timeout} seconds. Terminating process.")
# Try to terminate gracefully first
self.process.terminate()
try:
self.process.wait(timeout=5) # Give it 5 seconds to terminate
except subprocess.TimeoutExpired:
# Force kill if it doesn't terminate
logger.warning("Process not responding to termination. Killing.")
self.process.kill()
# Update final output
self.return_code = -1
# Always consider it a success if we have output, even with timeout
success = True if self.timed_out and (self.stdout_data or self.stderr_data) else (self.return_code == 0)
return {
"stdout": self.stdout_data,
"stderr": self.stderr_data,
"return_code": self.return_code,
"success": success,
"timed_out": self.timed_out,
"partial_results": self.timed_out and (self.stdout_data or self.stderr_data)
}
except Exception as e:
logger.error(f"Error executing command: {str(e)}")
logger.error(traceback.format_exc())
return {
"stdout": self.stdout_data,
"stderr": f"Error executing command: {str(e)}\n{self.stderr_data}",
"return_code": -1,
"success": False,
"timed_out": False,
"partial_results": bool(self.stdout_data or self.stderr_data)
}
def execute_command(command: str) -> Dict[str, Any]:
"""
Execute a shell command and return the result
Args:
command: The command to execute
Returns:
A dictionary containing the stdout, stderr, and return code
"""
executor = CommandExecutor(command)
return executor.execute()
@app.route("/api/command", methods=["POST"])
def generic_command():
"""Execute any command provided in the request."""
try:
params = request.json
command = params.get("command", "")
if not command:
logger.warning("Command endpoint called without command parameter")
return jsonify({
"error": "Command parameter is required"
}), 400
result = execute_command(command)
return jsonify(result)
except Exception as e:
logger.error(f"Error in command endpoint: {str(e)}")
logger.error(traceback.format_exc())
return jsonify({
"error": f"Server error: {str(e)}"
}), 500
@app.route("/api/tools/nmap", methods=["POST"])
def nmap():
"""Execute nmap scan with the provided parameters."""
try:
params = request.json
target = params.get("target", "")
scan_type = params.get("scan_type", "-sCV")
ports = params.get("ports", "")
additional_args = params.get("additional_args", "-T4 -Pn")
if not target:
logger.warning("Nmap called without target parameter")
return jsonify({
"error": "Target parameter is required"
}), 400
command = f"nmap {scan_type}"
if ports:
command += f" -p {ports}"
if additional_args:
# Basic validation for additional args - more sophisticated validation would be better
command += f" {additional_args}"
command += f" {target}"
result = execute_command(command)
return jsonify(result)
except Exception as e:
logger.error(f"Error in nmap endpoint: {str(e)}")
logger.error(traceback.format_exc())
return jsonify({
"error": f"Server error: {str(e)}"
}), 500
@app.route("/api/tools/gobuster", methods=["POST"])
def gobuster():
"""Execute gobuster with the provided parameters."""
try:
params = request.json
url = params.get("url", "")
mode = params.get("mode", "dir")
wordlist = params.get("wordlist", "/usr/share/wordlists/dirb/common.txt")
additional_args = params.get("additional_args", "")
if not url:
logger.warning("Gobuster called without URL parameter")
return jsonify({
"error": "URL parameter is required"
}), 400
# Validate mode
if mode not in ["dir", "dns", "fuzz", "vhost"]:
logger.warning(f"Invalid gobuster mode: {mode}")
return jsonify({
"error": f"Invalid mode: {mode}. Must be one of: dir, dns, fuzz, vhost"
}), 400
command = f"gobuster {mode} -u {url} -w {wordlist}"
if additional_args:
command += f" {additional_args}"
result = execute_command(command)
return jsonify(result)
except Exception as e:
logger.error(f"Error in gobuster endpoint: {str(e)}")
logger.error(traceback.format_exc())
return jsonify({
"error": f"Server error: {str(e)}"
}), 500
@app.route("/api/tools/dirb", methods=["POST"])
def dirb():
"""Execute dirb with the provided parameters."""
try:
params = request.json
url = params.get("url", "")
wordlist = params.get("wordlist", "/usr/share/wordlists/dirb/common.txt")
additional_args = params.get("additional_args", "")
if not url:
logger.warning("Dirb called without URL parameter")
return jsonify({
"error": "URL parameter is required"
}), 400
command = f"dirb {url} {wordlist}"
if additional_args:
command += f" {additional_args}"
result = execute_command(command)
return jsonify(result)
except Exception as e:
logger.error(f"Error in dirb endpoint: {str(e)}")
logger.error(traceback.format_exc())
return jsonify({
"error": f"Server error: {str(e)}"
}), 500
@app.route("/api/tools/nikto", methods=["POST"])
def nikto():
"""Execute nikto with the provided parameters."""
try:
params = request.json
target = params.get("target", "")
additional_args = params.get("additional_args", "")
if not target:
logger.warning("Nikto called without target parameter")
return jsonify({
"error": "Target parameter is required"
}), 400
command = f"nikto -h {target}"
if additional_args:
command += f" {additional_args}"
result = execute_command(command)
return jsonify(result)
except Exception as e:
logger.error(f"Error in nikto endpoint: {str(e)}")
logger.error(traceback.format_exc())
return jsonify({
"error": f"Server error: {str(e)}"
}), 500
@app.route("/api/tools/sqlmap", methods=["POST"])
def sqlmap():
"""Execute sqlmap with the provided parameters."""
try:
params = request.json
url = params.get("url", "")
data = params.get("data", "")
additional_args = params.get("additional_args", "")
if not url:
logger.warning("SQLMap called without URL parameter")
return jsonify({
"error": "URL parameter is required"
}), 400
command = f"sqlmap -u {url} --batch"
if data:
command += f" --data=\"{data}\""
if additional_args:
command += f" {additional_args}"
result = execute_command(command)
return jsonify(result)
except Exception as e:
logger.error(f"Error in sqlmap endpoint: {str(e)}")
logger.error(traceback.format_exc())
return jsonify({
"error": f"Server error: {str(e)}"
}), 500
@app.route("/api/tools/metasploit", methods=["POST"])
def metasploit():
"""Execute metasploit module with the provided parameters."""
try:
params = request.json
module = params.get("module", "")
options = params.get("options", {})
if not module:
logger.warning("Metasploit called without module parameter")
return jsonify({
"error": "Module parameter is required"
}), 400
# Format options for Metasploit
options_str = ""
for key, value in options.items():
options_str += f" {key}={value}"
# Create an MSF resource script
resource_content = f"use {module}\n"
for key, value in options.items():
resource_content += f"set {key} {value}\n"
resource_content += "exploit\n"
# Save resource script to a temporary file
resource_file = "/tmp/mcp_msf_resource.rc"
with open(resource_file, "w") as f:
f.write(resource_content)
command = f"msfconsole -q -r {resource_file}"
result = execute_command(command)
# Clean up the temporary file
try:
os.remove(resource_file)
except Exception as e:
logger.warning(f"Error removing temporary resource file: {str(e)}")
return jsonify(result)
except Exception as e:
logger.error(f"Error in metasploit endpoint: {str(e)}")
logger.error(traceback.format_exc())
return jsonify({
"error": f"Server error: {str(e)}"
}), 500
@app.route("/api/tools/hydra", methods=["POST"])
def hydra():
"""Execute hydra with the provided parameters."""
try:
params = request.json
target = params.get("target", "")
service = params.get("service", "")
username = params.get("username", "")
username_file = params.get("username_file", "")
password = params.get("password", "")
password_file = params.get("password_file", "")
additional_args = params.get("additional_args", "")
if not target or not service:
logger.warning("Hydra called without target or service parameter")
return jsonify({
"error": "Target and service parameters are required"
}), 400
if not (username or username_file) or not (password or password_file):
logger.warning("Hydra called without username/password parameters")
return jsonify({
"error": "Username/username_file and password/password_file are required"
}), 400
command = f"hydra -t 4"
if username:
command += f" -l {username}"
elif username_file:
command += f" -L {username_file}"
if password:
command += f" -p {password}"
elif password_file:
command += f" -P {password_file}"
if additional_args:
command += f" {additional_args}"
command += f" {target} {service}"
result = execute_command(command)
return jsonify(result)
except Exception as e:
logger.error(f"Error in hydra endpoint: {str(e)}")
logger.error(traceback.format_exc())
return jsonify({
"error": f"Server error: {str(e)}"
}), 500
@app.route("/api/tools/john", methods=["POST"])
def john():
"""Execute john with the provided parameters."""
try:
params = request.json
hash_file = params.get("hash_file", "")
wordlist = params.get("wordlist", "/usr/share/wordlists/rockyou.txt")
format_type = params.get("format", "")
additional_args = params.get("additional_args", "")
if not hash_file:
logger.warning("John called without hash_file parameter")
return jsonify({
"error": "Hash file parameter is required"
}), 400
command = f"john"
if format_type:
command += f" --format={format_type}"
if wordlist:
command += f" --wordlist={wordlist}"
if additional_args:
command += f" {additional_args}"
command += f" {hash_file}"
result = execute_command(command)
return jsonify(result)
except Exception as e:
logger.error(f"Error in john endpoint: {str(e)}")
logger.error(traceback.format_exc())
return jsonify({
"error": f"Server error: {str(e)}"
}), 500
@app.route("/api/tools/wpscan", methods=["POST"])
def wpscan():
"""Execute wpscan with the provided parameters."""
try:
params = request.json
url = params.get("url", "")
additional_args = params.get("additional_args", "")
if not url:
logger.warning("WPScan called without URL parameter")
return jsonify({
"error": "URL parameter is required"
}), 400
command = f"wpscan --url {url}"
if additional_args:
command += f" {additional_args}"
result = execute_command(command)
return jsonify(result)
except Exception as e:
logger.error(f"Error in wpscan endpoint: {str(e)}")
logger.error(traceback.format_exc())
return jsonify({
"error": f"Server error: {str(e)}"
}), 500
@app.route("/api/tools/enum4linux", methods=["POST"])
def enum4linux():
"""Execute enum4linux with the provided parameters."""
try:
params = request.json
target = params.get("target", "")
additional_args = params.get("additional_args", "-a")
if not target:
logger.warning("Enum4linux called without target parameter")
return jsonify({
"error": "Target parameter is required"
}), 400
command = f"enum4linux {additional_args} {target}"
result = execute_command(command)
return jsonify(result)
except Exception as e:
logger.error(f"Error in enum4linux endpoint: {str(e)}")
logger.error(traceback.format_exc())
return jsonify({
"error": f"Server error: {str(e)}"
}), 500
# Health check endpoint
@app.route("/health", methods=["GET"])
def health_check():
"""Health check endpoint."""
# Check if essential tools are installed
essential_tools = ["nmap", "gobuster", "dirb", "nikto"]
tools_status = {}
for tool in essential_tools:
try:
result = execute_command(f"which {tool}")
tools_status[tool] = result["success"]
except:
tools_status[tool] = False
all_essential_tools_available = all(tools_status.values())
return jsonify({
"status": "healthy",
"message": "Kali Linux Tools API Server is running",
"tools_status": tools_status,
"all_essential_tools_available": all_essential_tools_available
})
@app.route("/mcp/capabilities", methods=["GET"])
def get_capabilities():
# Return tool capabilities similar to our existing MCP server
pass
@app.route("/mcp/tools/kali_tools/<tool_name>", methods=["POST"])
def execute_tool(tool_name):
# Direct tool execution without going through the API server
pass
def parse_args():
"""Parse command line arguments."""
parser = argparse.ArgumentParser(description="Run the Kali Linux API Server")
parser.add_argument("--debug", action="store_true", help="Enable debug mode")
parser.add_argument("--port", type=int, default=API_PORT, help=f"Port for the API server (default: {API_PORT})")
return parser.parse_args()
if __name__ == "__main__":
args = parse_args()
# Set configuration from command line arguments
if args.debug:
DEBUG_MODE = True
os.environ["DEBUG_MODE"] = "1"
logger.setLevel(logging.DEBUG)
if args.port != API_PORT:
API_PORT = args.port
logger.info(f"Starting Kali Linux Tools API Server on port {API_PORT}")
app.run(host="0.0.0.0", port=API_PORT, debug=DEBUG_MODE)

417
mcpforpentest/mcp_server.py Normal file
View File

@ -0,0 +1,417 @@
#!/usr/bin/env python3
# This script connect the MCP AI agent to Kali Linux terminal and API Server.
# some of the code here was inspired from https://github.com/whit3rabbit0/project_astro , be sure to check them out
import sys
import os
import argparse
import logging
from typing import Dict, Any, Optional
import requests
from mcp.server.fastmcp import FastMCP
# Configure logging
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(message)s",
handlers=[
logging.StreamHandler(sys.stdout)
]
)
logger = logging.getLogger(__name__)
# Default configuration
DEFAULT_KALI_SERVER = "http://localhost:5000" # change to your linux IP
DEFAULT_REQUEST_TIMEOUT = 300 # 5 minutes default timeout for API requests
class KaliToolsClient:
"""Client for communicating with the Kali Linux Tools API Server"""
def __init__(self, server_url: str, timeout: int = DEFAULT_REQUEST_TIMEOUT):
"""
Initialize the Kali Tools Client
Args:
server_url: URL of the Kali Tools API Server
timeout: Request timeout in seconds
"""
self.server_url = server_url.rstrip("/")
self.timeout = timeout
logger.info(f"Initialized Kali Tools Client connecting to {server_url}")
def safe_get(self, endpoint: str, params: Optional[Dict[str, Any]] = None) -> Dict[str, Any]:
"""
Perform a GET request with optional query parameters.
Args:
endpoint: API endpoint path (without leading slash)
params: Optional query parameters
Returns:
Response data as dictionary
"""
if params is None:
params = {}
url = f"{self.server_url}/{endpoint}"
try:
logger.debug(f"GET {url} with params: {params}")
response = requests.get(url, params=params, timeout=self.timeout)
response.raise_for_status()
return response.json()
except requests.exceptions.RequestException as e:
logger.error(f"Request failed: {str(e)}")
return {"error": f"Request failed: {str(e)}", "success": False}
except Exception as e:
logger.error(f"Unexpected error: {str(e)}")
return {"error": f"Unexpected error: {str(e)}", "success": False}
def safe_post(self, endpoint: str, json_data: Dict[str, Any]) -> Dict[str, Any]:
"""
Perform a POST request with JSON data.
Args:
endpoint: API endpoint path (without leading slash)
json_data: JSON data to send
Returns:
Response data as dictionary
"""
url = f"{self.server_url}/{endpoint}"
try:
logger.debug(f"POST {url} with data: {json_data}")
response = requests.post(url, json=json_data, timeout=self.timeout)
response.raise_for_status()
return response.json()
except requests.exceptions.RequestException as e:
logger.error(f"Request failed: {str(e)}")
return {"error": f"Request failed: {str(e)}", "success": False}
except Exception as e:
logger.error(f"Unexpected error: {str(e)}")
return {"error": f"Unexpected error: {str(e)}", "success": False}
def execute_command(self, command: str) -> Dict[str, Any]:
"""
Execute a generic command on the Kali server
Args:
command: Command to execute
Returns:
Command execution results
"""
return self.safe_post("api/command", {"command": command})
def check_health(self) -> Dict[str, Any]:
"""
Check the health of the Kali Tools API Server
Returns:
Health status information
"""
return self.safe_get("health")
def setup_mcp_server(kali_client: KaliToolsClient) -> FastMCP:
"""
Set up the MCP server with all tool functions
Args:
kali_client: Initialized KaliToolsClient
Returns:
Configured FastMCP instance
"""
mcp = FastMCP("kali-mcp")
@mcp.tool()
def nmap_scan(target: str, scan_type: str = "-sV", ports: str = "", additional_args: str = "") -> Dict[str, Any]:
"""
Execute an Nmap scan against a target.
Args:
target: The IP address or hostname to scan
scan_type: Scan type (e.g., -sV for version detection)
ports: Comma-separated list of ports or port ranges
additional_args: Additional Nmap arguments
Returns:
Scan results
"""
data = {
"target": target,
"scan_type": scan_type,
"ports": ports,
"additional_args": additional_args
}
return kali_client.safe_post("api/tools/nmap", data)
@mcp.tool()
def gobuster_scan(url: str, mode: str = "dir", wordlist: str = "/usr/share/wordlists/dirb/common.txt", additional_args: str = "") -> Dict[str, Any]:
"""
Execute Gobuster to find directories, DNS subdomains, or virtual hosts.
Args:
url: The target URL
mode: Scan mode (dir, dns, fuzz, vhost)
wordlist: Path to wordlist file
additional_args: Additional Gobuster arguments
Returns:
Scan results
"""
data = {
"url": url,
"mode": mode,
"wordlist": wordlist,
"additional_args": additional_args
}
return kali_client.safe_post("api/tools/gobuster", data)
@mcp.tool()
def dirb_scan(url: str, wordlist: str = "/usr/share/wordlists/dirb/common.txt", additional_args: str = "") -> Dict[str, Any]:
"""
Execute Dirb web content scanner.
Args:
url: The target URL
wordlist: Path to wordlist file
additional_args: Additional Dirb arguments
Returns:
Scan results
"""
data = {
"url": url,
"wordlist": wordlist,
"additional_args": additional_args
}
return kali_client.safe_post("api/tools/dirb", data)
@mcp.tool()
def nikto_scan(target: str, additional_args: str = "") -> Dict[str, Any]:
"""
Execute Nikto web server scanner.
Args:
target: The target URL or IP
additional_args: Additional Nikto arguments
Returns:
Scan results
"""
data = {
"target": target,
"additional_args": additional_args
}
return kali_client.safe_post("api/tools/nikto", data)
@mcp.tool()
def sqlmap_scan(url: str, data: str = "", additional_args: str = "") -> Dict[str, Any]:
"""
Execute SQLmap SQL injection scanner.
Args:
url: The target URL
data: POST data string
additional_args: Additional SQLmap arguments
Returns:
Scan results
"""
post_data = {
"url": url,
"data": data,
"additional_args": additional_args
}
return kali_client.safe_post("api/tools/sqlmap", post_data)
@mcp.tool()
def metasploit_run(module: str, options: Dict[str, Any] = {}) -> Dict[str, Any]:
"""
Execute a Metasploit module.
Args:
module: The Metasploit module path
options: Dictionary of module options
Returns:
Module execution results
"""
data = {
"module": module,
"options": options
}
return kali_client.safe_post("api/tools/metasploit", data)
@mcp.tool()
def hydra_attack(
target: str,
service: str,
username: str = "",
username_file: str = "",
password: str = "",
password_file: str = "",
additional_args: str = ""
) -> Dict[str, Any]:
"""
Execute Hydra password cracking tool.
Args:
target: Target IP or hostname
service: Service to attack (ssh, ftp, http-post-form, etc.)
username: Single username to try
username_file: Path to username file
password: Single password to try
password_file: Path to password file
additional_args: Additional Hydra arguments
Returns:
Attack results
"""
data = {
"target": target,
"service": service,
"username": username,
"username_file": username_file,
"password": password,
"password_file": password_file,
"additional_args": additional_args
}
return kali_client.safe_post("api/tools/hydra", data)
@mcp.tool()
def john_crack(
hash_file: str,
wordlist: str = "/usr/share/wordlists/rockyou.txt",
format_type: str = "",
additional_args: str = ""
) -> Dict[str, Any]:
"""
Execute John the Ripper password cracker.
Args:
hash_file: Path to file containing hashes
wordlist: Path to wordlist file
format_type: Hash format type
additional_args: Additional John arguments
Returns:
Cracking results
"""
data = {
"hash_file": hash_file,
"wordlist": wordlist,
"format": format_type,
"additional_args": additional_args
}
return kali_client.safe_post("api/tools/john", data)
@mcp.tool()
def wpscan_analyze(url: str, additional_args: str = "") -> Dict[str, Any]:
"""
Execute WPScan WordPress vulnerability scanner.
Args:
url: The target WordPress URL
additional_args: Additional WPScan arguments
Returns:
Scan results
"""
data = {
"url": url,
"additional_args": additional_args
}
return kali_client.safe_post("api/tools/wpscan", data)
@mcp.tool()
def enum4linux_scan(target: str, additional_args: str = "-a") -> Dict[str, Any]:
"""
Execute Enum4linux Windows/Samba enumeration tool.
Args:
target: The target IP or hostname
additional_args: Additional enum4linux arguments
Returns:
Enumeration results
"""
data = {
"target": target,
"additional_args": additional_args
}
return kali_client.safe_post("api/tools/enum4linux", data)
@mcp.tool()
def server_health() -> Dict[str, Any]:
"""
Check the health status of the Kali API server.
Returns:
Server health information
"""
return kali_client.check_health()
@mcp.tool()
def execute_command(command: str) -> Dict[str, Any]:
"""
Execute an arbitrary command on the Kali server.
Args:
command: The command to execute
Returns:
Command execution results
"""
return kali_client.execute_command(command)
return mcp
def parse_args():
"""Parse command line arguments."""
parser = argparse.ArgumentParser(description="Run the Kali MCP Client")
parser.add_argument("--server", type=str, default=DEFAULT_KALI_SERVER,
help=f"Kali API server URL (default: {DEFAULT_KALI_SERVER})")
parser.add_argument("--timeout", type=int, default=DEFAULT_REQUEST_TIMEOUT,
help=f"Request timeout in seconds (default: {DEFAULT_REQUEST_TIMEOUT})")
parser.add_argument("--debug", action="store_true", help="Enable debug logging")
return parser.parse_args()
def main():
"""Main entry point for the MCP server."""
args = parse_args()
# Configure logging based on debug flag
if args.debug:
logger.setLevel(logging.DEBUG)
logger.debug("Debug logging enabled")
# Initialize the Kali Tools client
kali_client = KaliToolsClient(args.server, args.timeout)
# Check server health and log the result
health = kali_client.check_health()
if "error" in health:
logger.warning(f"Unable to connect to Kali API server at {args.server}: {health['error']}")
logger.warning("MCP server will start, but tool execution may fail")
else:
logger.info(f"Successfully connected to Kali API server at {args.server}")
logger.info(f"Server health status: {health['status']}")
if not health.get("all_essential_tools_available", False):
logger.warning("Not all essential tools are available on the Kali server")
missing_tools = [tool for tool, available in health.get("tools_status", {}).items() if not available]
if missing_tools:
logger.warning(f"Missing tools: {', '.join(missing_tools)}")
# Set up and run the MCP server
mcp = setup_mcp_server(kali_client)
logger.info("Starting Kali MCP server")
mcp.run()
if __name__ == "__main__":
main()