financial-task-env / eval_lora.py
bpHigh's picture
eval_lora: fix truncation drop-direction bug + add subprocess preflight
b1c7959
#!/usr/bin/env python3
"""In-process eval for LoRA-adapter models against the office-document env.
Mirror of inference.py but with two key differences:
1. **Loads base model + LoRA via transformers/peft** instead of hitting an
external API. Lets us eval models that no Inference Provider hosts
(i.e., our own SFT'd Qwen2.5-Coder-3B + LoRA adapters).
2. **Instantiates `FinancialEnvironment` directly** instead of connecting
over WebSocket. Cuts WS overhead and is the same code path GRPO will
use later (rollouts in-process).
Multi-adapter mode is supported — pass a comma-separated list to
`--adapters` and the script evals each in turn (loading base once,
wrapping/unwrapping the LoRA between iterations). Pass `none` as an
adapter to evaluate the unmodified base model.
Output structure (mirrors inference.py):
runs/eval_lora_<timestamp>/<adapter_slug>/
results.json summary + per-task records
summary.csv flat table for plotting
trajectories/<id>.jsonl
log.txt mirrored stdout
Designed for HF Jobs (1× L40S 48 GB, ~$1.80/hr, ~15-20 min for 22 eval
tasks × 2 adapters = ~$0.50).
Example:
# Local (CUDA box):
python eval_lora.py \\
--adapters bpHigh/qwen3b-office-sft-kimi,bpHigh/qwen3b-office-sft-kimi-long \\
--split eval --output-dir runs/sft_eval
# HF Jobs (cleanest for users without GPUs):
hf jobs run --flavor l40sx1 --timeout 4h --secrets HF_TOKEN \\
pytorch/pytorch:2.6.0-cuda12.4-cudnn9-devel \\
bash -c "<git clone + pip install + python eval_lora.py ...>"
"""
from __future__ import annotations
import argparse
import csv
import gc
import json
import os
import sys
import textwrap
import time
from datetime import datetime
from pathlib import Path
from typing import Any, Dict, List, Optional
REPO_ROOT = Path(__file__).resolve().parent
sys.path.insert(0, str(REPO_ROOT))
sys.path.insert(0, str(REPO_ROOT / "server"))
# ---------------------------------------------------------------------------
# Re-use helpers from inference.py so the eval surface is identical
# ---------------------------------------------------------------------------
from inference import ( # noqa: E402
SYSTEM_PROMPTS,
extract_action,
load_tasks,
select_tasks,
log_start,
log_step,
log_end,
Tee,
model_slug,
)
from server.financial_environment import FinancialEnvironment # noqa: E402
from models import FinancialAction # noqa: E402
# ---------------------------------------------------------------------------
# Model loading
# ---------------------------------------------------------------------------
def load_base_and_tokenizer(base_model_id: str):
import torch
from transformers import AutoModelForCausalLM, AutoTokenizer
print(f"Loading tokenizer: {base_model_id}")
tokenizer = AutoTokenizer.from_pretrained(base_model_id, use_fast=True)
if tokenizer.pad_token is None:
tokenizer.pad_token = tokenizer.eos_token
# CRITICAL: drop oldest tokens (system prompt + early turns) when context
# overflows, NOT the most recent user feedback. Default is "right" which
# would silently strip the env's "your code failed: ..." message — leaving
# the model with stale state, causing it to loop the same code at temp=0.
tokenizer.truncation_side = "left"
bf16_ok = torch.cuda.is_available() and torch.cuda.is_bf16_supported()
dtype = torch.bfloat16 if bf16_ok else (
torch.float16 if torch.cuda.is_available() else torch.float32
)
print(f"Loading base model: {base_model_id}")
print(f" precision: {str(dtype).split('.')[-1]} cuda={torch.cuda.is_available()}")
model = AutoModelForCausalLM.from_pretrained(
base_model_id,
torch_dtype=dtype,
device_map="auto" if torch.cuda.is_available() else None,
attn_implementation="sdpa",
)
model.eval()
return tokenizer, model
def preflight_check() -> bool:
"""Verify the env's code-execution subprocess can import every library
the agent might use. The env spawns a fresh subprocess per code step
via ``subprocess.run([sys.executable, '-c', code])`` — if pip install
landed in a different Python than sys.executable, every code step will
fail with ImportError, and the model will loop indefinitely.
Runs ONE tiny test before any expensive eval starts. Returns True if
all libraries import cleanly in the subprocess.
"""
import subprocess
test_code = textwrap.dedent("""
import sys
print(f'PY={sys.executable}')
for lib in ('openpyxl', 'docx', 'pptx', 'PIL', 'rapidfuzz'):
try:
__import__(lib)
print(f'{lib}: OK')
except Exception as e:
print(f'{lib}: FAIL {type(e).__name__}: {e}')
""").strip()
print("\n=== Preflight: env subprocess library check ===")
try:
r = subprocess.run(
[sys.executable, "-c", test_code],
capture_output=True, text=True, timeout=30,
)
print(r.stdout)
if r.stderr.strip():
print("STDERR:", r.stderr)
except Exception as e:
print(f" preflight subprocess crashed: {e}")
return False
if "FAIL" in r.stdout:
print("⚠ Some libraries are missing in the subprocess. Install with:")
print(" pip install openpyxl python-docx python-pptx Pillow rapidfuzz")
print(" Eval will fail every code step until this is fixed.")
return False
print("✓ All required libraries import cleanly in subprocess.\n")
return True
def attach_lora(base_model, adapter_id_or_path: str):
"""Wrap base in a PeftModel with the given LoRA adapter."""
from peft import PeftModel
print(f" Attaching LoRA adapter: {adapter_id_or_path}")
peft_model = PeftModel.from_pretrained(base_model, adapter_id_or_path)
peft_model.eval()
return peft_model
def detach_lora(peft_model):
"""Return the underlying base model and free LoRA-side memory.
`PeftModel.unload()` returns the unwrapped base model with LoRA modules
removed, so we can immediately wrap the next adapter on top.
"""
try:
base = peft_model.unload()
except Exception:
base = getattr(peft_model, "base_model", None) or peft_model
if hasattr(base, "model"):
base = base.model
del peft_model
gc.collect()
try:
import torch
if torch.cuda.is_available():
torch.cuda.empty_cache()
except Exception:
pass
return base
# ---------------------------------------------------------------------------
# Generation
# ---------------------------------------------------------------------------
def generate_response(tokenizer, model, messages: List[Dict[str, str]],
max_new_tokens: int, temperature: float,
max_input_tokens: int = 28000) -> str:
"""Tokenize chat-template-formatted messages, generate, decode.
`max_input_tokens` is generous (28K) because Qwen2.5-Coder-3B has a 32K
context and our trajectories grow long after ~5 steps of feedback. When
truncation kicks in, it drops from the LEFT (oldest first) per the
`truncation_side='left'` set on the tokenizer at load — keeping the most
recent env feedback in context, which is the only signal that lets the
agent recover from errors.
"""
import torch
prompt = tokenizer.apply_chat_template(
messages, tokenize=False, add_generation_prompt=True,
)
inputs = tokenizer(
prompt,
return_tensors="pt",
truncation=True,
max_length=max_input_tokens,
)
inputs = {k: v.to(model.device) for k, v in inputs.items()}
with torch.inference_mode():
out = model.generate(
**inputs,
max_new_tokens=max_new_tokens,
do_sample=temperature > 0.0,
temperature=max(temperature, 0.01),
top_p=0.95,
pad_token_id=tokenizer.pad_token_id,
eos_token_id=tokenizer.eos_token_id,
)
new_tokens = out[0, inputs["input_ids"].shape[1]:]
return tokenizer.decode(new_tokens, skip_special_tokens=True).strip()
# ---------------------------------------------------------------------------
# Per-task eval (in-process — direct env, no WebSocket)
# ---------------------------------------------------------------------------
def run_task_inproc(
tokenizer, model, task: Dict[str, Any],
*, max_steps: int, max_new_tokens: int, temperature: float,
traj_dir: Path, model_name: str,
) -> Dict[str, Any]:
task_id = task["id"]
family = task.get("family", "xlsx")
log_start(task=task_id, family=family, model=model_name)
rewards: List[float] = []
trajectory: List[Dict[str, Any]] = []
final_score = 0.0
success = False
error_msg: Optional[str] = None
task_start = time.time()
env = FinancialEnvironment()
try:
obs = env.reset(task_id=task_id)
sys_prompt = SYSTEM_PROMPTS.get(family, SYSTEM_PROMPTS["xlsx"])
messages = [
{"role": "system", "content": sys_prompt},
{"role": "user", "content": (
f"{obs.task_description}\n\n"
f"Source file path: {obs.source_file}\n"
f"File family: {family}\n"
f"Task type: {obs.task_type}\n\n"
f"{obs.feedback}"
)},
]
for step_num in range(1, max_steps + 1):
response = generate_response(
tokenizer, model, messages,
max_new_tokens=max_new_tokens,
temperature=temperature,
)
if not response:
error_msg = "empty_response"
break
action_type, content = extract_action(response)
messages.append({"role": "assistant", "content": response})
try:
action = FinancialAction(action_type=action_type, content=content)
obs = env.step(action)
except Exception as e:
error_msg = f"env.step failed: {e}"
break
reward = float(obs.reward or 0)
done = bool(obs.done)
step_feedback = obs.feedback or ""
rewards.append(reward)
trajectory.append({
"step": step_num,
"action_type": action_type,
"action_content": content[:4000],
"reward": reward,
"done": done,
"feedback": step_feedback[:4000],
})
log_step(
step=step_num,
action=f"[{action_type}] {content}",
reward=reward, done=done, error=None,
)
if done:
final_score = reward
success = final_score >= 0.5
break
remaining = max_steps - step_num
urgency = ""
if remaining <= 2:
urgency = f"\n\n⚠ Only {remaining} step(s) remaining! You MUST submit now."
if obs.task_type == "QA":
urgency += " Use: SUBMIT_ANSWER: <your answer>"
else:
urgency += f" Save the file and use: SUBMIT_FILE: {obs.source_file}"
messages.append({"role": "user", "content": (
f"Code execution result (step {step_num}/{max_steps}):\n"
f"{step_feedback}\n\n"
f"Source file: {obs.source_file}{urgency}"
)})
except Exception as exc:
error_msg = str(exc)
print(f"[DEBUG] {task_id} crashed: {exc}")
finally:
try:
env.close()
except Exception:
pass
final_score = max(0.001, min(0.999, final_score))
rewards = [max(0.001, min(0.999, r)) for r in rewards]
log_end(success=success, steps=len(trajectory), score=final_score, rewards=rewards)
traj_path = traj_dir / f"{task_id}.jsonl"
with open(traj_path, "w") as f:
for entry in trajectory:
f.write(json.dumps(entry) + "\n")
return {
"task_id": task_id,
"family": family,
"primary_tag": task.get("primary_tag", ""),
"split": task.get("split", "train"),
"score": final_score,
"success": success,
"steps": len(trajectory),
"elapsed_s": round(time.time() - task_start, 2),
"step_rewards": rewards,
"error": error_msg,
}
# ---------------------------------------------------------------------------
# Per-adapter eval
# ---------------------------------------------------------------------------
def eval_one_adapter(
*, tokenizer, model, adapter_label: str, tasks: List[dict],
out_dir: Path, max_steps: int, max_new_tokens: int, temperature: float,
) -> Dict[str, Any]:
"""Run all tasks against the given (already-loaded) model. Writes
results.json + summary.csv + trajectories/ inside out_dir."""
out_dir.mkdir(parents=True, exist_ok=True)
traj_dir = out_dir / "trajectories"
traj_dir.mkdir(parents=True, exist_ok=True)
print(f"\n{'#' * 70}")
print(f"# Evaluating: {adapter_label}")
print(f"# Output : {out_dir}")
print(f"# Tasks : {len(tasks)}")
print(f"{'#' * 70}\n")
results: List[Dict[str, Any]] = []
overall_start = time.time()
for i, task in enumerate(tasks, 1):
print(f"\n{'=' * 70}")
print(f"[{i}/{len(tasks)}] {task['id']} "
f"({task.get('family')}, {task.get('primary_tag', '')[:40]})")
print(f"{'=' * 70}")
result = run_task_inproc(
tokenizer, model, task,
max_steps=max_steps,
max_new_tokens=max_new_tokens,
temperature=temperature,
traj_dir=traj_dir,
model_name=adapter_label,
)
results.append(result)
print(f" -> {task['id']} score={result['score']:.3f} "
f"steps={result['steps']} elapsed={result['elapsed_s']:.1f}s")
total_elapsed = time.time() - overall_start
if results:
avg = sum(r["score"] for r in results) / len(results)
success_rate = sum(1 for r in results if r["success"]) / len(results)
else:
avg = success_rate = 0.0
by_family: Dict[str, List[float]] = {}
for r in results:
by_family.setdefault(r["family"], []).append(r["score"])
summary = {
"model": adapter_label,
"n_tasks": len(results),
"avg_score": round(avg, 4),
"success_rate": round(success_rate, 4),
"total_elapsed_s": round(total_elapsed, 2),
"by_family": {fam: {
"n": len(scores), "avg": round(sum(scores) / len(scores), 4),
} for fam, scores in by_family.items()},
"results": results,
}
with open(out_dir / "results.json", "w") as f:
json.dump(summary, f, indent=2)
with open(out_dir / "summary.csv", "w", newline="") as f:
w = csv.writer(f)
w.writerow(["task_id", "family", "primary_tag", "split",
"score", "success", "steps", "elapsed_s", "error"])
for r in results:
w.writerow([r["task_id"], r["family"], r["primary_tag"], r["split"],
r["score"], r["success"], r["steps"], r["elapsed_s"],
r.get("error") or ""])
print(f"\n{'=' * 70}")
print(f"OVERALL [{adapter_label}] avg={avg:.3f} success_rate={success_rate:.0%} "
f"n={len(results)} elapsed={total_elapsed:.0f}s")
for fam in sorted(by_family):
scores = by_family[fam]
print(f" {fam}: avg={sum(scores) / len(scores):.3f} n={len(scores)}")
print(f"{'=' * 70}\n")
return summary
# ---------------------------------------------------------------------------
# CLI
# ---------------------------------------------------------------------------
def parse_args():
p = argparse.ArgumentParser()
p.add_argument("--base-model", default="Qwen/Qwen2.5-Coder-3B-Instruct")
p.add_argument("--adapters", required=True,
help="comma-separated list of LoRA adapters (HF repo IDs "
"or local paths). Pass 'none' as an entry to also "
"evaluate the bare base model.")
p.add_argument("--split", choices=["train", "eval", "all"], default="eval")
p.add_argument("--family", choices=["xlsx", "docx", "pptx", "all"], default="all")
p.add_argument("--limit", type=int, default=0)
p.add_argument("--task-ids", default="")
p.add_argument("--output-dir", default="",
help="parent dir; per-adapter subdirs created underneath. "
"Default: runs/eval_lora_<timestamp>/")
p.add_argument("--max-steps", type=int, default=15)
p.add_argument("--max-new-tokens", type=int, default=2048,
help="generation budget per assistant turn")
p.add_argument("--temperature", type=float, default=0.0)
return p.parse_args()
def main() -> int:
args = parse_args()
# Output parent dir
if args.output_dir:
parent_out = Path(args.output_dir)
else:
ts = datetime.now().strftime("%Y%m%d_%H%M%S")
parent_out = REPO_ROOT / "runs" / f"eval_lora_{ts}"
parent_out.mkdir(parents=True, exist_ok=True)
# Preflight: confirm the env's subprocess can import every library
# the agent will need. If this fails, ALL code steps will error and
# the eval is wasted. Better to crash fast than burn an hour on
# ImportError loops.
if not preflight_check():
print("\nABORT: preflight failed. Fix the missing libs and re-run.",
file=sys.stderr)
return 1
# Tokenizer + base model — loaded ONCE, reused across adapters
tokenizer, base_model = load_base_and_tokenizer(args.base_model)
# Tasks selected ONCE
all_tasks = load_tasks()
tasks = select_tasks(args, all_tasks)
if not tasks:
print("ERROR: no tasks selected (check --split / --family / --task-ids)",
file=sys.stderr)
return 1
print(f"\nSelected {len(tasks)} tasks")
adapters = [a.strip() for a in args.adapters.split(",") if a.strip()]
print(f"Will evaluate {len(adapters)} adapter(s): {adapters}")
overall_summaries: Dict[str, Dict[str, Any]] = {}
for i, adapter in enumerate(adapters):
# Each adapter gets its own subdir + log file
adapter_lower = adapter.lower()
is_base = adapter_lower in ("none", "base", "")
adapter_label = args.base_model if is_base else adapter
adapter_tag = "base" if is_base else model_slug(adapter)
out_dir = parent_out / adapter_tag
# Tee stdout to per-adapter log
out_dir.mkdir(parents=True, exist_ok=True)
log_file = open(out_dir / "log.txt", "w")
sys.stdout = Tee(sys.__stdout__, log_file)
# Wrap base in PeftModel (or use base directly)
if is_base:
eval_model = base_model
else:
eval_model = attach_lora(base_model, adapter)
# Run eval
summary = eval_one_adapter(
tokenizer=tokenizer,
model=eval_model,
adapter_label=adapter_label,
tasks=tasks,
out_dir=out_dir,
max_steps=args.max_steps,
max_new_tokens=args.max_new_tokens,
temperature=args.temperature,
)
overall_summaries[adapter_tag] = {
"label": adapter_label,
"avg_score": summary["avg_score"],
"success_rate": summary["success_rate"],
"by_family": summary["by_family"],
}
# Detach + free GPU memory for next adapter
if not is_base:
base_model = detach_lora(eval_model)
log_file.close()
sys.stdout = sys.__stdout__
# Cross-adapter comparison (printed + saved)
print(f"\n{'=' * 70}")
print("CROSS-ADAPTER COMPARISON")
print(f"{'=' * 70}")
print(f"{'adapter':40s} avg succ% xlsx docx pptx")
for tag, info in overall_summaries.items():
bf = info["by_family"]
print(f" {tag:38s} {info['avg_score']:.3f} "
f"{info['success_rate']:.0%} "
f"{bf.get('xlsx', {}).get('avg', 0):.3f} "
f"{bf.get('docx', {}).get('avg', 0):.3f} "
f"{bf.get('pptx', {}).get('avg', 0):.3f}")
with open(parent_out / "cross_summary.json", "w") as f:
json.dump(overall_summaries, f, indent=2)
print(f"\nResults written to: {parent_out}")
return 0
if __name__ == "__main__":
raise SystemExit(main())