|
|
|
|
|
""" |
|
|
logs_interpreter.py |
|
|
|
|
|
Parse log files, call the CBORG model to diagnose root causes of failures (or confirm success), and output its analysis. |
|
|
""" |
|
|
import os |
|
|
import sys |
|
|
import argparse |
|
|
|
|
|
try: |
|
|
from openai import OpenAI |
|
|
except ImportError: |
|
|
print("Please install openai (pip install openai)") |
|
|
sys.exit(1) |
|
|
|
|
|
|
|
|
def parse_args(): |
|
|
parser = argparse.ArgumentParser( |
|
|
description="Analyze run logs and ask CBORG model for root-cause analysis" |
|
|
) |
|
|
parser.add_argument( |
|
|
"--log_dir", default=".", |
|
|
help="Directory containing .txt log files (default: current directory)" |
|
|
) |
|
|
parser.add_argument( |
|
|
"--model", default="lbl/cborg-deepthought", |
|
|
help="CBORG model to use (default: lbl/cborg-deepthought)" |
|
|
) |
|
|
parser.add_argument( |
|
|
"--output", default=None, |
|
|
help="File to write the model's analysis (default: stdout)" |
|
|
) |
|
|
return parser.parse_args() |
|
|
|
|
|
|
|
|
def gather_logs(log_dir): |
|
|
|
|
|
if os.path.isdir(os.path.join(log_dir, 'logs')): |
|
|
log_base = os.path.join(log_dir, 'logs') |
|
|
else: |
|
|
log_base = log_dir |
|
|
|
|
|
files = [f for f in sorted(os.listdir(log_base)) if f.endswith('.txt')] |
|
|
groups = {} |
|
|
for fname in files: |
|
|
if '_' in fname: |
|
|
base = fname.rsplit('_', 1)[0] |
|
|
else: |
|
|
base = fname.rsplit('.', 1)[0] |
|
|
groups.setdefault(base, []).append(fname) |
|
|
segments = [] |
|
|
|
|
|
for base, flist in groups.items(): |
|
|
segments.append(f"=== Log group: {base} ===") |
|
|
for fname in flist: |
|
|
path = os.path.join(log_dir, fname) |
|
|
try: |
|
|
with open(path, 'r') as f: |
|
|
content = f.read().strip() |
|
|
except Exception as e: |
|
|
content = f"<could not read: {e}>" |
|
|
segments.append(f"-- {fname} --\n{content}") |
|
|
segments.append("") |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
candidates = [os.path.join(log_dir, 'snakemake_log'), |
|
|
os.path.join(log_dir, '.snakemake', 'log')] |
|
|
for sn_dir in candidates: |
|
|
if os.path.isdir(sn_dir): |
|
|
for fname in sorted(os.listdir(sn_dir)): |
|
|
if fname.endswith('.log'): |
|
|
path = os.path.join(sn_dir, fname) |
|
|
try: |
|
|
with open(path, 'r') as f: |
|
|
content = f.read().strip() |
|
|
except Exception as e: |
|
|
content = f"<could not read: {e}>" |
|
|
segments.append(f"=== Snakemake Log File: {fname} ===") |
|
|
segments.append(content) |
|
|
segments.append("") |
|
|
return "\n".join(segments) |
|
|
|
|
|
|
|
|
def call_cborg(prompt, model): |
|
|
api_key = os.getenv("CBORG_API_KEY") or os.getenv("OPENAI_API_KEY") |
|
|
if not api_key: |
|
|
print("Error: CBORG_API_KEY or OPENAI_API_KEY environment variable not set.") |
|
|
sys.exit(1) |
|
|
|
|
|
cborg_url = os.getenv("CBORG_API_URL", "https://api.cborg.lbl.gov") |
|
|
client = OpenAI(api_key=api_key, base_url=cborg_url) |
|
|
|
|
|
response = client.chat.completions.create( |
|
|
model=model, |
|
|
messages=[ |
|
|
{"role": "system", "content": "You are a log root-cause analyzer. Provide a concise diagnosis."}, |
|
|
{"role": "user", "content": prompt}, |
|
|
], |
|
|
temperature=0.2, |
|
|
) |
|
|
|
|
|
choice = response.choices[0] |
|
|
content = None |
|
|
if hasattr(choice, 'message') and choice.message: |
|
|
content = getattr(choice.message, 'content', None) |
|
|
if content is None and hasattr(choice, 'text'): |
|
|
content = choice.text |
|
|
if content is None: |
|
|
content = '' |
|
|
return content.strip() |
|
|
|
|
|
|
|
|
def main(): |
|
|
args = parse_args() |
|
|
|
|
|
runs = [d for d in sorted(os.listdir(args.log_dir)) |
|
|
if os.path.isdir(os.path.join(args.log_dir, d)) and d != '.snakemake'] |
|
|
|
|
|
|
|
|
log_folder = os.path.join(args.log_dir, 'logs') if os.path.isdir(os.path.join(args.log_dir, 'logs')) else args.log_dir |
|
|
if runs and os.path.isdir(os.path.join(args.log_dir, runs[0], 'logs')): |
|
|
combined = [] |
|
|
for run in runs: |
|
|
combined.append(f"=== Run: {run} ===") |
|
|
run_log_dir = os.path.join(args.log_dir, run, 'logs') |
|
|
combined.append(gather_logs(run_log_dir)) |
|
|
|
|
|
root_snake = os.path.join(args.log_dir, '.snakemake', 'log') |
|
|
if os.path.isdir(root_snake): |
|
|
combined.append("=== Root Snakemake Logs ===") |
|
|
for fname in sorted(os.listdir(root_snake)): |
|
|
if fname.endswith('.log'): |
|
|
path = os.path.join(root_snake, fname) |
|
|
try: |
|
|
content = open(path).read().strip() |
|
|
except Exception: |
|
|
content = "<could not read>" |
|
|
combined.append(f"-- {fname} --\n{content}") |
|
|
logs = "\n\n".join(combined) |
|
|
else: |
|
|
|
|
|
logs = gather_logs(log_folder) |
|
|
|
|
|
try: |
|
|
entries = sorted(f for f in os.listdir(log_folder) if f.endswith('.txt')) |
|
|
listing = "=== Logs directory files (txt) ===\n" + "\n".join(entries) + "\n\n" |
|
|
except Exception: |
|
|
listing = "" |
|
|
logs = listing + logs |
|
|
if not logs: |
|
|
print(f"No log files found in {args.log_dir}") |
|
|
sys.exit(0) |
|
|
|
|
|
|
|
|
stats_file = os.path.join(args.log_dir, 'stats.csv') |
|
|
if os.path.isfile(stats_file): |
|
|
try: |
|
|
with open(stats_file, 'r') as sf: |
|
|
stats_content = sf.read().strip() |
|
|
except Exception as e: |
|
|
stats_content = f"<could not read stats.csv: {e}>" |
|
|
|
|
|
logs = f"=== Stats Summary ===\n{stats_content}\n\n" |
|
|
|
|
|
try: |
|
|
with open(stats_file, 'r') as sf: |
|
|
|
|
|
content = sf.read().strip() |
|
|
lines = content.split('\n') |
|
|
|
|
|
|
|
|
data_line = None |
|
|
for line in lines: |
|
|
if line.strip().startswith('* '): |
|
|
data_line = line.strip()[2:] |
|
|
break |
|
|
|
|
|
if data_line: |
|
|
|
|
|
parts = [part.strip() for part in data_line.split(',')] |
|
|
if len(parts) >= 16: |
|
|
stats_row = { |
|
|
'step 1 success?': parts[1], |
|
|
'step 2 success?': parts[6], |
|
|
'step 3 success?': parts[11], |
|
|
} |
|
|
else: |
|
|
stats_row = {} |
|
|
else: |
|
|
stats_row = {} |
|
|
except Exception as e: |
|
|
print(f"Warning: Could not parse CSV: {e}") |
|
|
stats_row = {} |
|
|
|
|
|
step_rules = { |
|
|
'1': ['create_numpy', 'insert_root_summary', 'preprocess', 'summarize_root'], |
|
|
'2': ['scores'], |
|
|
'3': ['categorization'], |
|
|
} |
|
|
|
|
|
entries = [] |
|
|
try: |
|
|
entries = sorted(f for f in os.listdir(log_folder) if f.endswith('.txt')) |
|
|
except Exception: |
|
|
pass |
|
|
|
|
|
filtered = [] |
|
|
|
|
|
|
|
|
filtered.append("=== STEP STATUS FROM STATS.CSV ===") |
|
|
for step, rules in step_rules.items(): |
|
|
key = f'step {step} success?' |
|
|
status = stats_row.get(key, 'Unknown').strip() |
|
|
filtered.append(f"Step {step}: {status}") |
|
|
filtered.append("") |
|
|
|
|
|
|
|
|
failed_steps = [] |
|
|
for step, rules in step_rules.items(): |
|
|
key = f'step {step} success?' |
|
|
if stats_row.get(key, '').lower() != 'true': |
|
|
failed_steps.append(step) |
|
|
filtered.append(f"=== FAILED STEP {step} LOGS ===") |
|
|
|
|
|
for rule in rules: |
|
|
filtered.append(f"--- Rule: {rule} ---") |
|
|
matched = [f for f in entries if f.startswith(rule + '_')] |
|
|
if matched: |
|
|
for fname in matched: |
|
|
path = os.path.join(log_folder, fname) |
|
|
try: |
|
|
content = open(path).read().strip() |
|
|
|
|
|
if len(content) > 5000: |
|
|
lines = content.split('\n') |
|
|
content = '\n'.join(lines[:100]) + "\n...[TRUNCATED]...\n" + '\n'.join(lines[-50:]) |
|
|
except Exception as e: |
|
|
content = f"<could not read: {e}>" |
|
|
filtered.append(f"Log file: {fname}") |
|
|
filtered.append(content) |
|
|
else: |
|
|
filtered.append("No log files found for this rule.") |
|
|
filtered.append("") |
|
|
|
|
|
|
|
|
snakemake_dir = os.path.join(args.log_dir, 'snakemake_log') |
|
|
if os.path.isdir(snakemake_dir): |
|
|
filtered.append("=== SNAKEMAKE EXECUTION LOGS ===") |
|
|
for fname in sorted(os.listdir(snakemake_dir)): |
|
|
if fname.endswith('.log'): |
|
|
path = os.path.join(snakemake_dir, fname) |
|
|
try: |
|
|
content = open(path).read().strip() |
|
|
|
|
|
lines = content.split('\n') |
|
|
important_lines = [] |
|
|
for line in lines: |
|
|
if any(keyword in line.lower() for keyword in ['error', 'exception', 'failed', 'warning', 'killed']): |
|
|
important_lines.append(line) |
|
|
if important_lines: |
|
|
filtered.append(f"Snakemake log: {fname} (errors/warnings only)") |
|
|
filtered.append('\n'.join(important_lines[-20:])) |
|
|
else: |
|
|
filtered.append(f"Snakemake log: {fname} - No errors detected") |
|
|
except Exception as e: |
|
|
filtered.append(f"<could not read {fname}: {e}>") |
|
|
filtered.append("") |
|
|
|
|
|
|
|
|
logs += "\n".join(filtered) |
|
|
|
|
|
|
|
|
prompt = f"""You are analyzing a machine learning pipeline failure. Your task is to diagnose root causes by examining three sources: |
|
|
|
|
|
1) stats.csv: Shows pass/fail status for 3 steps: |
|
|
- Step 1 (Data Preparation): create_numpy, insert_root_summary, preprocess, summarize_root |
|
|
- Step 2 (Scoring): scores |
|
|
- Step 3 (Categorization): categorization |
|
|
|
|
|
2) Individual .txt logs in logs/: Contain detailed execution output for each rule attempt |
|
|
3) Snakemake logs: Show workflow execution status and any workflow-level errors |
|
|
|
|
|
ANALYSIS REQUIREMENTS: |
|
|
Create a diagnostic report using this format for each step: |
|
|
|
|
|
------ |
|
|
Step X (Category of failure) |
|
|
------ |
|
|
Rule: [rule_name] |
|
|
------ |
|
|
Status: [Pass/Fail from stats.csv] | [Snakemake execution status] |
|
|
------ |
|
|
Root Cause Analysis: [detailed analysis] |
|
|
------ |
|
|
|
|
|
For each failed step (False in stats.csv): |
|
|
- Examine ALL relevant .txt log files for that step's rules |
|
|
- Look for specific error messages, exceptions, or failure indicators |
|
|
- Identify the probable root cause (e.g., missing files, API failures, memory issues, logic errors, syntax errors) |
|
|
- If logs show success messages but stats.csv shows failure, investigate this discrepancy |
|
|
- Categorize the failure type (Data/API/Logic/Infrastructure/Other) |
|
|
|
|
|
For passed steps (True in stats.csv): |
|
|
- Simply mark as "OK" in Root Cause Analysis |
|
|
|
|
|
After the table, provide: |
|
|
1. Overall Status: SUCCESS or FAILURE using similar format as above. |
|
|
2. Primary Failure Category (if applicable): Data/API/Logic/Infrastructure/Other |
|
|
3. Recommended Next Steps |
|
|
|
|
|
DATA TO ANALYZE: |
|
|
{logs} |
|
|
""" |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
analysis = call_cborg(prompt, args.model) |
|
|
|
|
|
if not analysis or not analysis.strip(): |
|
|
analysis = ( |
|
|
"Warning: CBORG model returned no analysis.\n" |
|
|
"Below is the prompt sent to the model for debugging:\n\n" + prompt |
|
|
) |
|
|
|
|
|
|
|
|
|
|
|
output_file = args.output or os.path.join(args.log_dir, 'logs_analysis.txt') |
|
|
try: |
|
|
with open(output_file, 'w') as f: |
|
|
f.write(analysis + "\n") |
|
|
print(f"Analysis written to {output_file}") |
|
|
except Exception as e: |
|
|
print(f"Error writing analysis to {output_file}: {e}") |
|
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
|
main() |
|
|
|