mirror of
https://github.com/SunZhimin2021/AIPentest.git
synced 2025-11-05 10:53:16 +00:00
Update mcp_server.py
修改通道控制,加上交互式Shell
This commit is contained in:
parent
e3c70c5944
commit
c2c2ae101d
@ -8,7 +8,7 @@ import sys
|
|||||||
import os
|
import os
|
||||||
import argparse
|
import argparse
|
||||||
import logging
|
import logging
|
||||||
from typing import Dict, Any, Optional
|
from typing import Dict, Any, Optional, List, Union
|
||||||
import requests
|
import requests
|
||||||
|
|
||||||
from mcp.server.fastmcp import FastMCP
|
from mcp.server.fastmcp import FastMCP
|
||||||
@ -95,17 +95,40 @@ class KaliToolsClient:
|
|||||||
logger.error(f"Unexpected error: {str(e)}")
|
logger.error(f"Unexpected error: {str(e)}")
|
||||||
return {"error": f"Unexpected error: {str(e)}", "success": False}
|
return {"error": f"Unexpected error: {str(e)}", "success": False}
|
||||||
|
|
||||||
def execute_command(self, command: str) -> Dict[str, Any]:
|
def execute_command(self, command: str, interactive: bool = False) -> Dict[str, Any]:
|
||||||
"""
|
"""
|
||||||
Execute a generic command on the Kali server
|
Execute a generic command on the Kali server
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
command: Command to execute
|
command: Command to execute
|
||||||
|
interactive: Whether the command requires interactive mode
|
||||||
|
|
||||||
Returns:
|
Returns:
|
||||||
Command execution results
|
Command execution results
|
||||||
"""
|
"""
|
||||||
return self.safe_post("api/command", {"command": command})
|
return self.safe_post("api/command", {"command": command, "interactive": interactive})
|
||||||
|
|
||||||
|
def execute_interactive_tool(self, tool: str, commands: List[str] = None, timeout: int = None) -> Dict[str, Any]:
|
||||||
|
"""
|
||||||
|
Execute an interactive tool with a sequence of commands
|
||||||
|
|
||||||
|
Args:
|
||||||
|
tool: The tool to execute (e.g., msfconsole, sqlmap)
|
||||||
|
commands: List of commands to execute in the interactive session
|
||||||
|
timeout: Custom timeout for this interactive session
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Interactive session results
|
||||||
|
"""
|
||||||
|
data = {
|
||||||
|
"command": tool,
|
||||||
|
"interactive": True
|
||||||
|
}
|
||||||
|
|
||||||
|
if timeout is not None:
|
||||||
|
data["timeout"] = timeout
|
||||||
|
|
||||||
|
return self.safe_post("api/command", data)
|
||||||
|
|
||||||
def check_health(self) -> Dict[str, Any]:
|
def check_health(self) -> Dict[str, Any]:
|
||||||
"""
|
"""
|
||||||
@ -230,23 +253,72 @@ def setup_mcp_server(kali_client: KaliToolsClient) -> FastMCP:
|
|||||||
}
|
}
|
||||||
return kali_client.safe_post("api/tools/sqlmap", post_data)
|
return kali_client.safe_post("api/tools/sqlmap", post_data)
|
||||||
|
|
||||||
|
|
||||||
@mcp.tool()
|
@mcp.tool()
|
||||||
def metasploit_run(module: str, options: Dict[str, Any] = {}) -> Dict[str, Any]:
|
def msfconsole_interactive(resource_script: str = None, commands: List[str] = None, timeout: int = None) -> Dict[str, Any]:
|
||||||
"""
|
"""
|
||||||
Execute a Metasploit module.
|
Start an interactive Metasploit console session.
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
module: The Metasploit module path
|
resource_script: Optional path to MSF resource script
|
||||||
options: Dictionary of module options
|
commands: List of commands to execute in MSF console
|
||||||
|
timeout: Custom timeout for this MSF console session
|
||||||
|
|
||||||
Returns:
|
Returns:
|
||||||
Module execution results
|
MSF console session results
|
||||||
"""
|
"""
|
||||||
data = {
|
base_command = "msfconsole -q"
|
||||||
"module": module,
|
|
||||||
"options": options
|
if resource_script:
|
||||||
}
|
if not os.path.exists(resource_script):
|
||||||
return kali_client.safe_post("api/tools/metasploit", data)
|
return {
|
||||||
|
"error": f"Resource script file not found: {resource_script}",
|
||||||
|
"success": False
|
||||||
|
}
|
||||||
|
base_command += f" -r {resource_script}"
|
||||||
|
|
||||||
|
# Create a temporary resource script if commands are provided
|
||||||
|
temp_resource_file = None
|
||||||
|
if commands and not resource_script:
|
||||||
|
try:
|
||||||
|
import tempfile
|
||||||
|
temp_resource_file = tempfile.NamedTemporaryFile(delete=False, mode='w+', suffix='.rc')
|
||||||
|
for cmd in commands:
|
||||||
|
temp_resource_file.write(f"{cmd}\n")
|
||||||
|
temp_resource_file.close()
|
||||||
|
base_command += f" -r {temp_resource_file.name}"
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Error creating temporary resource file: {str(e)}")
|
||||||
|
return {
|
||||||
|
"error": f"Failed to create resource script: {str(e)}",
|
||||||
|
"success": False
|
||||||
|
}
|
||||||
|
|
||||||
|
try:
|
||||||
|
result = kali_client.execute_command(base_command, interactive=True)
|
||||||
|
|
||||||
|
# Clean up temporary file if created
|
||||||
|
if temp_resource_file and os.path.exists(temp_resource_file.name):
|
||||||
|
try:
|
||||||
|
os.unlink(temp_resource_file.name)
|
||||||
|
except:
|
||||||
|
pass
|
||||||
|
|
||||||
|
return result
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Error in msfconsole interactive: {str(e)}")
|
||||||
|
|
||||||
|
# Clean up on error too
|
||||||
|
if temp_resource_file and os.path.exists(temp_resource_file.name):
|
||||||
|
try:
|
||||||
|
os.unlink(temp_resource_file.name)
|
||||||
|
except:
|
||||||
|
pass
|
||||||
|
|
||||||
|
return {
|
||||||
|
"error": f"MSF console execution failed: {str(e)}",
|
||||||
|
"success": False
|
||||||
|
}
|
||||||
|
|
||||||
@mcp.tool()
|
@mcp.tool()
|
||||||
def hydra_attack(
|
def hydra_attack(
|
||||||
@ -358,17 +430,91 @@ def setup_mcp_server(kali_client: KaliToolsClient) -> FastMCP:
|
|||||||
return kali_client.check_health()
|
return kali_client.check_health()
|
||||||
|
|
||||||
@mcp.tool()
|
@mcp.tool()
|
||||||
def execute_command(command: str) -> Dict[str, Any]:
|
def execute_command(command: str, interactive: bool = False) -> Dict[str, Any]:
|
||||||
"""
|
"""
|
||||||
Execute an arbitrary command on the Kali server.
|
Execute an arbitrary command on the Kali server.
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
command: The command to execute
|
command: The command to execute
|
||||||
|
interactive: Whether the command requires interactive mode
|
||||||
|
|
||||||
Returns:
|
Returns:
|
||||||
Command execution results
|
Command execution results
|
||||||
"""
|
"""
|
||||||
return kali_client.execute_command(command)
|
return kali_client.execute_command(command, interactive)
|
||||||
|
|
||||||
|
@mcp.tool()
|
||||||
|
def run_interactive(tool: str, commands: List[str] = None, timeout: int = None) -> Dict[str, Any]:
|
||||||
|
"""
|
||||||
|
Run a tool in interactive mode.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
tool: The tool to run (e.g., msfconsole, sqlmap, hydra)
|
||||||
|
commands: List of commands to send to the interactive session
|
||||||
|
timeout: Custom timeout for this interactive session
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Interactive session results
|
||||||
|
"""
|
||||||
|
# Special handling for known interactive tools
|
||||||
|
if tool.startswith("msfconsole"):
|
||||||
|
return msfconsole_interactive(commands=commands, timeout=timeout)
|
||||||
|
elif tool.startswith("sqlmap"):
|
||||||
|
# Add --batch to make it non-interactive but still use the interactive executor
|
||||||
|
if not "--batch" in tool:
|
||||||
|
tool += " --batch"
|
||||||
|
|
||||||
|
return kali_client.execute_interactive_tool(tool, commands, timeout)
|
||||||
|
|
||||||
|
@mcp.tool()
|
||||||
|
def sql_interactive(url: str, data: str = "", commands: List[str] = None, timeout: int = None) -> Dict[str, Any]:
|
||||||
|
"""
|
||||||
|
Run SQLMap in interactive mode with more options.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
url: Target URL
|
||||||
|
data: Optional POST data
|
||||||
|
commands: Additional SQLMap commands/options
|
||||||
|
timeout: Custom timeout for this session
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
SQLMap session results
|
||||||
|
"""
|
||||||
|
command = f"sqlmap -u {url} --batch"
|
||||||
|
|
||||||
|
if data:
|
||||||
|
command += f" --data=\"{data}\""
|
||||||
|
|
||||||
|
return kali_client.execute_command(command, interactive=True)
|
||||||
|
|
||||||
|
@mcp.tool()
|
||||||
|
def create_metasploit_resource(commands: List[str], output_file: str) -> Dict[str, Any]:
|
||||||
|
"""
|
||||||
|
Create a Metasploit resource script.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
commands: List of MSF commands to include in the script
|
||||||
|
output_file: Path to save the resource script
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Result of script creation
|
||||||
|
"""
|
||||||
|
try:
|
||||||
|
with open(output_file, 'w') as f:
|
||||||
|
for cmd in commands:
|
||||||
|
f.write(f"{cmd}\n")
|
||||||
|
|
||||||
|
return {
|
||||||
|
"success": True,
|
||||||
|
"message": f"Created Metasploit resource script at {output_file}",
|
||||||
|
"file": output_file
|
||||||
|
}
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Error creating resource script: {str(e)}")
|
||||||
|
return {
|
||||||
|
"success": False,
|
||||||
|
"error": f"Failed to create resource script: {str(e)}"
|
||||||
|
}
|
||||||
|
|
||||||
return mcp
|
return mcp
|
||||||
|
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user