Spaces:
Sleeping
Sleeping
| """ | |
| Content deduplication component for the web crawler. | |
| Provides functionality to detect duplicate pages efficiently | |
| 1. Exact content hashing | |
| 2. Shingling and MinHash for near-duplicate detection | |
| 3. SimHash for fuzzy matching | |
| """ | |
| import hashlib | |
| import logging | |
| import time | |
| from typing import Set, List, Dict, Tuple, Optional, Union | |
| import random | |
| import numpy as np | |
| from collections import defaultdict | |
| import re | |
| import config | |
| # Configure logging | |
| logging.basicConfig( | |
| level=getattr(logging, config.LOG_LEVEL), | |
| format=config.LOG_FORMAT | |
| ) | |
| logger = logging.getLogger(__name__) | |
| class ContentDeduplicator: | |
| """ | |
| Content deduplication using multiple techniques: | |
| - Exact match (MD5 hash) | |
| - Near-duplicate detection (MinHash) | |
| - Fuzzy matching (SimHash) | |
| """ | |
| def __init__(self): | |
| """Initialize the deduplicator""" | |
| # Exact content hashing | |
| self.content_hashes = set() | |
| # MinHash parameters | |
| self.num_hashes = 100 | |
| self.minhash_signatures = {} # URL -> MinHash signature | |
| self.minhash_bands = defaultdict(set) # band_id -> set of URLs | |
| self.band_size = 5 # Each band contains 5 signatures | |
| self.shingle_size = 3 # k-shingles of 3 consecutive tokens | |
| # SimHash parameters | |
| self.simhash_dim = 64 | |
| self.simhash_values = {} # URL -> SimHash value | |
| self.hamming_threshold = 3 # Maximum Hamming distance for similarity | |
| # Cache of previously computed duplicates for quick lookups | |
| self.duplicate_cache = {} # URL -> set of duplicate URLs | |
| # Token preprocessing | |
| self.token_pattern = re.compile(r'\w+') | |
| self.stop_words = set(['the', 'and', 'a', 'to', 'of', 'in', 'is', 'that', 'for', 'on', 'with']) | |
| # Statistics | |
| self.stats = { | |
| 'exact_duplicates': 0, | |
| 'near_duplicates': 0, | |
| 'fuzzy_duplicates': 0, | |
| 'processing_time': 0, | |
| 'total_documents': 0, | |
| } | |
| def is_duplicate(self, url: str, content: str) -> Tuple[bool, Optional[str]]: | |
| """ | |
| Check if content is a duplicate | |
| Args: | |
| url: URL of the page | |
| content: Page content | |
| Returns: | |
| (is_duplicate, duplicate_url): Tuple indicating if content is duplicate and what it duplicates | |
| """ | |
| start_time = time.time() | |
| # Check exact match first (fastest) | |
| content_hash = self._hash_content(content) | |
| if content_hash in self.content_hashes: | |
| self.stats['exact_duplicates'] += 1 | |
| processing_time = time.time() - start_time | |
| self.stats['processing_time'] += processing_time | |
| # Find the URL with the same hash | |
| for existing_url, existing_hash in self._get_hash_map().items(): | |
| if existing_hash == content_hash and existing_url != url: | |
| logger.debug(f"Exact duplicate detected: {url} duplicates {existing_url}") | |
| return True, existing_url | |
| return True, None | |
| # Check cache for quick lookup | |
| if url in self.duplicate_cache: | |
| duplicate_url = next(iter(self.duplicate_cache[url])) | |
| logger.debug(f"Duplicate found in cache: {url} duplicates {duplicate_url}") | |
| return True, duplicate_url | |
| # Only perform more expensive checks if configured to do so | |
| if config.NEAR_DUPLICATE_DETECTION: | |
| # Check for near-duplicates using MinHash | |
| near_duplicate = self._check_minhash(url, content) | |
| if near_duplicate: | |
| self.stats['near_duplicates'] += 1 | |
| processing_time = time.time() - start_time | |
| self.stats['processing_time'] += processing_time | |
| logger.debug(f"Near-duplicate detected: {url} is similar to {near_duplicate}") | |
| self._add_to_duplicate_cache(url, near_duplicate) | |
| return True, near_duplicate | |
| if config.FUZZY_DUPLICATE_DETECTION: | |
| # Check for fuzzy matches using SimHash | |
| fuzzy_duplicate = self._check_simhash(url, content) | |
| if fuzzy_duplicate: | |
| self.stats['fuzzy_duplicates'] += 1 | |
| processing_time = time.time() - start_time | |
| self.stats['processing_time'] += processing_time | |
| logger.debug(f"Fuzzy duplicate detected: {url} is similar to {fuzzy_duplicate}") | |
| self._add_to_duplicate_cache(url, fuzzy_duplicate) | |
| return True, fuzzy_duplicate | |
| # Not a duplicate, add to index | |
| self._add_to_index(url, content, content_hash) | |
| self.stats['total_documents'] += 1 | |
| processing_time = time.time() - start_time | |
| self.stats['processing_time'] += processing_time | |
| return False, None | |
| def _add_to_duplicate_cache(self, url: str, duplicate_url: str) -> None: | |
| """Add URL to duplicate cache for faster lookups""" | |
| if url not in self.duplicate_cache: | |
| self.duplicate_cache[url] = set() | |
| self.duplicate_cache[url].add(duplicate_url) | |
| # Also add reverse relationship | |
| if duplicate_url not in self.duplicate_cache: | |
| self.duplicate_cache[duplicate_url] = set() | |
| self.duplicate_cache[duplicate_url].add(url) | |
| def _get_hash_map(self) -> Dict[str, str]: | |
| """Get mapping of URLs to their content hashes""" | |
| return {url: signature for url, signature in self.simhash_values.items()} | |
| def _hash_content(self, content: str) -> str: | |
| """Create MD5 hash of content""" | |
| return hashlib.md5(content.encode('utf-8')).hexdigest() | |
| def _preprocess_content(self, content: str) -> List[str]: | |
| """ | |
| Preprocess content for tokenization: | |
| 1. Convert to lowercase | |
| 2. Remove HTML tags | |
| 3. Extract tokens | |
| 4. Remove stop words | |
| """ | |
| # Remove HTML tags | |
| content = re.sub(r'<[^>]+>', ' ', content) | |
| # Tokenize | |
| tokens = self.token_pattern.findall(content.lower()) | |
| # Remove stop words | |
| tokens = [token for token in tokens if token not in self.stop_words] | |
| return tokens | |
| def _add_to_index(self, url: str, content: str, content_hash: Optional[str] = None) -> None: | |
| """ | |
| Add content to the deduplication index | |
| Args: | |
| url: URL of the page | |
| content: Page content | |
| content_hash: Optional pre-computed hash | |
| """ | |
| # Add exact hash | |
| if content_hash is None: | |
| content_hash = self._hash_content(content) | |
| self.content_hashes.add(content_hash) | |
| # Add MinHash signature | |
| if config.NEAR_DUPLICATE_DETECTION: | |
| signature = self._compute_minhash(content) | |
| self.minhash_signatures[url] = signature | |
| # Add to LSH bands | |
| for i in range(0, self.num_hashes, self.band_size): | |
| band = tuple(signature[i:i+self.band_size]) | |
| band_id = hash(band) | |
| self.minhash_bands[band_id].add(url) | |
| # Add SimHash | |
| if config.FUZZY_DUPLICATE_DETECTION: | |
| simhash_value = self._compute_simhash(content) | |
| self.simhash_values[url] = simhash_value | |
| def _create_shingles(self, tokens: List[str], k: int = 3) -> Set[str]: | |
| """ | |
| Create k-shingles from tokens | |
| Args: | |
| tokens: List of tokens | |
| k: Size of shingles | |
| Returns: | |
| Set of shingles | |
| """ | |
| return set(' '.join(tokens[i:i+k]) for i in range(len(tokens) - k + 1)) | |
| def _compute_minhash(self, content: str) -> List[int]: | |
| """ | |
| Compute MinHash signature for content | |
| Args: | |
| content: Page content | |
| Returns: | |
| MinHash signature (list of hash values) | |
| """ | |
| tokens = self._preprocess_content(content) | |
| shingles = self._create_shingles(tokens, self.shingle_size) | |
| # Generate random hash functions | |
| max_hash = 2**32 - 1 | |
| # Create signature | |
| signature = [max_hash] * self.num_hashes | |
| # For each shingle, compute hashes and keep minimum values | |
| for shingle in shingles: | |
| # Use shingle as seed for random hash functions | |
| shingle_hash = hash(shingle) | |
| for i in range(self.num_hashes): | |
| # Simple linear hash function: (a*x + b) mod c | |
| a = i + 1 # Different 'a' for each hash function | |
| b = i * i # Different 'b' for each hash function | |
| hash_value = (a * shingle_hash + b) % max_hash | |
| # Keep the minimum hash value | |
| signature[i] = min(signature[i], hash_value) | |
| return signature | |
| def _check_minhash(self, url: str, content: str) -> Optional[str]: | |
| """ | |
| Check for near-duplicates using MinHash and LSH | |
| Args: | |
| url: URL of the page | |
| content: Page content | |
| Returns: | |
| URL of duplicate page if found, None otherwise | |
| """ | |
| # Compute MinHash signature | |
| signature = self._compute_minhash(content) | |
| # Check each band for potential matches | |
| candidate_urls = set() | |
| for i in range(0, self.num_hashes, self.band_size): | |
| band = tuple(signature[i:i+self.band_size]) | |
| band_id = hash(band) | |
| # Get URLs that share this band | |
| if band_id in self.minhash_bands: | |
| candidate_urls.update(self.minhash_bands[band_id]) | |
| # Check Jaccard similarity with candidates | |
| for candidate_url in candidate_urls: | |
| if candidate_url == url: | |
| continue | |
| candidate_signature = self.minhash_signatures[candidate_url] | |
| similarity = self._jaccard_similarity(signature, candidate_signature) | |
| if similarity >= config.SIMILARITY_THRESHOLD: | |
| return candidate_url | |
| return None | |
| def _jaccard_similarity(self, sig1: List[int], sig2: List[int]) -> float: | |
| """ | |
| Estimate Jaccard similarity from MinHash signatures | |
| Args: | |
| sig1: First signature | |
| sig2: Second signature | |
| Returns: | |
| Estimated Jaccard similarity (0-1) | |
| """ | |
| if len(sig1) != len(sig2): | |
| raise ValueError("Signatures must have the same length") | |
| # Count matching hash values | |
| matches = sum(1 for i in range(len(sig1)) if sig1[i] == sig2[i]) | |
| # Estimate similarity | |
| return matches / len(sig1) | |
| def _compute_simhash(self, content: str) -> int: | |
| """ | |
| Compute SimHash for content | |
| Args: | |
| content: Page content | |
| Returns: | |
| SimHash value | |
| """ | |
| tokens = self._preprocess_content(content) | |
| # Initialize vector | |
| v = [0] * self.simhash_dim | |
| # For each token, compute hash and update vector | |
| for token in tokens: | |
| # Compute hash of token | |
| token_hash = hashlib.md5(token.encode('utf-8')).digest() | |
| # Convert to binary representation | |
| token_bits = ''.join(format(byte, '08b') for byte in token_hash) | |
| # Use first self.simhash_dim bits | |
| token_bits = token_bits[:self.simhash_dim] | |
| # Update vector | |
| for i, bit in enumerate(token_bits): | |
| if bit == '1': | |
| v[i] += 1 | |
| else: | |
| v[i] -= 1 | |
| # Create fingerprint | |
| fingerprint = 0 | |
| for i, val in enumerate(v): | |
| if val > 0: | |
| fingerprint |= (1 << i) | |
| return fingerprint | |
| def _check_simhash(self, url: str, content: str) -> Optional[str]: | |
| """ | |
| Check for fuzzy duplicates using SimHash | |
| Args: | |
| url: URL of the page | |
| content: Page content | |
| Returns: | |
| URL of duplicate page if found, None otherwise | |
| """ | |
| # Compute SimHash | |
| simhash_value = self._compute_simhash(content) | |
| # Compare with existing SimHash values | |
| for existing_url, existing_simhash in self.simhash_values.items(): | |
| if existing_url == url: | |
| continue | |
| # Calculate Hamming distance | |
| hamming_distance = bin(simhash_value ^ existing_simhash).count('1') | |
| if hamming_distance <= self.hamming_threshold: | |
| return existing_url | |
| return None | |
| def clear(self) -> None: | |
| """Clear all indexes and caches""" | |
| self.content_hashes.clear() | |
| self.minhash_signatures.clear() | |
| self.minhash_bands.clear() | |
| self.simhash_values.clear() | |
| self.duplicate_cache.clear() | |
| # Reset statistics | |
| self.stats = { | |
| 'exact_duplicates': 0, | |
| 'near_duplicates': 0, | |
| 'fuzzy_duplicates': 0, | |
| 'processing_time': 0, | |
| 'total_documents': 0, | |
| } | |
| def get_stats(self) -> Dict[str, Union[int, float]]: | |
| """Get deduplication statistics""" | |
| stats_copy = self.stats.copy() | |
| # Calculate average processing time | |
| total_docs = self.stats['total_documents'] | |
| if total_docs > 0: | |
| avg_time = self.stats['processing_time'] / total_docs | |
| stats_copy['avg_processing_time'] = avg_time | |
| else: | |
| stats_copy['avg_processing_time'] = 0 | |
| # Calculate total duplicates | |
| total_duplicates = (self.stats['exact_duplicates'] + | |
| self.stats['near_duplicates'] + | |
| self.stats['fuzzy_duplicates']) | |
| stats_copy['total_duplicates'] = total_duplicates | |
| # Calculate duplicate percentage | |
| if total_docs > 0: | |
| duplicate_percentage = (total_duplicates / total_docs) * 100 | |
| stats_copy['duplicate_percentage'] = duplicate_percentage | |
| else: | |
| stats_copy['duplicate_percentage'] = 0 | |
| return stats_copy |