""" PDF/Image to PPTX Converter based on dots.ocr 리팩토링 버전: 전역 모델 로드 + SDPA + 분리된 inference 함수 """ import io import json import math import os import sys import tempfile import shutil import gc import glob import traceback import argparse from pathlib import Path from typing import List, Optional, Tuple, Dict, Any import time as _time # ============================================================ # Flash Attention import 차단 (버전 충돌 방지) # ============================================================ sys.modules['flash_attn'] = None sys.modules['flash_attn.flash_attn_interface'] = None sys.modules['flash_attn.bert_padding'] = None # nest_asyncio는 스크립트 실행 시 불필요하며 Python 3.13/uvicorn과 충돌할 수 있어 주석 처리 # import nest_asyncio # nest_asyncio.apply() import gradio as gr # spaces는 HuggingFace Spaces 전용이므로 안전하게 처리 try: import spaces IS_SPACES = True except ImportError: IS_SPACES = False # spaces 모듈이 없어도 작동하도록 더미 클래스 생성 class spaces: class GPU: def __init__(self, *args, **kwargs): pass def __call__(self, func): return func # ============================================================ # Mock 모드 설정 (명령줄 인자 파싱) - torch import 전에 실행 # ============================================================ def parse_args(): parser = argparse.ArgumentParser(description="PDF/Image to PPTX Converter") parser.add_argument( "--mock", action="store_true", help="Mock mode: 실제 모델/API 호출 없이 UI만 테스트" ) return parser.parse_args() args = parse_args() MOCK_MODE = args.mock # Mock 모드가 아닐 때만 torch 및 관련 모듈 import if not MOCK_MODE: import torch from huggingface_hub import snapshot_download from transformers import AutoModelForCausalLM, AutoProcessor from qwen_vl_utils import process_vision_info else: # Mock 모드일 때는 더미 객체 생성 torch = None snapshot_download = None AutoModelForCausalLM = None AutoProcessor = None process_vision_info = None from PIL import Image, ImageDraw, ImageFont # pptx imports try: from pptx import Presentation except ImportError as exc: raise ImportError("python-pptx is required.") from exc # dots_ocr utils (PDF 로드용) from dots_ocr.utils.doc_utils import load_images_from_pdf from dots_ocr.utils.consts import MIN_PIXELS, MAX_PIXELS, IMAGE_FACTOR from dots_ocr.utils.pptx_generator import build_pptx_from_results # ============================================================ # Mock Inference 함수 # ============================================================ def create_mock_response(): """Mock 모드용 더미 JSON 응답 생성""" test_json_path = Path(__file__).parent / "test.json" if test_json_path.exists(): try: with open(test_json_path, 'r', encoding='utf-8') as f: sample_data = json.load(f) if sample_data and len(sample_data) > 0: layout_result = sample_data[0].get('layout_result', []) return json.dumps(layout_result, ensure_ascii=False) except Exception as e: print(f"⚠️ test.json 로드 실패: {e}") # 기본 더미 데이터 default_mock = [ { "bbox": [100, 100, 500, 200], "category": "Title", "text": "# Sample Title for UI Testing" }, { "bbox": [100, 250, 800, 400], "category": "Text", "text": "This is a sample text extracted from the document for UI testing purposes." }, { "bbox": [100, 450, 600, 650], "category": "Picture" }, { "bbox": [100, 700, 900, 850], "category": "Text", "text": "More sample content to test the UI layout and rendering." } ] return json.dumps(default_mock, ensure_ascii=False) _MOCK_RESPONSE = create_mock_response() if MOCK_MODE else None # ============================================================ # 상수 정의 # ============================================================ MODEL_ID = "rednote-hilab/dots.ocr" MODEL_PATH = "./models/dots-ocr-local" OUTPUT_DIR = os.path.abspath("./outputs") # 절대 경로로 설정 os.makedirs(OUTPUT_DIR, exist_ok=True) PROMPT_MAPPING = { "레이아웃 + 텍스트": "layout_text", "레이아웃": "layout_only", } DEFAULT_PROMPT_KEY = "레이아웃 + 텍스트" # 품질 모드 설정 QUALITY_MODES = { "고성능 (느림, 정확함)": 4194304, "균형 (추천)": 2073600, "고속 (빠름, 품질 낮음)": 1048576 } DEFAULT_QUALITY_KEY = "균형 (추천)" # 레이아웃 + 텍스트 추출 프롬프트 LAYOUT_PROMPT = """Please output the layout information from the PDF image, including each layout element's bbox, its category, and the corresponding text content within the bbox. 1. Bbox format: [x1, y1, x2, y2] 2. Layout Categories: The possible categories are ['Caption', 'Footnote', 'Formula', 'List-item', 'Page-footer', 'Page-header', 'Picture', 'Section-header', 'Table', 'Text', 'Title']. 3. Text Extraction & Formatting Rules: - Picture: For the 'Picture' category, the text field should be omitted. - Formula: Format its text as LaTeX. - Table: Format its text as HTML. - All Others (Text, Title, etc.): Format their text as Markdown. 4. Constraints: - The output text must be the original text from the image, with no translation. - All layout elements must be sorted according to human reading order. - Do NOT extract repetitive background patterns (e.g., "ROLE ITEMS // ROLE ITEMS" or repeated watermarks) as text. Treat them as 'Picture' or ignore them. 5. Final Output: The entire output must be a single JSON object. """ # 레이아웃만 추출 프롬프트 LAYOUT_ONLY_PROMPT = """Please output the layout information from the PDF image, including each layout element's bbox and its category. 1. Bbox format: [x1, y1, x2, y2] 2. Layout Categories: The possible categories are ['Caption', 'Footnote', 'Formula', 'List-item', 'Page-footer', 'Page-header', 'Picture', 'Section-header', 'Table', 'Text', 'Title']. 3. Constraints: - All layout elements must be sorted according to human reading order. - Do NOT extract repetitive background patterns (e.g., "ROLE ITEMS // ROLE ITEMS" or repeated watermarks) as text. 5. Final Output: The entire output must be a single JSON object. """ # 카테고리별 색상 CATEGORY_COLORS = { 'Caption': '#FF6B6B', 'Footnote': '#4ECDC4', 'Formula': '#45B7D1', 'List-item': '#96CEB4', 'Page-footer': '#FFEAA7', 'Page-header': '#DDA0DD', 'Picture': '#FFD93D', 'Section-header': '#6C5CE7', 'Table': '#FD79A8', 'Text': '#74B9FF', 'Title': '#E17055' } # ============================================================ # 폰트 크기 관련 상수 및 유틸리티 # ============================================================ # (pptx_generator로 이동됨) # ============================================================ # 임시파일 정리 # ============================================================ def cleanup_old_files(max_age_hours: float = 1.0): """지정된 시간보다 오래된 임시파일 삭제""" try: # 1. 시스템 임시 폴더 정리 temp_dir = tempfile.gettempdir() patterns = ["dotsocr_*", "preview_*", "*_layout.pptx", "dotsocr_results_*"] threshold = _time.time() - (max_age_hours * 3600) deleted_count = 0 for p in patterns: files = glob.glob(os.path.join(temp_dir, p)) for f in files: try: if os.path.getmtime(f) < threshold: os.remove(f) deleted_count += 1 except Exception: pass # 2. outputs 폴더 정리 if os.path.exists(OUTPUT_DIR): output_files = glob.glob(os.path.join(OUTPUT_DIR, "*")) for f in output_files: try: if os.path.getmtime(f) < threshold: os.remove(f) deleted_count += 1 except Exception: pass if deleted_count > 0: print(f"🧹 Cleanup: {deleted_count}개 오래된 임시파일 삭제") except Exception as e: print(f"Cleanup failed: {e}") cleanup_old_files(max_age_hours=0) # 앱 시작 시 정리 # ============================================================ # 유틸리티 함수들 # ============================================================ def round_by_factor(number: int, factor: int) -> int: """Returns the closest integer to 'number' that is divisible by 'factor'.""" return round(number / factor) * factor def smart_resize( height: int, width: int, factor: int = IMAGE_FACTOR, min_pixels: int = MIN_PIXELS, max_pixels: int = MAX_PIXELS, ) -> Tuple[int, int]: """이미지 크기를 모델 요구사항에 맞게 조정""" if max(height, width) / min(height, width) > 200: raise ValueError(f"Aspect ratio too extreme: {max(height, width) / min(height, width)}") h_bar = max(factor, round_by_factor(height, factor)) w_bar = max(factor, round_by_factor(width, factor)) if h_bar * w_bar > max_pixels: beta = math.sqrt((height * width) / max_pixels) h_bar = round_by_factor(height / beta, factor) w_bar = round_by_factor(width / beta, factor) elif h_bar * w_bar < min_pixels: beta = math.sqrt(min_pixels / (height * width)) h_bar = round_by_factor(height * beta, factor) w_bar = round_by_factor(width * beta, factor) return h_bar, w_bar def fetch_image(image_input, min_pixels: int = None, max_pixels: int = None) -> Image.Image: """이미지 로드 및 리사이즈""" if isinstance(image_input, str): image = Image.open(image_input).convert('RGB') elif isinstance(image_input, Image.Image): image = image_input.convert('RGB') else: raise ValueError(f"Invalid image input type: {type(image_input)}") if min_pixels is not None or max_pixels is not None: min_pixels = min_pixels or MIN_PIXELS max_pixels = max_pixels or MAX_PIXELS height, width = smart_resize( image.height, image.width, factor=IMAGE_FACTOR, min_pixels=min_pixels, max_pixels=max_pixels ) image = image.resize((width, height), Image.LANCZOS) return image def draw_layout_on_image(image: Image.Image, layout_data: List[Dict]) -> Image.Image: """레이아웃 바운딩 박스를 이미지에 그리기""" img_copy = image.copy() draw = ImageDraw.Draw(img_copy) try: try: font = ImageFont.truetype("/usr/share/fonts/truetype/dejavu/DejaVuSans-Bold.ttf", 12) except Exception: font = ImageFont.load_default() for item in layout_data: if 'bbox' in item and 'category' in item: bbox = item['bbox'] category = item['category'] color = CATEGORY_COLORS.get(category, '#000000') draw.rectangle(bbox, outline=color, width=2) label = category label_bbox = draw.textbbox((0, 0), label, font=font) label_width = label_bbox[2] - label_bbox[0] label_height = label_bbox[3] - label_bbox[1] label_x = bbox[0] label_y = max(0, bbox[1] - label_height - 2) draw.rectangle( [label_x, label_y, label_x + label_width + 4, label_y + label_height + 2], fill=color ) draw.text((label_x + 2, label_y + 1), label, fill='white', font=font) except Exception as e: print(f"Error drawing layout: {e}") return img_copy def extract_json_from_text(text: str) -> Optional[str]: """ 텍스트에서 JSON 부분만 추출 모델 출력이 다음과 같은 형태일 수 있음: - ```json\n[...]\n``` - Here is the output:\n[...] - [...] (순수 JSON) """ import re if not text or not text.strip(): return None text = text.strip() # 1. 이미 순수 JSON인 경우 ([ 또는 { 로 시작) if text.startswith('[') or text.startswith('{'): return text # 2. 마크다운 코드 블록에서 추출: ```json ... ``` 또는 ``` ... ``` code_block_pattern = r'```(?:json)?\s*\n?([\s\S]*?)\n?```' matches = re.findall(code_block_pattern, text) if matches: for match in matches: match = match.strip() if match.startswith('[') or match.startswith('{'): return match # 3. [ 또는 { 로 시작하는 JSON 블록 찾기 json_pattern = r'(\[[\s\S]*\]|\{[\s\S]*\})' matches = re.findall(json_pattern, text) if matches: # 가장 긴 매치 반환 (가장 완전한 JSON일 가능성) return max(matches, key=len) return None def filter_noise_text(layout_data: List[Dict]) -> List[Dict]: """ 의미 없는 반복 텍스트나 노이즈를 필터링 예: '// ROLE ITEMS // ROLE ITEMS ...' """ import re import zlib cleaned_data = [] for item in layout_data: text = item.get('text', '') category = item.get('category', '') # 1. 텍스트가 없으면 패스 (Picture 등) if not text or not isinstance(text, str): cleaned_data.append(item) continue # 2. 압축률 기반 반복 패턴 감지 (매우 강력함) # 정상 텍스트는 압축률(compressed/original)이 보통 0.5 이상 # 단순 반복 텍스트는 0.1~0.2 수준으로 나옴 if len(text) > 50: compressed = zlib.compress(text.encode('utf-8')) ratio = len(compressed) / len(text) if ratio < 0.2: print(f" 🗑️ 반복 패턴 감지(압축률 {ratio:.2f}): {text[:30]}...") item['text'] = "" cleaned_data.append(item) continue # 3. 기존 단순 반복 체크 (보완: 공백 제거 후 체크) # "ROLE ITEMS // ROLE ITEMS //" -> "ROLEITEMS//ROLEITEMS//" clean_text = text.replace(" ", "") if len(clean_text) > 50: # 앞 10글자가 뒤에도 계속 나오는지 체크 prefix = clean_text[:10] if clean_text.count(prefix) >= 4: print(f" 🗑️ 단순 반복 감지: {text[:30]}...") item['text'] = "" cleaned_data.append(item) continue # 4. 슬래시, 파이프 등이 과도하게 많은 경우 필터링 special_chars = len(re.findall(r'[/|\\-]', text)) if len(text) > 20 and (special_chars / len(text)) > 0.4: item['text'] = "" cleaned_data.append(item) continue cleaned_data.append(item) return cleaned_data def repair_json_string(json_str: str) -> str: """ 잘못된 이스케이프 시퀀스가 포함된 JSON 문자열을 수리 Invalid \\escape: line 1 column 1796 (char 1795) 같은 오류 해결 """ import re # 1. 역슬래시 단독 사용을 이중 역슬래시로 변경 (이스케이프 문자 제외) # JSON에서 유효한 이스케이프: \", \\, \/, \b, \f, \n, \r, \t, \uXXXX # 그 외의 \는 \\로 이스케이프해야 함 # 먼저 모든 역슬래시를 찾아서 유효한지 검사하는 대신, # 문제가 되는 패턴(윈도우 경로, LaTeX 등)을 찾아서 수정 # LaTeX 수식에서 자주 사용되는 역슬래시 패턴 수정 # 예: \frac -> \\frac, \sigma -> \\sigma # 하지만 이미 \\frac으로 되어 있을 수도 있으므로 주의 # 전략: JSON 파싱이 실패하는 원인인 '유효하지 않은 이스케이프'만 타겟팅 # 하지만 정규식으로 완벽하게 처리하기 어려우므로, # Python의 raw string 처럼 역슬래시를 2개로 늘리는 단순한 방법을 시도 # 단, \", \\, \n 등은 유지해야 함 # 방법: replace로 하나씩 처리하기엔 경우의 수가 많음. # 정규식으로 '이스케이프되지 않은 역슬래시'를 찾아서 두 배로 만듦 # (역슬래시) 뒤에 ["\/bfnrtu] 가 오지 않는 경우를 찾음 pattern = r'\\(?![\\"/bfnrtu])' return re.sub(pattern, r'\\\\', json_str) def layoutjson2md(image: Image.Image, layout_data: List[Dict], text_key: str = 'text') -> str: """Layout JSON을 Markdown으로 변환 (Picture는 실제 이미지 임베딩)""" import base64 from io import BytesIO markdown_lines = [] try: sorted_items = sorted( layout_data, key=lambda x: (x.get('bbox', [0, 0, 0, 0])[1], x.get('bbox', [0, 0, 0, 0])[0]) ) for item in sorted_items: category = item.get('category', '') text = item.get(text_key, '') bbox = item.get('bbox', []) if category == 'Picture': # 이미지 영역 추출 및 base64 임베딩 if bbox and len(bbox) == 4: try: x1, y1, x2, y2 = bbox x1, y1 = max(0, int(x1)), max(0, int(y1)) x2, y2 = min(image.width, int(x2)), min(image.height, int(y2)) if x2 > x1 and y2 > y1: cropped_img = image.crop((x1, y1, x2, y2)) buffer = BytesIO() cropped_img.save(buffer, format='PNG') img_data = base64.b64encode(buffer.getvalue()).decode() markdown_lines.append(f"![Image](data:image/png;base64,{img_data})\n") else: markdown_lines.append("![Image](Image region detected)\n") except Exception as e: print(f"Error processing image region: {e}") markdown_lines.append("![Image](Image detected)\n") else: markdown_lines.append("![Image](Image detected)\n") elif not text: continue elif category == 'Title': markdown_lines.append(f"# {text}\n") elif category == 'Section-header': markdown_lines.append(f"## {text}\n") elif category == 'Text': markdown_lines.append(f"{text}\n") elif category == 'List-item': markdown_lines.append(f"- {text}\n") elif category == 'Table': if text.strip().startswith('<'): markdown_lines.append(f"{text}\n") else: markdown_lines.append(f"**Table:** {text}\n") elif category == 'Formula': if text.strip().startswith('$') or '\\' in text: markdown_lines.append(f"$$\n{text}\n$$\n") else: markdown_lines.append(f"**Formula:** {text}\n") elif category == 'Caption': markdown_lines.append(f"*{text}*\n") elif category == 'Footnote': markdown_lines.append(f"^{text}^\n") elif category not in ['Page-header', 'Page-footer']: markdown_lines.append(f"{text}\n") markdown_lines.append("") except Exception as e: print(f"Error converting to markdown: {e}") return str(layout_data) return "\n".join(markdown_lines) # ============================================================ # 전역 모델 로드 (앱 시작 시 1회만) # ============================================================ if MOCK_MODE: print("=" * 60) print("🎭 MOCK 모드 활성화") print(" 실제 모델/API 호출 없이 UI만 테스트합니다.") print("=" * 60) model = None processor = None device = "cpu" else: print("=" * 60) print("📥 모델 다운로드 중...") print(f" 모델 ID: {MODEL_ID}") print(f" 저장 경로: {MODEL_PATH}") try: snapshot_download( repo_id=MODEL_ID, local_dir=MODEL_PATH, local_dir_use_symlinks=False, ) print("✅ 모델 다운로드 완료!") except Exception as e: print(f"⚠️ 모델 다운로드 중 오류 (캐시 사용 시도): {e}") print("🔄 모델 로딩 중...") model = AutoModelForCausalLM.from_pretrained( MODEL_PATH, attn_implementation="sdpa", # Flash Attention 대신 SDPA 사용 torch_dtype=torch.bfloat16, device_map="auto", trust_remote_code=True ) # Processor 로드 (transformers==4.44.2에서는 정상 작동) processor = AutoProcessor.from_pretrained( MODEL_PATH, trust_remote_code=True ) # video_processor가 없으면 image_processor로 대체 (qwen_vl_utils 호환성) if not hasattr(processor, "video_processor") or processor.video_processor is None: if hasattr(processor, "image_processor"): processor.video_processor = processor.image_processor print(" video_processor를 image_processor로 대체 설정") device = "cuda" if torch.cuda.is_available() else "cpu" print(f"✅ 모델 로딩 완료!") print(f" Device: {device}") if torch.cuda.is_available(): print(f" GPU: {torch.cuda.get_device_name(0)}") print("=" * 60) # ============================================================ # GPU 추론 함수 (@spaces.GPU는 여기만!) # ============================================================ def _real_inference(image: Image.Image, prompt: str, max_new_tokens: int = 4096) -> str: """실제 GPU 추론 함수""" try: messages = [ { "role": "user", "content": [ {"type": "image", "image": image}, {"type": "text", "text": prompt} ] } ] text = processor.apply_chat_template( messages, tokenize=False, add_generation_prompt=True ) image_inputs, video_inputs = process_vision_info(messages) inputs = processor( text=[text], images=image_inputs, videos=video_inputs, # video_inputs 추가 padding=True, return_tensors="pt", ) inputs = inputs.to(device) with torch.no_grad(): generated_ids = model.generate( **inputs, max_new_tokens=max_new_tokens, do_sample=False, ) generated_ids_trimmed = [ out_ids[len(in_ids):] for in_ids, out_ids in zip(inputs.input_ids, generated_ids) ] output_text = processor.batch_decode( generated_ids_trimmed, skip_special_tokens=True, clean_up_tokenization_spaces=False ) # [추가] 사용한 텐서 즉시 삭제 및 메모리 정리 del inputs, generated_ids, generated_ids_trimmed gc.collect() if torch.cuda.is_available(): torch.cuda.empty_cache() return output_text[0] if output_text else "" except Exception as e: print(f"❌ 추론 오류: {e}") traceback.print_exc() return f"Error: {str(e)}" def _mock_inference(image: Image.Image, prompt: str, max_new_tokens: int = 4096) -> str: """Mock 추론 함수 - 실제 API 호출 없이 더미 데이터 반환""" print(f" 🎭 [MOCK] 추론 시작 (실제 API 호출 없음)") print(f" 📝 프롬프트: {prompt[:50]}...") import time time.sleep(0.5) # 로딩 시뮬레이션 print(f" ✅ [MOCK] 추론 완료") return _MOCK_RESPONSE # inference 함수 선택 if MOCK_MODE: @spaces.GPU(duration=90) if IS_SPACES else lambda f: f def inference(image: Image.Image, prompt: str, max_new_tokens: int = 4096) -> str: return _mock_inference(image, prompt, max_new_tokens) else: @spaces.GPU(duration=90) def inference(image: Image.Image, prompt: str, max_new_tokens: int = 4096) -> str: return _real_inference(image, prompt, max_new_tokens) # ============================================================ # 이미지 처리 함수 (GPU 데코레이터 없음) # ============================================================ def process_single_image( image: Image.Image, prompt_mode: str = "layout_text", max_pixels: int = MIN_PIXELS ) -> Dict[str, Any]: """단일 이미지 처리""" try: # 이미지 리사이즈 processed_img = fetch_image(image, min_pixels=MIN_PIXELS, max_pixels=max_pixels) # 프롬프트 선택 prompt = LAYOUT_PROMPT if prompt_mode == "layout_text" else LAYOUT_ONLY_PROMPT # GPU 추론 호출 print(f" 🔍 추론 시작...") raw_output = inference(processed_img, prompt) print(f" ✅ 추론 완료") result = { 'original_image': image, 'raw_output': raw_output, 'processed_image': image, 'layout_result': [], # 항상 List[Dict] 타입 유지 'markdown_content': None } # JSON 파싱 시도 try: # raw_output에서 JSON 부분 추출 json_text = extract_json_from_text(raw_output) if json_text: try: layout_data = json.loads(json_text) except json.JSONDecodeError as e: # 1차 실패 시 수리 시도 (Invalid escape 등) print(f" ⚠️ 1차 JSON 파싱 실패 ({e}), 수리 시도...") repaired_json = repair_json_string(json_text) layout_data = json.loads(repaired_json) print(" ✅ JSON 수리 및 파싱 성공") # layout_data가 리스트인지 확인 if isinstance(layout_data, list): # [추가] 노이즈 필터링 적용 layout_data = filter_noise_text(layout_data) result['layout_result'] = layout_data elif isinstance(layout_data, dict): # 단일 객체면 리스트로 감싸기 layout_data = [layout_data] layout_data = filter_noise_text(layout_data) result['layout_result'] = layout_data else: raise ValueError(f"Unexpected layout_data type: {type(layout_data)}") # 바운딩 박스 시각화 result['processed_image'] = draw_layout_on_image(image, result['layout_result']) # Markdown 변환 result['markdown_content'] = layoutjson2md(image, result['layout_result'], text_key='text') else: print(" ⚠️ JSON 추출 실패, raw output 사용") print(f" 📝 Raw output 미리보기: {raw_output[:200]}...") result['markdown_content'] = raw_output except (json.JSONDecodeError, ValueError) as e: print(f" ⚠️ JSON 파싱 실패: {e}") print(f" 📝 Raw output 미리보기: {raw_output[:200]}...") result['markdown_content'] = raw_output return result except Exception as e: print(f"❌ 이미지 처리 오류: {e}") traceback.print_exc() return { 'original_image': image, 'raw_output': f"Error: {str(e)}", 'processed_image': image, 'layout_result': [], # 항상 List[Dict] 타입 유지 'markdown_content': f"Error: {str(e)}" } # ============================================================ # PPTX 생성 함수들 # ============================================================ # (pptx_generator로 이동됨) def save_results( all_results: List[Dict], file_stem: str, include_background: bool, images: List[Image.Image], use_dark_mode: bool = False, show_border: bool = False ) -> Tuple[Optional[str], Optional[str], List[str], List[List[Dict]]]: """결과 저장 로직 분리 (부분 저장 지원용)""" try: timestamp = int(_time.time()) safe_prefix = file_stem.encode('ascii', 'ignore').decode().strip() or 'document' # 1. PPTX 생성 pptx_path = None if include_background or any(r.get('layout_result') for r in all_results): pptx_filename = f"{safe_prefix}_{timestamp}.pptx" pptx_path = os.path.join(OUTPUT_DIR, pptx_filename) # 처리된 페이지만큼의 배경 이미지 준비 processed_count = len(all_results) background_images = images[:processed_count] if include_background else [] page_count, box_count = build_pptx_from_results( all_results, background_images, Path(pptx_path), use_dark_mode=use_dark_mode, show_border=show_border ) print(f"📊 PPTX 저장: {page_count}페이지, {box_count}개 텍스트박스") # 2. JSON 저장 json_filename = f"{safe_prefix}_{timestamp}.json" json_path = os.path.join(OUTPUT_DIR, json_filename) json_data = [r.get('layout_result') or [] for r in all_results] with open(json_path, 'w', encoding='utf-8') as f: json.dump(json_data, f, ensure_ascii=False, indent=2) # 3. 레이아웃 이미지 저장 layout_img_paths = [] if all_results: for i, res in enumerate(all_results): proc_img = res.get('processed_image') if proc_img: img_filename = f"layout_{safe_prefix}_{timestamp}_{i}.jpg" img_path = os.path.join(OUTPUT_DIR, img_filename) proc_img.save(img_path, "JPEG", quality=90) layout_img_paths.append(img_path) return pptx_path, json_path, layout_img_paths, json_data except Exception as e: print(f"⚠️ 결과 저장 중 오류: {e}") traceback.print_exc() return None, None, [], [] # ============================================================ # 메인 처리 함수 (GPU 데코레이터 없음!) # ============================================================ def process_document( file, prompt_mode: str, quality_mode: str, include_background: bool, use_dark_mode: bool, show_border: bool, ) -> Tuple[Optional[str], Optional[str], Optional[str], str, Any, str]: """문서 처리 메인 함수""" if file is None: return ( None, None, None, "*파일을 업로드하세요.*", [], "파일을 업로드하세요." ) file_path = file.name file_ext = Path(file_path).suffix.lower() file_stem = Path(file_path).stem # 품질 모드 설정값 가져오기 target_max_pixels = QUALITY_MODES.get(quality_mode, 2073600) print("=" * 60) print(f"🚀 변환 시작: {file_stem}") print(f" 프롬프트 모드: {prompt_mode}") print(f" 품질 모드: {quality_mode} ({target_max_pixels} pixels)") print(f" 배경 포함: {include_background}") print(f" 스타일 옵션: 다크모드={use_dark_mode}, 테두리={show_border}") print("=" * 60) # 프롬프트 모드 변환 real_prompt_mode = PROMPT_MAPPING.get(prompt_mode, "layout_text") # 변수 초기화 (예외 발생 시 참조 가능하도록) all_results = [] all_markdown = [] images = [] try: # 이미지 로드 if file_ext == ".pdf": print("📄 PDF 로딩 중...") images = load_images_from_pdf(file_path, dpi=200) print(f" {len(images)}개 페이지 로드 완료") elif file_ext in [".jpg", ".jpeg", ".png"]: images = [Image.open(file_path).convert("RGB")] print("🖼️ 이미지 로드 완료") else: return ( None, None, None, "*지원하지 않는 파일 형식입니다.*", [], f"지원하지 않는 형식: {file_ext}" ) if not images: return ( None, None, None, "*이미지를 로드할 수 없습니다.*", [], "이미지 로드 실패" ) # 각 페이지 처리 # [안전장치] 페이지 수 제한 (최대 10페이지) MAX_PAGES = 10 if len(images) > MAX_PAGES: print(f"⚠️ 페이지 수가 너무 많습니다 ({len(images)}장). 안정성을 위해 앞 {MAX_PAGES}장만 처리합니다.") images = images[:MAX_PAGES] for i, img in enumerate(images): print(f"\n📄 [{i+1}/{len(images)}] 페이지 처리 중...") result = process_single_image(img, real_prompt_mode, max_pixels=target_max_pixels) all_results.append(result) if result.get('markdown_content'): all_markdown.append(f"## 페이지 {i+1}\n\n{result['markdown_content']}") # [추가] 페이지 처리 후 명시적 메모리 정리 (루프 내부) print(f" 🧹 페이지 {i+1} 처리 후 메모리 정리...") gc.collect() if not MOCK_MODE and torch is not None and torch.cuda.is_available(): torch.cuda.empty_cache() print(f"\n✅ 모든 페이지 처리 완료!") # 결과 저장 (정상 완료 시) pptx_path, json_path, layout_img_paths, json_data = save_results( all_results, file_stem, include_background, images, use_dark_mode=use_dark_mode, show_border=show_border ) # Markdown 결과 combined_markdown = "\n\n---\n\n".join(all_markdown) if all_markdown else "*추출된 텍스트가 없습니다.*" # 요약 summary = f"✅ 완료: {len(images)}페이지 처리" # 가비지 컬렉션 gc.collect() if not MOCK_MODE and torch is not None and torch.cuda.is_available(): torch.cuda.empty_cache() cleanup_old_files(max_age_hours=1.0) return ( pptx_path, json_path, layout_img_paths, combined_markdown, json_data, summary ) except Exception as e: print(f"\n❌❌❌ 오류 발생 ❌❌❌") print(f"에러: {type(e).__name__}: {e}") traceback.print_exc() # [부분 저장 시도] if all_results: print(f"⚠️ 에러 발생! 현재까지 처리된 {len(all_results)}페이지 결과를 저장합니다...") pptx_path, json_path, layout_img_paths, json_data = save_results( all_results, file_stem, include_background, images, use_dark_mode=use_dark_mode, show_border=show_border ) combined_markdown = "\n\n---\n\n".join(all_markdown) if all_markdown else f"*처리 도중 오류 발생: {str(e)}*" summary = f"⚠️ 부분 완료: {len(all_results)}/{len(images)} 페이지 처리 중 중단됨 ({str(e)})" return ( pptx_path, json_path, layout_img_paths, combined_markdown, json_data, summary ) return ( None, None, None, f"*오류: {str(e)}*", [], f"처리 실패: {type(e).__name__}: {e}" ) # ============================================================ # 페이지 네비게이션 함수들 # ============================================================ def load_preview_images(file) -> Tuple[List[Image.Image], int]: """업로드된 파일에서 미리보기 이미지 로드""" if file is None: return [], 0 file_path = file.name ext = Path(file_path).suffix.lower() images = [] if ext == ".pdf": try: images = load_images_from_pdf(file_path, dpi=150) except Exception as e: print(f"PDF 미리보기 로드 실패: {e}") elif ext in [".jpg", ".jpeg", ".png"]: try: images = [Image.open(file_path).convert("RGB")] except Exception as e: print(f"이미지 로드 실패: {e}") return images, len(images) def update_preview(file, current_idx): """파일 업로드 시 미리보기 업데이트""" images, total = load_preview_images(file) if not images: return None, 0, 0, "파일을 업로드하세요." current_idx = 0 preview_img = images[current_idx] status = f"페이지 {current_idx + 1} / {total}" return preview_img, current_idx, total, status def navigate_page(file, current_idx, total_pages, direction): """이전/다음 페이지 네비게이션""" if total_pages == 0: return None, 0, "파일을 업로드하세요." if direction == "prev": current_idx = max(0, current_idx - 1) else: current_idx = min(total_pages - 1, current_idx + 1) images, _ = load_preview_images(file) if images and 0 <= current_idx < len(images): preview_img = images[current_idx] status = f"페이지 {current_idx + 1} / {total_pages}" return preview_img, current_idx, status return None, current_idx, "이미지 로드 실패" # ============================================================ # Gradio UI # ============================================================ custom_css = """ .upload-box { border: 2px dashed #a8b4c4 !important; border-radius: 12px !important; background: #f8fafc !important; } .nav-btn { min-width: 120px !important; } .preview-label { color: #6366f1 !important; font-weight: 600 !important; } """ with gr.Blocks(title="PDF/Image to PPTX Converter") as demo: # 상태 변수 current_page_idx = gr.State(0) total_pages = gr.State(0) gr.Markdown( f""" # 🔍 PDF/Image to PPTX Converter PDF 또는 이미지를 업로드하면 레이아웃과 텍스트를 추출하고 PPTX로 변환합니다. (dots.ocr 기반) **⚡ Created by Hyunsang Joo!** {"🎭 **MOCK MODE** - UI 테스트 모드" if MOCK_MODE else ""} """ ) with gr.Row(): # ==================== 왼쪽 컬럼 ==================== with gr.Column(scale=1): file_input = gr.File( label="📄 Upload Image or PDF", file_types=[".pdf", ".jpg", ".jpeg", ".png"], file_count="single", elem_classes=["upload-box"] ) gr.Markdown("**🖼️ Preview**", elem_classes=["preview-label"]) preview_image = gr.Image( label="Preview", type="pil", height=400, show_label=False, interactive=False ) page_status = gr.Markdown("파일을 업로드하세요.") with gr.Row(): prev_btn = gr.Button("◀ Previous", elem_classes=["nav-btn"]) next_btn = gr.Button("Next ▶", elem_classes=["nav-btn"]) with gr.Accordion("⚙️ 설정", open=False): with gr.Row(): prompt_mode = gr.Dropdown( label="프롬프트 모드", choices=list(PROMPT_MAPPING.keys()), value=DEFAULT_PROMPT_KEY, ) quality_mode = gr.Dropdown( label="품질 모드 (속도 vs 정확도)", choices=list(QUALITY_MODES.keys()), value=DEFAULT_QUALITY_KEY, ) include_background = gr.Checkbox( label="PPTX에 배경 이미지 포함", value=True ) with gr.Row(): use_dark_mode = gr.Checkbox( label="다크 모드 (검정 배경/흰 글씨)", value=False ) show_border = gr.Checkbox( label="텍스트 박스 테두리 표시", value=False ) run_btn = gr.Button("🚀 변환 실행", variant="primary", size="lg") # ==================== 오른쪽 컬럼 (탭) ==================== with gr.Column(scale=2): with gr.Tabs(): with gr.TabItem("🖼️ Processed Image"): processed_image = gr.Gallery( label="Image with Layout Detection", type="filepath", height=500, show_label=True, interactive=False, columns=[2], # 2열 그리드로 보기 rows=[2], object_fit="contain" ) with gr.TabItem("📝 Extracted Content"): extracted_content = gr.Markdown( label="Extracted Text", value="*변환 실행 후 추출된 텍스트가 여기에 표시됩니다.*" ) with gr.TabItem("📋 Layout JSON"): layout_json = gr.JSON( label="Layout JSON", value=[] # 일관된 List 타입 ) gr.Markdown("### 📥 결과 파일 다운로드") with gr.Row(): pptx_output = gr.File(label="PPTX 파일", type="filepath", interactive=False) json_output = gr.File(label="JSON 파일", type="filepath", interactive=False) log_output = gr.Markdown(label="로그", value="") # ==================== 이벤트 핸들러 ==================== file_input.change( fn=update_preview, inputs=[file_input, current_page_idx], outputs=[preview_image, current_page_idx, total_pages, page_status] ) prev_btn.click( fn=lambda f, idx, total: navigate_page(f, idx, total, "prev"), inputs=[file_input, current_page_idx, total_pages], outputs=[preview_image, current_page_idx, page_status] ) next_btn.click( fn=lambda f, idx, total: navigate_page(f, idx, total, "next"), inputs=[file_input, current_page_idx, total_pages], outputs=[preview_image, current_page_idx, page_status] ) run_btn.click( fn=process_document, inputs=[file_input, prompt_mode, quality_mode, include_background, use_dark_mode, show_border], outputs=[pptx_output, json_output, processed_image, extracted_content, layout_json, log_output] ) if __name__ == "__main__": port = int(os.getenv("PORT", "7860")) demo.queue(max_size=10).launch( server_name="0.0.0.0", server_port=port, ssr_mode=False, show_error=True, css=custom_css, allowed_paths=[OUTPUT_DIR] # [중요] 출력 폴더 접근 허용 )