Rithvickkr
init 7
f0ada1f
import gradio as gr
import requests
import os
import re
import ast
from collections import defaultdict
from datetime import datetime, timedelta
import json
import time
import logging
from retrying import retry
import base64
import pickle
# Set up logging
logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
# Suppress warnings
os.environ["HF_HUB_DISABLE_SYMLINKS_WARNING"] = "1"
# Modal Mistral-7B API endpoint
MODAL_API = os.getenv("MODAL_API", "https://rithvickkumar27--mistral-7b-api-analyze.modal.run")
NVD_API_KEY = os.getenv("NVD_API_KEY")
if not NVD_API_KEY:
logger.error("NVD_API_KEY not set in environment variables, API queries will fail")
# Retry decorator for Mistral-7B API
@retry(stop_max_attempt_number=3, wait_fixed=2000)
def call_mistral_llm(prompt):
logger.debug(f"Sending prompt to Mistral-7B API: {prompt[:100]}...")
try:
response = requests.post(MODAL_API, json={"prompt": prompt}, timeout=120)
logger.debug(f"Mistral-7B API response status: {response.status_code}")
if response.status_code == 200:
data = response.json()
if "error" in data:
logger.error(f"Mistral API error: {data['error']}")
return f"Modal API error: {data['error']}"
response_text = data.get("response", "LLM error: No response")
logger.info("Mistral-7B response received successfully")
return response_text
else:
logger.error(f"Mistral API error: Status {response.status_code}")
return f"Modal API error: Status {response.status_code}"
except requests.exceptions.RequestException as e:
logger.error(f"Mistral API request failed: {e}")
raise
# NVD API query with caching
def query_nvd(keywords):
cache_dir = "corpus/cache"
os.makedirs(cache_dir, exist_ok=True)
cache_file = f"{cache_dir}/{keywords.replace(' ', '_')}.pkl"
# Check cache
if os.path.exists(cache_file):
try:
with open(cache_file, "rb") as f:
cached_data = pickle.load(f)
if time.time() - cached_data["timestamp"] < 86400: # Cache valid for 24 hours
logger.debug(f"Using cached NVD data for: {keywords}")
return cached_data["results"]
except Exception as e:
logger.warning(f"Cache read error: {e}")
# Query NVD API
try:
url = "https://services.nvd.nist.gov/rest/json/cves/2.0"
params = {"keywordSearch": keywords, "resultsPerPage": 10}
headers = {"apiKey": NVD_API_KEY}
response = requests.get(url, params=params, headers=headers, timeout=10)
if response.status_code == 200:
data = response.json()
results = [
f"{item['cve']['id']}: {item['cve']['descriptions'][0]['value']}"
for item in data.get("vulnerabilities", [])
]
# Save to cache
with open(cache_file, "wb") as f:
pickle.dump({"timestamp": time.time(), "results": results}, f)
logger.info(f"Fetched {len(results)} CVEs from NVD for: {keywords}")
return results
elif response.status_code == 429:
logger.error("NVD rate limit exceeded")
else:
logger.error(f"NVD API error: Status {response.status_code}")
except Exception as e:
logger.error(f"NVD API request failed: {e}")
return None
# Basic Python code analysis
def analyze_python_code(content: str) -> dict:
try:
tree = ast.parse(content)
suspicious_patterns = []
for node in ast.walk(tree):
if isinstance(node, ast.Call) and isinstance(node.func, ast.Attribute):
if node.func.attr == 'b64decode' and isinstance(node.func.value, ast.Name) and node.func.value.id == 'base64':
suspicious_patterns.append("Base64 decoding detected")
if isinstance(node, ast.Call) and isinstance(node.func, ast.Name) and node.func.id == 'exec':
suspicious_patterns.append("Dynamic code execution (exec)")
if isinstance(node, ast.Import) or isinstance(node, ast.ImportFrom):
for name in (node.names if isinstance(node, ast.Import) else node.names):
if name.name in ['urllib', 'urllib.request', 'requests']:
suspicious_patterns.append(f"Suspicious import: {name.name}")
if isinstance(node, ast.Str) or (isinstance(node, ast.Constant) and isinstance(node.value, str)):
if re.search(r'http[s]?://.*(evil|malicious|bad)[^\s]*', node.value, re.IGNORECASE):
suspicious_patterns.append(f"Suspicious URL: {node.value}")
if suspicious_patterns:
return {
"classification": "Malware Detected",
"severity": "Critical",
"mitigation": "Quarantine file, run antivirus, block suspicious URLs",
"confidence": 0.9,
"details": suspicious_patterns
}
except SyntaxError:
logger.warning("Invalid Python syntax in file")
return None
# Enhanced DSATP log parsing
def dsatp_parse_log(text: str) -> dict:
log = text.lower()
lines = log.split('\n')
detected_threats = []
# Comprehensive threat dictionary
threats = {
"compromised": {"classification": "System Compromise", "severity": "Critical", "mitigation": "Isolate process, run port scan, reset credentials"},
"unauthorized": {"classification": "Unauthorized Access", "severity": "High", "mitigation": "Quarantine MAC address, review access logs"},
"high cpu": {"classification": "Resource Abuse", "severity": "Medium", "mitigation": "Check for crypto-miner or DoS, limit resource usage"},
"inbound traffic": {"classification": "Network Intrusion", "severity": "Medium", "mitigation": "Block closed ports, enable firewall rules"},
"firmware mismatch": {"classification": "Firmware Vulnerability", "severity": "High", "mitigation": "Validate OTA or rollback, update firmware"},
"ddos": {"classification": "DDoS Attack", "severity": "Critical", "mitigation": "Rate-limit traffic, enable DDoS protection"},
"phishing": {"classification": "Phishing Attempt", "severity": "High", "mitigation": "Block malicious URLs, educate users"},
"sql injection": {"classification": "SQL Injection", "severity": "Critical", "mitigation": "Sanitize inputs, patch database"},
"xss": {"classification": "Cross-Site Scripting", "severity": "High", "mitigation": "Escape HTML, update web apps"},
"privilege escalation": {"classification": "Privilege Escalation", "severity": "Critical", "mitigation": "Patch vulnerabilities, restrict permissions"},
"trojan": {"classification": "Malware Detected", "severity": "Critical", "mitigation": "Quarantine file, run antivirus"},
"ransomware": {"classification": "Malware Detected", "severity": "Critical", "mitigation": "Quarantine file, run antivirus, restore from backup"},
"heuristic.behavior.suspicious": {"classification": "Suspicious Activity", "severity": "High", "mitigation": "Monitor process, run memory scan"},
"malicious": {"classification": "Malware Detected", "severity": "Critical", "mitigation": "Quarantine file, run antivirus, block malicious URLs"},
"ufw block": {"classification": "Network Intrusion", "severity": "High", "mitigation": "Investigate blocked IP, strengthen firewall rules"},
"sudo": {"classification": "Privilege Escalation", "severity": "High", "mitigation": "Audit user permissions, review sudo logs"},
"reverse ssh": {"classification": "Persistence Mechanism", "severity": "Critical", "mitigation": "Disable unauthorized SSH services, inspect network connections"},
"failed password": {"classification": "Brute-Force Attempt", "severity": "Critical", "mitigation": "Block suspicious IPs, disable password-based SSH, enable fail2ban"},
"invalid user": {"classification": "Brute-Force Attempt", "severity": "Critical", "mitigation": "Block suspicious IPs, disable password-based SSH, enable fail2ban"}
}
# Advanced threat detection
failed_attempts = defaultdict(list)
suspicious_terms = [
"failed password", "invalid user", "error", "denied", "malformed packet",
"flood", "syn flood", "http flood", "suspicious url", "script tag",
"sqlmap", "union select", "escalation attempt", "rootkit", "yara_match",
"wget", "curl", "bash", "sh", "payload", "ufw", "sudo", "root", "ssh",
"cron", "systemd", "base64", "exec"
]
ip_pattern = r'\b\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}\b'
timestamp_pattern = r'\w{3}\s+\d{1,2}\s+\d{2}:\d{2}:\d{2}'
url_pattern = r'(https?://[^\s]+)'
sql_pattern = r'(union\s+select|select\s+.*\s+from|drop\s+table)'
xss_pattern = r'(<script>|on\w+\s*=|javascript:)'
yara_pattern = r'yara_match:\s*([\w\.]+)\s+detected'
script_pattern = r'(wget|curl)\s+.*(http[s]?://[^\s]+)\s*.*\.(sh|bash|exe)'
sudo_pattern = r'sudo:.*user=root\s*;'
ssh_pattern = r'reverse\s+ssh|tunnel\s+service'
malicious_url_pattern = r'http[s]?://.*(malicious|payload|evil)[^\s]*'
for line in lines:
# YARA match detection
yara_match = re.search(yara_pattern, line)
if yara_match:
threat_type = yara_match.group(1).lower()
if "trojan" in threat_type:
detected_threats.append(threats["trojan"] | {"confidence": 0.95})
elif "ransomware" in threat_type:
detected_threats.append(threats["ransomware"] | {"confidence": 0.95})
elif "heuristic" in threat_type:
detected_threats.append(threats["heuristic.behavior.suspicious"] | {"confidence": 0.85})
# Malicious script download
if re.search(script_pattern, line, re.IGNORECASE) or re.search(malicious_url_pattern, line, re.IGNORECASE):
detected_threats.append(threats["malicious"] | {"confidence": 0.95})
# Base64 and exec detection
if "base64.b64decode" in line and "exec" in line:
detected_threats.append(threats["malicious"] | {"confidence": 0.95})
# Firewall block
if re.search(r'ufw\s+block', line, re.IGNORECASE):
ip_match = re.search(ip_pattern, line)
ip = ip_match.group() if ip_match else "unknown IP"
detected_threats.append(threats["ufw block"] | {"mitigation": f"Investigate blocked IP {ip}, strengthen firewall rules", "confidence": 0.9})
# Privilege escalation
if re.search(sudo_pattern, line, re.IGNORECASE):
detected_threats.append(threats["sudo"] | {"confidence": 0.85})
# Reverse SSH tunnel
if re.search(ssh_pattern, line, re.IGNORECASE):
detected_threats.append(threats["reverse ssh"] | {"confidence": 0.95})
# Brute-force detection
if "failed password" in line or "invalid user" in line:
ip_match = re.search(ip_pattern, line)
time_match = re.search(timestamp_pattern, line)
if ip_match and time_match:
ip = ip_match.group()
try:
timestamp = datetime.strptime(time_match.group(), "%b %d %H:%M:%S")
timestamp = timestamp.replace(year=2025)
failed_attempts[ip].append(timestamp)
except ValueError:
continue
# Phishing, SQL Injection, XSS
if re.search(url_pattern, line) and "phishing" in line:
detected_threats.append(threats["phishing"] | {"confidence": 0.85})
if re.search(sql_pattern, line, re.IGNORECASE):
detected_threats.append(threats["sql injection"] | {"confidence": 0.9})
if re.search(xss_pattern, line, re.IGNORECASE):
detected_threats.append(threats["xss"] | {"confidence": 0.85})
# Brute-force and DDoS detection
for ip, timestamps in failed_attempts.items():
if len(timestamps) >= 5:
time_window = max(timestamps) - min(timestamps)
if time_window <= timedelta(seconds=60):
detected_threats.append({
"classification": "Brute-Force Attempt",
"severity": "Critical",
"mitigation": f"Block IP {ip}, disable password-based SSH, enable fail2ban",
"confidence": 0.95
})
elif len(timestamps) >= 10 and time_window <= timedelta(minutes=5):
detected_threats.append({
"classification": "DDoS Attack",
"severity": "Critical",
"mitigation": f"Rate-limit traffic from {ip}, enable DDoS protection",
"confidence": 0.9
})
# Suspicious activity
suspicious_count = sum(line.count(term) for term in suspicious_terms for line in lines)
if suspicious_count >= 5 and not detected_threats:
detected_threats.append({
"classification": "Suspicious Activity",
"severity": "Medium",
"mitigation": "Review logs for anomalies, monitor affected IPs",
"confidence": 0.75
})
# Select the highest-severity threat
if detected_threats:
severity_order = {"Critical": 3, "High": 2, "Medium": 1, "Safe": 0}
highest_threat = max(detected_threats, key=lambda x: (severity_order.get(x["severity"], 0), x["confidence"]))
logger.info(f"Detected threats: {len(detected_threats)}, Selected: {highest_threat}")
return highest_threat | {"all_threats": detected_threats}
logger.info("No threats detected")
return {"classification": "No Threat", "severity": "Safe", "mitigation": "None", "confidence": 0.5, "all_threats": []}
# Enhanced DSATP YARA scanning
def dsatp_yara_scan(file_path: str) -> dict:
try:
# Read file content for Python analysis
with open(file_path, 'r', encoding='utf-8', errors='ignore') as f:
content = f.read()
# Analyze Python code if it's a .py file
if file_path.endswith('.py'):
python_analysis = analyze_python_code(content)
if python_analysis:
return python_analysis | {"all_threats": [python_analysis]}
import yara
rules = yara.compile(source="""
rule BruteForceLog {
strings:
$failed = "failed password" nocase
$invalid = "invalid user" nocase
$denied = "denied" nocase
condition:
($failed and #failed >= 3) or ($invalid and #invalid >= 3) or ($denied and #denied >= 3)
}
rule MalwareLog {
strings:
$mirai = "mirai" nocase
$botnet = "botnet" nocase
$exploit = "exploit" nocase
$rootkit = "rootkit" nocase
$trojan = "trojan" nocase
$ransomware = "ransomware" nocase
$wget = "wget" nocase
$curl = "curl" nocase
$payload = "payload" nocase
$malicious = "malicious" nocase
$evil = "evil" nocase
condition:
any of them
}
rule PythonMalware {
strings:
$base64 = "base64.b64decode" nocase
$exec = "exec" nocase
$urllib = "urllib.request" nocase
$requests = "requests.get" nocase
$url = "http://" nocase
$url2 = "https://" nocase
$evil_url = "evil.com" nocase
condition:
($base64 and $exec) or ($urllib and ($url or $url2 or $evil_url)) or ($requests and ($url or $url2 or $evil_url))
}
rule SuspiciousBehavior {
strings:
$heuristic = "heuristic" nocase
$suspicious = "suspicious" nocase
$ufw = "ufw block" nocase
condition:
any of them
}
rule PhishingLog {
strings:
$phish = "phishing" nocase
$url = "http://" nocase
$url2 = "https://" nocase
condition:
$phish or ($url and #url >= 2) or ($url2 and #url2 >= 2)
}
rule DDoSLog {
strings:
$flood = "flood" nocase
$syn = "syn" nocase
$http_flood = "http flood" nocase
condition:
any of them
}
rule SQLInjectionLog {
strings:
$sql = "union select" nocase
$sql2 = "drop table" nocase
condition:
any of them
}
rule XSSLog {
strings:
$xss = "<script>" nocase
$xss2 = "javascript:" nocase
condition:
any of them
}
rule PersistenceLog {
strings:
$ssh = "reverse ssh" nocase
$tunnel = "tunnel service" nocase
$sudo = "sudo:.*user=root" nocase
condition:
any of them
}
""")
matches = rules.match(file_path)
detected_threats = []
if matches:
for match in matches:
if match.rule == "BruteForceLog":
detected_threats.append({
"classification": "Brute-Force Attempt",
"severity": "Critical",
"mitigation": "Block suspicious IPs, disable password-based SSH, enable fail2ban",
"confidence": 0.95
})
elif match.rule == "MalwareLog" or match.rule == "PythonMalware":
detected_threats.append({
"classification": "Malware Detected",
"severity": "Critical",
"mitigation": "Quarantine file, run antivirus",
"confidence": 0.9
})
elif match.rule == "SuspiciousBehavior":
detected_threats.append({
"classification": "Suspicious Activity",
"severity": "High",
"mitigation": "Monitor process, run memory scan",
"confidence": 0.85
})
elif match.rule == "PhishingLog":
detected_threats.append({
"classification": "Phishing Attempt",
"severity": "High",
"mitigation": "Block malicious URLs, educate users",
"confidence": 0.85
})
elif match.rule == "DDoSLog":
detected_threats.append({
"classification": "DDoS Attack",
"severity": "Critical",
"mitigation": "Rate-limit traffic, enable DDoS protection",
"confidence": 0.9
})
elif match.rule == "SQLInjectionLog":
detected_threats.append({
"classification": "SQL Injection",
"severity": "Critical",
"mitigation": "Sanitize inputs, patch database",
"confidence": 0.9
})
elif match.rule == "XSSLog":
detected_threats.append({
"classification": "Cross-Site Scripting",
"severity": "High",
"mitigation": "Escape HTML, update web apps",
"confidence": 0.85
})
elif match.rule == "PersistenceLog":
detected_threats.append({
"classification": "Persistence Mechanism",
"severity": "Critical",
"mitigation": "Disable unauthorized SSH services, inspect network connections",
"confidence": 0.95
})
# Select the highest-severity threat
if detected_threats:
severity_order = {"Critical": 3, "High": 2, "Medium": 1, "Safe": 0}
highest_threat = max(detected_threats, key=lambda x: (severity_order.get(x["severity"], 0), x["confidence"]))
logger.info(f"YARA scan detected threats: {len(detected_threats)}, Selected: {highest_threat}")
return highest_threat | {"all_threats": detected_threats}
logger.info("YARA scan: No threats detected")
return {
"classification": "No Malware",
"severity": "Safe",
"mitigation": "None",
"confidence": 0.7,
"all_threats": []
}
except Exception as e:
logger.error(f"YARA scan error: {e}")
return {"error": str(e), "severity": "Unknown", "mitigation": "Check file format", "all_threats": []}
# Chatbot function
def chatbot_response(user_input, file, history, state):
if history is None:
history = []
input_text = user_input
scan_result = None
all_threats = []
start_time = time.time()
if file:
try:
input_text = open(file.name, "r").read()
scan_result = dsatp_yara_scan(file.name)
except Exception as e:
scan_result = {"error": f"File error: {e}", "severity": "Unknown", "mitigation": "Check file", "all_threats": []}
else:
scan_result = dsatp_parse_log(input_text)
all_threats = scan_result.get("all_threats", [])
context_str = "No relevant vulnerabilities found."
if NVD_API_KEY:
try:
# Map classification to precise keywords for relevant CVEs
threat_keywords = {
"Brute-Force Attempt": "brute force ssh login attempt authentication failure openssh password attack cwe-287 cwe-307",
"Malware Detected": "malware trojan ransomware payload malicious script backdoor virus cwe-94 cwe-506 cwe-119 code injection python",
"Network Intrusion": "firewall intrusion ufw network attack port scan unauthorized access cwe-284",
"Privilege Escalation": "privilege escalation sudo root unauthorized access cwe-269 cwe-250",
"Persistence Mechanism": "ssh tunnel reverse ssh persistence backdoor remote access cwe-284",
"System Compromise": "compromise breach unauthorized access cwe-284",
"Unauthorized Access": "unauthorized access login failure cwe-287",
"Resource Abuse": "resource abuse crypto-miner denial of service cwe-400",
"Firmware Vulnerability": "firmware vulnerability iot cwe-119",
"DDoS Attack": "ddos denial of service network flood cwe-400",
"Phishing Attempt": "phishing malicious url social engineering cwe-601",
"SQL Injection": "sql injection database attack cwe-89",
"Cross-Site Scripting": "xss cross-site scripting web attack cwe-79",
"Suspicious Activity": "suspicious activity anomaly heuristic cwe-693"
}
classification = scan_result.get("classification", "unknown")
keywords = threat_keywords.get(classification, "security threat").replace(',', '')
nvd_results = query_nvd(keywords)
if nvd_results:
context_items = []
seen_cves = set()
for result in nvd_results[:3]:
cve_matches = re.findall(r'CVE-\d{4}-\d{5,7}', result)
if cve_matches and cve_matches[0] not in seen_cves:
if any(keyword.lower() in result.lower() for keyword in keywords.split()):
context_items.append(result)
seen_cves.add(cve_matches[0])
context_str = "\n\n".join(context_items) if context_items else "No relevant vulnerabilities found for this threat."
logger.debug(f"NVD query: {keywords}, Results: {len(nvd_results) if nvd_results else 0}")
except Exception as e:
logger.error(f"NVD error: {e}")
context_str = f"Context error: {e}"
if "error" not in scan_result:
other_threats_summary = ""
if len(all_threats) > 1:
other_threats = [t for t in all_threats if t != scan_result]
other_threats_summary = "\nOther detected threats include:\n" + "\n".join(
[f"- {t['classification']} (Severity: {t['severity']}, Confidence: {t['confidence']:.1f})" for t in other_threats]
)
prompt = f"""
CLASSIFICATION: {scan_result['classification']}
SEVERITY: {scan_result['severity']}
CONFIDENCE: {scan_result['confidence']:.1f}
THREAT DETAILS: {other_threats_summary}
IMMEDIATE RESPONSE REQUIRED:
THREAT ASSESSMENT: Analyze this security incident with precision. Provide tactical recommendations and threat intelligence.
DIRECTIVE: Generate a concise, actionable security briefing. Focus on:
1. Primary threat vector and attack methodology
2. Immediate containment procedures
3. Risk escalation potential
4. Technical countermeasures
RESPONSE FORMAT: Direct, technical, no pleasantries. Use cybersecurity terminology. Prioritize critical actions first.
CONTEXT: Real-time threat detected in production environment. Response time critical.
EXECUTE ANALYSIS:"""
try:
llm_response = call_mistral_llm(prompt)
except Exception as e:
logger.warning(f"Mistral-7B failed after retries: {e}, using fallback response")
llm_response = f"A {scan_result['severity'].lower()} {scan_result['classification'].lower()} was detected. Recommended actions: {scan_result['mitigation']}." + other_threats_summary
response = f"**Classification**: {scan_result['classification']}\n**Severity**: {scan_result['severity']}\n**Mitigation**: {scan_result['mitigation']}\n**Confidence**: {scan_result['confidence']:.1f}\n\n{llm_response}\n\n**Context**:\n{context_str}"
else:
response = f"**Error**: {scan_result['error']}\n**Context**:\n{context_str}"
updated_history = history + [
{"role": "user", "content": input_text or "File uploaded"},
{"role": "assistant", "content": response}
]
# Update state
state = {
"classification": scan_result.get("classification", "Awaiting Data"),
"severity": scan_result.get("severity", "Unknown"),
"mitigation": scan_result.get("mitigation", "None"),
"confidence": scan_result.get("confidence", 0.0),
"timestamp": datetime.now().strftime("%Y-%m-%d %H:%M:%S")
}
# Generate threat card
threat_card_content = f"""
<div class="threat-card" id="threat-card">
<p><strong>Classification:</strong> <span id="classification">{state['classification']}</span></p>
<p><strong>Severity:</strong> <span class="severity-label severity-{state['severity'].lower()}" id="severity">{state['severity']}</span></p>
<p><strong>Mitigation:</strong> <span id="mitigation">{state['mitigation']}</span></p>
<p><strong>Confidence:</strong> <span id="confidence">{state['confidence']:.1f}</span></p>
<p><strong>Timestamp:</strong> <span id="timestamp">{state['timestamp']}</span></p>
</div>
"""
# Update threat history
if not hasattr(chatbot_response, "threat_history"):
chatbot_response.threat_history = []
chatbot_response.threat_history.append(state)
if len(chatbot_response.threat_history) > 10:
chatbot_response.threat_history.pop(0)
# Generate threat history table
history_table = "<table style='width:100%; border-collapse: collapse;'><tr><th>Timestamp</th><th>Classification</th><th>Severity</th><th>Confidence</th></tr>"
for entry in reversed(chatbot_response.threat_history):
history_table += f"<tr><td>{entry['timestamp']}</td><td>{entry['classification']}</td><td>{entry['severity']}</td><td>{entry['confidence']:.1f}</td></tr>"
history_table += "</table>"
logger.info(f"Response generated in {time.time() - start_time:.2f} seconds")
return updated_history, scan_result, state, threat_card_content, history_table
# Gradio interface
with gr.Blocks(css="""
@import url('https://fonts.googleapis.com/css2?family=Orbitron:wght@400;700;900&family=Rajdhani:wght@300;400;600;700&display=swap');
body {
background:
radial-gradient(circle at 20% 50%, rgba(120, 119, 198, 0.3) 0%, transparent 50%),
radial-gradient(circle at 80% 20%, rgba(255, 119, 48, 0.3) 0%, transparent 50%),
radial-gradient(circle at 40% 80%, rgba(120, 119, 198, 0.2) 0%, transparent 50%),
linear-gradient(135deg, #0a0a0a 0%, #1a1a2e 25%, #16213e 50%, #0f0f0f 100%);
color: #00ff41;
font-family: 'Rajdhani', monospace;
margin: 0;
padding: 20px;
min-height: 100vh;
background-attachment: fixed;
position: relative;
overflow-x: hidden;
}
body::before {
content: '';
position: fixed;
top: 0;
left: 0;
right: 0;
bottom: 0;
background:
repeating-linear-gradient(
90deg,
transparent,
transparent 100px,
rgba(0, 255, 65, 0.03) 100px,
rgba(0, 255, 65, 0.03) 101px
),
repeating-linear-gradient(
0deg,
transparent,
transparent 100px,
rgba(0, 255, 65, 0.03) 100px,
rgba(0, 255, 65, 0.03) 101px
);
pointer-events: none;
z-index: 1;
}
.gradio-container {
position: relative;
z-index: 2;
}
.threat-card {
background:
linear-gradient(135deg,
rgba(0, 0, 0, 0.9) 0%,
rgba(13, 13, 13, 0.95) 50%,
rgba(0, 0, 0, 0.9) 100%
);
color: #00ff41;
padding: 24px;
border-radius: 16px;
border: 2px solid rgba(0, 255, 65, 0.3);
backdrop-filter: blur(20px);
box-shadow:
0 0 30px rgba(0, 255, 65, 0.2),
inset 0 1px 0 rgba(0, 255, 65, 0.1),
0 8px 32px rgba(0, 0, 0, 0.5);
margin-bottom: 24px;
transition: all 0.4s cubic-bezier(0.4, 0, 0.2, 1);
animation: threatCardGlow 3s ease-in-out infinite alternate, fadeIn 0.8s ease-out;
position: relative;
overflow: hidden;
}
.threat-card::before {
content: '';
position: absolute;
top: 0;
left: -100%;
width: 100%;
height: 100%;
background: linear-gradient(90deg, transparent, rgba(0, 255, 65, 0.1), transparent);
transition: left 0.8s ease;
}
.threat-card:hover::before {
left: 100%;
}
@keyframes threatCardGlow {
0% {
box-shadow:
0 0 20px rgba(0, 255, 65, 0.2),
inset 0 1px 0 rgba(0, 255, 65, 0.1),
0 8px 32px rgba(0, 0, 0, 0.5);
}
100% {
box-shadow:
0 0 40px rgba(0, 255, 65, 0.4),
inset 0 1px 0 rgba(0, 255, 65, 0.2),
0 12px 40px rgba(0, 0, 0, 0.6);
}
}
@keyframes fadeIn {
from {
opacity: 0;
transform: translateY(20px) scale(0.95);
}
to {
opacity: 1;
transform: translateY(0) scale(1);
}
}
.threat-card:hover {
transform: translateY(-8px);
border-color: rgba(0, 255, 65, 0.6);
box-shadow:
0 0 50px rgba(0, 255, 65, 0.4),
0 20px 60px rgba(0, 0, 0, 0.7);
}
.severity-critical {
background: linear-gradient(45deg, #ff0040, #cc0033);
color: #ffffff;
padding: 8px 16px;
border-radius: 8px;
font-weight: 700;
font-family: 'Orbitron', monospace;
display: inline-block;
text-transform: uppercase;
letter-spacing: 1px;
border: 1px solid #ff0040;
box-shadow: 0 0 20px rgba(255, 0, 64, 0.5);
animation: criticalPulse 1.5s ease-in-out infinite;
}
@keyframes criticalPulse {
0%, 100% { box-shadow: 0 0 20px rgba(255, 0, 64, 0.5); }
50% { box-shadow: 0 0 30px rgba(255, 0, 64, 0.8); }
}
.severity-high {
background: linear-gradient(45deg, #ff8c00, #ff6600);
color: #ffffff;
padding: 8px 16px;
border-radius: 8px;
font-weight: 700;
font-family: 'Orbitron', monospace;
text-transform: uppercase;
letter-spacing: 1px;
border: 1px solid #ff8c00;
box-shadow: 0 0 15px rgba(255, 140, 0, 0.4);
}
.severity-medium {
background: linear-gradient(45deg, #ffaa00, #ff9900);
color: #000000;
padding: 8px 16px;
border-radius: 8px;
font-weight: 700;
font-family: 'Orbitron', monospace;
text-transform: uppercase;
letter-spacing: 1px;
border: 1px solid #ffaa00;
box-shadow: 0 0 15px rgba(255, 170, 0, 0.4);
}
.severity-safe {
background: linear-gradient(45deg, #00ff41, #00cc33);
color: #000000;
padding: 8px 16px;
border-radius: 8px;
font-weight: 700;
font-family: 'Orbitron', monospace;
text-transform: uppercase;
letter-spacing: 1px;
border: 1px solid #00ff41;
box-shadow: 0 0 15px rgba(0, 255, 65, 0.4);
}
.severity-unknown {
background: linear-gradient(45deg, #666666, #555555);
color: #ffffff;
padding: 8px 16px;
border-radius: 8px;
font-weight: 700;
font-family: 'Orbitron', monospace;
text-transform: uppercase;
letter-spacing: 1px;
border: 1px solid #666666;
box-shadow: 0 0 15px rgba(102, 102, 102, 0.4);
}
.gr-button {
background: linear-gradient(45deg, #00ff41, #00cc33, #008f2f);
color: #000000;
border: 2px solid #00ff41;
padding: 14px 28px;
border-radius: 12px;
font-size: 16px;
font-weight: 700;
font-family: 'Orbitron', monospace;
text-transform: uppercase;
letter-spacing: 2px;
transition: all 0.3s cubic-bezier(0.4, 0, 0.2, 1);
cursor: pointer;
position: relative;
overflow: hidden;
box-shadow:
0 0 20px rgba(0, 255, 65, 0.3),
0 4px 15px rgba(0, 0, 0, 0.3);
}
.gr-button::before {
content: '';
position: absolute;
top: 0;
left: -100%;
width: 100%;
height: 100%;
background: linear-gradient(90deg, transparent, rgba(255, 255, 255, 0.2), transparent);
transition: left 0.6s ease;
}
.gr-button:hover::before {
left: 100%;
}
.gr-button:hover {
background: linear-gradient(45deg, #ff0040, #cc0033, #990026);
border-color: #ff0040;
color: #ffffff;
transform: translateY(-3px);
box-shadow:
0 0 30px rgba(255, 0, 64, 0.5),
0 8px 25px rgba(0, 0, 0, 0.4);
}
.gr-button:active {
transform: translateY(-1px);
}
.gr-button:disabled {
background: linear-gradient(45deg, #333333, #222222);
border-color: #444444;
color: #666666;
cursor: not-allowed;
box-shadow: none;
}
.gr-button:disabled::after {
content: " SCANNING...";
animation: scanPulse 1.2s infinite;
}
@keyframes scanPulse {
0% { opacity: 1; }
50% { opacity: 0.4; }
100% { opacity: 1; }
}
.gr-textbox, .gr-file {
background: rgba(0, 0, 0, 0.8);
color: #00ff41;
border: 2px solid rgba(0, 255, 65, 0.3);
border-radius: 12px;
padding: 14px;
font-family: 'Rajdhani', monospace;
font-size: 16px;
transition: all 0.3s ease;
box-shadow: inset 0 2px 10px rgba(0, 0, 0, 0.5);
}
.gr-textbox:focus, .gr-file:focus {
border-color: #00ff41;
outline: none;
box-shadow:
inset 0 2px 10px rgba(0, 0, 0, 0.5),
0 0 20px rgba(0, 255, 65, 0.3);
background: rgba(0, 0, 0, 0.9);
}
.gr-chatbot {
background: rgba(0, 0, 0, 0.85);
border: 2px solid rgba(0, 255, 65, 0.2);
border-radius: 16px;
padding: 20px;
box-shadow:
inset 0 2px 20px rgba(0, 0, 0, 0.5),
0 0 30px rgba(0, 255, 65, 0.1);
backdrop-filter: blur(15px);
}
.gr-chatbot .message {
border-radius: 12px;
padding: 14px 18px;
margin: 10px;
max-width: 80%;
animation: messageSlide 0.4s ease-out;
border: 1px solid rgba(0, 255, 65, 0.2);
backdrop-filter: blur(10px);
}
@keyframes messageSlide {
from {
opacity: 0;
transform: translateY(15px) scale(0.95);
}
to {
opacity: 1;
transform: translateY(0) scale(1);
}
}
.gr-chatbot .user {
background: linear-gradient(135deg, rgba(0, 255, 65, 0.2), rgba(0, 200, 50, 0.3));
color: #00ff41;
margin-left: auto;
border-color: rgba(0, 255, 65, 0.4);
font-family: 'Rajdhani', monospace;
}
.gr-chatbot .assistant {
background: linear-gradient(135deg, rgba(0, 0, 0, 0.8), rgba(20, 20, 20, 0.9));
color: #00ff41;
margin-right: auto;
border-color: rgba(0, 255, 65, 0.3);
font-family: 'Rajdhani', monospace;
}
table {
background: rgba(0, 0, 0, 0.85);
border: 2px solid rgba(0, 255, 65, 0.3);
border-radius: 12px;
padding: 20px;
margin-top: 20px;
width: 100%;
border-collapse: separate;
border-spacing: 0;
font-family: 'Rajdhani', monospace;
box-shadow:
inset 0 2px 20px rgba(0, 0, 0, 0.5),
0 0 20px rgba(0, 255, 65, 0.1);
}
th, td {
padding: 14px;
text-align: left;
border-bottom: 1px solid rgba(0, 255, 65, 0.2);
color: #00ff41;
font-weight: 600;
}
th {
background: rgba(0, 255, 65, 0.1);
font-weight: 700;
color: #00ff41;
text-transform: uppercase;
letter-spacing: 1px;
font-family: 'Orbitron', monospace;
}
tr:hover {
background: rgba(0, 255, 65, 0.05);
}
tr:last-child td {
border-bottom: none;
}
.text-center {
text-align: center;
font-size: 36px;
font-weight: 900;
margin-bottom: 30px;
color: #00ff41;
font-family: 'Orbitron', monospace;
text-transform: uppercase;
letter-spacing: 4px;
text-shadow:
0 0 10px rgba(0, 255, 65, 0.8),
0 0 20px rgba(0, 255, 65, 0.5),
0 0 40px rgba(0, 255, 65, 0.3);
animation: titleGlow 2s ease-in-out infinite alternate;
}
@keyframes titleGlow {
from {
text-shadow:
0 0 10px rgba(0, 255, 65, 0.8),
0 0 20px rgba(0, 255, 65, 0.5),
0 0 40px rgba(0, 255, 65, 0.3);
}
to {
text-shadow:
0 0 15px rgba(0, 255, 65, 1),
0 0 30px rgba(0, 255, 65, 0.7),
0 0 60px rgba(0, 255, 65, 0.5);
}
}
h3 {
color: #00ff41;
font-family: 'Orbitron', monospace;
font-weight: 700;
text-transform: uppercase;
letter-spacing: 2px;
border-bottom: 2px solid rgba(0, 255, 65, 0.3);
padding-bottom: 8px;
margin-bottom: 16px;
}
.gr-json {
background: rgba(0, 0, 0, 0.85);
border: 2px solid rgba(0, 255, 65, 0.3);
border-radius: 12px;
color: #00ff41;
font-family: 'Rajdhani', monospace;
}
/* Scrollbar styling */
::-webkit-scrollbar {
width: 12px;
}
::-webkit-scrollbar-track {
background: rgba(0, 0, 0, 0.5);
border-radius: 6px;
}
::-webkit-scrollbar-thumb {
background: linear-gradient(180deg, #00ff41, #00cc33);
border-radius: 6px;
border: 2px solid rgba(0, 0, 0, 0.5);
}
::-webkit-scrollbar-thumb:hover {
background: linear-gradient(180deg, #00cc33, #008f2f);
}
/* Mobile responsiveness */
@media (max-width: 768px) {
.gr-row { flex-direction: column; }
.threat-card, table, .gr-chatbot { padding: 16px; }
.gr-button { padding: 12px 20px; font-size: 14px; }
.gr-chatbot .message { max-width: 95%; }
.text-center { font-size: 24px; letter-spacing: 2px; }
body { padding: 10px; }
}
/* Loading animation for better UX */
.loading {
position: relative;
overflow: hidden;
}
.loading::after {
content: '';
position: absolute;
top: 0;
left: -100%;
width: 100%;
height: 100%;
background: linear-gradient(90deg, transparent, rgba(0, 255, 65, 0.2), transparent);
animation: loadingSweep 1.5s infinite;
}
@keyframes loadingSweep {
0% { left: -100%; }
100% { left: 100%; }
}
""") as demo:
gr.Markdown(
"""
# πŸ›‘οΈ AI CYBERSECURITY AGENT πŸ›‘οΈ
SECURE YOUR SYSTEMS WITH REAL-TIME DETECTION OF ALL THREATS
""",
elem_classes="text-center"
)
state = gr.State(value={"classification": "AWAITING DATA", "severity": "Unknown", "mitigation": "None", "confidence": 0.0, "timestamp": ""})
with gr.Row():
with gr.Column(scale=2):
chatbot = gr.Chatbot(label="πŸ”’ SECURITY ANALYST TERMINAL", type="messages", height=500)
user_input = gr.Textbox(placeholder=">>> ENTER LOG DATA OR SECURITY ALERT (e.g., 'System compromised!', 'Trojan detected')", lines=3)
file_input = gr.File(label="πŸ“ UPLOAD THREAT FILE (.txt/.log/.py)", file_types=[".txt", ".log", ".py"])
submit_btn = gr.Button("πŸ” INITIATE SCAN")
with gr.Column(scale=1):
gr.Markdown("### πŸ“Š THREAT ANALYSIS RESULTS")
output_json = gr.JSON(label="Scan Data")
gr.Markdown("### ⚠️ CURRENT THREAT STATUS")
threat_card = gr.Markdown(
"""
<div class="threat-card" id="threat-card">
<p><strong>CLASSIFICATION:</strong> <span id="classification">AWAITING DATA</span></p>
<p><strong>SEVERITY:</strong> <span class="severity-label severity-unknown" id="severity">UNKNOWN</span></p>
<p><strong>MITIGATION:</strong> <span id="mitigation">NONE</span></p>
<p><strong>CONFIDENCE:</strong> <span id="confidence">0.0</span></p>
<p><strong>TIMESTAMP:</strong> <span id="timestamp">-</span></p>
</div>
""",
visible=True
)
gr.Markdown("### πŸ“ˆ THREAT HISTORY LOG")
history_table = gr.Markdown(
"""
<table style='width:100%; border-collapse: collapse;'>
<tr><th>TIMESTAMP</th><th>CLASSIFICATION</th><th>SEVERITY</th><th>CONFIDENCE</th></tr>
<tr><td colspan='4'>NO THREATS DETECTED YET</td></tr>
</table>
""",
visible=True
)
submit_btn.click(
fn=chatbot_response,
inputs=[user_input, file_input, chatbot, state],
outputs=[chatbot, output_json, state, threat_card, history_table]
)
# Launch the app
if __name__ == "__main__":
demo.launch(mcp_server=True)