#!/usr/bin/env python3 """ logs_interpreter.py Parse log files, call the CBORG model to diagnose root causes of failures (or confirm success), and output its analysis. """ import os import sys import argparse try: from openai import OpenAI # type: ignore except ImportError: print("Please install openai (pip install openai)") sys.exit(1) def parse_args(): parser = argparse.ArgumentParser( description="Analyze run logs and ask CBORG model for root-cause analysis" ) parser.add_argument( "--log_dir", default=".", help="Directory containing .txt log files (default: current directory)" ) parser.add_argument( "--model", default="lbl/cborg-deepthought", help="CBORG model to use (default: lbl/cborg-deepthought)" ) parser.add_argument( "--output", default=None, help="File to write the model's analysis (default: stdout)" ) return parser.parse_args() def gather_logs(log_dir): # If logs are under a nested 'logs' directory, use that first if os.path.isdir(os.path.join(log_dir, 'logs')): log_base = os.path.join(log_dir, 'logs') else: log_base = log_dir # Group TXT log files by prefix (before the last underscore) files = [f for f in sorted(os.listdir(log_base)) if f.endswith('.txt')] groups = {} for fname in files: if '_' in fname: base = fname.rsplit('_', 1)[0] else: base = fname.rsplit('.', 1)[0] groups.setdefault(base, []).append(fname) segments = [] # Assemble grouped log contents for base, flist in groups.items(): segments.append(f"=== Log group: {base} ===") for fname in flist: path = os.path.join(log_dir, fname) try: with open(path, 'r') as f: content = f.read().strip() except Exception as e: content = f"" segments.append(f"-- {fname} --\n{content}") segments.append("") # Include Snakemake run logs from possible locations # 1) sibling 'snakemake_log' folder # 2) nested '.snakemake/log' under log_dir candidates = [os.path.join(log_dir, 'snakemake_log'), os.path.join(log_dir, '.snakemake', 'log')] for sn_dir in candidates: if os.path.isdir(sn_dir): for fname in sorted(os.listdir(sn_dir)): if fname.endswith('.log'): path = os.path.join(sn_dir, fname) try: with open(path, 'r') as f: content = f.read().strip() except Exception as e: content = f"" segments.append(f"=== Snakemake Log File: {fname} ===") segments.append(content) segments.append("") return "\n".join(segments) def call_cborg(prompt, model): api_key = os.getenv("CBORG_API_KEY") or os.getenv("OPENAI_API_KEY") if not api_key: print("Error: CBORG_API_KEY or OPENAI_API_KEY environment variable not set.") sys.exit(1) # Initialize the CBORG/OpenAI client with the appropriate API endpoint cborg_url = os.getenv("CBORG_API_URL", "https://api.cborg.lbl.gov") client = OpenAI(api_key=api_key, base_url=cborg_url) # Call the chat completions endpoint response = client.chat.completions.create( model=model, messages=[ {"role": "system", "content": "You are a log root-cause analyzer. Provide a concise diagnosis."}, {"role": "user", "content": prompt}, ], temperature=0.2, ) # Safely extract content choice = response.choices[0] content = None if hasattr(choice, 'message') and choice.message: content = getattr(choice.message, 'content', None) if content is None and hasattr(choice, 'text'): content = choice.text if content is None: content = '' return content.strip() def main(): args = parse_args() # If the log_dir contains run subdirectories with their own 'logs' folders, gather per-run runs = [d for d in sorted(os.listdir(args.log_dir)) if os.path.isdir(os.path.join(args.log_dir, d)) and d != '.snakemake'] # Determine base log directory (for nested runs or single run) # Determine the folder containing .txt logs log_folder = os.path.join(args.log_dir, 'logs') if os.path.isdir(os.path.join(args.log_dir, 'logs')) else args.log_dir if runs and os.path.isdir(os.path.join(args.log_dir, runs[0], 'logs')): combined = [] for run in runs: combined.append(f"=== Run: {run} ===") run_log_dir = os.path.join(args.log_dir, run, 'logs') combined.append(gather_logs(run_log_dir)) # Include root-level Snakemake logs if present root_snake = os.path.join(args.log_dir, '.snakemake', 'log') if os.path.isdir(root_snake): combined.append("=== Root Snakemake Logs ===") for fname in sorted(os.listdir(root_snake)): if fname.endswith('.log'): path = os.path.join(root_snake, fname) try: content = open(path).read().strip() except Exception: content = "" combined.append(f"-- {fname} --\n{content}") logs = "\n\n".join(combined) else: # Gather logs from determined log_folder logs = gather_logs(log_folder) # Prepend a listing of available .txt files in the log_folder for clarity try: entries = sorted(f for f in os.listdir(log_folder) if f.endswith('.txt')) listing = "=== Logs directory files (txt) ===\n" + "\n".join(entries) + "\n\n" except Exception: listing = "" logs = listing + logs if not logs: print(f"No log files found in {args.log_dir}") sys.exit(0) # Include stats.csv summary and filter logs for failed steps stats_file = os.path.join(args.log_dir, 'stats.csv') if os.path.isfile(stats_file): try: with open(stats_file, 'r') as sf: stats_content = sf.read().strip() except Exception as e: stats_content = f"" # Begin prompt logs with stats summary logs = f"=== Stats Summary ===\n{stats_content}\n\n" # Parse CSV to identify failed steps try: with open(stats_file, 'r') as sf: # Read the entire CSV content and parse manually due to potential line wrapping content = sf.read().strip() lines = content.split('\n') # Find the data line (starts with '* ') data_line = None for line in lines: if line.strip().startswith('* '): data_line = line.strip()[2:] # Remove '* ' prefix break if data_line: # Parse the data manually: model_name, step1_success, step1_time, step1_calls, step1_in, step1_out, step2_success, etc. parts = [part.strip() for part in data_line.split(',')] if len(parts) >= 16: # Ensure we have enough columns stats_row = { 'step 1 success?': parts[1], # Index 1: step 1 success 'step 2 success?': parts[6], # Index 6: step 2 success 'step 3 success?': parts[11], # Index 11: step 3 success } else: stats_row = {} else: stats_row = {} except Exception as e: print(f"Warning: Could not parse CSV: {e}") stats_row = {} # Map step numbers to rule prefixes step_rules = { '1': ['create_numpy', 'insert_root_summary', 'preprocess', 'summarize_root'], '2': ['scores'], '3': ['categorization'], } # List available txt entries entries = [] try: entries = sorted(f for f in os.listdir(log_folder) if f.endswith('.txt')) except Exception: pass # Build filtered log segments for each step (both failed and passed for context) filtered = [] # Always include stats parsing for context filtered.append("=== STEP STATUS FROM STATS.CSV ===") for step, rules in step_rules.items(): key = f'step {step} success?' status = stats_row.get(key, 'Unknown').strip() filtered.append(f"Step {step}: {status}") filtered.append("") # Include logs for failed steps and their associated rules failed_steps = [] for step, rules in step_rules.items(): key = f'step {step} success?' if stats_row.get(key, '').lower() != 'true': failed_steps.append(step) filtered.append(f"=== FAILED STEP {step} LOGS ===") for rule in rules: filtered.append(f"--- Rule: {rule} ---") matched = [f for f in entries if f.startswith(rule + '_')] if matched: for fname in matched: path = os.path.join(log_folder, fname) try: content = open(path).read().strip() # Truncate very long logs to focus on key parts if len(content) > 5000: lines = content.split('\n') content = '\n'.join(lines[:100]) + "\n...[TRUNCATED]...\n" + '\n'.join(lines[-50:]) except Exception as e: content = f"" filtered.append(f"Log file: {fname}") filtered.append(content) else: filtered.append("No log files found for this rule.") filtered.append("") # Add Snakemake logs for execution context snakemake_dir = os.path.join(args.log_dir, 'snakemake_log') if os.path.isdir(snakemake_dir): filtered.append("=== SNAKEMAKE EXECUTION LOGS ===") for fname in sorted(os.listdir(snakemake_dir)): if fname.endswith('.log'): path = os.path.join(snakemake_dir, fname) try: content = open(path).read().strip() # Focus on errors and warnings in Snakemake logs lines = content.split('\n') important_lines = [] for line in lines: if any(keyword in line.lower() for keyword in ['error', 'exception', 'failed', 'warning', 'killed']): important_lines.append(line) if important_lines: filtered.append(f"Snakemake log: {fname} (errors/warnings only)") filtered.append('\n'.join(important_lines[-20:])) # Last 20 error lines else: filtered.append(f"Snakemake log: {fname} - No errors detected") except Exception as e: filtered.append(f"") filtered.append("") # Append filtered logs logs += "\n".join(filtered) # Build prompt: a single f-string literal with embedded logs (no leading newline) prompt = f"""You are analyzing a machine learning pipeline failure. Your task is to diagnose root causes by examining three sources: 1) stats.csv: Shows pass/fail status for 3 steps: - Step 1 (Data Preparation): create_numpy, insert_root_summary, preprocess, summarize_root - Step 2 (Scoring): scores - Step 3 (Categorization): categorization 2) Individual .txt logs in logs/: Contain detailed execution output for each rule attempt 3) Snakemake logs: Show workflow execution status and any workflow-level errors ANALYSIS REQUIREMENTS: Create a diagnostic report using this format for each step: ------ Step X (Category of failure) ------ Rule: [rule_name] ------ Status: [Pass/Fail from stats.csv] | [Snakemake execution status] ------ Root Cause Analysis: [detailed analysis] ------ For each failed step (False in stats.csv): - Examine ALL relevant .txt log files for that step's rules - Look for specific error messages, exceptions, or failure indicators - Identify the probable root cause (e.g., missing files, API failures, memory issues, logic errors, syntax errors) - If logs show success messages but stats.csv shows failure, investigate this discrepancy - Categorize the failure type (Data/API/Logic/Infrastructure/Other) For passed steps (True in stats.csv): - Simply mark as "OK" in Root Cause Analysis After the table, provide: 1. Overall Status: SUCCESS or FAILURE using similar format as above. 2. Primary Failure Category (if applicable): Data/API/Logic/Infrastructure/Other 3. Recommended Next Steps DATA TO ANALYZE: {logs} """ # DEBUG: Uncomment to see full prompt # print("=== PROMPT BEING SENT TO CBORG ===") # print(prompt) # print("=== END PROMPT ===\n") analysis = call_cborg(prompt, args.model) # Fallback if model returns empty if not analysis or not analysis.strip(): analysis = ( "Warning: CBORG model returned no analysis.\n" "Below is the prompt sent to the model for debugging:\n\n" + prompt ) # Determine output path: either user-specified or default under log_dir # Write analysis to logs_analysis.txt by default in the log directory output_file = args.output or os.path.join(args.log_dir, 'logs_analysis.txt') try: with open(output_file, 'w') as f: f.write(analysis + "\n") print(f"Analysis written to {output_file}") except Exception as e: print(f"Error writing analysis to {output_file}: {e}") if __name__ == "__main__": main()