import gradio as gr import requests import os import re import ast from collections import defaultdict from datetime import datetime, timedelta import json import time import logging from retrying import retry import base64 import pickle # Set up logging logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(levelname)s - %(message)s') logger = logging.getLogger(__name__) # Suppress warnings os.environ["HF_HUB_DISABLE_SYMLINKS_WARNING"] = "1" # Modal Mistral-7B API endpoint MODAL_API = os.getenv("MODAL_API", "https://rithvickkumar27--mistral-7b-api-analyze.modal.run") NVD_API_KEY = os.getenv("NVD_API_KEY") if not NVD_API_KEY: logger.error("NVD_API_KEY not set in environment variables, API queries will fail") # Retry decorator for Mistral-7B API @retry(stop_max_attempt_number=3, wait_fixed=2000) def call_mistral_llm(prompt): logger.debug(f"Sending prompt to Mistral-7B API: {prompt[:100]}...") try: response = requests.post(MODAL_API, json={"prompt": prompt}, timeout=120) logger.debug(f"Mistral-7B API response status: {response.status_code}") if response.status_code == 200: data = response.json() if "error" in data: logger.error(f"Mistral API error: {data['error']}") return f"Modal API error: {data['error']}" response_text = data.get("response", "LLM error: No response") logger.info("Mistral-7B response received successfully") return response_text else: logger.error(f"Mistral API error: Status {response.status_code}") return f"Modal API error: Status {response.status_code}" except requests.exceptions.RequestException as e: logger.error(f"Mistral API request failed: {e}") raise # NVD API query with caching def query_nvd(keywords): cache_dir = "corpus/cache" os.makedirs(cache_dir, exist_ok=True) cache_file = f"{cache_dir}/{keywords.replace(' ', '_')}.pkl" # Check cache if os.path.exists(cache_file): try: with open(cache_file, "rb") as f: cached_data = pickle.load(f) if time.time() - cached_data["timestamp"] < 86400: # Cache valid for 24 hours logger.debug(f"Using cached NVD data for: {keywords}") return cached_data["results"] except Exception as e: logger.warning(f"Cache read error: {e}") # Query NVD API try: url = "https://services.nvd.nist.gov/rest/json/cves/2.0" params = {"keywordSearch": keywords, "resultsPerPage": 10} headers = {"apiKey": NVD_API_KEY} response = requests.get(url, params=params, headers=headers, timeout=10) if response.status_code == 200: data = response.json() results = [ f"{item['cve']['id']}: {item['cve']['descriptions'][0]['value']}" for item in data.get("vulnerabilities", []) ] # Save to cache with open(cache_file, "wb") as f: pickle.dump({"timestamp": time.time(), "results": results}, f) logger.info(f"Fetched {len(results)} CVEs from NVD for: {keywords}") return results elif response.status_code == 429: logger.error("NVD rate limit exceeded") else: logger.error(f"NVD API error: Status {response.status_code}") except Exception as e: logger.error(f"NVD API request failed: {e}") return None # Basic Python code analysis def analyze_python_code(content: str) -> dict: try: tree = ast.parse(content) suspicious_patterns = [] for node in ast.walk(tree): if isinstance(node, ast.Call) and isinstance(node.func, ast.Attribute): if node.func.attr == 'b64decode' and isinstance(node.func.value, ast.Name) and node.func.value.id == 'base64': suspicious_patterns.append("Base64 decoding detected") if isinstance(node, ast.Call) and isinstance(node.func, ast.Name) and node.func.id == 'exec': suspicious_patterns.append("Dynamic code execution (exec)") if isinstance(node, ast.Import) or isinstance(node, ast.ImportFrom): for name in (node.names if isinstance(node, ast.Import) else node.names): if name.name in ['urllib', 'urllib.request', 'requests']: suspicious_patterns.append(f"Suspicious import: {name.name}") if isinstance(node, ast.Str) or (isinstance(node, ast.Constant) and isinstance(node.value, str)): if re.search(r'http[s]?://.*(evil|malicious|bad)[^\s]*', node.value, re.IGNORECASE): suspicious_patterns.append(f"Suspicious URL: {node.value}") if suspicious_patterns: return { "classification": "Malware Detected", "severity": "Critical", "mitigation": "Quarantine file, run antivirus, block suspicious URLs", "confidence": 0.9, "details": suspicious_patterns } except SyntaxError: logger.warning("Invalid Python syntax in file") return None # Enhanced DSATP log parsing def dsatp_parse_log(text: str) -> dict: log = text.lower() lines = log.split('\n') detected_threats = [] # Comprehensive threat dictionary threats = { "compromised": {"classification": "System Compromise", "severity": "Critical", "mitigation": "Isolate process, run port scan, reset credentials"}, "unauthorized": {"classification": "Unauthorized Access", "severity": "High", "mitigation": "Quarantine MAC address, review access logs"}, "high cpu": {"classification": "Resource Abuse", "severity": "Medium", "mitigation": "Check for crypto-miner or DoS, limit resource usage"}, "inbound traffic": {"classification": "Network Intrusion", "severity": "Medium", "mitigation": "Block closed ports, enable firewall rules"}, "firmware mismatch": {"classification": "Firmware Vulnerability", "severity": "High", "mitigation": "Validate OTA or rollback, update firmware"}, "ddos": {"classification": "DDoS Attack", "severity": "Critical", "mitigation": "Rate-limit traffic, enable DDoS protection"}, "phishing": {"classification": "Phishing Attempt", "severity": "High", "mitigation": "Block malicious URLs, educate users"}, "sql injection": {"classification": "SQL Injection", "severity": "Critical", "mitigation": "Sanitize inputs, patch database"}, "xss": {"classification": "Cross-Site Scripting", "severity": "High", "mitigation": "Escape HTML, update web apps"}, "privilege escalation": {"classification": "Privilege Escalation", "severity": "Critical", "mitigation": "Patch vulnerabilities, restrict permissions"}, "trojan": {"classification": "Malware Detected", "severity": "Critical", "mitigation": "Quarantine file, run antivirus"}, "ransomware": {"classification": "Malware Detected", "severity": "Critical", "mitigation": "Quarantine file, run antivirus, restore from backup"}, "heuristic.behavior.suspicious": {"classification": "Suspicious Activity", "severity": "High", "mitigation": "Monitor process, run memory scan"}, "malicious": {"classification": "Malware Detected", "severity": "Critical", "mitigation": "Quarantine file, run antivirus, block malicious URLs"}, "ufw block": {"classification": "Network Intrusion", "severity": "High", "mitigation": "Investigate blocked IP, strengthen firewall rules"}, "sudo": {"classification": "Privilege Escalation", "severity": "High", "mitigation": "Audit user permissions, review sudo logs"}, "reverse ssh": {"classification": "Persistence Mechanism", "severity": "Critical", "mitigation": "Disable unauthorized SSH services, inspect network connections"}, "failed password": {"classification": "Brute-Force Attempt", "severity": "Critical", "mitigation": "Block suspicious IPs, disable password-based SSH, enable fail2ban"}, "invalid user": {"classification": "Brute-Force Attempt", "severity": "Critical", "mitigation": "Block suspicious IPs, disable password-based SSH, enable fail2ban"} } # Advanced threat detection failed_attempts = defaultdict(list) suspicious_terms = [ "failed password", "invalid user", "error", "denied", "malformed packet", "flood", "syn flood", "http flood", "suspicious url", "script tag", "sqlmap", "union select", "escalation attempt", "rootkit", "yara_match", "wget", "curl", "bash", "sh", "payload", "ufw", "sudo", "root", "ssh", "cron", "systemd", "base64", "exec" ] ip_pattern = r'\b\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}\b' timestamp_pattern = r'\w{3}\s+\d{1,2}\s+\d{2}:\d{2}:\d{2}' url_pattern = r'(https?://[^\s]+)' sql_pattern = r'(union\s+select|select\s+.*\s+from|drop\s+table)' xss_pattern = r'(