mirror of
https://github.com/SunZhimin2021/AIPentest.git
synced 2025-11-05 10:53:16 +00:00
755 lines
27 KiB
Python
755 lines
27 KiB
Python
#!/usr/bin/env python3
|
|
|
|
# This script connect the MCP AI agent to Kali Linux terminal and API Server.
|
|
|
|
import argparse
|
|
import json
|
|
import logging
|
|
import os
|
|
import subprocess
|
|
import sys
|
|
import traceback
|
|
import threading
|
|
import tempfile
|
|
import signal
|
|
import time
|
|
from typing import Dict, Any, Optional, Tuple
|
|
from flask import Flask, request, jsonify
|
|
|
|
# Configure logging
|
|
logging.basicConfig(
|
|
level=logging.INFO,
|
|
format="%(asctime)s [%(levelname)s] %(message)s",
|
|
handlers=[
|
|
logging.StreamHandler(sys.stdout)
|
|
]
|
|
)
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# Configuration
|
|
API_PORT = int(os.environ.get("API_PORT", 5000))
|
|
DEBUG_MODE = os.environ.get("DEBUG_MODE", "0").lower() in ("1", "true", "yes", "y")
|
|
COMMAND_TIMEOUT = 180 # 3 minutes default timeout
|
|
|
|
app = Flask(__name__)
|
|
|
|
class CommandExecutor:
|
|
"""Class to handle command execution with better timeout management"""
|
|
|
|
def __init__(self, command: str, timeout: int = COMMAND_TIMEOUT, interactive: bool = False):
|
|
self.command = command
|
|
self.timeout = timeout
|
|
self.process = None
|
|
self.stdout_data = ""
|
|
self.stderr_data = ""
|
|
self.stdout_thread = None
|
|
self.stderr_thread = None
|
|
self.return_code = None
|
|
self.timed_out = False
|
|
self.interactive = interactive
|
|
|
|
# Check if expect is installed - needed for complex interactive commands
|
|
if self.interactive and "expect" in command:
|
|
try:
|
|
check_expect = subprocess.run(["which", "expect"],
|
|
stdout=subprocess.PIPE,
|
|
stderr=subprocess.PIPE)
|
|
if check_expect.returncode != 0:
|
|
logger.warning("'expect' command not found, installing it...")
|
|
# Try to install expect
|
|
install_cmd = subprocess.run(["apt-get", "install", "-y", "expect"],
|
|
stdout=subprocess.PIPE,
|
|
stderr=subprocess.PIPE)
|
|
if install_cmd.returncode != 0:
|
|
logger.error("Failed to install 'expect'. Some interactive commands may not work properly.")
|
|
except Exception as e:
|
|
logger.error(f"Error checking/installing 'expect': {str(e)}")
|
|
|
|
def _read_stdout(self):
|
|
"""Thread function to continuously read stdout"""
|
|
for line in iter(self.process.stdout.readline, ''):
|
|
self.stdout_data += line
|
|
if DEBUG_MODE:
|
|
logger.debug(f"STDOUT: {line.strip()}")
|
|
|
|
def _read_stderr(self):
|
|
"""Thread function to continuously read stderr"""
|
|
for line in iter(self.process.stderr.readline, ''):
|
|
self.stderr_data += line
|
|
if DEBUG_MODE:
|
|
logger.debug(f"STDERR: {line.strip()}")
|
|
|
|
def execute(self) -> Dict[str, Any]:
|
|
"""Execute the command and handle timeout gracefully"""
|
|
logger.info(f"Executing command: {self.command}")
|
|
|
|
# For interactive tools, use a different approach with pseudo-TTY
|
|
if self.interactive:
|
|
return self._execute_interactive()
|
|
else:
|
|
return self._execute_non_interactive()
|
|
|
|
def _execute_non_interactive(self) -> Dict[str, Any]:
|
|
"""Execute non-interactive command"""
|
|
try:
|
|
self.process = subprocess.Popen(
|
|
self.command,
|
|
shell=True,
|
|
stdout=subprocess.PIPE,
|
|
stderr=subprocess.PIPE,
|
|
text=True,
|
|
bufsize=1 # Line buffered
|
|
)
|
|
|
|
# Start threads to read output continuously
|
|
self.stdout_thread = threading.Thread(target=self._read_stdout)
|
|
self.stderr_thread = threading.Thread(target=self._read_stderr)
|
|
self.stdout_thread.daemon = True
|
|
self.stderr_thread.daemon = True
|
|
self.stdout_thread.start()
|
|
self.stderr_thread.start()
|
|
|
|
# Wait for the process to complete or timeout
|
|
try:
|
|
self.return_code = self.process.wait(timeout=self.timeout)
|
|
# Process completed, join the threads
|
|
self.stdout_thread.join()
|
|
self.stderr_thread.join()
|
|
except subprocess.TimeoutExpired:
|
|
# Process timed out but we might have partial results
|
|
self.timed_out = True
|
|
logger.warning(f"Command timed out after {self.timeout} seconds. Terminating process.")
|
|
|
|
# Try to terminate gracefully first
|
|
self.process.terminate()
|
|
try:
|
|
self.process.wait(timeout=5) # Give it 5 seconds to terminate
|
|
except subprocess.TimeoutExpired:
|
|
# Force kill if it doesn't terminate
|
|
logger.warning("Process not responding to termination. Killing.")
|
|
self.process.kill()
|
|
|
|
# Update final output
|
|
self.return_code = -1
|
|
|
|
# Always consider it a success if we have output, even with timeout
|
|
success = True if self.timed_out and (self.stdout_data or self.stderr_data) else (self.return_code == 0)
|
|
|
|
return {
|
|
"stdout": self.stdout_data,
|
|
"stderr": self.stderr_data,
|
|
"return_code": self.return_code,
|
|
"success": success,
|
|
"timed_out": self.timed_out,
|
|
"partial_results": self.timed_out and (self.stdout_data or self.stderr_data)
|
|
}
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error executing command: {str(e)}")
|
|
logger.error(traceback.format_exc())
|
|
return {
|
|
"stdout": self.stdout_data,
|
|
"stderr": f"Error executing command: {str(e)}\n{self.stderr_data}",
|
|
"return_code": -1,
|
|
"success": False,
|
|
"timed_out": False,
|
|
"partial_results": bool(self.stdout_data or self.stderr_data)
|
|
}
|
|
|
|
def _execute_interactive(self) -> Dict[str, Any]:
|
|
"""Execute command that requires a TTY/PTY"""
|
|
temp_output = tempfile.NamedTemporaryFile(delete=False, mode='w+')
|
|
temp_name = temp_output.name
|
|
temp_output.close()
|
|
|
|
try:
|
|
# Create command that runs in a PTY and redirects output to our temp file
|
|
pty_command = f"script -q -c '{self.command}' {temp_name}"
|
|
|
|
# Execute the command
|
|
process = subprocess.Popen(
|
|
pty_command,
|
|
shell=True,
|
|
preexec_fn=os.setsid # Create new process group for proper kill
|
|
)
|
|
|
|
# Wait for timeout or completion
|
|
try:
|
|
return_code = process.wait(timeout=self.timeout)
|
|
timed_out = False
|
|
except subprocess.TimeoutExpired:
|
|
logger.warning(f"Interactive command timed out after {self.timeout} seconds")
|
|
# Kill the entire process group
|
|
os.killpg(os.getpgid(process.pid), signal.SIGTERM)
|
|
time.sleep(1)
|
|
try:
|
|
os.killpg(os.getpgid(process.pid), signal.SIGKILL)
|
|
except:
|
|
pass
|
|
return_code = -1
|
|
timed_out = True
|
|
|
|
# Read the captured output
|
|
with open(temp_name, 'r') as f:
|
|
output = f.read()
|
|
|
|
# Determine success based on output presence even if timed out
|
|
success = True if timed_out and output else (return_code == 0)
|
|
|
|
return {
|
|
"stdout": output,
|
|
"stderr": "", # All output is captured to stdout with the script command
|
|
"return_code": return_code,
|
|
"success": success,
|
|
"timed_out": timed_out,
|
|
"partial_results": timed_out and bool(output)
|
|
}
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error executing interactive command: {str(e)}")
|
|
logger.error(traceback.format_exc())
|
|
return {
|
|
"stdout": "",
|
|
"stderr": f"Error executing interactive command: {str(e)}",
|
|
"return_code": -1,
|
|
"success": False,
|
|
"timed_out": False,
|
|
"partial_results": False
|
|
}
|
|
finally:
|
|
# Clean up temp file
|
|
try:
|
|
os.unlink(temp_name)
|
|
except:
|
|
pass
|
|
|
|
|
|
def execute_command(command: str, interactive: bool = False) -> Dict[str, Any]:
|
|
"""
|
|
Execute a shell command and return the result
|
|
|
|
Args:
|
|
command: The command to execute
|
|
interactive: Whether the command requires a TTY
|
|
|
|
Returns:
|
|
A dictionary containing the stdout, stderr, and return code
|
|
"""
|
|
executor = CommandExecutor(command, interactive=interactive)
|
|
return executor.execute()
|
|
|
|
# Helper function to determine if a command should be run in interactive mode
|
|
def needs_interactive_mode(command: str) -> bool:
|
|
"""Determine if a command likely needs an interactive TTY session"""
|
|
interactive_tools = [
|
|
"msfconsole", "sqlmap", "hydra", "wpscan", "metasploit",
|
|
"meterpreter", "postgresql", "mysql", "ssh", "ftp", "telnet"
|
|
]
|
|
|
|
return any(tool in command for tool in interactive_tools)
|
|
|
|
|
|
@app.route("/api/command", methods=["POST"])
|
|
def generic_command():
|
|
"""Execute any command provided in the request."""
|
|
try:
|
|
params = request.json
|
|
command = params.get("command", "")
|
|
force_interactive = params.get("interactive", False)
|
|
|
|
if not command:
|
|
logger.warning("Command endpoint called without command parameter")
|
|
return jsonify({
|
|
"error": "Command parameter is required"
|
|
}), 400
|
|
|
|
# Determine if command needs interactive mode
|
|
interactive = force_interactive or needs_interactive_mode(command)
|
|
logger.info(f"Running command in {'interactive' if interactive else 'non-interactive'} mode")
|
|
|
|
result = execute_command(command, interactive=interactive)
|
|
return jsonify(result)
|
|
except Exception as e:
|
|
logger.error(f"Error in command endpoint: {str(e)}")
|
|
logger.error(traceback.format_exc())
|
|
return jsonify({
|
|
"error": f"Server error: {str(e)}"
|
|
}), 500
|
|
|
|
|
|
@app.route("/api/tools/nmap", methods=["POST"])
|
|
def nmap():
|
|
"""Execute nmap scan with the provided parameters."""
|
|
try:
|
|
params = request.json
|
|
target = params.get("target", "")
|
|
scan_type = params.get("scan_type", "-sCV")
|
|
ports = params.get("ports", "")
|
|
additional_args = params.get("additional_args", "-T4 -Pn")
|
|
|
|
if not target:
|
|
logger.warning("Nmap called without target parameter")
|
|
return jsonify({
|
|
"error": "Target parameter is required"
|
|
}), 400
|
|
|
|
command = f"nmap {scan_type}"
|
|
|
|
if ports:
|
|
command += f" -p {ports}"
|
|
|
|
if additional_args:
|
|
# Basic validation for additional args - more sophisticated validation would be better
|
|
command += f" {additional_args}"
|
|
|
|
command += f" {target}"
|
|
|
|
result = execute_command(command)
|
|
return jsonify(result)
|
|
except Exception as e:
|
|
logger.error(f"Error in nmap endpoint: {str(e)}")
|
|
logger.error(traceback.format_exc())
|
|
return jsonify({
|
|
"error": f"Server error: {str(e)}"
|
|
}), 500
|
|
|
|
@app.route("/api/tools/gobuster", methods=["POST"])
|
|
def gobuster():
|
|
"""Execute gobuster with the provided parameters."""
|
|
try:
|
|
params = request.json
|
|
url = params.get("url", "")
|
|
mode = params.get("mode", "dir")
|
|
wordlist = params.get("wordlist", "/usr/share/wordlists/dirb/common.txt")
|
|
additional_args = params.get("additional_args", "")
|
|
|
|
if not url:
|
|
logger.warning("Gobuster called without URL parameter")
|
|
return jsonify({
|
|
"error": "URL parameter is required"
|
|
}), 400
|
|
|
|
# Validate mode
|
|
if mode not in ["dir", "dns", "fuzz", "vhost"]:
|
|
logger.warning(f"Invalid gobuster mode: {mode}")
|
|
return jsonify({
|
|
"error": f"Invalid mode: {mode}. Must be one of: dir, dns, fuzz, vhost"
|
|
}), 400
|
|
|
|
command = f"gobuster {mode} -u {url} -w {wordlist}"
|
|
|
|
if additional_args:
|
|
command += f" {additional_args}"
|
|
|
|
result = execute_command(command)
|
|
return jsonify(result)
|
|
except Exception as e:
|
|
logger.error(f"Error in gobuster endpoint: {str(e)}")
|
|
logger.error(traceback.format_exc())
|
|
return jsonify({
|
|
"error": f"Server error: {str(e)}"
|
|
}), 500
|
|
|
|
@app.route("/api/tools/dirb", methods=["POST"])
|
|
def dirb():
|
|
"""Execute dirb with the provided parameters."""
|
|
try:
|
|
params = request.json
|
|
url = params.get("url", "")
|
|
wordlist = params.get("wordlist", "/usr/share/wordlists/dirb/common.txt")
|
|
additional_args = params.get("additional_args", "")
|
|
|
|
if not url:
|
|
logger.warning("Dirb called without URL parameter")
|
|
return jsonify({
|
|
"error": "URL parameter is required"
|
|
}), 400
|
|
|
|
command = f"dirb {url} {wordlist}"
|
|
|
|
if additional_args:
|
|
command += f" {additional_args}"
|
|
|
|
result = execute_command(command)
|
|
return jsonify(result)
|
|
except Exception as e:
|
|
logger.error(f"Error in dirb endpoint: {str(e)}")
|
|
logger.error(traceback.format_exc())
|
|
return jsonify({
|
|
"error": f"Server error: {str(e)}"
|
|
}), 500
|
|
|
|
@app.route("/api/tools/nikto", methods=["POST"])
|
|
def nikto():
|
|
"""Execute nikto with the provided parameters."""
|
|
try:
|
|
params = request.json
|
|
target = params.get("target", "")
|
|
additional_args = params.get("additional_args", "")
|
|
|
|
if not target:
|
|
logger.warning("Nikto called without target parameter")
|
|
return jsonify({
|
|
"error": "Target parameter is required"
|
|
}), 400
|
|
|
|
command = f"nikto -h {target}"
|
|
|
|
if additional_args:
|
|
command += f" {additional_args}"
|
|
|
|
result = execute_command(command)
|
|
return jsonify(result)
|
|
except Exception as e:
|
|
logger.error(f"Error in nikto endpoint: {str(e)}")
|
|
logger.error(traceback.format_exc())
|
|
return jsonify({
|
|
"error": f"Server error: {str(e)}"
|
|
}), 500
|
|
|
|
@app.route("/api/tools/sqlmap", methods=["POST"])
|
|
def sqlmap():
|
|
"""Execute sqlmap with the provided parameters."""
|
|
try:
|
|
params = request.json
|
|
url = params.get("url", "")
|
|
data = params.get("data", "")
|
|
additional_args = params.get("additional_args", "")
|
|
|
|
if not url:
|
|
logger.warning("SQLMap called without URL parameter")
|
|
return jsonify({
|
|
"error": "URL parameter is required"
|
|
}), 400
|
|
|
|
command = f"sqlmap -u {url} --batch"
|
|
|
|
if data:
|
|
command += f" --data=\"{data}\""
|
|
|
|
if additional_args:
|
|
command += f" {additional_args}"
|
|
|
|
# SQLMap definitely needs interactive mode
|
|
result = execute_command(command, interactive=True)
|
|
return jsonify(result)
|
|
except Exception as e:
|
|
logger.error(f"Error in sqlmap endpoint: {str(e)}")
|
|
logger.error(traceback.format_exc())
|
|
return jsonify({
|
|
"error": f"Server error: {str(e)}"
|
|
}), 500
|
|
|
|
@app.route("/api/tools/metasploit", methods=["POST"])
|
|
def metasploit():
|
|
"""Execute metasploit module with the provided parameters."""
|
|
try:
|
|
params = request.json
|
|
module = params.get("module", "")
|
|
options = params.get("options", {})
|
|
commands = params.get("commands", [])
|
|
timeout = params.get("timeout", COMMAND_TIMEOUT)
|
|
|
|
if not module:
|
|
logger.warning("Metasploit called without module parameter")
|
|
return jsonify({
|
|
"error": "Module parameter is required"
|
|
}), 400
|
|
|
|
# Create a more sophisticated MSF resource script
|
|
resource_content = f"use {module}\n"
|
|
|
|
# Set module options
|
|
for key, value in options.items():
|
|
resource_content += f"set {key} {value}\n"
|
|
|
|
# Run the exploit
|
|
resource_content += "exploit -z\n" # -z means don't interact with the session
|
|
|
|
# Additional post-exploit commands if provided
|
|
for cmd in commands:
|
|
resource_content += f"{cmd}\n"
|
|
|
|
# Handle sessions more gracefully
|
|
resource_content += "sessions -l\n" # List all sessions
|
|
resource_content += "sleep 3\n" # Give it time to complete
|
|
resource_content += "sessions -K\n" # Kill all sessions before exit
|
|
resource_content += "exit\n" # Exit msfconsole
|
|
|
|
# Save resource script to a temporary file with unique name
|
|
resource_file = f"/tmp/mcp_msf_resource_{os.getpid()}_{int(time.time())}.rc"
|
|
with open(resource_file, "w") as f:
|
|
f.write(resource_content)
|
|
|
|
logger.info(f"Created Metasploit resource script: {resource_file}")
|
|
logger.info(f"Resource script content:\n{resource_content}")
|
|
|
|
# Use expect script to handle the metasploit console automatically
|
|
# This creates a command that will automatically handle interactive prompts
|
|
expect_script = f"""
|
|
timeout {timeout} expect -c '
|
|
set timeout {timeout}
|
|
spawn msfconsole -q -r {resource_file}
|
|
expect {{
|
|
"exploit completed" {{
|
|
sleep 5
|
|
send "exit\\r"
|
|
exp_continue
|
|
}}
|
|
"session 1 opened" {{
|
|
sleep 5
|
|
send "\\r"
|
|
exp_continue
|
|
}}
|
|
"command shell session" {{
|
|
sleep 3
|
|
send "exit\\r"
|
|
exp_continue
|
|
}}
|
|
"meterpreter >" {{
|
|
send "exit\\r"
|
|
exp_continue
|
|
}}
|
|
timeout {{
|
|
puts "TIMEOUT OCCURRED"
|
|
exit 1
|
|
}}
|
|
eof {{
|
|
exit 0
|
|
}}
|
|
}}
|
|
'
|
|
"""
|
|
|
|
# Execute the metasploit command with the expect script
|
|
command = expect_script
|
|
result = execute_command(command, interactive=True)
|
|
|
|
# Clean up the temporary file
|
|
try:
|
|
os.remove(resource_file)
|
|
except Exception as e:
|
|
logger.warning(f"Error removing temporary resource file: {str(e)}")
|
|
|
|
return jsonify(result)
|
|
except Exception as e:
|
|
logger.error(f"Error in metasploit endpoint: {str(e)}")
|
|
logger.error(traceback.format_exc())
|
|
return jsonify({
|
|
"error": f"Server error: {str(e)}"
|
|
}), 500
|
|
|
|
@app.route("/api/tools/hydra", methods=["POST"])
|
|
def hydra():
|
|
"""Execute hydra with the provided parameters."""
|
|
try:
|
|
params = request.json
|
|
target = params.get("target", "")
|
|
service = params.get("service", "")
|
|
username = params.get("username", "")
|
|
username_file = params.get("username_file", "")
|
|
password = params.get("password", "")
|
|
password_file = params.get("password_file", "")
|
|
additional_args = params.get("additional_args", "")
|
|
|
|
if not target or not service:
|
|
logger.warning("Hydra called without target or service parameter")
|
|
return jsonify({
|
|
"error": "Target and service parameters are required"
|
|
}), 400
|
|
|
|
if not (username or username_file) or not (password or password_file):
|
|
logger.warning("Hydra called without username/password parameters")
|
|
return jsonify({
|
|
"error": "Username/username_file and password/password_file are required"
|
|
}), 400
|
|
|
|
command = f"hydra -t 4"
|
|
|
|
if username:
|
|
command += f" -l {username}"
|
|
elif username_file:
|
|
command += f" -L {username_file}"
|
|
|
|
if password:
|
|
command += f" -p {password}"
|
|
elif password_file:
|
|
command += f" -P {password_file}"
|
|
|
|
if additional_args:
|
|
command += f" {additional_args}"
|
|
|
|
command += f" {target} {service}"
|
|
|
|
# Hydra might benefit from interactive mode for certain services
|
|
result = execute_command(command, interactive=True)
|
|
return jsonify(result)
|
|
except Exception as e:
|
|
logger.error(f"Error in hydra endpoint: {str(e)}")
|
|
logger.error(traceback.format_exc())
|
|
return jsonify({
|
|
"error": f"Server error: {str(e)}"
|
|
}), 500
|
|
|
|
@app.route("/api/tools/john", methods=["POST"])
|
|
def john():
|
|
"""Execute john with the provided parameters."""
|
|
try:
|
|
params = request.json
|
|
hash_file = params.get("hash_file", "")
|
|
wordlist = params.get("wordlist", "/usr/share/wordlists/rockyou.txt")
|
|
format_type = params.get("format", "")
|
|
additional_args = params.get("additional_args", "")
|
|
|
|
if not hash_file:
|
|
logger.warning("John called without hash_file parameter")
|
|
return jsonify({
|
|
"error": "Hash file parameter is required"
|
|
}), 400
|
|
|
|
command = f"john"
|
|
|
|
if format_type:
|
|
command += f" --format={format_type}"
|
|
|
|
if wordlist:
|
|
command += f" --wordlist={wordlist}"
|
|
|
|
if additional_args:
|
|
command += f" {additional_args}"
|
|
|
|
command += f" {hash_file}"
|
|
|
|
result = execute_command(command)
|
|
return jsonify(result)
|
|
except Exception as e:
|
|
logger.error(f"Error in john endpoint: {str(e)}")
|
|
logger.error(traceback.format_exc())
|
|
return jsonify({
|
|
"error": f"Server error: {str(e)}"
|
|
}), 500
|
|
|
|
@app.route("/api/tools/wpscan", methods=["POST"])
|
|
def wpscan():
|
|
"""Execute wpscan with the provided parameters."""
|
|
try:
|
|
params = request.json
|
|
url = params.get("url", "")
|
|
additional_args = params.get("additional_args", "")
|
|
|
|
if not url:
|
|
logger.warning("WPScan called without URL parameter")
|
|
return jsonify({
|
|
"error": "URL parameter is required"
|
|
}), 400
|
|
|
|
command = f"wpscan --url {url} --no-banner"
|
|
|
|
if additional_args:
|
|
command += f" {additional_args}"
|
|
|
|
# WPScan needs interactive mode as it displays progress bars
|
|
result = execute_command(command, interactive=True)
|
|
return jsonify(result)
|
|
except Exception as e:
|
|
logger.error(f"Error in wpscan endpoint: {str(e)}")
|
|
logger.error(traceback.format_exc())
|
|
return jsonify({
|
|
"error": f"Server error: {str(e)}"
|
|
}), 500
|
|
|
|
@app.route("/api/tools/enum4linux", methods=["POST"])
|
|
def enum4linux():
|
|
"""Execute enum4linux with the provided parameters."""
|
|
try:
|
|
params = request.json
|
|
target = params.get("target", "")
|
|
additional_args = params.get("additional_args", "-a")
|
|
|
|
if not target:
|
|
logger.warning("Enum4linux called without target parameter")
|
|
return jsonify({
|
|
"error": "Target parameter is required"
|
|
}), 400
|
|
|
|
command = f"enum4linux {additional_args} {target}"
|
|
|
|
result = execute_command(command)
|
|
return jsonify(result)
|
|
except Exception as e:
|
|
logger.error(f"Error in enum4linux endpoint: {str(e)}")
|
|
logger.error(traceback.format_exc())
|
|
return jsonify({
|
|
"error": f"Server error: {str(e)}"
|
|
}), 500
|
|
|
|
|
|
# Health check endpoint
|
|
@app.route("/health", methods=["GET"])
|
|
def health_check():
|
|
"""Health check endpoint."""
|
|
# Check if essential tools are installed
|
|
essential_tools = ["nmap", "gobuster", "dirb", "nikto"]
|
|
tools_status = {}
|
|
|
|
for tool in essential_tools:
|
|
try:
|
|
result = execute_command(f"which {tool}")
|
|
tools_status[tool] = result["success"]
|
|
except:
|
|
tools_status[tool] = False
|
|
|
|
all_essential_tools_available = all(tools_status.values())
|
|
|
|
return jsonify({
|
|
"status": "healthy",
|
|
"message": "Kali Linux Tools API Server is running",
|
|
"tools_status": tools_status,
|
|
"all_essential_tools_available": all_essential_tools_available
|
|
})
|
|
|
|
def parse_args():
|
|
"""Parse command line arguments."""
|
|
parser = argparse.ArgumentParser(description="Run the Kali Linux API Server")
|
|
parser.add_argument("--debug", action="store_true", help="Enable debug mode")
|
|
parser.add_argument("--port", type=int, default=API_PORT, help=f"Port for the API server (default: {API_PORT})")
|
|
parser.add_argument("--timeout", type=int, default=COMMAND_TIMEOUT,
|
|
help=f"Default command timeout in seconds (default: {COMMAND_TIMEOUT})")
|
|
return parser.parse_args()
|
|
|
|
if __name__ == "__main__":
|
|
args = parse_args()
|
|
|
|
# Set configuration from command line arguments
|
|
if args.debug:
|
|
DEBUG_MODE = True
|
|
os.environ["DEBUG_MODE"] = "1"
|
|
logger.setLevel(logging.DEBUG)
|
|
|
|
if args.port != API_PORT:
|
|
API_PORT = args.port
|
|
|
|
# Update default timeout if specified
|
|
if args.timeout != COMMAND_TIMEOUT:
|
|
COMMAND_TIMEOUT = args.timeout
|
|
logger.info(f"Command timeout set to {COMMAND_TIMEOUT} seconds")
|
|
|
|
# Check for essential dependencies
|
|
try:
|
|
dependencies = ["script", "expect"]
|
|
for dep in dependencies:
|
|
check = subprocess.run(["which", dep], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
|
|
if check.returncode != 0:
|
|
logger.warning(f"'{dep}' command not found, installing required dependencies...")
|
|
install = subprocess.run(["apt-get", "update"], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
|
|
install = subprocess.run(["apt-get", "install", "-y", "expect", "bsdutils"],
|
|
stdout=subprocess.PIPE, stderr=subprocess.PIPE)
|
|
if install.returncode != 0:
|
|
logger.error(f"Failed to install required dependencies. Some features may not work properly.")
|
|
break
|
|
except Exception as e:
|
|
logger.error(f"Error checking dependencies: {str(e)}")
|
|
|
|
logger.info(f"Starting Kali Linux Tools API Server on port {API_PORT}")
|
|
app.run(host="0.0.0.0", port=API_PORT, debug=DEBUG_MODE)
|