diff --git a/EnhancedBurpGpt.py b/EnhancedBurpGpt.py index e6288e4..96b87f6 100644 --- a/EnhancedBurpGpt.py +++ b/EnhancedBurpGpt.py @@ -5,7 +5,7 @@ from burp import IContextMenuFactory from burp import IScannerCheck from burp import ITab from javax.swing import JMenuItem, JPanel, JTextArea, JScrollPane, BoxLayout, JTabbedPane, JDialog, JProgressBar, JLabel -from javax.swing import JButton, JTextField, JOptionPane, JSplitPane, JCheckBox +from javax.swing import JButton, JTextField, JOptionPane, JSplitPane, JCheckBox, JComboBox from java.awt import BorderLayout, Dimension, Color from java.io import PrintWriter from java.util import ArrayList @@ -24,72 +24,882 @@ from javax.swing import DefaultListModel, JList from javax.swing.event import ListSelectionListener from javax.swing import BorderFactory, Box from java.awt import GridBagLayout, GridBagConstraints, Insets +from abc import ABCMeta, abstractmethod + +# ============================================================================ +# Logging System +# ============================================================================ + +class LogLevel: + """Log level constants""" + DEBUG = 0 + INFO = 1 + WARN = 2 + ERROR = 3 + + @staticmethod + def to_string(level): + """Convert log level to string""" + levels = ["DEBUG", "INFO", "WARN", "ERROR"] + if 0 <= level < len(levels): + return levels[level] + return "UNKNOWN" + + @staticmethod + def to_color(level): + """Get color for log level""" + colors = { + LogLevel.DEBUG: Color(128, 128, 128), # Gray + LogLevel.INFO: Color(0, 128, 0), # Green + LogLevel.WARN: Color(255, 165, 0), # Orange + LogLevel.ERROR: Color(255, 0, 0) # Red + } + return colors.get(level, Color.BLACK) + + +class Logger: + """Enhanced logging system with levels and formatting""" + + def __init__(self, log_area, stdout): + self.log_area = log_area + self.stdout = stdout + self.log_level = LogLevel.INFO + self.max_lines = 1000 + self.line_count = 0 + + def set_log_level(self, level): + """Set minimum log level to display""" + self.log_level = level + + def log(self, message, level=LogLevel.INFO): + """Log message with specified level""" + if level < self.log_level: + return + + # Format timestamp + timestamp = java.text.SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS").format(java.util.Date()) + level_str = LogLevel.to_string(level) + formatted = "[{}] [{}] {}".format(timestamp, level_str, message) + + # Output to stdout + self.stdout.println(formatted) + + # Output to UI with auto-truncation + if hasattr(self, 'log_area') and self.log_area: + self._append_to_log_area(formatted) + + def _append_to_log_area(self, message): + """Append message to log area with auto-truncation""" + def append(): + try: + self.log_area.append(message + "\n") + self.line_count += 1 + + # Auto-truncate if exceeds max lines + if self.line_count > self.max_lines: + text = self.log_area.getText() + lines = text.split("\n") + # Keep only the last max_lines + truncated = "\n".join(lines[-self.max_lines:]) + self.log_area.setText(truncated) + self.line_count = self.max_lines + + # Auto-scroll to bottom + self.log_area.setCaretPosition(self.log_area.getDocument().getLength()) + except Exception as e: + self.stdout.println("[-] Error appending to log area: {}".format(str(e))) + + SwingUtilities.invokeLater(append) + + def debug(self, message): + """Log debug message""" + self.log(message, LogLevel.DEBUG) + + def info(self, message): + """Log info message""" + self.log(message, LogLevel.INFO) + + def warn(self, message): + """Log warning message""" + self.log(message, LogLevel.WARN) + + def error(self, message): + """Log error message""" + self.log(message, LogLevel.ERROR) + + def clear(self): + """Clear log area""" + if hasattr(self, 'log_area') and self.log_area: + SwingUtilities.invokeLater(lambda: self.log_area.setText("")) + self.line_count = 0 + +# ============================================================================ +# Content Truncation Utilities +# ============================================================================ + +class ContentTruncator: + """Smart content truncation that preserves structure""" + + @staticmethod + def smart_truncate(content, max_length, content_name="content"): + """ + Intelligently truncate content while preserving structure + + Args: + content: Content to truncate + max_length: Maximum allowed length + content_name: Name for logging purposes + + Returns: + Truncated content with notification + """ + if not content: + return "" + + # Convert to string if needed + if isinstance(content, (bytes, bytearray)): + try: + content = content.decode('utf-8', errors='ignore') + except: + content = str(content) + else: + content = str(content) + + original_length = len(content) + + # No truncation needed + if original_length <= max_length: + return content + + # Try to preserve HTTP structure (headers + partial body) + headers_end = content.find("\r\n\r\n") + if headers_end != -1: + headers = content[:headers_end + 4] + body = content[headers_end + 4:] + + # Calculate space for body + header_size = len(headers) + truncation_marker = "\n\n[!!! CONTENT TRUNCATED !!!]\n[Original size: {} chars, showing first {} chars of body]\n[Configure 'Max Request/Response Length' in settings to show more]\n\n".format( + original_length, + max_length - header_size - 200 + ) + + available_body_space = max_length - header_size - len(truncation_marker) + + if available_body_space > 100: + # Keep beginning and end of body + keep_size = available_body_space // 2 + truncated_body = body[:keep_size] + "\n\n... [middle content omitted] ...\n\n" + body[-keep_size:] + + return headers + truncation_marker + truncated_body + else: + # Not enough space, just truncate body start + return headers + truncation_marker + body[:available_body_space] + + # No HTTP structure detected, simple truncation with markers + keep_size = (max_length - 200) // 2 + truncation_marker = "\n\n[!!! CONTENT TRUNCATED !!!]\n[Original size: {} chars, showing first and last {} chars]\n[Configure 'Max Request/Response Length' in settings to show more]\n\n".format( + original_length, + keep_size + ) + + return content[:keep_size] + truncation_marker + content[-keep_size:] + + @staticmethod + def truncate_with_marker(content, max_length): + """Simple truncation with a clear marker""" + if not content or len(content) <= max_length: + return content + + original_length = len(content) + marker = "\n\n[!!! TRUNCATED - Original length: {} chars !!!]\n".format(original_length) + usable_length = max_length - len(marker) + + return content[:usable_length] + marker + +# ============================================================================ +# SSL Context Handler +# ============================================================================ -# 添加自定义SSL上下文处理器 class TrustAllSSLContext: def __init__(self): pass @staticmethod def create(): - # 创建一个不验证证书的SSL上下文 trust_all_context = ssl.create_default_context() trust_all_context.check_hostname = False trust_all_context.verify_mode = ssl.CERT_NONE return trust_all_context +# ============================================================================ +# Multi-Provider API Adapter Architecture +# ============================================================================ + +class APIProvider: + """Abstract base class for API providers""" + __metaclass__ = ABCMeta + + def __init__(self, api_key, api_url, model, timeout=60): + self.api_key = api_key + self.api_url = api_url + self.model = model + self.timeout = timeout + self.disable_ssl = False + self.log_callback = None + # Proxy settings + self.proxy_enabled = False + self.proxy_type = "HTTP" + self.proxy_host = "" + self.proxy_port = "" + self.proxy_username = "" + self.proxy_password = "" + + def set_proxy(self, proxy_type, host, port, username="", password=""): + """Configure proxy settings""" + self.proxy_enabled = True + self.proxy_type = proxy_type.upper() + self.proxy_host = host + self.proxy_port = port + self.proxy_username = username + self.proxy_password = password + self.log("[+] Proxy configured: {}://{}:{}".format(proxy_type, host, port)) + + # CRITICAL: Set Java proxy properties immediately for SOCKS5 + if self.proxy_type == "SOCKS5": + self._setup_java_proxy() + + def _setup_java_proxy(self): + """Setup Java system properties for SOCKS5 proxy""" + try: + System.setProperty("socksProxyHost", self.proxy_host) + System.setProperty("socksProxyPort", str(self.proxy_port)) + + if self.proxy_username and self.proxy_password: + System.setProperty("java.net.socks.username", self.proxy_username) + System.setProperty("java.net.socks.password", self.proxy_password) + + self.log("[+] Java SOCKS5 proxy properties set: {}:{}".format(self.proxy_host, self.proxy_port)) + except Exception as e: + self.log("[-] Error setting Java proxy properties: {}".format(str(e))) + + def _make_request_with_socks_proxy(self, url, data=None, headers=None): + """Make HTTP request using Java's URLConnection with SOCKS5 proxy""" + from java.net import URL, Proxy, SocketAddress, InetSocketAddress + from java.net import Proxy as JavaProxy + from java.io import InputStreamReader, BufferedReader + + self.log("[+] Using Java URLConnection with SOCKS5 proxy: {}:{}".format(self.proxy_host, self.proxy_port)) + + try: + # Create SOCKS proxy + sock_addr = InetSocketAddress(self.proxy_host, int(self.proxy_port)) + proxy = JavaProxy(JavaProxy.Type.SOCKS, sock_addr) + + # Create URL connection + java_url = URL(url) + conn = java_url.openConnection(proxy) + + # Set headers + if headers: + for key, value in headers.items(): + conn.setRequestProperty(key, value) + + # Set timeout + conn.setConnectTimeout(int(self.timeout * 1000)) + conn.setReadTimeout(int(self.timeout * 1000)) + + # Send data if POST + if data: + conn.setDoOutput(True) + conn.setRequestMethod("POST") + out = conn.getOutputStream() + out.write(data) + out.close() + + # Get response + response_code = conn.getResponseCode() + if response_code >= 400: + self.log("[-] HTTP Error {}: {}".format(response_code, conn.getResponseMessage())) + raise Exception("HTTP Error {}: {}".format(response_code, conn.getResponseMessage())) + + # Read response body + reader = BufferedReader(InputStreamReader(conn.getInputStream())) + response_text = "" + line = reader.readLine() + while line is not None: + response_text += line + line = reader.readLine() + reader.close() + + return response_text + + except Exception as e: + self.log("[-] SOCKS5 proxy error: {}".format(str(e))) + raise Exception("SOCKS5 proxy error: {}".format(str(e))) + + def _create_proxy_opener(self): + """Create urllib2 opener with proxy support""" + if not self.proxy_enabled: + return None + + # For HTTP/HTTPS, use urllib2.ProxyHandler + proxy_protocol = self.proxy_type.lower() + + # Build proxy URL + if self.proxy_username and self.proxy_password: + proxy_url = "{}://{}:{}@{}:{}".format( + proxy_protocol, + self.proxy_username, + self.proxy_password, + self.proxy_host, + self.proxy_port + ) + else: + proxy_url = "{}://{}:{}".format( + proxy_protocol, + self.proxy_host, + self.proxy_port + ) + + self.log("[+] Using {} proxy: {}:{}".format(proxy_protocol.upper(), self.proxy_host, self.proxy_port)) + + # Create proxy handler + proxy_handler = urllib2.ProxyHandler({ + 'http': proxy_url, + 'https': proxy_url + }) + + if self.disable_ssl: + ssl_context = TrustAllSSLContext.create() + https_handler = urllib2.HTTPSHandler(context=ssl_context) + return urllib2.build_opener(proxy_handler, https_handler) + else: + return urllib2.build_opener(proxy_handler) + + def set_log_callback(self, callback): + """Set logging callback function""" + self.log_callback = callback + + def log(self, message): + """Log message if callback is set""" + if self.log_callback: + self.log_callback(message) + + @abstractmethod + def build_request_data(self, prompt, max_tokens): + """Build request data - format differs per provider""" + pass + + @abstractmethod + def build_headers(self): + """Build request headers - authentication differs per provider""" + pass + + @abstractmethod + def parse_response(self, response_data): + """Parse response data - format differs per provider""" + pass + + @abstractmethod + def get_models_url(self): + """Get models list URL - differs per provider""" + pass + + def send_request(self, prompt, max_tokens): + """Common request sending flow""" + try: + data = self.build_request_data(prompt, max_tokens) + json_data = json.dumps(data).encode('utf-8') + + headers = self.build_headers() + + # CRITICAL: Use Java URLConnection for SOCKS5, urllib2 for HTTP/HTTPS + if self.proxy_enabled and self.proxy_type == "SOCKS5": + response_text = self._make_request_with_socks_proxy(self.api_url, json_data, headers) + result = json.loads(response_text) + return self.parse_response(result) + + request = urllib2.Request(url=self.api_url, data=json_data, headers=headers) + + # Use proxy if configured + opener = self._create_proxy_opener() + if opener: + response = opener.open(request, timeout=self.timeout) + elif self.disable_ssl: + self.log("[*] SSL certificate validation is disabled") + ssl_context = TrustAllSSLContext.create() + response = urllib2.urlopen(request, context=ssl_context, timeout=self.timeout) + else: + response = urllib2.urlopen(request, timeout=self.timeout) + + response_data = response.read() + response_text = str(response_data) + result = json.loads(response_text) + + return self.parse_response(result) + + except Exception as e: + raise Exception("Error calling {} API: {}".format(self.__class__.__name__, str(e))) + + def fetch_models(self): + """Fetch available models list""" + try: + models_url = self.get_models_url() + if not models_url: + return self.get_default_models() + + # CRITICAL: Use Java URLConnection for SOCKS5, urllib2 for HTTP/HTTPS + if self.proxy_enabled and self.proxy_type == "SOCKS5": + headers = self.build_headers() + response_text = self._make_request_with_socks_proxy(models_url, None, headers) + result = json.loads(response_text) + return self.parse_models_response(result) + + # 使用完整的认证头来获取模型列表 + headers = self.build_headers() + request = urllib2.Request(url=models_url, headers=headers) + + # Use proxy if configured + opener = self._create_proxy_opener() + if opener: + response = opener.open(request, timeout=self.timeout) + elif self.disable_ssl: + ssl_context = TrustAllSSLContext.create() + response = urllib2.urlopen(request, context=ssl_context, timeout=self.timeout) + else: + response = urllib2.urlopen(request, timeout=self.timeout) + + response_data = response.read() + response_text = str(response_data) + result = json.loads(response_text) + + return self.parse_models_response(result) + + except Exception as e: + self.log("[-] Error fetching models: {}".format(str(e))) + return self.get_default_models() + + @abstractmethod + def parse_models_response(self, response_data): + """Parse models list response""" + pass + + @abstractmethod + def get_default_models(self): + """Get default models list if fetch fails""" + pass + + +class OpenAIProvider(APIProvider): + """OpenAI-compatible API provider (OpenAI, DeepSeek, Ollama, etc.)""" + + def build_request_data(self, prompt, max_tokens): + return { + "model": self.model, + "messages": [{"role": "user", "content": prompt}], + "max_tokens": max_tokens + } + + def build_headers(self): + host = self.api_url.split("://")[-1].split("/")[0] + return { + "Host": host, + "Content-Type": "application/json", + "Authorization": "Bearer " + self.api_key, + "Accept": "*/*", + "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36", + "Accept-Language": "zh-CN" + } + + def parse_response(self, response_data): + return { + "content": response_data["choices"][0]["message"]["content"], + "usage": response_data.get("usage", { + "prompt_tokens": 0, + "completion_tokens": 0, + "total_tokens": 0 + }) + } + + def get_models_url(self): + return self.api_url.replace("/v1/chat/completions", "/v1/models") + + def parse_models_response(self, response_data): + if 'data' in response_data: + return [model.get('id') for model in response_data['data'] if model.get('id')] + return self.get_default_models() + + def get_default_models(self): + return ["gpt-4o", "gpt-4-turbo", "gpt-3.5-turbo"] + + +class GeminiProvider(APIProvider): + """Google Gemini API provider""" + + def __init__(self, api_key, model="gemini-pro", timeout=60): + api_url = "https://generativelanguage.googleapis.com/v1beta/models/{}:generateContent".format(model) + super(GeminiProvider, self).__init__(api_key, api_url, model, timeout) + + def build_request_data(self, prompt, max_tokens): + return { + "contents": [{"parts": [{"text": prompt}]}], + "generationConfig": { + "maxOutputTokens": max_tokens, + "temperature": 0.7 + } + } + + def build_headers(self): + # Fixed: Removed redundant x-goog-api-key header + # Gemini API uses URL parameter authentication (?key=...), not header authentication + return { + "Content-Type": "application/json", + "Accept": "*/*", + "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36" + } + + def parse_response(self, response_data): + try: + content = response_data["candidates"][0]["content"]["parts"][0]["text"] + usage_metadata = response_data.get("usageMetadata", {}) + + return { + "content": content, + "usage": { + "prompt_tokens": usage_metadata.get("promptTokenCount", 0), + "completion_tokens": usage_metadata.get("candidatesTokenCount", 0), + "total_tokens": usage_metadata.get("totalTokenCount", 0) + } + } + except (KeyError, IndexError) as e: + raise Exception("Failed to parse Gemini response: {}".format(str(e))) + + def send_request(self, prompt, max_tokens): + """Override to use URL parameter authentication""" + try: + url_with_key = "https://generativelanguage.googleapis.com/v1beta/models/{}:generateContent?key={}".format( + self.model, self.api_key) + + data = self.build_request_data(prompt, max_tokens) + json_data = json.dumps(data).encode('utf-8') + + headers = { + "Content-Type": "application/json", + "Accept": "*/*", + "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36" + } + + # CRITICAL: Use Java URLConnection for SOCKS5 + if self.proxy_enabled and self.proxy_type == "SOCKS5": + response_text = self._make_request_with_socks_proxy(url_with_key, json_data, headers) + result = json.loads(response_text) + return self.parse_response(result) + + request = urllib2.Request(url=url_with_key, data=json_data, headers=headers) + + # Use proxy if configured + opener = self._create_proxy_opener() + if opener: + response = opener.open(request, timeout=self.timeout) + elif self.disable_ssl: + ssl_context = TrustAllSSLContext.create() + response = urllib2.urlopen(request, context=ssl_context, timeout=self.timeout) + else: + response = urllib2.urlopen(request, timeout=self.timeout) + + response_data = response.read() + response_text = str(response_data) + result = json.loads(response_text) + + return self.parse_response(result) + + except Exception as e: + raise Exception("Error calling Gemini API: {}".format(str(e))) + + def get_models_url(self): + # Fixed: Added pageSize parameter to support full model list retrieval + return "https://generativelanguage.googleapis.com/v1beta/models?key={}&pageSize=100".format(self.api_key) + + def fetch_models(self): + """Override fetch_models for Gemini to handle API call differently""" + try: + all_models = [] + page_token = None + + while True: + models_url = self.get_models_url() + + # Fixed: Added pagination token support + if page_token: + models_url += "&pageToken={}".format(page_token) + + self.log("[+] Gemini models URL: {}".format(models_url[:80] + "...")) + + # CRITICAL: Use Java URLConnection for SOCKS5 + if self.proxy_enabled and self.proxy_type == "SOCKS5": + response_text = self._make_request_with_socks_proxy(models_url, None, {}) + else: + # Fixed: Explicitly set request method to GET + # Official API requires: GET https://generativelanguage.googleapis.com/v1beta/models + request = urllib2.Request(url=models_url) + request.get_method = lambda: 'GET' # Critical fix: Set request method to GET + + # Use proxy if configured + opener = self._create_proxy_opener() + if opener: + response = opener.open(request, timeout=self.timeout) + elif self.disable_ssl: + self.log("[*] SSL verification disabled for Gemini") + ssl_context = TrustAllSSLContext.create() + response = urllib2.urlopen(request, context=ssl_context, timeout=self.timeout) + else: + response = urllib2.urlopen(request, timeout=self.timeout) + + response_data = response.read() + response_text = str(response_data) + self.log("[+] Gemini API 响应长度: {} 字节".format(len(response_text))) + + result = json.loads(response_text) + models = self.parse_models_response(result) + all_models.extend(models) + + # Check if there's a next page + page_token = result.get('nextPageToken') + if not page_token: + break + + return all_models if all_models else self.get_default_models() + + except urllib2.HTTPError as e: + self.log("[-] HTTP 错误 {}: {}".format(e.code, str(e))) + self.log("[*] 使用默认模型列表") + return self.get_default_models() + except urllib2.URLError as e: + self.log("[-] Gemini API 网络错误: {}".format(str(e))) + self.log("[*] 使用默认模型列表") + return self.get_default_models() + except Exception as e: + self.log("[-] Gemini获取模型失败: {}".format(str(e))) + self.log("[*] 使用默认模型列表") + return self.get_default_models() + + def parse_models_response(self, response_data): + try: + if 'models' in response_data: + models = [] + all_models_count = len(response_data['models']) + self.log("[+] Gemini API 返回了 {} 个模型(总数)".format(all_models_count)) + + for model in response_data['models']: + model_name = model.get('name', '') + if model_name.startswith('models/'): + model_name = model_name[7:] + + # Get supported methods + supported_methods = model.get('supportedGenerationMethods', []) + + # Fixed: Check if model is deprecated and skip it + deprecation_info = model.get('deprecationInfo', {}) + if deprecation_info: + self.log("[*] 跳过已弃用模型: {}".format(model_name)) + continue + + # Only add models that support generateContent + if model_name and 'generateContent' in supported_methods: + models.append(model_name) + self.log("[+] 发现可用模型: {}".format(model_name)) + + self.log("[+] 筛选后可用的 Gemini 模型数量: {}".format(len(models))) + return models + else: + self.log("[-] 响应中没有 'models' 字段") + return self.get_default_models() + except Exception as e: + self.log("[-] 解析Gemini模型列表出错: {}".format(str(e))) + return self.get_default_models() + + def get_default_models(self): + return ["gemini-pro", "gemini-pro-vision", "gemini-1.5-pro", "gemini-1.5-flash"] + + +class ClaudeProvider(APIProvider): + """Anthropic Claude API provider""" + + def __init__(self, api_key, model="claude-3-sonnet-20240229", api_url="https://api.anthropic.com/v1/messages", timeout=60): + super(ClaudeProvider, self).__init__(api_key, api_url, model, timeout) + self.anthropic_version = "2023-06-01" + + def build_request_data(self, prompt, max_tokens): + return { + "model": self.model, + "messages": [{"role": "user", "content": prompt}], + "max_tokens": max_tokens + } + + def build_headers(self): + return { + "Content-Type": "application/json", + "x-api-key": self.api_key, + "anthropic-version": self.anthropic_version, + "Accept": "*/*", + "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36" + } + + def parse_response(self, response_data): + try: + content = response_data["content"][0]["text"] + usage = response_data.get("usage", {}) + + return { + "content": content, + "usage": { + "prompt_tokens": usage.get("input_tokens", 0), + "completion_tokens": usage.get("output_tokens", 0), + "total_tokens": usage.get("input_tokens", 0) + usage.get("output_tokens", 0) + } + } + except (KeyError, IndexError) as e: + raise Exception("Failed to parse Claude response: {}".format(str(e))) + + def get_models_url(self): + return None + + def parse_models_response(self, response_data): + return self.get_default_models() + + def get_default_models(self): + return [ + "claude-3-opus-20240229", + "claude-3-sonnet-20240229", + "claude-3-haiku-20240307", + "claude-2.1" + ] + + +class ProviderFactory: + """Factory for creating API provider instances""" + + PROVIDER_TYPES = { + "OpenAI": OpenAIProvider, + "Gemini": GeminiProvider, + "Claude": ClaudeProvider, + "DeepSeek": OpenAIProvider, + "Ollama": OpenAIProvider, + "Custom": OpenAIProvider + } + + @staticmethod + def create_provider(provider_type, api_key, api_url, model, timeout=60): + """Create API provider instance""" + provider_class = ProviderFactory.PROVIDER_TYPES.get(provider_type) + + if not provider_class: + raise ValueError("Unsupported provider type: {}".format(provider_type)) + + if provider_type == "Gemini": + return GeminiProvider(api_key, model, timeout) + elif provider_type == "Claude": + return ClaudeProvider(api_key, model, api_url, timeout) + else: + return provider_class(api_key, api_url, model, timeout) + + @staticmethod + def get_provider_types(): + """Get all supported provider types""" + return list(ProviderFactory.PROVIDER_TYPES.keys()) + + @staticmethod + def get_default_config(provider_type): + """Get default configuration for provider""" + configs = { + "OpenAI": { + "api_url": "https://api.openai.com/v1/chat/completions", + "model": "gpt-4o", + "requires_url": True + }, + "Gemini": { + "api_url": "", + "model": "gemini-pro", + "requires_url": False + }, + "Claude": { + "api_url": "https://api.anthropic.com/v1/messages", + "model": "claude-3-sonnet-20240229", + "requires_url": True + }, + "DeepSeek": { + "api_url": "https://api.deepseek.com/v1/chat/completions", + "model": "deepseek-chat", + "requires_url": True + }, + "Ollama": { + "api_url": "http://localhost:11434/v1/chat/completions", + "model": "llama2", + "requires_url": True + }, + "Custom": { + "api_url": "https://api.example.com/v1/chat/completions", + "model": "gpt-3.5-turbo", + "requires_url": True + } + } + return configs.get(provider_type, configs["Custom"]) + +# ============================================================================ +# Burp Extension +# ============================================================================ + class BurpExtender(IBurpExtender, IContextMenuFactory, IScannerCheck, ITab): def registerExtenderCallbacks(self, callbacks): self._callbacks = callbacks self._helpers = callbacks.getHelpers() self.stdout = PrintWriter(callbacks.getStdout(), True) + # Default configuration + self.provider_type = "OpenAI" self.api_key = "Please enter your API key" self.api_url = "https://api.openai.com/v1/chat/completions" - self.model = "Please select or enter the model name to use" + self.model = "gpt-4o" self.max_tokens = 3072 - self.timeout_seconds = 60 # 设置超时时间 - self.disable_ssl_verification = False # 默认启用SSL验证 - - # 添加默认长度限制 + self.timeout_seconds = 60 + self.disable_ssl_verification = False self.max_request_length = 1000 self.max_response_length = 2000 - # 创建线程池 + # Proxy configuration + self.enable_proxy = False + self.proxy_type = "HTTP" + self.proxy_host = "127.0.0.1" + self.proxy_port = "10809" + self.proxy_username = "" + self.proxy_password = "" + self.executor = Executors.newCachedThreadPool() - # 创建主标签页面板 self.tab = JTabbedPane() - # 创建日志面板 self.log_panel = JPanel(BorderLayout()) self.log_area = JTextArea() self.log_area.setEditable(False) log_scroll = JScrollPane(self.log_area) self.log_panel.add(log_scroll, BorderLayout.CENTER) - # 创建配置面板 config_panel = self.create_config_panel() - - # 创建结果面板 results_panel = self.create_results_panel() - # 添加标签页 self.tab.addTab("Configuration", config_panel) self.tab.addTab("Analysis Results", results_panel) - self.tab.addTab("Logs", self.log_panel) # 添加日志标签页 + self.tab.addTab("Logs", self.log_panel) - # 注册扩展 callbacks.setExtensionName("Enhanced BurpGPT") callbacks.registerContextMenuFactory(self) callbacks.registerScannerCheck(self) callbacks.addSuiteTab(self) - # 需要添加debug变量初始化 - self.debug = True # 或从配置中读取 + self.debug = True - # 设置默认的prompt模板,只使用ASCII字符 - default_prompt = """Answer in Chinese.Please analyze this HTTP request and response: + default_prompt = """Answer in Chinese. Please analyze this HTTP request and response: Request: {REQUEST} @@ -101,7 +911,6 @@ Please identify any security issues and suggest fixes.""" self.prompt_area.setText(default_prompt) - # 配置SSL System.setProperty("jsse.enableSNIExtension", "false") System.setProperty("https.protocols", "TLSv1.2") System.setProperty("javax.net.ssl.trustStore", "") @@ -111,17 +920,40 @@ Please identify any security issues and suggest fixes.""" config_panel = JPanel(GridBagLayout()) constraints = GridBagConstraints() constraints.fill = GridBagConstraints.HORIZONTAL - constraints.insets = Insets(5, 10, 5, 10) # 上、左、下、右的边距 + constraints.insets = Insets(5, 10, 5, 10) - # API设置面板 api_panel = JPanel(GridBagLayout()) api_panel.setBorder(BorderFactory.createTitledBorder("API Settings")) - # API URL配置 + # Provider Type Selection constraints.gridx = 0 constraints.gridy = 0 constraints.gridwidth = 1 constraints.weightx = 0.2 + api_panel.add(JLabel("Provider Type:"), constraints) + + constraints.gridx = 1 + constraints.weightx = 0.8 + self.provider_combo = JComboBox(ProviderFactory.get_provider_types()) + self.provider_combo.setSelectedItem(self.provider_type) + self.provider_combo.setToolTipText("Select API provider type") + + def on_provider_change(event): + selected = str(self.provider_combo.getSelectedItem()) + config = ProviderFactory.get_default_config(selected) + self.url_field.setText(config["api_url"]) + self.url_field.setEnabled(config["requires_url"]) + self.model_combo.removeAllItems() + self.model_combo.addItem(config["model"]) + self.log("[*] Provider changed to: {}".format(selected)) + + self.provider_combo.addActionListener(on_provider_change) + api_panel.add(self.provider_combo, constraints) + + # API URL + constraints.gridx = 0 + constraints.gridy = 1 + constraints.weightx = 0.2 api_panel.add(JLabel("API URL:"), constraints) constraints.gridx = 1 @@ -130,9 +962,9 @@ Please identify any security issues and suggest fixes.""" self.url_field.setToolTipText("Enter the API endpoint URL") api_panel.add(self.url_field, constraints) - # API Key配置 + # API Key constraints.gridx = 0 - constraints.gridy = 1 + constraints.gridy = 2 constraints.weightx = 0.2 api_panel.add(JLabel("API Key:"), constraints) @@ -142,23 +974,20 @@ Please identify any security issues and suggest fixes.""" self.key_field.setToolTipText("Enter your API key") api_panel.add(self.key_field, constraints) - # Model配置区域改造 + # Model constraints.gridx = 0 - constraints.gridy = 2 + constraints.gridy = 3 constraints.weightx = 0.2 api_panel.add(JLabel("Model:"), constraints) - # 创建模型选择的组合框 - from javax.swing import JComboBox self.model_combo = JComboBox() - self.model_combo.setEditable(True) # 允许手动输入 + self.model_combo.setEditable(True) self.model_combo.setToolTipText("Select or enter the model name to use") + self.model_combo.addItem(self.model) - # 创建获取模型列表的按钮 fetch_models_button = JButton("Fetch Models") fetch_models_button.setToolTipText("Fetch available models from API") - # 创建模型选择面板 model_panel = JPanel(BorderLayout()) model_panel.add(self.model_combo, BorderLayout.CENTER) model_panel.add(fetch_models_button, BorderLayout.EAST) @@ -167,23 +996,94 @@ Please identify any security issues and suggest fixes.""" constraints.weightx = 0.8 api_panel.add(model_panel, constraints) - # 添加SSL验证复选框 + # SSL Options constraints.gridx = 0 - constraints.gridy = 3 + constraints.gridy = 4 constraints.weightx = 0.2 api_panel.add(JLabel("SSL Options:"), constraints) constraints.gridx = 1 constraints.weightx = 0.8 self.disable_ssl_check = JCheckBox("Disable SSL Certificate Validation", self.disable_ssl_verification) - self.disable_ssl_check.setToolTipText("Enable this if you encounter SSL certificate issues (Not recommended for production use)") + self.disable_ssl_check.setToolTipText("Enable this if you encounter SSL certificate issues") api_panel.add(self.disable_ssl_check, constraints) - # 限制设置面板 + # Proxy panel + from javax.swing import JPasswordField + proxy_panel = JPanel(GridBagLayout()) + proxy_panel.setBorder(BorderFactory.createTitledBorder("Proxy Settings")) + + constraints.gridx = 0 + constraints.gridy = 0 + constraints.weightx = 0.2 + proxy_panel.add(JLabel("Enable Proxy:"), constraints) + + constraints.gridx = 1 + constraints.weightx = 0.8 + self.enable_proxy_check = JCheckBox("Use proxy for API requests", self.enable_proxy) + self.enable_proxy_check.setToolTipText("Enable if you need to use a proxy") + proxy_panel.add(self.enable_proxy_check, constraints) + + constraints.gridx = 0 + constraints.gridy = 1 + constraints.weightx = 0.2 + proxy_panel.add(JLabel("Proxy Type:"), constraints) + + constraints.gridx = 1 + constraints.weightx = 0.8 + self.proxy_type_combo = JComboBox(["HTTP", "HTTPS", "SOCKS5"]) + self.proxy_type_combo.setSelectedItem(self.proxy_type) + self.proxy_type_combo.setToolTipText("Select proxy type ") + proxy_panel.add(self.proxy_type_combo, constraints) + + constraints.gridx = 0 + constraints.gridy = 2 + constraints.weightx = 0.2 + proxy_panel.add(JLabel("Proxy Host:"), constraints) + + constraints.gridx = 1 + constraints.weightx = 0.8 + self.proxy_host_field = JTextField(self.proxy_host, 20) + self.proxy_host_field.setToolTipText("Proxy server address (e.g., 127.0.0.1)") + proxy_panel.add(self.proxy_host_field, constraints) + + constraints.gridx = 0 + constraints.gridy = 3 + constraints.weightx = 0.2 + proxy_panel.add(JLabel("Proxy Port:"), constraints) + + constraints.gridx = 1 + constraints.weightx = 0.8 + self.proxy_port_field = JTextField(self.proxy_port, 10) + self.proxy_port_field.setToolTipText("Proxy port ") + proxy_panel.add(self.proxy_port_field, constraints) + + constraints.gridx = 0 + constraints.gridy = 4 + constraints.weightx = 0.2 + proxy_panel.add(JLabel("Username (optional):"), constraints) + + constraints.gridx = 1 + constraints.weightx = 0.8 + self.proxy_username_field = JTextField(self.proxy_username, 20) + self.proxy_username_field.setToolTipText("Leave empty if no authentication required") + proxy_panel.add(self.proxy_username_field, constraints) + + constraints.gridx = 0 + constraints.gridy = 5 + constraints.weightx = 0.2 + proxy_panel.add(JLabel("Password (optional):"), constraints) + + constraints.gridx = 1 + constraints.weightx = 0.8 + self.proxy_password_field = JPasswordField(self.proxy_password, 20) + self.proxy_password_field.setToolTipText("Leave empty if no authentication required") + proxy_panel.add(self.proxy_password_field, constraints) + + # Limits panel limits_panel = JPanel(GridBagLayout()) limits_panel.setBorder(BorderFactory.createTitledBorder("Limits & Timeouts")) - # Timeout配置 constraints.gridx = 0 constraints.gridy = 0 constraints.weightx = 0.2 @@ -195,7 +1095,6 @@ Please identify any security issues and suggest fixes.""" self.timeout_field.setToolTipText("Maximum time to wait for API response") limits_panel.add(self.timeout_field, constraints) - # 请求长度限制 constraints.gridx = 0 constraints.gridy = 1 constraints.weightx = 0.2 @@ -207,7 +1106,6 @@ Please identify any security issues and suggest fixes.""" self.req_length_field.setToolTipText("Maximum length of request content to analyze") limits_panel.add(self.req_length_field, constraints) - # 响应长度限制 constraints.gridx = 0 constraints.gridy = 2 constraints.weightx = 0.2 @@ -219,7 +1117,7 @@ Please identify any security issues and suggest fixes.""" self.resp_length_field.setToolTipText("Maximum length of response content to analyze") limits_panel.add(self.resp_length_field, constraints) - # Prompt模板面板 + # Prompt panel prompt_panel = JPanel(GridBagLayout()) prompt_panel.setBorder(BorderFactory.createTitledBorder("Prompt Template")) @@ -230,84 +1128,96 @@ Please identify any security issues and suggest fixes.""" self.prompt_area = JTextArea(5, 40) self.prompt_area.setLineWrap(True) self.prompt_area.setWrapStyleWord(True) - self.prompt_area.setToolTipText("Template for analysis prompt. Use {URL}, {METHOD}, {REQUEST}, {RESPONSE} as placeholders") + self.prompt_area.setToolTipText("Template for analysis prompt") prompt_scroll = JScrollPane(self.prompt_area) prompt_panel.add(prompt_scroll, constraints) - # 按钮面板 + # Button panel button_panel = JPanel() button_panel.setBorder(BorderFactory.createEmptyBorder(10, 0, 10, 0)) def fetch_models(event): try: - # 验证必要的配置 - api_url = self.url_field.getText() + provider_type = str(self.provider_combo.getSelectedItem()) api_key = self.key_field.getText() + api_url = self.url_field.getText() + model = str(self.model_combo.getSelectedItem()) if self.model_combo.getSelectedItem() else "gpt-4o" - if not api_url or not api_key: - JOptionPane.showMessageDialog(None, "Please enter API URL and API Key first!") + if not api_key: + JOptionPane.showMessageDialog(None, "Please enter API Key first!") return - # 构造models API URL - models_url = api_url.replace("/v1/chat/completions", "/v1/models") + self.log("[+] Fetching models from {} provider".format(provider_type)) - # 从配置中获取Host - host = models_url.split("://")[-1].split("/")[0] + # Create progress dialog + progress_dialog = JDialog() + progress_dialog.setTitle("Fetching Models...") + progress_dialog.setSize(300, 100) + progress_dialog.setLocationRelativeTo(None) + progress_dialog.setLayout(BorderLayout()) - # 请求头 - headers = { - "Host": host, - "Authorization": "Bearer " + api_key, - "Accept": "*/*", - "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/108.0.5359.215", - "Accept-Language": "zh-CN" - } + progress_bar = JProgressBar() + progress_bar.setIndeterminate(True) + label = JLabel("Fetching available models...", SwingUtilities.CENTER) + progress_dialog.add(label, BorderLayout.NORTH) + progress_dialog.add(progress_bar, BorderLayout.CENTER) - self.log("[+] Fetching models from {}".format(models_url)) - - # 创建请求 - request = urllib2.Request( - url=models_url, - headers=headers - ) - - # 获取SSL验证设置 - disable_ssl = self.disable_ssl_check.isSelected() - - # 发送请求 - if disable_ssl: - self.log("[*] SSL certificate validation is disabled for models fetch") - # 创建自定义的SSL上下文 - ssl_context = TrustAllSSLContext.create() - # 使用自定义SSL上下文发送请求 - response = urllib2.urlopen(request, context=ssl_context, timeout=self.timeout_seconds) - else: - # 标准请求方式,使用默认SSL验证 - response = urllib2.urlopen(request, timeout=self.timeout_seconds) - - response_data = response.read() - response_text = str(response_data) - - # 解析JSON响应 - models_data = json.loads(response_text) - - if 'data' in models_data: - # 清除现有的模型列表 - self.model_combo.removeAllItems() + # Create async task for fetching models + class FetchModelsTask(Runnable): + def __init__(self, outer): + self.outer = outer - # 添加新的模型 - for model in models_data['data']: - model_id = model.get('id') - if model_id: - self.model_combo.addItem(model_id) - - self.log("[+] Successfully fetched {} models".format(len(models_data['data']))) - JOptionPane.showMessageDialog(None, "Successfully fetched models!") - else: - raise Exception("Invalid response format") + def run(self): + try: + SwingUtilities.invokeLater(lambda: progress_dialog.setVisible(True)) + + provider = ProviderFactory.create_provider( + provider_type, api_key, api_url, model, self.outer.timeout_seconds + ) + provider.set_log_callback(self.outer.log) + provider.disable_ssl = self.outer.disable_ssl_check.isSelected() + + # Configure proxy if enabled + if self.outer.enable_proxy_check.isSelected(): + provider.set_proxy( + str(self.outer.proxy_type_combo.getSelectedItem()), + self.outer.proxy_host_field.getText(), + self.outer.proxy_port_field.getText(), + self.outer.proxy_username_field.getText(), + String(self.outer.proxy_password_field.getPassword()) + ) + + models = provider.fetch_models() + + def update_models_ui(): + try: + progress_dialog.dispose() + + if models: + self.outer.model_combo.removeAllItems() + for model_name in models: + self.outer.model_combo.addItem(model_name) + self.outer.log("[+] Successfully fetched {} models".format(len(models))) + JOptionPane.showMessageDialog(None, "Successfully fetched {} models!".format(len(models))) + else: + raise Exception("No models returned") + except Exception as e: + error_msg = "Error updating models: {}".format(str(e)) + self.outer.log("[-] " + error_msg) + JOptionPane.showMessageDialog(None, error_msg) + + SwingUtilities.invokeLater(update_models_ui) + + except Exception as e: + error_msg = "Error fetching models: {}".format(str(e)) + self.outer.log("[-] " + error_msg) + SwingUtilities.invokeLater(lambda: progress_dialog.dispose()) + SwingUtilities.invokeLater(lambda: JOptionPane.showMessageDialog(None, error_msg)) + + Thread(FetchModelsTask(self)).start() except Exception as e: - error_msg = "Error fetching models: {}".format(str(e)) + error_msg = "Error initializing fetch: {}".format(str(e)) self.log("[-] " + error_msg) JOptionPane.showMessageDialog(None, error_msg) @@ -315,29 +1225,65 @@ Please identify any security issues and suggest fixes.""" def save_config(event): try: - # 更新配置值 + self.provider_type = str(self.provider_combo.getSelectedItem()) self.api_url = self.url_field.getText() self.api_key = self.key_field.getText() - self.model = str(self.model_combo.getSelectedItem()) # 从组合框获取选中的模型 + self.model = str(self.model_combo.getSelectedItem()) self.timeout_seconds = int(self.timeout_field.getText()) self.max_request_length = int(self.req_length_field.getText()) self.max_response_length = int(self.resp_length_field.getText()) - self.disable_ssl_verification = self.disable_ssl_check.isSelected() # 获取SSL验证设置 + self.disable_ssl_verification = self.disable_ssl_check.isSelected() - # 验证配置 - if not self.api_url or not self.api_key or not self.model: - JOptionPane.showMessageDialog(None, "API URL, API Key and Model cannot be empty!") + # Save proxy settings + self.enable_proxy = self.enable_proxy_check.isSelected() + self.proxy_type = str(self.proxy_type_combo.getSelectedItem()) + self.proxy_host = self.proxy_host_field.getText() + self.proxy_port = self.proxy_port_field.getText() + self.proxy_username = self.proxy_username_field.getText() + self.proxy_password = String(self.proxy_password_field.getPassword()) + + # Validate required fields + if not self.api_key or not self.model: + JOptionPane.showMessageDialog(None, "API Key and Model cannot be empty!") return + + # Warn if using Custom provider with example URL + if self.provider_type == "Custom": + if not self.api_url or self.api_url.strip() == "": + JOptionPane.showMessageDialog(None, "Warning: API URL cannot be empty for Custom provider!") + return + + if "api.example.com" in self.api_url: + result = JOptionPane.showConfirmDialog(None, + "You are using the example API URL.\n\n" + + "Please replace it with your actual API endpoint.\n\n" + + "Do you want to save anyway?", + "Example URL Detected", + JOptionPane.YES_NO_OPTION, + JOptionPane.WARNING_MESSAGE) + if result != JOptionPane.YES_OPTION: + return + + if self.model == "gpt-3.5-turbo": + result = JOptionPane.showConfirmDialog(None, + "You are using the default model name 'gpt-3.5-turbo'.\n\n" + + "Make sure this model exists in your custom API.\n\n" + + "Do you want to save anyway?", + "Default Model Name", + JOptionPane.YES_NO_OPTION, + JOptionPane.WARNING_MESSAGE) + if result != JOptionPane.YES_OPTION: + return - # 更新成功提示 JOptionPane.showMessageDialog(None, "Configuration saved successfully!") - # 记录更新到日志 self.log("[+] Configuration updated:") + self.log(" - Provider: {}".format(self.provider_type)) self.log(" - API URL: {}".format(self.api_url)) self.log(" - Model: {}".format(self.model)) - self.log(" - API Key: {}".format("*" * len(self.api_key))) self.log(" - SSL Verification: {}".format("Disabled" if self.disable_ssl_verification else "Enabled")) + self.log(" - Proxy: {}".format("Enabled ({}://{}:{})".format( + self.proxy_type, self.proxy_host, self.proxy_port) if self.enable_proxy else "Disabled")) except Exception as e: JOptionPane.showMessageDialog(None, "Error saving configuration: " + str(e)) @@ -348,15 +1294,21 @@ Please identify any security issues and suggest fixes.""" "Are you sure you want to reset all settings to default values?", "Confirm Reset", JOptionPane.YES_NO_OPTION) == JOptionPane.YES_OPTION: + self.provider_combo.setSelectedItem("OpenAI") self.url_field.setText("https://api.openai.com/v1/chat/completions") self.key_field.setText("Please enter your API key") self.model_combo.removeAllItems() self.model_combo.addItem("gpt-4o") - self.model_combo.setSelectedItem("gpt-4o") self.timeout_field.setText("60") self.req_length_field.setText("1000") self.resp_length_field.setText("2000") - self.disable_ssl_check.setSelected(False) # 重置SSL验证设置 + self.disable_ssl_check.setSelected(False) + self.enable_proxy_check.setSelected(False) + self.proxy_type_combo.setSelectedItem("HTTP") + self.proxy_host_field.setText("127.0.0.1") + self.proxy_port_field.setText("10809") + self.proxy_username_field.setText("") + self.proxy_password_field.setText("") self.prompt_area.setText(self.get_default_prompt()) save_button = JButton("Save Configuration") @@ -368,10 +1320,10 @@ Please identify any security issues and suggest fixes.""" reset_button.addActionListener(reset_config) button_panel.add(save_button) - button_panel.add(Box.createHorizontalStrut(10)) # 添加间距 + button_panel.add(Box.createHorizontalStrut(10)) button_panel.add(reset_button) - # 将所有面板添加到主配置面板 + # Assemble panels main_constraints = GridBagConstraints() main_constraints.fill = GridBagConstraints.HORIZONTAL main_constraints.insets = Insets(5, 5, 5, 5) @@ -381,14 +1333,17 @@ Please identify any security issues and suggest fixes.""" config_panel.add(api_panel, main_constraints) main_constraints.gridy = 1 - config_panel.add(limits_panel, main_constraints) + config_panel.add(proxy_panel, main_constraints) main_constraints.gridy = 2 + config_panel.add(limits_panel, main_constraints) + + main_constraints.gridy = 3 main_constraints.weighty = 1.0 main_constraints.fill = GridBagConstraints.BOTH config_panel.add(prompt_panel, main_constraints) - main_constraints.gridy = 3 + main_constraints.gridy = 4 main_constraints.weighty = 0.0 main_constraints.fill = GridBagConstraints.HORIZONTAL config_panel.add(button_panel, main_constraints) @@ -412,28 +1367,23 @@ Please identify any security issues and suggest fixes.""" def create_results_panel(self): results_panel = JPanel(BorderLayout()) - # 创建分割面板 split_pane = JSplitPane(JSplitPane.VERTICAL_SPLIT) split_pane.setDividerLocation(200) - # 创建工具栏面板 toolbar = JPanel() clear_button = JButton("Clear Results") search_field = JTextField(20) search_button = JButton("Search") export_button = JButton("Export Results") - # 创建列表模型和JList用于显示分析历史 self.list_model = DefaultListModel() self.analysis_list = JList(self.list_model) analysis_scroll = JScrollPane(self.analysis_list) - # 创建详细结果面板 self.results_area = JTextArea() self.results_area.setEditable(False) results_scroll = JScrollPane(self.results_area) - # 添加列表选择监听器 class SelectionListener(ListSelectionListener): def __init__(self, outer): self.outer = outer @@ -446,11 +1396,8 @@ Please identify any security issues and suggest fixes.""" self.outer.display_result_details(result) self.analysis_list.addListSelectionListener(SelectionListener(self)) - - # 初始化结果存储列表 self.analysis_results = [] - # 添加按钮事件 def clear_results(event): self.list_model.clear() self.analysis_results = [] @@ -472,21 +1419,14 @@ Please identify any security issues and suggest fixes.""" if self.analysis_results: timestamp = java.text.SimpleDateFormat("yyyyMMdd_HHmmss").format(java.util.Date()) filename = "gpt_analysis_{}.txt".format(timestamp) - - # 使用更简单的文件写入方式 from java.io import FileWriter, BufferedWriter - writer = BufferedWriter(FileWriter(filename)) - try: - # 写入导出信息头 writer.write("Enhanced BurpGPT Analysis Report\n") writer.write("=" * 50 + "\n") writer.write("Export Time: {}\n".format(timestamp)) writer.write("Total Results: {}\n".format(len(self.analysis_results))) writer.write("=" * 50 + "\n\n") - - # 遍历每个分析结果 for index, result in enumerate(self.analysis_results, 1): writer.write("Analysis #{}\n".format(index)) writer.write("-" * 30 + "\n") @@ -496,29 +1436,21 @@ Please identify any security issues and suggest fixes.""" writer.write("-" * 30 + "\n") writer.write(result.response) writer.write("\n" + "=" * 50 + "\n\n") - JOptionPane.showMessageDialog(None, "Results exported to {}".format(filename)) - - # 在日志中记录导出信息 - self.log("[+] Exported {} analysis results to {}".format( - len(self.analysis_results), filename)) - + self.log("[+] Exported {} analysis results to {}".format(len(self.analysis_results), filename)) finally: writer.close() - except Exception as e: error_msg = "Export failed: {}".format(str(e)) JOptionPane.showMessageDialog(None, error_msg) self.log("[-] " + error_msg) export_button.addActionListener(export_results) - # 添加组件到工具栏 toolbar.add(clear_button) toolbar.add(search_field) toolbar.add(search_button) toolbar.add(export_button) - # 组装面板 split_pane.setTopComponent(analysis_scroll) split_pane.setBottomComponent(results_scroll) @@ -529,26 +1461,21 @@ Please identify any security issues and suggest fixes.""" def send_to_gpt(self, invocation): try: - # 添加时间戳检查,防止短时间内重复触发 current_time = System.currentTimeMillis() - if hasattr(self, '_last_trigger_time') and (current_time - self._last_trigger_time < 1000): # 1秒内不重复触发 + if hasattr(self, '_last_trigger_time') and (current_time - self._last_trigger_time < 1000): self.log("[*] Ignoring duplicate trigger") return self._last_trigger_time = current_time - self.log("[+] Send to GPT method called - Thread: " + str(Thread.currentThread().getName())) + self.log("[+] Send to GPT method called") http_msgs = invocation.getSelectedMessages() self.log("[+] Selected messages: {}".format(len(http_msgs))) if http_msgs and len(http_msgs) == 1: msg = http_msgs[0] - request = msg.getRequest() - response = msg.getResponse() url = msg.getUrl().toString() - self.log("[+] Processing URL: {}".format(url)) - # 创建进度对话框 progress_dialog = JDialog() progress_dialog.setTitle("Analyzing...") progress_dialog.setSize(300, 100) @@ -561,201 +1488,126 @@ Please identify any security issues and suggest fixes.""" progress_dialog.add(label, BorderLayout.NORTH) progress_dialog.add(progress_bar, BorderLayout.CENTER) - # 创建一个实现Runnable接口的类 class AsyncTask(Runnable): def __init__(self, outer): self.outer = outer def run(self): + response_time = 0 try: - self.outer.log("[+] AsyncTask started - Thread: " + str(Thread.currentThread().getName())) + self.outer.log("[+] AsyncTask started") SwingUtilities.invokeLater(lambda: progress_dialog.setVisible(True)) - self.outer.log("[+] Creating GPT request") - gpt_request = GPTRequest(self.outer._helpers, msg, self.outer.model, self.outer.max_tokens) + self.outer.log("[+] Creating GPT request with truncation limits") + gpt_request = GPTRequest( + self.outer._helpers, + msg, + self.outer.model, + self.outer.max_tokens, + self.outer.max_request_length, + self.outer.max_response_length, + self.outer.log + ) gpt_request.set_prompt(self.outer.prompt_area.getText()) - self.outer.log("[+] Sending request to GPT API") + self.outer.log("[+] Sending request to API") - # 发送请求 + # Measure response time + start_time = System.currentTimeMillis() gpt_response = self.outer.call_gpt_api(gpt_request) + end_time = System.currentTimeMillis() + response_time = end_time - start_time - self.outer.log("[+] Received response from GPT API") + self.outer.log("[+] Received response from API (took {:.2f}s)".format(response_time / 1000.0)) def update_ui(): try: - self.outer.log("[+] Updating UI - Thread: " + str(Thread.currentThread().getName())) - # 关闭进度对话框 + self.outer.log("[+] Updating UI") progress_dialog.dispose() - if isinstance(gpt_response, GPTResponse): - content = gpt_response.get_content() - usage = gpt_response.get_token_usage() - + if isinstance(gpt_response, dict): + content = gpt_response.get("content", "") + usage = gpt_response.get("usage", {}) if content: - self.outer.update_results(url, content, usage) + self.outer.update_results(url, content, usage, response_time) else: - self.outer.update_results(url, "No valid analysis result received.", { - "prompt_tokens": 0, - "completion_tokens": 0, - "total_tokens": 0 - }) + self.outer.update_results(url, "No valid analysis result received.", {}, response_time) else: error_msg = "Error: {}".format(str(gpt_response)) - self.outer.update_results(url, error_msg, { - "prompt_tokens": 0, - "completion_tokens": 0, - "total_tokens": 0 - }) + self.outer.update_results(url, error_msg, {}, response_time) self.outer.log("[+] UI updated successfully") - except Exception as e: self.outer.log("[-] Error in update_ui: {}".format(str(e))) - # 确保在EDT线程中更新UI SwingUtilities.invokeLater(update_ui) - except Exception as e: - self.outer.log("[-] Error in run_async: {}".format(str(e))) + self.outer.log("[-] Error in AsyncTask: {}".format(str(e))) SwingUtilities.invokeLater(lambda: progress_dialog.dispose()) - SwingUtilities.invokeLater(lambda: self.outer.results_area.append( - "\n[-] Error: {}\n\n".format(str(e)))) + SwingUtilities.invokeLater(lambda: JOptionPane.showMessageDialog(None, "Error: {}".format(str(e)))) - # 使用正确的方式创建和启动线程 - self.log("[+] Starting AsyncTask thread") Thread(AsyncTask(self)).start() - else: self.log("[-] No message selected or multiple messages selected") - except Exception as e: self.log("[-] Error in send_to_gpt: {}".format(str(e))) + def call_gpt_api(self, gpt_request): + """Call GPT API using the provider adapter""" + try: + self.log("[+] Using provider: {}".format(self.provider_type)) + + provider = ProviderFactory.create_provider( + self.provider_type, + self.api_key, + self.api_url, + self.model, + self.timeout_seconds + ) + provider.set_log_callback(self.log) + provider.disable_ssl = self.disable_ssl_verification + + # Configure proxy if enabled + if self.enable_proxy: + provider.set_proxy( + self.proxy_type, + self.proxy_host, + self.proxy_port, + self.proxy_username, + self.proxy_password + ) + + result = provider.send_request(gpt_request.prompt, self.max_tokens) + return result + + except Exception as e: + self.log("[-] Error calling API: {}".format(str(e))) + raise Exception("Error calling API: {}".format(str(e))) + def truncate_content(self, content, max_length): - """智能截断内容,保留头部信息和部分正文""" if not content: return "" - try: content_str = self._helpers.bytesToString(content) except: - # 如果helpers方法失败,回退到str() content_str = str(content) if len(content_str) <= max_length: return content_str - - # 分离头部和正文 + headers_end = content_str.find("\r\n\r\n") if headers_end == -1: - # 没有找到头部分隔符,直接截断 return content_str[:max_length] + "\n... (content truncated)" - + headers = content_str[:headers_end] body = content_str[headers_end+4:] - - # 计算剩余可用长度 - remaining_length = max_length - len(headers) - 50 # 预留50字符给提示信息 + remaining_length = max_length - len(headers) - 50 if remaining_length <= 0: - # 如果头部已经超过限制,只保留部分头部 return content_str[:max_length] + "\n... (content truncated)" - - # 截断正文 + truncated_body = body[:remaining_length] - - return "{}\r\n\r\n{}\n... (content truncated, total length: {})".format( - headers, - truncated_body, - len(content_str) - ) - - def build_prompt(self, request, response): - """使用截断后的请求和响应构建提示词""" - truncated_request = self.truncate_content(request, self.max_request_length) - truncated_response = self.truncate_content(response, self.max_response_length) - - return self.prompt_area.getText().format( - truncated_request, - truncated_response - ) - - def call_gpt_api(self, gpt_request): - try: - # 构造请求数据,使用当前配置的模型名称 - data = { - "model": self.model, # 使用配置中的模型 - "messages": [ - { - "role": "user", - "content": gpt_request.prompt - } - ], - "max_tokens": self.max_tokens - } - - # 转换为JSON - json_data = json.dumps(data).encode('utf-8') - - # 从配置中获取Host - host = self.api_url.split("://")[-1].split("/")[0] - - # 请求头 - headers = { - "Host": host, - "Content-Type": "application/json", - "Authorization": "Bearer " + self.api_key, # 使用配置中的API key - "Accept": "*/*", - "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/108.0.5359.215", - "Accept-Language": "zh-CN" - } - - self.log("[+] Sending request to {}".format(self.api_url)) # 使用配置中的API URL - - # 创建请求 - request = urllib2.Request( - url=self.api_url, # 使用配置中的API URL - data=json_data, - headers=headers - ) - - # 处理SSL验证 - if self.disable_ssl_verification: - self.log("[*] SSL certificate validation is disabled") - # 创建自定义的SSL上下文 - ssl_context = TrustAllSSLContext.create() - # 使用自定义SSL上下文发送请求 - response = urllib2.urlopen(request, context=ssl_context, timeout=self.timeout_seconds) - else: - # 标准请求方式,使用默认SSL验证 - response = urllib2.urlopen(request, timeout=self.timeout_seconds) - - # 读取原始响应数据 - response_data = response.read() - - try: - # 修改这里:直接使用str()而不是尝试各种解码方式 - response_text = str(response_data) - - # 解析JSON - result = json.loads(response_text) - return GPTResponse(result) - - except Exception as decode_error: - # 如果解析失败,打印原始数据信息 - self.log("[-] Raw response: " + str(response_data)) - raise decode_error - - except urllib2.HTTPError as e: - error_body = e.read() - # 修改这里:改用str()替代decode - error_text = str(error_body) - self.log("[-] HTTP Error Response: " + error_text) - raise Exception("Error calling GPT API: " + str(e)) - except Exception as e: - self.log("[-] Error: " + str(e)) - raise Exception("Error calling GPT API: " + str(e)) + return "{}\r\n\r\n{}\n... (content truncated, total length: {})".format(headers, truncated_body, len(content_str)) def getTabCaption(self): return "GPT Analysis" @@ -771,119 +1623,137 @@ Please identify any security issues and suggest fixes.""" return menu_list def log(self, message): - # 输出到Burp的标准输出 self.stdout.println(message) - # 输出到日志面板 if hasattr(self, 'log_area'): SwingUtilities.invokeLater(lambda: self.log_area.append(message + "\n")) SwingUtilities.invokeLater(lambda: self.log_area.setCaretPosition(self.log_area.getDocument().getLength())) - def update_results(self, url, content, usage): - """更新结果显示""" + def update_results(self, url, content, usage, response_time=0): timestamp = java.text.SimpleDateFormat("HH:mm:ss").format(java.util.Date()) - - # 创建新的分析结果对象 - result = AnalysisResult(timestamp, url, content) - - # 添加到结果列表 + result = AnalysisResult(timestamp, url, content, usage, response_time) self.analysis_results.append(result) - # 更新列表显示 - self.list_model.addElement("[{}] {}".format(timestamp, url)) + # Include token count in list display + tokens_str = " [{}T]".format(result.total_tokens) if result.total_tokens > 0 else "" + self.list_model.addElement("[{}] {}{}".format(timestamp, url, tokens_str)) - # 选中新添加的项 last_index = self.list_model.size() - 1 self.analysis_list.setSelectedIndex(last_index) self.analysis_list.ensureIndexIsVisible(last_index) + + # Log token usage + if result.total_tokens > 0: + self.log("[+] Token usage: {} total ({} prompt + {} completion)".format( + result.total_tokens, result.prompt_tokens, result.completion_tokens)) def display_result_details(self, result): - """显示选中结果的详细信息""" self.results_area.setText("") self.results_area.append("="*50 + "\n") self.results_area.append("Analysis Time: {}\n".format(result.time)) self.results_area.append("Target URL: {}\n".format(result.url)) + + # Display token usage statistics + if result.total_tokens > 0: + self.results_area.append("\n[Token Usage]\n") + self.results_area.append(" Prompt Tokens: {}\n".format(result.prompt_tokens)) + self.results_area.append(" Completion Tokens: {}\n".format(result.completion_tokens)) + self.results_area.append(" Total Tokens: {}\n".format(result.total_tokens)) + + # Display performance metrics + if result.response_time > 0: + self.results_area.append(" Response Time: {:.2f}s\n".format(result.response_time / 1000.0)) + self.results_area.append("-"*50 + "\n") self.results_area.append(result.response + "\n") self.results_area.append("="*50 + "\n") self.results_area.setCaretPosition(0) + class GPTRequest: - def __init__(self, helpers, http_message, model, max_prompt_size): + def __init__(self, helpers, http_message, model, max_tokens, max_request_length=1000, max_response_length=2000, log_callback=None): try: - # 获取请求信息 request_info = helpers.analyzeRequest(http_message) - - # 获取基本信息 self.url = str(http_message.getUrl()) self.method = str(request_info.getMethod()) - # 获取请求和响应 + # Get raw request and response request_bytes = http_message.getRequest() - self.request = helpers.bytesToString(request_bytes) - + raw_request = helpers.bytesToString(request_bytes) response_bytes = http_message.getResponse() - self.response = helpers.bytesToString(response_bytes) if response_bytes else "" + raw_response = helpers.bytesToString(response_bytes) if response_bytes else "" + + # Apply smart truncation to request and response + self.request = ContentTruncator.smart_truncate(raw_request, max_request_length, "request") + self.response = ContentTruncator.smart_truncate(raw_response, max_response_length, "response") + + # Log truncation info if callback provided + if log_callback: + if len(raw_request) > max_request_length: + log_callback("[*] Request truncated: {} chars -> {} chars".format(len(raw_request), max_request_length)) + if len(raw_response) > max_response_length: + log_callback("[*] Response truncated: {} chars -> {} chars".format(len(raw_response), max_response_length)) self.model = model - self.max_prompt_size = max_prompt_size + self.max_tokens = max_tokens self.prompt = None - + self.log_callback = log_callback except Exception as e: raise Exception("Error initializing GPTRequest: " + str(e)) def set_prompt(self, prompt_template): + """Build prompt from template with variable substitution""" try: - # 构建提示词 prompt = prompt_template - - # 替换占位符 prompt = prompt.replace("{URL}", self.url) prompt = prompt.replace("{METHOD}", self.method) prompt = prompt.replace("{REQUEST}", self.request) prompt = prompt.replace("{RESPONSE}", self.response) - # 截断过长的内容 - if len(prompt) > self.max_prompt_size: - prompt = prompt[:self.max_prompt_size] - + # Store the final prompt self.prompt = prompt - return prompt + # Log final prompt size + if self.log_callback: + self.log_callback("[*] Final prompt size: {} characters".format(len(prompt))) + + return prompt except Exception as e: raise Exception("Error setting prompt: " + str(e)) - def log(self, message): - if hasattr(self, '_callbacks'): - stdout = self._callbacks.getStdout() - if stdout: - writer = PrintWriter(stdout, True) - writer.println(message) - -class GPTResponse: - def __init__(self, raw_response): - self.raw_response = raw_response - self.choices = raw_response.get("choices", []) - self.usage = raw_response.get("usage", {}) - - def get_content(self): - if self.choices and len(self.choices) > 0: - return self.choices[0]["message"]["content"] - return None - - def get_token_usage(self): - return { - "prompt_tokens": self.usage.get("prompt_tokens", 0), - "completion_tokens": self.usage.get("completion_tokens", 0), - "total_tokens": self.usage.get("total_tokens", 0) - } class AnalysisResult: - def __init__(self, time, url, response): + """Enhanced analysis result with token usage tracking""" + + def __init__(self, time, url, response, usage=None, response_time=0): self.time = time self.url = url self.response = response self.severity = "Information" self.notes = "" + # Token usage statistics + if usage is None: + usage = {} + self.prompt_tokens = usage.get("prompt_tokens", 0) + self.completion_tokens = usage.get("completion_tokens", 0) + self.total_tokens = usage.get("total_tokens", 0) + + # Performance metrics + self.response_time = response_time # in milliseconds + + def get_tokens_display(self): + """Get formatted token usage string""" + return "Tokens: {} prompt + {} completion = {} total".format( + self.prompt_tokens, + self.completion_tokens, + self.total_tokens + ) + + def get_performance_display(self): + """Get formatted performance string""" + if self.response_time > 0: + return "Response time: {:.2f}s".format(self.response_time / 1000.0) + return "" + def __str__(self): return "[{}] {}".format(self.time, self.url)