Spaces:
Sleeping
Sleeping
| import gradio as gr | |
| import json | |
| import os | |
| from pathlib import Path | |
| from typing import List, Dict, Any | |
| import google.generativeai as genai | |
| from PIL import Image | |
| import PyPDF2 | |
| import tempfile | |
| import traceback | |
| # ============================================================== | |
| # API Configuration - Add your key here | |
| # ============================================================== | |
| GEMINI_API_KEY = "AIzaSyB2b80YwNHs3Yj6RZOTL8wjXk2YhxCluOA" # Replace with your key | |
| # ============================================================== | |
| # Enhanced extraction prompt with better instructions | |
| # ============================================================== | |
| EXTRACTION_PROMPT = """You are an expert shipping-document data extractor with OCR capabilities. | |
| Carefully analyze ALL text content from PDFs, images, and documents. | |
| CRITICAL: Look at both the text AND the visual layout of documents. Sometimes important data | |
| is in tables, handwritten notes, stamps, or poorly scanned areas. | |
| Extract and structure the data as valid JSON only (no markdown, no commentary): | |
| { | |
| "poNumber": string | null, | |
| "shipFrom": string | null, | |
| "carrierType": string | null, | |
| "originCarrier": string | null, | |
| "railCarNumber": string | null, | |
| "totalQuantity": number | null, | |
| "totalUnits": string | null, | |
| "attachments": [string], | |
| "accountName": string | null, | |
| "inventories": { | |
| "items": [ | |
| { | |
| "quantityShipped": number | null, | |
| "inventoryUnits": string | null, | |
| "pcs": number | null, | |
| "productName": string | null, | |
| "productCode": string | null, | |
| "product": { | |
| "category": string | null, | |
| "defaultUnits": string | null, | |
| "unit": number | null, | |
| "pcs": number | null, | |
| "mbf": number | null, | |
| "sf": number | null, | |
| "pcsHeight": number | null, | |
| "pcsWidth": number | null, | |
| "pcsLength": number | null | |
| }, | |
| "customFields": [string] | |
| } | |
| ] | |
| } | |
| } | |
| EXTRACTION RULES: | |
| 1. Extract ALL product line items - create one inventory item per product | |
| 2. Parse dimensions: "2X6X14" β pcsHeight=2, pcsWidth=6, pcsLength=14 | |
| 3. Convert BF to MBF: BF Γ· 1000 | |
| 4. customFields format: "Key||Value" (e.g., "Mill||Tolko") | |
| 5. Look for: PO numbers, shipping info, quantities, product codes, dimensions | |
| 6. Check headers, footers, stamps, handwritten notes, and table cells | |
| 7. If multiple documents, consolidate all items into one JSON | |
| 8. Return null for missing fields | |
| 9. attachments should list all provided filenames | |
| Return ONLY valid JSON matching this exact structure.""" | |
| def extract_text_from_pdf(pdf_path: str) -> str: | |
| """Extract text from PDF with better error handling""" | |
| try: | |
| with open(pdf_path, 'rb') as file: | |
| pdf_reader = PyPDF2.PdfReader(file) | |
| text = "" | |
| for page_num, page in enumerate(pdf_reader.pages): | |
| page_text = page.extract_text() | |
| if page_text: | |
| text += f"\n--- Page {page_num + 1} ---\n{page_text}" | |
| return text if text.strip() else "No text extracted from PDF" | |
| except Exception as e: | |
| return f"Error extracting PDF text: {str(e)}" | |
| def process_files_for_gemini(files: List[str]) -> Dict[str, Any]: | |
| """Process files and prepare for Gemini multimodal input""" | |
| processed_data = { | |
| "text_content": "", | |
| "file_objects": [], | |
| "attachments": [], | |
| "file_info": [] | |
| } | |
| if not files: | |
| return processed_data | |
| for file_path in files: | |
| if not os.path.exists(file_path): | |
| continue | |
| file_name = Path(file_path).name | |
| file_ext = Path(file_path).suffix.lower() | |
| processed_data["attachments"].append(file_name) | |
| processed_data["file_info"].append(f"File: {file_name} (Type: {file_ext})") | |
| try: | |
| # Handle PDFs | |
| if file_ext == '.pdf': | |
| text = extract_text_from_pdf(file_path) | |
| processed_data["text_content"] += f"\n\n=== {file_name} ===\n{text}" | |
| # Upload PDF to Gemini for visual analysis | |
| uploaded_file = genai.upload_file(file_path) | |
| processed_data["file_objects"].append(uploaded_file) | |
| # Handle images | |
| elif file_ext in ['.jpg', '.jpeg', '.png', '.gif', '.bmp', '.webp']: | |
| # Upload image to Gemini | |
| uploaded_file = genai.upload_file(file_path) | |
| processed_data["file_objects"].append(uploaded_file) | |
| processed_data["text_content"] += f"\n\n=== {file_name} (Image) ===\n[Image uploaded for visual analysis]" | |
| # Handle text files | |
| elif file_ext in ['.txt', '.csv']: | |
| with open(file_path, 'r', encoding='utf-8', errors='ignore') as f: | |
| text = f.read() | |
| processed_data["text_content"] += f"\n\n=== {file_name} ===\n{text}" | |
| # Handle Word documents (basic text extraction) | |
| elif file_ext in ['.doc', '.docx']: | |
| try: | |
| import docx | |
| doc = docx.Document(file_path) | |
| text = "\n".join([para.text for para in doc.paragraphs]) | |
| processed_data["text_content"] += f"\n\n=== {file_name} ===\n{text}" | |
| except ImportError: | |
| processed_data["text_content"] += f"\n\n=== {file_name} ===\n[Word document - install python-docx for text extraction]" | |
| except Exception as e: | |
| processed_data["text_content"] += f"\n\n=== {file_name} ===\nError reading Word doc: {str(e)}" | |
| except Exception as e: | |
| processed_data["text_content"] += f"\n\n=== {file_name} ===\nError processing: {str(e)}" | |
| return processed_data | |
| def extract_with_gemini(processed_data: Dict[str, Any], api_key: str, model_name: str = "gemini-2.0-flash-exp") -> Dict[str, Any]: | |
| """Extract structured data using Gemini with enhanced multimodal processing""" | |
| if not api_key or api_key.strip() == "": | |
| return { | |
| "success": False, | |
| "error": "Gemini API key not provided" | |
| } | |
| try: | |
| # Configure Gemini | |
| genai.configure(api_key=api_key) | |
| # Use the latest model with vision capabilities | |
| model = genai.GenerativeModel(model_name) | |
| # Build multimodal prompt | |
| content_parts = [ | |
| EXTRACTION_PROMPT, | |
| f"\n\nDOCUMENT CONTEXT:\n{processed_data['text_content']}\n", | |
| f"\nATTACHMENTS: {json.dumps(processed_data['attachments'])}\n", | |
| "\nNow analyze the uploaded files carefully (including visual content) and extract the data as JSON:" | |
| ] | |
| # Add all uploaded files | |
| content_parts.extend(processed_data["file_objects"]) | |
| # Generate with higher temperature for better extraction | |
| generation_config = genai.types.GenerationConfig( | |
| temperature=0.2, | |
| max_output_tokens=8000, | |
| ) | |
| response = model.generate_content( | |
| content_parts, | |
| generation_config=generation_config | |
| ) | |
| response_text = response.text.strip() | |
| # Clean markdown code blocks | |
| if response_text.startswith("```json"): | |
| response_text = response_text[7:] | |
| elif response_text.startswith("```"): | |
| response_text = response_text[3:] | |
| if response_text.endswith("```"): | |
| response_text = response_text[:-3] | |
| response_text = response_text.strip() | |
| # Parse JSON | |
| extracted_data = json.loads(response_text) | |
| return { | |
| "success": True, | |
| "data": extracted_data, | |
| "raw_response": response_text, | |
| "files_processed": len(processed_data["file_objects"]) | |
| } | |
| except json.JSONDecodeError as e: | |
| return { | |
| "success": False, | |
| "error": f"JSON parsing error: {str(e)}", | |
| "raw_response": response.text if 'response' in locals() else "No response", | |
| "suggestion": "The AI returned non-JSON text. Try again or check the raw response." | |
| } | |
| except Exception as e: | |
| return { | |
| "success": False, | |
| "error": f"Extraction error: {str(e)}", | |
| "traceback": traceback.format_exc() | |
| } | |
| def process_documents(files): | |
| """Main Gradio processing function""" | |
| if not files or len(files) == 0: | |
| return "β Error: Please upload at least one file", "{}", "No files provided" | |
| # Use the hardcoded API key and default model | |
| api_key = GEMINI_API_KEY | |
| model_choice = "gemini-2.0-flash-exp" | |
| if not api_key or api_key.strip() == "": | |
| return "β Error: API key not configured in code", "{}", "API key missing" | |
| try: | |
| # Get file paths | |
| file_paths = [f.name if hasattr(f, 'name') else f for f in files] | |
| status_msg = f"π Processing {len(file_paths)} file(s)...\n" | |
| # Process files | |
| processed_data = process_files_for_gemini(file_paths) | |
| status_msg += f"β Files loaded: {', '.join(processed_data['attachments'])}\n" | |
| # Extract with Gemini | |
| status_msg += "π€ Extracting data with Gemini AI...\n" | |
| result = extract_with_gemini(processed_data, api_key, model_choice) | |
| if result.get("success"): | |
| json_output = json.dumps(result["data"], indent=2) | |
| status_msg += f"β Extraction successful! Processed {result.get('files_processed', 0)} files.\n" | |
| # Format display output | |
| display_text = "=== EXTRACTED DATA ===\n\n" | |
| display_text += json_output | |
| return status_msg, json_output, display_text | |
| else: | |
| error_msg = f"β Extraction failed:\n{result.get('error', 'Unknown error')}\n" | |
| if 'suggestion' in result: | |
| error_msg += f"\nπ‘ {result['suggestion']}\n" | |
| if 'traceback' in result: | |
| error_msg += f"\nDebug info:\n{result['traceback'][:500]}" | |
| raw_resp = result.get('raw_response', 'No response') | |
| return error_msg, "{}", f"Raw Response:\n{raw_resp[:1000]}" | |
| except Exception as e: | |
| error_msg = f"β Unexpected error: {str(e)}\n{traceback.format_exc()[:500]}" | |
| return error_msg, "{}", error_msg | |
| # ============================================================== | |
| # Gradio Interface | |
| # ============================================================== | |
| def create_interface(): | |
| with gr.Blocks(theme=gr.themes.Soft(), title="Document Data Extractor") as demo: | |
| gr.Markdown(""" | |
| # π Shipping Document Data Extractor | |
| Upload PDFs, images, Word docs, or text files to extract structured shipping data using Google Gemini AI. | |
| **Supported formats:** PDF, JPG, PNG, DOCX, TXT, CSV | |
| """) | |
| with gr.Row(): | |
| with gr.Column(scale=2): | |
| file_input = gr.File( | |
| label="π Upload Documents", | |
| file_count="multiple", | |
| file_types=[".pdf", ".jpg", ".jpeg", ".png", ".gif", ".bmp", ".txt", ".csv", ".doc", ".docx"] | |
| ) | |
| submit_btn = gr.Button("π Extract Data", variant="primary", size="lg") | |
| with gr.Column(scale=3): | |
| status_output = gr.Textbox( | |
| label="π Status", | |
| lines=4, | |
| max_lines=8 | |
| ) | |
| json_output = gr.Code( | |
| label="π JSON Output (Copy this)", | |
| language="json", | |
| lines=15 | |
| ) | |
| display_output = gr.Textbox( | |
| label="ποΈ Preview", | |
| lines=10, | |
| max_lines=15 | |
| ) | |
| gr.Markdown(""" | |
| ### π‘ Tips: | |
| - Upload multiple files for batch processing | |
| - For images: ensure text is clear and well-lit | |
| - For PDFs: both text-based and scanned PDFs work | |
| - The AI will analyze visual content even if text extraction fails | |
| """) | |
| # Button action | |
| submit_btn.click( | |
| fn=process_documents, | |
| inputs=[file_input], | |
| outputs=[status_output, json_output, display_output] | |
| ) | |
| # Examples | |
| gr.Examples( | |
| examples=[ | |
| [["example1.pdf"]], | |
| ], | |
| inputs=[file_input], | |
| label="Example Usage" | |
| ) | |
| return demo | |
| if __name__ == "__main__": | |
| demo = create_interface() | |
| demo.launch( | |
| server_name="0.0.0.0", | |
| server_port=7860, | |
| share=False | |
| ) |