LLM4HEP / logs_interpreter.py
ho22joshua's picture
initial commit
cfcbbc8
#!/usr/bin/env python3
"""
logs_interpreter.py
Parse log files, call the CBORG model to diagnose root causes of failures (or confirm success), and output its analysis.
"""
import os
import sys
import argparse
try:
from openai import OpenAI # type: ignore
except ImportError:
print("Please install openai (pip install openai)")
sys.exit(1)
def parse_args():
parser = argparse.ArgumentParser(
description="Analyze run logs and ask CBORG model for root-cause analysis"
)
parser.add_argument(
"--log_dir", default=".",
help="Directory containing .txt log files (default: current directory)"
)
parser.add_argument(
"--model", default="lbl/cborg-deepthought",
help="CBORG model to use (default: lbl/cborg-deepthought)"
)
parser.add_argument(
"--output", default=None,
help="File to write the model's analysis (default: stdout)"
)
return parser.parse_args()
def gather_logs(log_dir):
# If logs are under a nested 'logs' directory, use that first
if os.path.isdir(os.path.join(log_dir, 'logs')):
log_base = os.path.join(log_dir, 'logs')
else:
log_base = log_dir
# Group TXT log files by prefix (before the last underscore)
files = [f for f in sorted(os.listdir(log_base)) if f.endswith('.txt')]
groups = {}
for fname in files:
if '_' in fname:
base = fname.rsplit('_', 1)[0]
else:
base = fname.rsplit('.', 1)[0]
groups.setdefault(base, []).append(fname)
segments = []
# Assemble grouped log contents
for base, flist in groups.items():
segments.append(f"=== Log group: {base} ===")
for fname in flist:
path = os.path.join(log_dir, fname)
try:
with open(path, 'r') as f:
content = f.read().strip()
except Exception as e:
content = f"<could not read: {e}>"
segments.append(f"-- {fname} --\n{content}")
segments.append("")
# Include Snakemake run logs from possible locations
# 1) sibling 'snakemake_log' folder
# 2) nested '.snakemake/log' under log_dir
candidates = [os.path.join(log_dir, 'snakemake_log'),
os.path.join(log_dir, '.snakemake', 'log')]
for sn_dir in candidates:
if os.path.isdir(sn_dir):
for fname in sorted(os.listdir(sn_dir)):
if fname.endswith('.log'):
path = os.path.join(sn_dir, fname)
try:
with open(path, 'r') as f:
content = f.read().strip()
except Exception as e:
content = f"<could not read: {e}>"
segments.append(f"=== Snakemake Log File: {fname} ===")
segments.append(content)
segments.append("")
return "\n".join(segments)
def call_cborg(prompt, model):
api_key = os.getenv("CBORG_API_KEY") or os.getenv("OPENAI_API_KEY")
if not api_key:
print("Error: CBORG_API_KEY or OPENAI_API_KEY environment variable not set.")
sys.exit(1)
# Initialize the CBORG/OpenAI client with the appropriate API endpoint
cborg_url = os.getenv("CBORG_API_URL", "https://api.cborg.lbl.gov")
client = OpenAI(api_key=api_key, base_url=cborg_url)
# Call the chat completions endpoint
response = client.chat.completions.create(
model=model,
messages=[
{"role": "system", "content": "You are a log root-cause analyzer. Provide a concise diagnosis."},
{"role": "user", "content": prompt},
],
temperature=0.2,
)
# Safely extract content
choice = response.choices[0]
content = None
if hasattr(choice, 'message') and choice.message:
content = getattr(choice.message, 'content', None)
if content is None and hasattr(choice, 'text'):
content = choice.text
if content is None:
content = ''
return content.strip()
def main():
args = parse_args()
# If the log_dir contains run subdirectories with their own 'logs' folders, gather per-run
runs = [d for d in sorted(os.listdir(args.log_dir))
if os.path.isdir(os.path.join(args.log_dir, d)) and d != '.snakemake']
# Determine base log directory (for nested runs or single run)
# Determine the folder containing .txt logs
log_folder = os.path.join(args.log_dir, 'logs') if os.path.isdir(os.path.join(args.log_dir, 'logs')) else args.log_dir
if runs and os.path.isdir(os.path.join(args.log_dir, runs[0], 'logs')):
combined = []
for run in runs:
combined.append(f"=== Run: {run} ===")
run_log_dir = os.path.join(args.log_dir, run, 'logs')
combined.append(gather_logs(run_log_dir))
# Include root-level Snakemake logs if present
root_snake = os.path.join(args.log_dir, '.snakemake', 'log')
if os.path.isdir(root_snake):
combined.append("=== Root Snakemake Logs ===")
for fname in sorted(os.listdir(root_snake)):
if fname.endswith('.log'):
path = os.path.join(root_snake, fname)
try:
content = open(path).read().strip()
except Exception:
content = "<could not read>"
combined.append(f"-- {fname} --\n{content}")
logs = "\n\n".join(combined)
else:
# Gather logs from determined log_folder
logs = gather_logs(log_folder)
# Prepend a listing of available .txt files in the log_folder for clarity
try:
entries = sorted(f for f in os.listdir(log_folder) if f.endswith('.txt'))
listing = "=== Logs directory files (txt) ===\n" + "\n".join(entries) + "\n\n"
except Exception:
listing = ""
logs = listing + logs
if not logs:
print(f"No log files found in {args.log_dir}")
sys.exit(0)
# Include stats.csv summary and filter logs for failed steps
stats_file = os.path.join(args.log_dir, 'stats.csv')
if os.path.isfile(stats_file):
try:
with open(stats_file, 'r') as sf:
stats_content = sf.read().strip()
except Exception as e:
stats_content = f"<could not read stats.csv: {e}>"
# Begin prompt logs with stats summary
logs = f"=== Stats Summary ===\n{stats_content}\n\n"
# Parse CSV to identify failed steps
try:
with open(stats_file, 'r') as sf:
# Read the entire CSV content and parse manually due to potential line wrapping
content = sf.read().strip()
lines = content.split('\n')
# Find the data line (starts with '* ')
data_line = None
for line in lines:
if line.strip().startswith('* '):
data_line = line.strip()[2:] # Remove '* ' prefix
break
if data_line:
# Parse the data manually: model_name, step1_success, step1_time, step1_calls, step1_in, step1_out, step2_success, etc.
parts = [part.strip() for part in data_line.split(',')]
if len(parts) >= 16: # Ensure we have enough columns
stats_row = {
'step 1 success?': parts[1], # Index 1: step 1 success
'step 2 success?': parts[6], # Index 6: step 2 success
'step 3 success?': parts[11], # Index 11: step 3 success
}
else:
stats_row = {}
else:
stats_row = {}
except Exception as e:
print(f"Warning: Could not parse CSV: {e}")
stats_row = {}
# Map step numbers to rule prefixes
step_rules = {
'1': ['create_numpy', 'insert_root_summary', 'preprocess', 'summarize_root'],
'2': ['scores'],
'3': ['categorization'],
}
# List available txt entries
entries = []
try:
entries = sorted(f for f in os.listdir(log_folder) if f.endswith('.txt'))
except Exception:
pass
# Build filtered log segments for each step (both failed and passed for context)
filtered = []
# Always include stats parsing for context
filtered.append("=== STEP STATUS FROM STATS.CSV ===")
for step, rules in step_rules.items():
key = f'step {step} success?'
status = stats_row.get(key, 'Unknown').strip()
filtered.append(f"Step {step}: {status}")
filtered.append("")
# Include logs for failed steps and their associated rules
failed_steps = []
for step, rules in step_rules.items():
key = f'step {step} success?'
if stats_row.get(key, '').lower() != 'true':
failed_steps.append(step)
filtered.append(f"=== FAILED STEP {step} LOGS ===")
for rule in rules:
filtered.append(f"--- Rule: {rule} ---")
matched = [f for f in entries if f.startswith(rule + '_')]
if matched:
for fname in matched:
path = os.path.join(log_folder, fname)
try:
content = open(path).read().strip()
# Truncate very long logs to focus on key parts
if len(content) > 5000:
lines = content.split('\n')
content = '\n'.join(lines[:100]) + "\n...[TRUNCATED]...\n" + '\n'.join(lines[-50:])
except Exception as e:
content = f"<could not read: {e}>"
filtered.append(f"Log file: {fname}")
filtered.append(content)
else:
filtered.append("No log files found for this rule.")
filtered.append("")
# Add Snakemake logs for execution context
snakemake_dir = os.path.join(args.log_dir, 'snakemake_log')
if os.path.isdir(snakemake_dir):
filtered.append("=== SNAKEMAKE EXECUTION LOGS ===")
for fname in sorted(os.listdir(snakemake_dir)):
if fname.endswith('.log'):
path = os.path.join(snakemake_dir, fname)
try:
content = open(path).read().strip()
# Focus on errors and warnings in Snakemake logs
lines = content.split('\n')
important_lines = []
for line in lines:
if any(keyword in line.lower() for keyword in ['error', 'exception', 'failed', 'warning', 'killed']):
important_lines.append(line)
if important_lines:
filtered.append(f"Snakemake log: {fname} (errors/warnings only)")
filtered.append('\n'.join(important_lines[-20:])) # Last 20 error lines
else:
filtered.append(f"Snakemake log: {fname} - No errors detected")
except Exception as e:
filtered.append(f"<could not read {fname}: {e}>")
filtered.append("")
# Append filtered logs
logs += "\n".join(filtered)
# Build prompt: a single f-string literal with embedded logs (no leading newline)
prompt = f"""You are analyzing a machine learning pipeline failure. Your task is to diagnose root causes by examining three sources:
1) stats.csv: Shows pass/fail status for 3 steps:
- Step 1 (Data Preparation): create_numpy, insert_root_summary, preprocess, summarize_root
- Step 2 (Scoring): scores
- Step 3 (Categorization): categorization
2) Individual .txt logs in logs/: Contain detailed execution output for each rule attempt
3) Snakemake logs: Show workflow execution status and any workflow-level errors
ANALYSIS REQUIREMENTS:
Create a diagnostic report using this format for each step:
------
Step X (Category of failure)
------
Rule: [rule_name]
------
Status: [Pass/Fail from stats.csv] | [Snakemake execution status]
------
Root Cause Analysis: [detailed analysis]
------
For each failed step (False in stats.csv):
- Examine ALL relevant .txt log files for that step's rules
- Look for specific error messages, exceptions, or failure indicators
- Identify the probable root cause (e.g., missing files, API failures, memory issues, logic errors, syntax errors)
- If logs show success messages but stats.csv shows failure, investigate this discrepancy
- Categorize the failure type (Data/API/Logic/Infrastructure/Other)
For passed steps (True in stats.csv):
- Simply mark as "OK" in Root Cause Analysis
After the table, provide:
1. Overall Status: SUCCESS or FAILURE using similar format as above.
2. Primary Failure Category (if applicable): Data/API/Logic/Infrastructure/Other
3. Recommended Next Steps
DATA TO ANALYZE:
{logs}
"""
# DEBUG: Uncomment to see full prompt
# print("=== PROMPT BEING SENT TO CBORG ===")
# print(prompt)
# print("=== END PROMPT ===\n")
analysis = call_cborg(prompt, args.model)
# Fallback if model returns empty
if not analysis or not analysis.strip():
analysis = (
"Warning: CBORG model returned no analysis.\n"
"Below is the prompt sent to the model for debugging:\n\n" + prompt
)
# Determine output path: either user-specified or default under log_dir
# Write analysis to logs_analysis.txt by default in the log directory
output_file = args.output or os.path.join(args.log_dir, 'logs_analysis.txt')
try:
with open(output_file, 'w') as f:
f.write(analysis + "\n")
print(f"Analysis written to {output_file}")
except Exception as e:
print(f"Error writing analysis to {output_file}: {e}")
if __name__ == "__main__":
main()