Spaces:
Runtime error
Runtime error
| import gradio as gr | |
| import requests | |
| import os | |
| import re | |
| import ast | |
| from collections import defaultdict | |
| from datetime import datetime, timedelta | |
| import json | |
| import time | |
| import logging | |
| from retrying import retry | |
| import base64 | |
| import pickle | |
| # Set up logging | |
| logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(levelname)s - %(message)s') | |
| logger = logging.getLogger(__name__) | |
| # Suppress warnings | |
| os.environ["HF_HUB_DISABLE_SYMLINKS_WARNING"] = "1" | |
| # Modal Mistral-7B API endpoint | |
| MODAL_API = os.getenv("MODAL_API", "https://rithvickkumar27--mistral-7b-api-analyze.modal.run") | |
| NVD_API_KEY = os.getenv("NVD_API_KEY") | |
| if not NVD_API_KEY: | |
| logger.error("NVD_API_KEY not set in environment variables, API queries will fail") | |
| # Retry decorator for Mistral-7B API | |
| def call_mistral_llm(prompt): | |
| logger.debug(f"Sending prompt to Mistral-7B API: {prompt[:100]}...") | |
| try: | |
| response = requests.post(MODAL_API, json={"prompt": prompt}, timeout=120) | |
| logger.debug(f"Mistral-7B API response status: {response.status_code}") | |
| if response.status_code == 200: | |
| data = response.json() | |
| if "error" in data: | |
| logger.error(f"Mistral API error: {data['error']}") | |
| return f"Modal API error: {data['error']}" | |
| response_text = data.get("response", "LLM error: No response") | |
| logger.info("Mistral-7B response received successfully") | |
| return response_text | |
| else: | |
| logger.error(f"Mistral API error: Status {response.status_code}") | |
| return f"Modal API error: Status {response.status_code}" | |
| except requests.exceptions.RequestException as e: | |
| logger.error(f"Mistral API request failed: {e}") | |
| raise | |
| # NVD API query with caching | |
| def query_nvd(keywords): | |
| cache_dir = "corpus/cache" | |
| os.makedirs(cache_dir, exist_ok=True) | |
| cache_file = f"{cache_dir}/{keywords.replace(' ', '_')}.pkl" | |
| # Check cache | |
| if os.path.exists(cache_file): | |
| try: | |
| with open(cache_file, "rb") as f: | |
| cached_data = pickle.load(f) | |
| if time.time() - cached_data["timestamp"] < 86400: # Cache valid for 24 hours | |
| logger.debug(f"Using cached NVD data for: {keywords}") | |
| return cached_data["results"] | |
| except Exception as e: | |
| logger.warning(f"Cache read error: {e}") | |
| # Query NVD API | |
| try: | |
| url = "https://services.nvd.nist.gov/rest/json/cves/2.0" | |
| params = {"keywordSearch": keywords, "resultsPerPage": 10} | |
| headers = {"apiKey": NVD_API_KEY} | |
| response = requests.get(url, params=params, headers=headers, timeout=10) | |
| if response.status_code == 200: | |
| data = response.json() | |
| results = [ | |
| f"{item['cve']['id']}: {item['cve']['descriptions'][0]['value']}" | |
| for item in data.get("vulnerabilities", []) | |
| ] | |
| # Save to cache | |
| with open(cache_file, "wb") as f: | |
| pickle.dump({"timestamp": time.time(), "results": results}, f) | |
| logger.info(f"Fetched {len(results)} CVEs from NVD for: {keywords}") | |
| return results | |
| elif response.status_code == 429: | |
| logger.error("NVD rate limit exceeded") | |
| else: | |
| logger.error(f"NVD API error: Status {response.status_code}") | |
| except Exception as e: | |
| logger.error(f"NVD API request failed: {e}") | |
| return None | |
| # Basic Python code analysis | |
| def analyze_python_code(content: str) -> dict: | |
| try: | |
| tree = ast.parse(content) | |
| suspicious_patterns = [] | |
| for node in ast.walk(tree): | |
| if isinstance(node, ast.Call) and isinstance(node.func, ast.Attribute): | |
| if node.func.attr == 'b64decode' and isinstance(node.func.value, ast.Name) and node.func.value.id == 'base64': | |
| suspicious_patterns.append("Base64 decoding detected") | |
| if isinstance(node, ast.Call) and isinstance(node.func, ast.Name) and node.func.id == 'exec': | |
| suspicious_patterns.append("Dynamic code execution (exec)") | |
| if isinstance(node, ast.Import) or isinstance(node, ast.ImportFrom): | |
| for name in (node.names if isinstance(node, ast.Import) else node.names): | |
| if name.name in ['urllib', 'urllib.request', 'requests']: | |
| suspicious_patterns.append(f"Suspicious import: {name.name}") | |
| if isinstance(node, ast.Str) or (isinstance(node, ast.Constant) and isinstance(node.value, str)): | |
| if re.search(r'http[s]?://.*(evil|malicious|bad)[^\s]*', node.value, re.IGNORECASE): | |
| suspicious_patterns.append(f"Suspicious URL: {node.value}") | |
| if suspicious_patterns: | |
| return { | |
| "classification": "Malware Detected", | |
| "severity": "Critical", | |
| "mitigation": "Quarantine file, run antivirus, block suspicious URLs", | |
| "confidence": 0.9, | |
| "details": suspicious_patterns | |
| } | |
| except SyntaxError: | |
| logger.warning("Invalid Python syntax in file") | |
| return None | |
| # Enhanced DSATP log parsing | |
| def dsatp_parse_log(text: str) -> dict: | |
| log = text.lower() | |
| lines = log.split('\n') | |
| detected_threats = [] | |
| # Comprehensive threat dictionary | |
| threats = { | |
| "compromised": {"classification": "System Compromise", "severity": "Critical", "mitigation": "Isolate process, run port scan, reset credentials"}, | |
| "unauthorized": {"classification": "Unauthorized Access", "severity": "High", "mitigation": "Quarantine MAC address, review access logs"}, | |
| "high cpu": {"classification": "Resource Abuse", "severity": "Medium", "mitigation": "Check for crypto-miner or DoS, limit resource usage"}, | |
| "inbound traffic": {"classification": "Network Intrusion", "severity": "Medium", "mitigation": "Block closed ports, enable firewall rules"}, | |
| "firmware mismatch": {"classification": "Firmware Vulnerability", "severity": "High", "mitigation": "Validate OTA or rollback, update firmware"}, | |
| "ddos": {"classification": "DDoS Attack", "severity": "Critical", "mitigation": "Rate-limit traffic, enable DDoS protection"}, | |
| "phishing": {"classification": "Phishing Attempt", "severity": "High", "mitigation": "Block malicious URLs, educate users"}, | |
| "sql injection": {"classification": "SQL Injection", "severity": "Critical", "mitigation": "Sanitize inputs, patch database"}, | |
| "xss": {"classification": "Cross-Site Scripting", "severity": "High", "mitigation": "Escape HTML, update web apps"}, | |
| "privilege escalation": {"classification": "Privilege Escalation", "severity": "Critical", "mitigation": "Patch vulnerabilities, restrict permissions"}, | |
| "trojan": {"classification": "Malware Detected", "severity": "Critical", "mitigation": "Quarantine file, run antivirus"}, | |
| "ransomware": {"classification": "Malware Detected", "severity": "Critical", "mitigation": "Quarantine file, run antivirus, restore from backup"}, | |
| "heuristic.behavior.suspicious": {"classification": "Suspicious Activity", "severity": "High", "mitigation": "Monitor process, run memory scan"}, | |
| "malicious": {"classification": "Malware Detected", "severity": "Critical", "mitigation": "Quarantine file, run antivirus, block malicious URLs"}, | |
| "ufw block": {"classification": "Network Intrusion", "severity": "High", "mitigation": "Investigate blocked IP, strengthen firewall rules"}, | |
| "sudo": {"classification": "Privilege Escalation", "severity": "High", "mitigation": "Audit user permissions, review sudo logs"}, | |
| "reverse ssh": {"classification": "Persistence Mechanism", "severity": "Critical", "mitigation": "Disable unauthorized SSH services, inspect network connections"}, | |
| "failed password": {"classification": "Brute-Force Attempt", "severity": "Critical", "mitigation": "Block suspicious IPs, disable password-based SSH, enable fail2ban"}, | |
| "invalid user": {"classification": "Brute-Force Attempt", "severity": "Critical", "mitigation": "Block suspicious IPs, disable password-based SSH, enable fail2ban"} | |
| } | |
| # Advanced threat detection | |
| failed_attempts = defaultdict(list) | |
| suspicious_terms = [ | |
| "failed password", "invalid user", "error", "denied", "malformed packet", | |
| "flood", "syn flood", "http flood", "suspicious url", "script tag", | |
| "sqlmap", "union select", "escalation attempt", "rootkit", "yara_match", | |
| "wget", "curl", "bash", "sh", "payload", "ufw", "sudo", "root", "ssh", | |
| "cron", "systemd", "base64", "exec" | |
| ] | |
| ip_pattern = r'\b\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}\b' | |
| timestamp_pattern = r'\w{3}\s+\d{1,2}\s+\d{2}:\d{2}:\d{2}' | |
| url_pattern = r'(https?://[^\s]+)' | |
| sql_pattern = r'(union\s+select|select\s+.*\s+from|drop\s+table)' | |
| xss_pattern = r'(<script>|on\w+\s*=|javascript:)' | |
| yara_pattern = r'yara_match:\s*([\w\.]+)\s+detected' | |
| script_pattern = r'(wget|curl)\s+.*(http[s]?://[^\s]+)\s*.*\.(sh|bash|exe)' | |
| sudo_pattern = r'sudo:.*user=root\s*;' | |
| ssh_pattern = r'reverse\s+ssh|tunnel\s+service' | |
| malicious_url_pattern = r'http[s]?://.*(malicious|payload|evil)[^\s]*' | |
| for line in lines: | |
| # YARA match detection | |
| yara_match = re.search(yara_pattern, line) | |
| if yara_match: | |
| threat_type = yara_match.group(1).lower() | |
| if "trojan" in threat_type: | |
| detected_threats.append(threats["trojan"] | {"confidence": 0.95}) | |
| elif "ransomware" in threat_type: | |
| detected_threats.append(threats["ransomware"] | {"confidence": 0.95}) | |
| elif "heuristic" in threat_type: | |
| detected_threats.append(threats["heuristic.behavior.suspicious"] | {"confidence": 0.85}) | |
| # Malicious script download | |
| if re.search(script_pattern, line, re.IGNORECASE) or re.search(malicious_url_pattern, line, re.IGNORECASE): | |
| detected_threats.append(threats["malicious"] | {"confidence": 0.95}) | |
| # Base64 and exec detection | |
| if "base64.b64decode" in line and "exec" in line: | |
| detected_threats.append(threats["malicious"] | {"confidence": 0.95}) | |
| # Firewall block | |
| if re.search(r'ufw\s+block', line, re.IGNORECASE): | |
| ip_match = re.search(ip_pattern, line) | |
| ip = ip_match.group() if ip_match else "unknown IP" | |
| detected_threats.append(threats["ufw block"] | {"mitigation": f"Investigate blocked IP {ip}, strengthen firewall rules", "confidence": 0.9}) | |
| # Privilege escalation | |
| if re.search(sudo_pattern, line, re.IGNORECASE): | |
| detected_threats.append(threats["sudo"] | {"confidence": 0.85}) | |
| # Reverse SSH tunnel | |
| if re.search(ssh_pattern, line, re.IGNORECASE): | |
| detected_threats.append(threats["reverse ssh"] | {"confidence": 0.95}) | |
| # Brute-force detection | |
| if "failed password" in line or "invalid user" in line: | |
| ip_match = re.search(ip_pattern, line) | |
| time_match = re.search(timestamp_pattern, line) | |
| if ip_match and time_match: | |
| ip = ip_match.group() | |
| try: | |
| timestamp = datetime.strptime(time_match.group(), "%b %d %H:%M:%S") | |
| timestamp = timestamp.replace(year=2025) | |
| failed_attempts[ip].append(timestamp) | |
| except ValueError: | |
| continue | |
| # Phishing, SQL Injection, XSS | |
| if re.search(url_pattern, line) and "phishing" in line: | |
| detected_threats.append(threats["phishing"] | {"confidence": 0.85}) | |
| if re.search(sql_pattern, line, re.IGNORECASE): | |
| detected_threats.append(threats["sql injection"] | {"confidence": 0.9}) | |
| if re.search(xss_pattern, line, re.IGNORECASE): | |
| detected_threats.append(threats["xss"] | {"confidence": 0.85}) | |
| # Brute-force and DDoS detection | |
| for ip, timestamps in failed_attempts.items(): | |
| if len(timestamps) >= 5: | |
| time_window = max(timestamps) - min(timestamps) | |
| if time_window <= timedelta(seconds=60): | |
| detected_threats.append({ | |
| "classification": "Brute-Force Attempt", | |
| "severity": "Critical", | |
| "mitigation": f"Block IP {ip}, disable password-based SSH, enable fail2ban", | |
| "confidence": 0.95 | |
| }) | |
| elif len(timestamps) >= 10 and time_window <= timedelta(minutes=5): | |
| detected_threats.append({ | |
| "classification": "DDoS Attack", | |
| "severity": "Critical", | |
| "mitigation": f"Rate-limit traffic from {ip}, enable DDoS protection", | |
| "confidence": 0.9 | |
| }) | |
| # Suspicious activity | |
| suspicious_count = sum(line.count(term) for term in suspicious_terms for line in lines) | |
| if suspicious_count >= 5 and not detected_threats: | |
| detected_threats.append({ | |
| "classification": "Suspicious Activity", | |
| "severity": "Medium", | |
| "mitigation": "Review logs for anomalies, monitor affected IPs", | |
| "confidence": 0.75 | |
| }) | |
| # Select the highest-severity threat | |
| if detected_threats: | |
| severity_order = {"Critical": 3, "High": 2, "Medium": 1, "Safe": 0} | |
| highest_threat = max(detected_threats, key=lambda x: (severity_order.get(x["severity"], 0), x["confidence"])) | |
| logger.info(f"Detected threats: {len(detected_threats)}, Selected: {highest_threat}") | |
| return highest_threat | {"all_threats": detected_threats} | |
| logger.info("No threats detected") | |
| return {"classification": "No Threat", "severity": "Safe", "mitigation": "None", "confidence": 0.5, "all_threats": []} | |
| # Enhanced DSATP YARA scanning | |
| def dsatp_yara_scan(file_path: str) -> dict: | |
| try: | |
| # Read file content for Python analysis | |
| with open(file_path, 'r', encoding='utf-8', errors='ignore') as f: | |
| content = f.read() | |
| # Analyze Python code if it's a .py file | |
| if file_path.endswith('.py'): | |
| python_analysis = analyze_python_code(content) | |
| if python_analysis: | |
| return python_analysis | {"all_threats": [python_analysis]} | |
| import yara | |
| rules = yara.compile(source=""" | |
| rule BruteForceLog { | |
| strings: | |
| $failed = "failed password" nocase | |
| $invalid = "invalid user" nocase | |
| $denied = "denied" nocase | |
| condition: | |
| ($failed and #failed >= 3) or ($invalid and #invalid >= 3) or ($denied and #denied >= 3) | |
| } | |
| rule MalwareLog { | |
| strings: | |
| $mirai = "mirai" nocase | |
| $botnet = "botnet" nocase | |
| $exploit = "exploit" nocase | |
| $rootkit = "rootkit" nocase | |
| $trojan = "trojan" nocase | |
| $ransomware = "ransomware" nocase | |
| $wget = "wget" nocase | |
| $curl = "curl" nocase | |
| $payload = "payload" nocase | |
| $malicious = "malicious" nocase | |
| $evil = "evil" nocase | |
| condition: | |
| any of them | |
| } | |
| rule PythonMalware { | |
| strings: | |
| $base64 = "base64.b64decode" nocase | |
| $exec = "exec" nocase | |
| $urllib = "urllib.request" nocase | |
| $requests = "requests.get" nocase | |
| $url = "http://" nocase | |
| $url2 = "https://" nocase | |
| $evil_url = "evil.com" nocase | |
| condition: | |
| ($base64 and $exec) or ($urllib and ($url or $url2 or $evil_url)) or ($requests and ($url or $url2 or $evil_url)) | |
| } | |
| rule SuspiciousBehavior { | |
| strings: | |
| $heuristic = "heuristic" nocase | |
| $suspicious = "suspicious" nocase | |
| $ufw = "ufw block" nocase | |
| condition: | |
| any of them | |
| } | |
| rule PhishingLog { | |
| strings: | |
| $phish = "phishing" nocase | |
| $url = "http://" nocase | |
| $url2 = "https://" nocase | |
| condition: | |
| $phish or ($url and #url >= 2) or ($url2 and #url2 >= 2) | |
| } | |
| rule DDoSLog { | |
| strings: | |
| $flood = "flood" nocase | |
| $syn = "syn" nocase | |
| $http_flood = "http flood" nocase | |
| condition: | |
| any of them | |
| } | |
| rule SQLInjectionLog { | |
| strings: | |
| $sql = "union select" nocase | |
| $sql2 = "drop table" nocase | |
| condition: | |
| any of them | |
| } | |
| rule XSSLog { | |
| strings: | |
| $xss = "<script>" nocase | |
| $xss2 = "javascript:" nocase | |
| condition: | |
| any of them | |
| } | |
| rule PersistenceLog { | |
| strings: | |
| $ssh = "reverse ssh" nocase | |
| $tunnel = "tunnel service" nocase | |
| $sudo = "sudo:.*user=root" nocase | |
| condition: | |
| any of them | |
| } | |
| """) | |
| matches = rules.match(file_path) | |
| detected_threats = [] | |
| if matches: | |
| for match in matches: | |
| if match.rule == "BruteForceLog": | |
| detected_threats.append({ | |
| "classification": "Brute-Force Attempt", | |
| "severity": "Critical", | |
| "mitigation": "Block suspicious IPs, disable password-based SSH, enable fail2ban", | |
| "confidence": 0.95 | |
| }) | |
| elif match.rule == "MalwareLog" or match.rule == "PythonMalware": | |
| detected_threats.append({ | |
| "classification": "Malware Detected", | |
| "severity": "Critical", | |
| "mitigation": "Quarantine file, run antivirus", | |
| "confidence": 0.9 | |
| }) | |
| elif match.rule == "SuspiciousBehavior": | |
| detected_threats.append({ | |
| "classification": "Suspicious Activity", | |
| "severity": "High", | |
| "mitigation": "Monitor process, run memory scan", | |
| "confidence": 0.85 | |
| }) | |
| elif match.rule == "PhishingLog": | |
| detected_threats.append({ | |
| "classification": "Phishing Attempt", | |
| "severity": "High", | |
| "mitigation": "Block malicious URLs, educate users", | |
| "confidence": 0.85 | |
| }) | |
| elif match.rule == "DDoSLog": | |
| detected_threats.append({ | |
| "classification": "DDoS Attack", | |
| "severity": "Critical", | |
| "mitigation": "Rate-limit traffic, enable DDoS protection", | |
| "confidence": 0.9 | |
| }) | |
| elif match.rule == "SQLInjectionLog": | |
| detected_threats.append({ | |
| "classification": "SQL Injection", | |
| "severity": "Critical", | |
| "mitigation": "Sanitize inputs, patch database", | |
| "confidence": 0.9 | |
| }) | |
| elif match.rule == "XSSLog": | |
| detected_threats.append({ | |
| "classification": "Cross-Site Scripting", | |
| "severity": "High", | |
| "mitigation": "Escape HTML, update web apps", | |
| "confidence": 0.85 | |
| }) | |
| elif match.rule == "PersistenceLog": | |
| detected_threats.append({ | |
| "classification": "Persistence Mechanism", | |
| "severity": "Critical", | |
| "mitigation": "Disable unauthorized SSH services, inspect network connections", | |
| "confidence": 0.95 | |
| }) | |
| # Select the highest-severity threat | |
| if detected_threats: | |
| severity_order = {"Critical": 3, "High": 2, "Medium": 1, "Safe": 0} | |
| highest_threat = max(detected_threats, key=lambda x: (severity_order.get(x["severity"], 0), x["confidence"])) | |
| logger.info(f"YARA scan detected threats: {len(detected_threats)}, Selected: {highest_threat}") | |
| return highest_threat | {"all_threats": detected_threats} | |
| logger.info("YARA scan: No threats detected") | |
| return { | |
| "classification": "No Malware", | |
| "severity": "Safe", | |
| "mitigation": "None", | |
| "confidence": 0.7, | |
| "all_threats": [] | |
| } | |
| except Exception as e: | |
| logger.error(f"YARA scan error: {e}") | |
| return {"error": str(e), "severity": "Unknown", "mitigation": "Check file format", "all_threats": []} | |
| # Chatbot function | |
| def chatbot_response(user_input, file, history, state): | |
| if history is None: | |
| history = [] | |
| input_text = user_input | |
| scan_result = None | |
| all_threats = [] | |
| start_time = time.time() | |
| if file: | |
| try: | |
| input_text = open(file.name, "r").read() | |
| scan_result = dsatp_yara_scan(file.name) | |
| except Exception as e: | |
| scan_result = {"error": f"File error: {e}", "severity": "Unknown", "mitigation": "Check file", "all_threats": []} | |
| else: | |
| scan_result = dsatp_parse_log(input_text) | |
| all_threats = scan_result.get("all_threats", []) | |
| context_str = "No relevant vulnerabilities found." | |
| if NVD_API_KEY: | |
| try: | |
| # Map classification to precise keywords for relevant CVEs | |
| threat_keywords = { | |
| "Brute-Force Attempt": "brute force ssh login attempt authentication failure openssh password attack cwe-287 cwe-307", | |
| "Malware Detected": "malware trojan ransomware payload malicious script backdoor virus cwe-94 cwe-506 cwe-119 code injection python", | |
| "Network Intrusion": "firewall intrusion ufw network attack port scan unauthorized access cwe-284", | |
| "Privilege Escalation": "privilege escalation sudo root unauthorized access cwe-269 cwe-250", | |
| "Persistence Mechanism": "ssh tunnel reverse ssh persistence backdoor remote access cwe-284", | |
| "System Compromise": "compromise breach unauthorized access cwe-284", | |
| "Unauthorized Access": "unauthorized access login failure cwe-287", | |
| "Resource Abuse": "resource abuse crypto-miner denial of service cwe-400", | |
| "Firmware Vulnerability": "firmware vulnerability iot cwe-119", | |
| "DDoS Attack": "ddos denial of service network flood cwe-400", | |
| "Phishing Attempt": "phishing malicious url social engineering cwe-601", | |
| "SQL Injection": "sql injection database attack cwe-89", | |
| "Cross-Site Scripting": "xss cross-site scripting web attack cwe-79", | |
| "Suspicious Activity": "suspicious activity anomaly heuristic cwe-693" | |
| } | |
| classification = scan_result.get("classification", "unknown") | |
| keywords = threat_keywords.get(classification, "security threat").replace(',', '') | |
| nvd_results = query_nvd(keywords) | |
| if nvd_results: | |
| context_items = [] | |
| seen_cves = set() | |
| for result in nvd_results[:3]: | |
| cve_matches = re.findall(r'CVE-\d{4}-\d{5,7}', result) | |
| if cve_matches and cve_matches[0] not in seen_cves: | |
| if any(keyword.lower() in result.lower() for keyword in keywords.split()): | |
| context_items.append(result) | |
| seen_cves.add(cve_matches[0]) | |
| context_str = "\n\n".join(context_items) if context_items else "No relevant vulnerabilities found for this threat." | |
| logger.debug(f"NVD query: {keywords}, Results: {len(nvd_results) if nvd_results else 0}") | |
| except Exception as e: | |
| logger.error(f"NVD error: {e}") | |
| context_str = f"Context error: {e}" | |
| if "error" not in scan_result: | |
| other_threats_summary = "" | |
| if len(all_threats) > 1: | |
| other_threats = [t for t in all_threats if t != scan_result] | |
| other_threats_summary = "\nOther detected threats include:\n" + "\n".join( | |
| [f"- {t['classification']} (Severity: {t['severity']}, Confidence: {t['confidence']:.1f})" for t in other_threats] | |
| ) | |
| prompt = f""" | |
| CLASSIFICATION: {scan_result['classification']} | |
| SEVERITY: {scan_result['severity']} | |
| CONFIDENCE: {scan_result['confidence']:.1f} | |
| THREAT DETAILS: {other_threats_summary} | |
| IMMEDIATE RESPONSE REQUIRED: | |
| THREAT ASSESSMENT: Analyze this security incident with precision. Provide tactical recommendations and threat intelligence. | |
| DIRECTIVE: Generate a concise, actionable security briefing. Focus on: | |
| 1. Primary threat vector and attack methodology | |
| 2. Immediate containment procedures | |
| 3. Risk escalation potential | |
| 4. Technical countermeasures | |
| RESPONSE FORMAT: Direct, technical, no pleasantries. Use cybersecurity terminology. Prioritize critical actions first. | |
| CONTEXT: Real-time threat detected in production environment. Response time critical. | |
| EXECUTE ANALYSIS:""" | |
| try: | |
| llm_response = call_mistral_llm(prompt) | |
| except Exception as e: | |
| logger.warning(f"Mistral-7B failed after retries: {e}, using fallback response") | |
| llm_response = f"A {scan_result['severity'].lower()} {scan_result['classification'].lower()} was detected. Recommended actions: {scan_result['mitigation']}." + other_threats_summary | |
| response = f"**Classification**: {scan_result['classification']}\n**Severity**: {scan_result['severity']}\n**Mitigation**: {scan_result['mitigation']}\n**Confidence**: {scan_result['confidence']:.1f}\n\n{llm_response}\n\n**Context**:\n{context_str}" | |
| else: | |
| response = f"**Error**: {scan_result['error']}\n**Context**:\n{context_str}" | |
| updated_history = history + [ | |
| {"role": "user", "content": input_text or "File uploaded"}, | |
| {"role": "assistant", "content": response} | |
| ] | |
| # Update state | |
| state = { | |
| "classification": scan_result.get("classification", "Awaiting Data"), | |
| "severity": scan_result.get("severity", "Unknown"), | |
| "mitigation": scan_result.get("mitigation", "None"), | |
| "confidence": scan_result.get("confidence", 0.0), | |
| "timestamp": datetime.now().strftime("%Y-%m-%d %H:%M:%S") | |
| } | |
| # Generate threat card | |
| threat_card_content = f""" | |
| <div class="threat-card" id="threat-card"> | |
| <p><strong>Classification:</strong> <span id="classification">{state['classification']}</span></p> | |
| <p><strong>Severity:</strong> <span class="severity-label severity-{state['severity'].lower()}" id="severity">{state['severity']}</span></p> | |
| <p><strong>Mitigation:</strong> <span id="mitigation">{state['mitigation']}</span></p> | |
| <p><strong>Confidence:</strong> <span id="confidence">{state['confidence']:.1f}</span></p> | |
| <p><strong>Timestamp:</strong> <span id="timestamp">{state['timestamp']}</span></p> | |
| </div> | |
| """ | |
| # Update threat history | |
| if not hasattr(chatbot_response, "threat_history"): | |
| chatbot_response.threat_history = [] | |
| chatbot_response.threat_history.append(state) | |
| if len(chatbot_response.threat_history) > 10: | |
| chatbot_response.threat_history.pop(0) | |
| # Generate threat history table | |
| history_table = "<table style='width:100%; border-collapse: collapse;'><tr><th>Timestamp</th><th>Classification</th><th>Severity</th><th>Confidence</th></tr>" | |
| for entry in reversed(chatbot_response.threat_history): | |
| history_table += f"<tr><td>{entry['timestamp']}</td><td>{entry['classification']}</td><td>{entry['severity']}</td><td>{entry['confidence']:.1f}</td></tr>" | |
| history_table += "</table>" | |
| logger.info(f"Response generated in {time.time() - start_time:.2f} seconds") | |
| return updated_history, scan_result, state, threat_card_content, history_table | |
| # Gradio interface | |
| with gr.Blocks(css=""" | |
| @import url('https://fonts.googleapis.com/css2?family=Orbitron:wght@400;700;900&family=Rajdhani:wght@300;400;600;700&display=swap'); | |
| body { | |
| background: | |
| radial-gradient(circle at 20% 50%, rgba(120, 119, 198, 0.3) 0%, transparent 50%), | |
| radial-gradient(circle at 80% 20%, rgba(255, 119, 48, 0.3) 0%, transparent 50%), | |
| radial-gradient(circle at 40% 80%, rgba(120, 119, 198, 0.2) 0%, transparent 50%), | |
| linear-gradient(135deg, #0a0a0a 0%, #1a1a2e 25%, #16213e 50%, #0f0f0f 100%); | |
| color: #00ff41; | |
| font-family: 'Rajdhani', monospace; | |
| margin: 0; | |
| padding: 20px; | |
| min-height: 100vh; | |
| background-attachment: fixed; | |
| position: relative; | |
| overflow-x: hidden; | |
| } | |
| body::before { | |
| content: ''; | |
| position: fixed; | |
| top: 0; | |
| left: 0; | |
| right: 0; | |
| bottom: 0; | |
| background: | |
| repeating-linear-gradient( | |
| 90deg, | |
| transparent, | |
| transparent 100px, | |
| rgba(0, 255, 65, 0.03) 100px, | |
| rgba(0, 255, 65, 0.03) 101px | |
| ), | |
| repeating-linear-gradient( | |
| 0deg, | |
| transparent, | |
| transparent 100px, | |
| rgba(0, 255, 65, 0.03) 100px, | |
| rgba(0, 255, 65, 0.03) 101px | |
| ); | |
| pointer-events: none; | |
| z-index: 1; | |
| } | |
| .gradio-container { | |
| position: relative; | |
| z-index: 2; | |
| } | |
| .threat-card { | |
| background: | |
| linear-gradient(135deg, | |
| rgba(0, 0, 0, 0.9) 0%, | |
| rgba(13, 13, 13, 0.95) 50%, | |
| rgba(0, 0, 0, 0.9) 100% | |
| ); | |
| color: #00ff41; | |
| padding: 24px; | |
| border-radius: 16px; | |
| border: 2px solid rgba(0, 255, 65, 0.3); | |
| backdrop-filter: blur(20px); | |
| box-shadow: | |
| 0 0 30px rgba(0, 255, 65, 0.2), | |
| inset 0 1px 0 rgba(0, 255, 65, 0.1), | |
| 0 8px 32px rgba(0, 0, 0, 0.5); | |
| margin-bottom: 24px; | |
| transition: all 0.4s cubic-bezier(0.4, 0, 0.2, 1); | |
| animation: threatCardGlow 3s ease-in-out infinite alternate, fadeIn 0.8s ease-out; | |
| position: relative; | |
| overflow: hidden; | |
| } | |
| .threat-card::before { | |
| content: ''; | |
| position: absolute; | |
| top: 0; | |
| left: -100%; | |
| width: 100%; | |
| height: 100%; | |
| background: linear-gradient(90deg, transparent, rgba(0, 255, 65, 0.1), transparent); | |
| transition: left 0.8s ease; | |
| } | |
| .threat-card:hover::before { | |
| left: 100%; | |
| } | |
| @keyframes threatCardGlow { | |
| 0% { | |
| box-shadow: | |
| 0 0 20px rgba(0, 255, 65, 0.2), | |
| inset 0 1px 0 rgba(0, 255, 65, 0.1), | |
| 0 8px 32px rgba(0, 0, 0, 0.5); | |
| } | |
| 100% { | |
| box-shadow: | |
| 0 0 40px rgba(0, 255, 65, 0.4), | |
| inset 0 1px 0 rgba(0, 255, 65, 0.2), | |
| 0 12px 40px rgba(0, 0, 0, 0.6); | |
| } | |
| } | |
| @keyframes fadeIn { | |
| from { | |
| opacity: 0; | |
| transform: translateY(20px) scale(0.95); | |
| } | |
| to { | |
| opacity: 1; | |
| transform: translateY(0) scale(1); | |
| } | |
| } | |
| .threat-card:hover { | |
| transform: translateY(-8px); | |
| border-color: rgba(0, 255, 65, 0.6); | |
| box-shadow: | |
| 0 0 50px rgba(0, 255, 65, 0.4), | |
| 0 20px 60px rgba(0, 0, 0, 0.7); | |
| } | |
| .severity-critical { | |
| background: linear-gradient(45deg, #ff0040, #cc0033); | |
| color: #ffffff; | |
| padding: 8px 16px; | |
| border-radius: 8px; | |
| font-weight: 700; | |
| font-family: 'Orbitron', monospace; | |
| display: inline-block; | |
| text-transform: uppercase; | |
| letter-spacing: 1px; | |
| border: 1px solid #ff0040; | |
| box-shadow: 0 0 20px rgba(255, 0, 64, 0.5); | |
| animation: criticalPulse 1.5s ease-in-out infinite; | |
| } | |
| @keyframes criticalPulse { | |
| 0%, 100% { box-shadow: 0 0 20px rgba(255, 0, 64, 0.5); } | |
| 50% { box-shadow: 0 0 30px rgba(255, 0, 64, 0.8); } | |
| } | |
| .severity-high { | |
| background: linear-gradient(45deg, #ff8c00, #ff6600); | |
| color: #ffffff; | |
| padding: 8px 16px; | |
| border-radius: 8px; | |
| font-weight: 700; | |
| font-family: 'Orbitron', monospace; | |
| text-transform: uppercase; | |
| letter-spacing: 1px; | |
| border: 1px solid #ff8c00; | |
| box-shadow: 0 0 15px rgba(255, 140, 0, 0.4); | |
| } | |
| .severity-medium { | |
| background: linear-gradient(45deg, #ffaa00, #ff9900); | |
| color: #000000; | |
| padding: 8px 16px; | |
| border-radius: 8px; | |
| font-weight: 700; | |
| font-family: 'Orbitron', monospace; | |
| text-transform: uppercase; | |
| letter-spacing: 1px; | |
| border: 1px solid #ffaa00; | |
| box-shadow: 0 0 15px rgba(255, 170, 0, 0.4); | |
| } | |
| .severity-safe { | |
| background: linear-gradient(45deg, #00ff41, #00cc33); | |
| color: #000000; | |
| padding: 8px 16px; | |
| border-radius: 8px; | |
| font-weight: 700; | |
| font-family: 'Orbitron', monospace; | |
| text-transform: uppercase; | |
| letter-spacing: 1px; | |
| border: 1px solid #00ff41; | |
| box-shadow: 0 0 15px rgba(0, 255, 65, 0.4); | |
| } | |
| .severity-unknown { | |
| background: linear-gradient(45deg, #666666, #555555); | |
| color: #ffffff; | |
| padding: 8px 16px; | |
| border-radius: 8px; | |
| font-weight: 700; | |
| font-family: 'Orbitron', monospace; | |
| text-transform: uppercase; | |
| letter-spacing: 1px; | |
| border: 1px solid #666666; | |
| box-shadow: 0 0 15px rgba(102, 102, 102, 0.4); | |
| } | |
| .gr-button { | |
| background: linear-gradient(45deg, #00ff41, #00cc33, #008f2f); | |
| color: #000000; | |
| border: 2px solid #00ff41; | |
| padding: 14px 28px; | |
| border-radius: 12px; | |
| font-size: 16px; | |
| font-weight: 700; | |
| font-family: 'Orbitron', monospace; | |
| text-transform: uppercase; | |
| letter-spacing: 2px; | |
| transition: all 0.3s cubic-bezier(0.4, 0, 0.2, 1); | |
| cursor: pointer; | |
| position: relative; | |
| overflow: hidden; | |
| box-shadow: | |
| 0 0 20px rgba(0, 255, 65, 0.3), | |
| 0 4px 15px rgba(0, 0, 0, 0.3); | |
| } | |
| .gr-button::before { | |
| content: ''; | |
| position: absolute; | |
| top: 0; | |
| left: -100%; | |
| width: 100%; | |
| height: 100%; | |
| background: linear-gradient(90deg, transparent, rgba(255, 255, 255, 0.2), transparent); | |
| transition: left 0.6s ease; | |
| } | |
| .gr-button:hover::before { | |
| left: 100%; | |
| } | |
| .gr-button:hover { | |
| background: linear-gradient(45deg, #ff0040, #cc0033, #990026); | |
| border-color: #ff0040; | |
| color: #ffffff; | |
| transform: translateY(-3px); | |
| box-shadow: | |
| 0 0 30px rgba(255, 0, 64, 0.5), | |
| 0 8px 25px rgba(0, 0, 0, 0.4); | |
| } | |
| .gr-button:active { | |
| transform: translateY(-1px); | |
| } | |
| .gr-button:disabled { | |
| background: linear-gradient(45deg, #333333, #222222); | |
| border-color: #444444; | |
| color: #666666; | |
| cursor: not-allowed; | |
| box-shadow: none; | |
| } | |
| .gr-button:disabled::after { | |
| content: " SCANNING..."; | |
| animation: scanPulse 1.2s infinite; | |
| } | |
| @keyframes scanPulse { | |
| 0% { opacity: 1; } | |
| 50% { opacity: 0.4; } | |
| 100% { opacity: 1; } | |
| } | |
| .gr-textbox, .gr-file { | |
| background: rgba(0, 0, 0, 0.8); | |
| color: #00ff41; | |
| border: 2px solid rgba(0, 255, 65, 0.3); | |
| border-radius: 12px; | |
| padding: 14px; | |
| font-family: 'Rajdhani', monospace; | |
| font-size: 16px; | |
| transition: all 0.3s ease; | |
| box-shadow: inset 0 2px 10px rgba(0, 0, 0, 0.5); | |
| } | |
| .gr-textbox:focus, .gr-file:focus { | |
| border-color: #00ff41; | |
| outline: none; | |
| box-shadow: | |
| inset 0 2px 10px rgba(0, 0, 0, 0.5), | |
| 0 0 20px rgba(0, 255, 65, 0.3); | |
| background: rgba(0, 0, 0, 0.9); | |
| } | |
| .gr-chatbot { | |
| background: rgba(0, 0, 0, 0.85); | |
| border: 2px solid rgba(0, 255, 65, 0.2); | |
| border-radius: 16px; | |
| padding: 20px; | |
| box-shadow: | |
| inset 0 2px 20px rgba(0, 0, 0, 0.5), | |
| 0 0 30px rgba(0, 255, 65, 0.1); | |
| backdrop-filter: blur(15px); | |
| } | |
| .gr-chatbot .message { | |
| border-radius: 12px; | |
| padding: 14px 18px; | |
| margin: 10px; | |
| max-width: 80%; | |
| animation: messageSlide 0.4s ease-out; | |
| border: 1px solid rgba(0, 255, 65, 0.2); | |
| backdrop-filter: blur(10px); | |
| } | |
| @keyframes messageSlide { | |
| from { | |
| opacity: 0; | |
| transform: translateY(15px) scale(0.95); | |
| } | |
| to { | |
| opacity: 1; | |
| transform: translateY(0) scale(1); | |
| } | |
| } | |
| .gr-chatbot .user { | |
| background: linear-gradient(135deg, rgba(0, 255, 65, 0.2), rgba(0, 200, 50, 0.3)); | |
| color: #00ff41; | |
| margin-left: auto; | |
| border-color: rgba(0, 255, 65, 0.4); | |
| font-family: 'Rajdhani', monospace; | |
| } | |
| .gr-chatbot .assistant { | |
| background: linear-gradient(135deg, rgba(0, 0, 0, 0.8), rgba(20, 20, 20, 0.9)); | |
| color: #00ff41; | |
| margin-right: auto; | |
| border-color: rgba(0, 255, 65, 0.3); | |
| font-family: 'Rajdhani', monospace; | |
| } | |
| table { | |
| background: rgba(0, 0, 0, 0.85); | |
| border: 2px solid rgba(0, 255, 65, 0.3); | |
| border-radius: 12px; | |
| padding: 20px; | |
| margin-top: 20px; | |
| width: 100%; | |
| border-collapse: separate; | |
| border-spacing: 0; | |
| font-family: 'Rajdhani', monospace; | |
| box-shadow: | |
| inset 0 2px 20px rgba(0, 0, 0, 0.5), | |
| 0 0 20px rgba(0, 255, 65, 0.1); | |
| } | |
| th, td { | |
| padding: 14px; | |
| text-align: left; | |
| border-bottom: 1px solid rgba(0, 255, 65, 0.2); | |
| color: #00ff41; | |
| font-weight: 600; | |
| } | |
| th { | |
| background: rgba(0, 255, 65, 0.1); | |
| font-weight: 700; | |
| color: #00ff41; | |
| text-transform: uppercase; | |
| letter-spacing: 1px; | |
| font-family: 'Orbitron', monospace; | |
| } | |
| tr:hover { | |
| background: rgba(0, 255, 65, 0.05); | |
| } | |
| tr:last-child td { | |
| border-bottom: none; | |
| } | |
| .text-center { | |
| text-align: center; | |
| font-size: 36px; | |
| font-weight: 900; | |
| margin-bottom: 30px; | |
| color: #00ff41; | |
| font-family: 'Orbitron', monospace; | |
| text-transform: uppercase; | |
| letter-spacing: 4px; | |
| text-shadow: | |
| 0 0 10px rgba(0, 255, 65, 0.8), | |
| 0 0 20px rgba(0, 255, 65, 0.5), | |
| 0 0 40px rgba(0, 255, 65, 0.3); | |
| animation: titleGlow 2s ease-in-out infinite alternate; | |
| } | |
| @keyframes titleGlow { | |
| from { | |
| text-shadow: | |
| 0 0 10px rgba(0, 255, 65, 0.8), | |
| 0 0 20px rgba(0, 255, 65, 0.5), | |
| 0 0 40px rgba(0, 255, 65, 0.3); | |
| } | |
| to { | |
| text-shadow: | |
| 0 0 15px rgba(0, 255, 65, 1), | |
| 0 0 30px rgba(0, 255, 65, 0.7), | |
| 0 0 60px rgba(0, 255, 65, 0.5); | |
| } | |
| } | |
| h3 { | |
| color: #00ff41; | |
| font-family: 'Orbitron', monospace; | |
| font-weight: 700; | |
| text-transform: uppercase; | |
| letter-spacing: 2px; | |
| border-bottom: 2px solid rgba(0, 255, 65, 0.3); | |
| padding-bottom: 8px; | |
| margin-bottom: 16px; | |
| } | |
| .gr-json { | |
| background: rgba(0, 0, 0, 0.85); | |
| border: 2px solid rgba(0, 255, 65, 0.3); | |
| border-radius: 12px; | |
| color: #00ff41; | |
| font-family: 'Rajdhani', monospace; | |
| } | |
| /* Scrollbar styling */ | |
| ::-webkit-scrollbar { | |
| width: 12px; | |
| } | |
| ::-webkit-scrollbar-track { | |
| background: rgba(0, 0, 0, 0.5); | |
| border-radius: 6px; | |
| } | |
| ::-webkit-scrollbar-thumb { | |
| background: linear-gradient(180deg, #00ff41, #00cc33); | |
| border-radius: 6px; | |
| border: 2px solid rgba(0, 0, 0, 0.5); | |
| } | |
| ::-webkit-scrollbar-thumb:hover { | |
| background: linear-gradient(180deg, #00cc33, #008f2f); | |
| } | |
| /* Mobile responsiveness */ | |
| @media (max-width: 768px) { | |
| .gr-row { flex-direction: column; } | |
| .threat-card, table, .gr-chatbot { padding: 16px; } | |
| .gr-button { padding: 12px 20px; font-size: 14px; } | |
| .gr-chatbot .message { max-width: 95%; } | |
| .text-center { font-size: 24px; letter-spacing: 2px; } | |
| body { padding: 10px; } | |
| } | |
| /* Loading animation for better UX */ | |
| .loading { | |
| position: relative; | |
| overflow: hidden; | |
| } | |
| .loading::after { | |
| content: ''; | |
| position: absolute; | |
| top: 0; | |
| left: -100%; | |
| width: 100%; | |
| height: 100%; | |
| background: linear-gradient(90deg, transparent, rgba(0, 255, 65, 0.2), transparent); | |
| animation: loadingSweep 1.5s infinite; | |
| } | |
| @keyframes loadingSweep { | |
| 0% { left: -100%; } | |
| 100% { left: 100%; } | |
| } | |
| """) as demo: | |
| gr.Markdown( | |
| """ | |
| # π‘οΈ AI CYBERSECURITY AGENT π‘οΈ | |
| SECURE YOUR SYSTEMS WITH REAL-TIME DETECTION OF ALL THREATS | |
| """, | |
| elem_classes="text-center" | |
| ) | |
| state = gr.State(value={"classification": "AWAITING DATA", "severity": "Unknown", "mitigation": "None", "confidence": 0.0, "timestamp": ""}) | |
| with gr.Row(): | |
| with gr.Column(scale=2): | |
| chatbot = gr.Chatbot(label="π SECURITY ANALYST TERMINAL", type="messages", height=500) | |
| user_input = gr.Textbox(placeholder=">>> ENTER LOG DATA OR SECURITY ALERT (e.g., 'System compromised!', 'Trojan detected')", lines=3) | |
| file_input = gr.File(label="π UPLOAD THREAT FILE (.txt/.log/.py)", file_types=[".txt", ".log", ".py"]) | |
| submit_btn = gr.Button("π INITIATE SCAN") | |
| with gr.Column(scale=1): | |
| gr.Markdown("### π THREAT ANALYSIS RESULTS") | |
| output_json = gr.JSON(label="Scan Data") | |
| gr.Markdown("### β οΈ CURRENT THREAT STATUS") | |
| threat_card = gr.Markdown( | |
| """ | |
| <div class="threat-card" id="threat-card"> | |
| <p><strong>CLASSIFICATION:</strong> <span id="classification">AWAITING DATA</span></p> | |
| <p><strong>SEVERITY:</strong> <span class="severity-label severity-unknown" id="severity">UNKNOWN</span></p> | |
| <p><strong>MITIGATION:</strong> <span id="mitigation">NONE</span></p> | |
| <p><strong>CONFIDENCE:</strong> <span id="confidence">0.0</span></p> | |
| <p><strong>TIMESTAMP:</strong> <span id="timestamp">-</span></p> | |
| </div> | |
| """, | |
| visible=True | |
| ) | |
| gr.Markdown("### π THREAT HISTORY LOG") | |
| history_table = gr.Markdown( | |
| """ | |
| <table style='width:100%; border-collapse: collapse;'> | |
| <tr><th>TIMESTAMP</th><th>CLASSIFICATION</th><th>SEVERITY</th><th>CONFIDENCE</th></tr> | |
| <tr><td colspan='4'>NO THREATS DETECTED YET</td></tr> | |
| </table> | |
| """, | |
| visible=True | |
| ) | |
| submit_btn.click( | |
| fn=chatbot_response, | |
| inputs=[user_input, file_input, chatbot, state], | |
| outputs=[chatbot, output_json, state, threat_card, history_table] | |
| ) | |
| # Launch the app | |
| if __name__ == "__main__": | |
| demo.launch(mcp_server=True) |