# -*- coding: utf-8 -*- """temp_image.ipynb Automatically generated by Colab. Original file is located at https://colab.research.google.com/drive/1g_LpdbYLQ7dGmAzUiG2X2gPQUsPkDN1D """ # Commented out IPython magic to ensure Python compatibility. # %%capture # import os # os.environ["UNSLOTH_VLLM_STANDBY"] = "1" # # # Install packages for Colab # !pip install --upgrade -qqq uv # try: # import numpy; get_numpy = f"numpy=={numpy.__version__}" # except: # get_numpy = "numpy" # # try: # import subprocess; is_t4 = "Tesla T4" in str(subprocess.check_output(["nvidia-smi"])) # except: # is_t4 = False # # get_vllm, get_triton = ("vllm==0.10.1", "triton==3.2.0") if is_t4 else ("vllm", "triton") # # !uv pip install -qqq --upgrade \ # unsloth {get_vllm} {get_numpy} torchvision bitsandbytes xformers # !uv pip install -qqq {get_triton} # !uv pip install transformers==4.55.4 # !uv pip install PyMuPDF xlsxwriter pillow # # print("All packages installed successfully!") from unsloth import FastLanguageModel import torch import fitz # PyMuPDF import json import pandas as pd import os import re import xlsxwriter from PIL import Image, ImageDraw import io from collections import defaultdict from vllm import SamplingParams from trl import GRPOConfig, GRPOTrainer from datasets import Dataset import numpy as np from google.colab import files import zipfile import matplotlib.pyplot as plt # Model configuration max_seq_length = 2048 lora_rank = 32 print("Loading model...") model, tokenizer = FastLanguageModel.from_pretrained( model_name="unsloth/Qwen3-4B-Base", max_seq_length=max_seq_length, load_in_4bit=False, fast_inference=True, max_lora_rank=lora_rank, gpu_memory_utilization=0.7, ) model = FastLanguageModel.get_peft_model( model, r=lora_rank, target_modules=[ "q_proj", "k_proj", "v_proj", "o_proj", "gate_proj", "up_proj", "down_proj", ], lora_alpha=lora_rank*2, use_gradient_checkpointing="unsloth", random_state=3407, ) print("Model loaded successfully!") print("Please upload your PDF file:") uploaded = files.upload() # Get the uploaded file name pdf_file_path = list(uploaded.keys())[0] print(f"Uploaded file: {pdf_file_path}") # Verify the file if not pdf_file_path.endswith('.pdf'): print("Warning: Please ensure you uploaded a PDF file") else: print("PDF file ready for processing!") new_system_prompt = """You are a data extraction assistant. Extract the item details from the provided text. Provide the output as a JSON object, where object represents an item and has the following keys: 'Flag', 'Product Code', 'Description', 'Manufacturer', 'Supplier', 'Material', 'Dimensions', and 'Product Image'. If a key's value is not found in the text for an item, provide an empty string "". If no items are found, return an empty JSON {}. Do not include any extra text or formatting outside the JSON object. Include rows with unique Product Code values only. For the 'Dimensions' field, extract all dimension information found (e.g., Height, Width, Depth, Diameter, Length) and format them as a single string of key-value pairs separated by semicolons, like "Height: [value]; Width: [value]; Diameter: [value]". If a specific dimension is not available, do not include its key-value pair in the string. If we found the data from first page then take those only If there are any missing details or extra details then include with it. Do not include any duplicate data in any key of JSON.""" # Your existing training data annotated_data_examples = [ { "prompt": [ {"role": "system", "content": new_system_prompt}, {"role": "user", "content": "Text:\nProject Name: Anse La Mouche\nItem Number: GR-AA10\nDescription: Wall Hanging Art Work\nManufacturer: Harper + Wilde\nSupplier: Harper + Wilde\nMaterial/Finish: Hand Rolled Clay Beads, Cuttlefish Bone, Hemp Rope\nDimensions: Height: 300mm; Width: 250mm\nImage:\n[Image Placeholder]\n\nOutput JSON:"}, ], "answer": '[{"Flag": "", "Product Code": "GR-AA10", "Description": "Wall Hanging Art Work", "Manufacturer": "Harper + Wilde", "Supplier": "Harper + Wilde", "Material": "Hand Rolled Clay Beads, Cuttlefish Bone, Hemp Rope", "Dimensions": "Height: 300mm; Width: 250mm", "Product Image": ""}]', }, { "prompt": [ {"role": "system", "content": new_system_prompt}, {"role": "user", "content": "Text:\nProject Name: Anse La Mouche\nItem Number: GR-AA12\nDescription: Mirror\nManufacturer: By Contractor\nMaterial/Finish: Clear Mirror (GR-GL02), Powder-Coated Black Aluminium Frame (GR-M03)\nDimensions: Height: 1010mm; Width: 600mm; Depth: 40mm\n\nOutput JSON:"}, ], "answer": '[{"Flag": "", "Product Code": "GR-AA12", "Description": "Mirror", "Manufacturer": "By Contractor", "Supplier": "", "Material": "Clear Mirror (GR-GL02), Powder-Coated Black Aluminium Frame (GR-M03)", "Dimensions": "Height: 1010mm; Width: 600mm; Depth: 40mm", "Product Image": ""}]', }, ] grpo_training_dataset = Dataset.from_list(annotated_data_examples) print("Training dataset created!") def format_reward(completions, **kwargs): scores = [] for completion in completions: score = 0.0 if completion and isinstance(completion, list) and len(completion) > 0 and 'content' in completion[0]: response = completion[0]['content'] try: parsed_response = json.loads(response.strip()) if isinstance(parsed_response, list): score += 3.0 else: score -= 1.0 except json.JSONDecodeError: score -= 2.0 else: score -= 2.0 scores.append(score) return scores def accuracy_reward(prompts, completions, answer, **kwargs): scores = [] expected_keys = ['Flag', 'Product Code', 'Description', 'Manufacturer', 'Supplier', 'Material', 'Dimensions', 'Product Image'] for completion, true_answer_str in zip(completions, answer): score = 0.0 if completion and isinstance(completion, list) and len(completion) > 0 and 'content' in completion[0]: response = completion[0]['content'] try: parsed_response = json.loads(response.strip()) true_data = json.loads(true_answer_str.strip()) if isinstance(parsed_response, list) and isinstance(true_data, list): match_count = 0 total_items = max(len(parsed_response), len(true_data)) for i in range(total_items): parsed_item = parsed_response[i] if i < len(parsed_response) and isinstance(parsed_response[i], dict) else {} true_item = true_data[i] if i < len(true_data) and isinstance(true_data[i], dict) else {} key_matches = 0 for key in expected_keys: parsed_value = parsed_item.get(key, "") true_value = true_item.get(key, "") if str(parsed_value).strip() == str(true_value).strip(): key_matches += 1 if len(expected_keys) > 0: match_count += key_matches / len(expected_keys) if total_items > 0: score += 5.0 * (match_count / total_items) else: if len(parsed_response) == 0 and len(true_data) == 0: score += 5.0 else: score -= 2.0 else: score -= 2.0 except json.JSONDecodeError: score -= 3.0 else: score -= 2.0 scores.append(score) return scores # Quick training (uncomment if needed) print("Training model... (This may take a few minutes)") chat_template = \ "{% if messages[0]['role'] == 'system' %}"\ "{{ messages[0]['content'] + eos_token }}"\ "{% set loop_messages = messages[1:] %}"\ "{% else %}"\ "{{ new_system_prompt + eos_token }}"\ "{% set loop_messages = messages %}"\ "{% endif %}"\ "{% for message in loop_messages %}"\ "{% if message['role'] == 'user' %}"\ "{{ message['content'] }}"\ "{% elif message['role'] == 'assistant' %}"\ "{{ message['content'] + eos_token }}"\ "{% endif %}"\ "{% endfor %}" tokenizer.chat_template = chat_template vllm_sampling_params = SamplingParams( temperature=1.0, top_k=50, max_tokens=1024, stop=[tokenizer.eos_token], include_stop_str_in_output=True, ) training_args = GRPOConfig( vllm_sampling_params=vllm_sampling_params, temperature=1.0, learning_rate=5e-6, weight_decay=0.01, warmup_ratio=0.1, lr_scheduler_type="linear", optim="adamw_8bit", logging_steps=1, per_device_train_batch_size=2, # Reduced for Colab gradient_accumulation_steps=1, max_prompt_length=512, max_completion_length=512, max_steps=10, # Reduced for quick demo save_steps=10, report_to="none", output_dir="outputs", ) trainer = GRPOTrainer( model=model, processing_class=tokenizer, reward_funcs=[format_reward, accuracy_reward], args=training_args, train_dataset=grpo_training_dataset, ) trainer.train() model.save_lora("grpo_saved_lora") print("Model training completed and saved!") class ProductImageExtractor: def __init__(self, pdf_path, model, tokenizer): self.pdf_path = pdf_path self.model = model self.tokenizer = tokenizer self.doc = None self.lora_request = None self.image_save_dir = "extracted_product_images" self.load_lora("grpo_saved_lora") self.setup_directories() def load_lora(self, lora_path): """Load trained LoRA adapter""" if os.path.exists(lora_path): try: self.lora_request = self.model.load_lora(lora_path) print(f"LoRA adapter loaded from {lora_path}") except Exception as e: print(f"Error loading LoRA: {e}") self.lora_request = None def setup_directories(self): """Create necessary directories""" os.makedirs(self.image_save_dir, exist_ok=True) os.makedirs(f"{self.image_save_dir}/product_images", exist_ok=True) os.makedirs(f"{self.image_save_dir}/non_product_images", exist_ok=True) print("Directories created for image storage") # def is_product_related_image(self, image_bbox, text_blocks, page_text): # """Determine if an image is product-related based on spatial proximity""" # # Extract product codes from page text # product_code_pattern = r'\b[A-Z]{2}-[A-Z]{2}\d+[a-z]?\b' # product_codes = re.findall(product_code_pattern, page_text) # print('--product codes', product_codes) # if not product_codes: # return False, None, 0.0 # # Find text blocks containing product codes # product_text_blocks = [] # for block in text_blocks: # if len(block) < 5: # continue # block_text = block[4] # Text content # if any(code in block_text for code in product_codes): # product_text_blocks.append({ # 'bbox': block[:4], # x0, y0, x1, y1 # 'text': block_text, # 'codes': [code for code in product_codes if code in block_text] # }) # if not product_text_blocks: # return False, None, 0.0 # # Calculate proximity scores # max_proximity_score = 0.0 # closest_product_code = None # for block in product_text_blocks: # print('--product codes block', block['codes']) # proximity_score = self.calculate_proximity_score(image_bbox, block['bbox']) # if proximity_score > max_proximity_score: # max_proximity_score = proximity_score # closest_product_code = block['codes'][0] if block['codes'] else None # # Additional filters for non-product images # image_area = (image_bbox[2] - image_bbox[0]) * (image_bbox[3] - image_bbox[1]) # # Filter out very small images (likely icons/logos) # if image_area < 3000: # Adjusted threshold # return False, closest_product_code, max_proximity_score # # Filter out images in header/footer areas # page_height = 842 # A4 page height in points # if image_bbox[1] < 80 or image_bbox[3] > page_height - 80: # return False, closest_product_code, max_proximity_score # # Consider it product-related if proximity score is above threshold # is_product = max_proximity_score > 0.2 # Lowered threshold for better detection # return is_product, closest_product_code, max_proximity_score def is_product_related_image(self, image_bbox, text_blocks, page_text): """Determine if an image is product-related based on spatial proximity""" # Extract product codes from page text product_code_pattern = r'\b[A-Z]{2}-[A-Z]{2}\d+[a-z]?\b' product_codes = re.findall(product_code_pattern, page_text) print('--product codes', product_codes) if not product_codes: return False, None, 0.0 # Find text blocks containing product codes product_text_blocks = [] for block in text_blocks: if len(block) < 5: continue block_text = block[4] # Text content if any(code in block_text for code in product_codes): product_text_blocks.append({ 'bbox': block[:4], # x0, y0, x1, y1 'text': block_text, 'codes': [code for code in product_codes if code in block_text] }) if not product_text_blocks: return False, None, 0.0 # Calculate proximity scores max_proximity_score = 0.0 closest_product_code = None for block in product_text_blocks: print('--product codes block', block['codes']) proximity_score = self.calculate_proximity_score(image_bbox, block['bbox']) # Immediate return if a high score is found if proximity_score > 0.2: # Use the same threshold as the final check max_proximity_score = proximity_score closest_product_code = block['codes'][0] if block['codes'] else None is_product = self.additional_filters(image_bbox, max_proximity_score) return is_product, closest_product_code, max_proximity_score if proximity_score > max_proximity_score: max_proximity_score = proximity_score closest_product_code = block['codes'][0] if block['codes'] else None # Apply additional filters to the best-found score is_product = self.additional_filters(image_bbox, max_proximity_score) return is_product, closest_product_code, max_proximity_score def additional_filters(self, image_bbox, max_proximity_score): """Helper function to apply additional filters""" image_area = (image_bbox[2] - image_bbox[0]) * (image_bbox[3] - image_bbox[1]) # Filter out very small images (likely icons/logos) if image_area < 3000: return False # Filter out images in header/footer areas page_height = 842 # A4 page height in points if image_bbox[1] < 80 or image_bbox[3] > page_height - 80: return False # Consider it product-related if proximity score is above threshold return max_proximity_score > 0.2 def calculate_proximity_score(self, image_bbox, text_bbox): """Calculate proximity score between image and text bounding boxes""" img_center_x = (image_bbox[0] + image_bbox[2]) / 2 img_center_y = (image_bbox[1] + image_bbox[3]) / 2 text_center_x = (text_bbox[0] + text_bbox[2]) / 2 text_center_y = (text_bbox[1] + text_bbox[3]) / 2 distance = ((img_center_x - text_center_x) ** 2 + (img_center_y - text_center_y) ** 2) ** 0.5 proximity_score = max(0, 1 - (distance / 800)) # Adjusted for better scoring return proximity_score def extract_and_classify_images(self, page, page_num): """Extract images from page and classify as product-related or not""" images = page.get_images(full=True) text_blocks = page.get_text("blocks") page_text = page.get_text() product_images = [] non_product_images = [] for img_index, img_info in enumerate(images): xref = img_info[0] try: # Get image bounding box image_list = page.get_image_rects(xref) if not image_list: continue image_bbox = image_list[0] # First occurrence # Classify image is_product, product_code, proximity_score = self.is_product_related_image( image_bbox, text_blocks, page_text ) # Extract and save image pix = fitz.Pixmap(self.doc, xref) if pix.n - pix.alpha > 3: # Handle CMYK images pix = fitz.Pixmap(fitz.csRGB, pix) # Generate filename if is_product and product_code: category = "product_images" filename = f"page{page_num}_{product_code}_img{img_index+1}.png" else: category = "non_product_images" filename = f"page{page_num}_generic_img{img_index+1}.png" image_path = os.path.join(self.image_save_dir, category, filename) pix.save(image_path) image_data = { 'path': image_path, 'bbox': image_bbox, 'product_code': product_code, 'proximity_score': proximity_score, 'xref': xref, 'size': (pix.width, pix.height) } if is_product: product_images.append(image_data) print(f"✓ Product image: {filename} (Code: {product_code}, Score: {proximity_score:.2f})") else: non_product_images.append(image_data) print(f"• Non-product image: {filename}") pix = None # Release memory except Exception as e: print(f"Error extracting image {img_index+1} on page {page_num}: {e}") return product_images, non_product_images def merge_product_data(self, first_page_item, additional_item): """Merge product data, prioritizing first page data but filling in missing details""" merged_item = first_page_item.copy() # Fill in missing or empty fields from additional item for key in ['Flag', 'Description', 'Manufacturer', 'Supplier', 'Material', 'Dimensions', 'Product Image']: if not merged_item.get(key, '').strip() and additional_item.get(key, '').strip(): merged_item[key] = additional_item[key] print(f" → Added missing {key}: {additional_item[key][:50]}...") # For image, prefer the one with better proximity score or first occurrence if not merged_item.get('Product Image File', '') and additional_item.get('Product Image File', ''): merged_item['Product Image File'] = additional_item['Product Image File'] print(f" → Added missing image: {os.path.basename(additional_item['Product Image File'])}") return merged_item def extract_product_data_with_images(self): """Main extraction function with duplicate consolidation""" try: self.doc = fitz.open(self.pdf_path) total_pages = self.doc.page_count # Store page count before processing print(f"Processing PDF: {self.pdf_path}") print(f"Total pages: {total_pages}") except Exception as e: print(f"Error opening PDF: {e}") return None all_product_images = {} # Dict to store images by product code product_data_tracker = {} # Track products by code to avoid duplicates # Setup inference parameters sampling_params = SamplingParams( temperature=0.1, top_p=1.0, max_tokens=1024, stop=[self.tokenizer.eos_token], include_stop_str_in_output=True, ) for page_num in range(total_pages): page = self.doc.load_page(page_num) page_text = page.get_text() print(f"\n--- Processing page {page_num + 1} ---") # Extract and classify images product_images, non_product_images = self.extract_and_classify_images(page, page_num + 1) # Group product images by product code for img_data in product_images: if img_data['product_code']: if img_data['product_code'] not in all_product_images: all_product_images[img_data['product_code']] = [] all_product_images[img_data['product_code']].append(img_data) # Extract product data using trained model messages = [ {"role": "system", "content": new_system_prompt}, {"role": "user", "content": f"Text:\n{page_text}\n\nOutput JSON:"}, ] prompt_text = self.tokenizer.apply_chat_template( messages, add_generation_prompt=False, tokenize=False, ) try: raw_model_output = self.model.fast_generate( prompt_text, sampling_params=sampling_params, lora_request=self.lora_request, )[0].outputs[0].text # Parse model output cleaned_output = raw_model_output.strip() parsed_data = json.loads(cleaned_output) if isinstance(parsed_data, dict): parsed_data = [parsed_data] elif not isinstance(parsed_data, list): parsed_data = [] # Process extracted items and handle duplicates for item in parsed_data: if isinstance(item, dict): product_code = item.get('Product Code', '').strip() # Skip items without product codes if not product_code: continue # Find best matching image for this product image_path = "" if product_code in all_product_images: best_image = max( all_product_images[product_code], key=lambda x: x['proximity_score'] ) image_path = best_image['path'] # Create complete item record current_item_data = { "pdf_page_number": page_num + 1, "Flag": item.get('Flag', ''), "Product Code": product_code, "Description": item.get('Description', ''), "Manufacturer": item.get('Manufacturer', ''), "Supplier": item.get('Supplier', ''), "Material": item.get('Material', ''), "Dimensions": item.get('Dimensions', ''), "Product Image": item.get('Product Image', ''), "Product Image File": image_path, } # Check if this product code already exists if product_code in product_data_tracker: print(f" ! Duplicate found for {product_code} on page {page_num + 1}") # Merge with existing data (prioritize first occurrence) existing_item = product_data_tracker[product_code] merged_item = self.merge_product_data(existing_item, current_item_data) product_data_tracker[product_code] = merged_item else: # First occurrence of this product code print(f" ✓ New product: {product_code}") if image_path: print(f" → Linked image: {os.path.basename(image_path)}") product_data_tracker[product_code] = current_item_data except Exception as e: print(f"Error processing page {page_num + 1}: {e}") # Close document before processing final data self.doc.close() # Convert tracker to final list (this ensures no duplicates) final_data = list(product_data_tracker.values()) print(f"\n=== DEDUPLICATION SUMMARY ===") print(f"Unique products found: {len(final_data)}") print(f"Pages processed: {total_pages}") # Verify no duplicates exist product_codes = [item.get('Product Code', '') for item in final_data] unique_codes = set(product_codes) if len(product_codes) != len(unique_codes): print(f"WARNING: Found {len(product_codes) - len(unique_codes)} duplicate entries!") else: print("✓ No duplicate product codes confirmed") return final_data print("ProductImageExtractor class defined!") print("Starting extraction process...") # Initialize extractor extractor = ProductImageExtractor(pdf_file_path, model, tokenizer) # Extract data and images extracted_data = extractor.extract_product_data_with_images() if extracted_data: # Convert to DataFrame for display df_results = pd.DataFrame(extracted_data) print(f"\n=== EXTRACTION COMPLETED ===") print(f"Total items extracted: {len(df_results)}") print(f"Items with product images: {len([item for item in extracted_data if item['Product Image File']])}") # Display first few results print("\n=== SAMPLE RESULTS ===") display_columns = ['Product Code', 'Description', 'Manufacturer', 'Product Image File'] print(df_results[display_columns].head(10).to_string(index=False)) else: print("Failed to extract data from PDF") def create_excel_with_embedded_images(data, output_filename): """Create Excel file with properly embedded and displayed images""" df = pd.DataFrame(data) print(f"Creating Excel file: {output_filename}") # Create Excel writer with xlsxwriter engine with pd.ExcelWriter(output_filename, engine='xlsxwriter') as writer: df.to_excel(writer, sheet_name='Product Data', index=False) workbook = writer.book worksheet = writer.sheets['Product Data'] # Auto-calculate column widths based on content length def calculate_column_width(column_data, column_name, min_width=8, max_width=50): """Calculate optimal column width based on content""" if len(column_data) == 0: return min_width # Get max length of content in this column max_length = max( len(str(value)) for value in [column_name] + list(column_data) ) # Apply some padding and limits optimal_width = min(max(max_length * 1.2, min_width), max_width) return optimal_width # Set auto-calculated column widths for col_idx, column_name in enumerate(df.columns): if column_name == "Product Image": # Increased width for image column to prevent overflow worksheet.set_column(col_idx, col_idx, 20) elif column_name == "Product Image File": # Fixed width for image file path column worksheet.set_column(col_idx, col_idx, 25) elif column_name == "Description": # Limit description width to avoid too wide columns width = calculate_column_width(df[column_name], column_name, min_width=15, max_width=40) worksheet.set_column(col_idx, col_idx, width) elif column_name == "Material": width = calculate_column_width(df[column_name], column_name, min_width=12, max_width=35) worksheet.set_column(col_idx, col_idx, width) elif column_name == "Dimensions": width = calculate_column_width(df[column_name], column_name, min_width=15, max_width=30) worksheet.set_column(col_idx, col_idx, width) else: # Auto-calculate for other columns width = calculate_column_width(df[column_name], column_name) worksheet.set_column(col_idx, col_idx, width) print(f"Column '{column_name}': width = {width if 'width' in locals() else 'auto'}") # Find the image column index try: image_col_index = df.columns.get_loc("Product Image") # Uniform image size settings UNIFORM_IMAGE_WIDTH = 120 # pixels UNIFORM_IMAGE_HEIGHT = 120 # pixels CELL_ROW_HEIGHT = 100 # points (Excel row height) # Insert images into cells with uniform sizing images_inserted = 0 for row_num in range(1, len(df) + 1): # Start from row 1 (skip header) image_path = df.iloc[row_num - 1]['Product Image File'] if image_path and os.path.exists(image_path): try: # Set consistent row height for all image rows worksheet.set_row(row_num, CELL_ROW_HEIGHT) # Get original image dimensions to calculate scaling with Image.open(image_path) as img: original_width, original_height = img.size # Calculate scaling factors to achieve uniform size scale_x = UNIFORM_IMAGE_WIDTH / original_width scale_y = UNIFORM_IMAGE_HEIGHT / original_height # Use the smaller scale to maintain aspect ratio while fitting in target size uniform_scale = min(scale_x, scale_y) # Insert image with uniform scaling worksheet.insert_image( row_num, image_col_index, image_path, { 'x_scale': uniform_scale, 'y_scale': uniform_scale, 'x_offset': 5, # Small offset from cell border 'y_offset': 5, 'positioning': 1 # Move and size with cells } ) images_inserted += 1 print(f" → Inserted uniform image {images_inserted}: {os.path.basename(image_path)} " f"(scale: {uniform_scale:.2f}, orig: {original_width}x{original_height})") except Exception as e: print(f"Error embedding image {image_path}: {e}") print(f"\nExcel file created with {images_inserted} uniformly-sized embedded images!") print(f"All images scaled to approximately {UNIFORM_IMAGE_WIDTH}x{UNIFORM_IMAGE_HEIGHT} pixels") except KeyError: print("Product Image File column not found") # Add formatting for better appearance header_format = workbook.add_format({ 'bold': True, 'text_wrap': True, 'valign': 'top', 'fg_color': '#D7E4BC', 'border': 1 }) # Apply header formatting for col_num, value in enumerate(df.columns.values): worksheet.write(0, col_num, value, header_format) # Add text wrapping for content cells wrap_format = workbook.add_format({ 'text_wrap': True, 'valign': 'top', 'border': 1 }) image_cell_format = workbook.add_format({ 'border': 1, 'valign': 'top' }) # Apply text wrapping to data cells (excluding image column) for row_num in range(1, len(df) + 1): for col_num in range(len(df.columns)): cell_value = df.iloc[row_num - 1, col_num] if col_num == image_col_index: # Image column gets special formatting worksheet.write(row_num, col_num, '', image_cell_format) # Empty cell with borders else: worksheet.write(row_num, col_num, cell_value, wrap_format) if extracted_data: output_excel = "product_data_with_images.xlsx" create_excel_with_embedded_images(extracted_data, output_excel) # Create summary statistics df_results = pd.DataFrame(extracted_data) total_items = len(df_results) items_with_images = len(df_results[df_results['Product Image File'] != '']) unique_products = len(df_results[df_results['Product Code'] != '']['Product Code'].unique()) print(f"\n=== FINAL SUMMARY ===") print(f"Total items extracted: {total_items}") print(f"Items with images: {items_with_images}") print(f"Unique products: {unique_products}") print(f"Images saved in: {extractor.image_save_dir}") print(f"Excel file: {output_excel}") print("Preparing files for download...") # Import the correct files module for Colab from google.colab import files as colab_files # Create a zip file with all results # zip_filename = "extraction_results.zip" # with zipfile.ZipFile(zip_filename, 'w') as zipf: # # Add Excel file # if os.path.exists("product_data_with_images.xlsx"): # zipf.write("product_data_with_images.xlsx") # # Add all extracted images # if os.path.exists("extracted_product_images"): # for root, dirs, files_list in os.walk("extracted_product_images"): # for file in files_list: # file_path = os.path.join(root, file) # arcname = os.path.relpath(file_path, ".") # zipf.write(file_path, arcname) # print(f"Created zip file: {zip_filename}") # # Download the zip file # if os.path.exists(zip_filename): # colab_files.download(zip_filename) # print("Download started! Check your downloads folder.") # else: # print("Error creating zip file") # Also download Excel separately if os.path.exists("product_data_with_images.xlsx"): colab_files.download("product_data_with_images.xlsx") print("Excel file download started!") print("\nExtraction completed successfully!") print("You should now have:") print("1. product_data_with_images.xlsx - Excel file with embedded images") # print("2. extraction_results.zip - Complete package with all files") def run_quality_check(extracted_data): """Run quality checks on extracted data""" df = pd.DataFrame(extracted_data) print("=== QUALITY CHECK REPORT ===") # Basic statistics print(f"Total records: {len(df)}") print(f"Records with Product Code: {len(df[df['Product Code'] != ''])}") print(f"Records with Description: {len(df[df['Description'] != ''])}") print(f"Records with Images: {len(df[df['Product Image File'] != ''])}") # Product code analysis product_codes = df[df['Product Code'] != '']['Product Code'].tolist() unique_codes = set(product_codes) print(f"Unique Product Codes: {len(unique_codes)}") if product_codes: print("Sample Product Codes:", list(unique_codes)[:5]) # Image file verification image_files = df[df['Product Image File'] != '']['Product Image File'].tolist() existing_images = [f for f in image_files if os.path.exists(f)] print(f"Image files that exist: {len(existing_images)}/{len(image_files)}") # Manufacturer analysis manufacturers = df[df['Manufacturer'] != '']['Manufacturer'].unique() print(f"Unique Manufacturers: {len(manufacturers)}") return { 'total_records': len(df), 'records_with_codes': len(df[df['Product Code'] != '']), 'records_with_images': len(df[df['Product Image File'] != '']), 'unique_codes': len(unique_codes), 'existing_images': len(existing_images) } if extracted_data: quality_stats = run_quality_check(extracted_data) model_name = "Qwen3_4B_Base_fine_tuned" model.save_pretrained(model_name) tokenizer.save_pretrained(model_name) model.push_to_hub("pragneshr002/Qwen3_4B_Base_fine_tuned") model.push_to_hub_gguf(model_name, tokenizer, quantization_method="q4_k_m")