""" Wikidata service for finding historical events. Queries Wikidata for events at specific coordinates and times. """ from __future__ import annotations import hashlib import json import math import os import ssl import time from dataclasses import dataclass, field from datetime import datetime from pathlib import Path from typing import Any, Dict, List, Optional, Tuple import urllib.request import urllib.parse import urllib.error # Note: We use urllib with SSL fallback to avoid permission issues with requests/certifi USE_REQUESTS = False # Cache configuration ROOT_DIR = Path(__file__).resolve().parent.parent CACHE_DIR = ROOT_DIR / "data" / "wikidata_cache" CACHE_DIR.mkdir(parents=True, exist_ok=True) CACHE_TTL_SECONDS = 86400 * 7 # 7 days WIKIDATA_SPARQL_ENDPOINT = "https://query.wikidata.org/sparql" # Rate limiting _last_request_time: float = 0.0 MIN_REQUEST_INTERVAL = 1.5 # seconds between requests @dataclass class WikidataEvent: """Represents a historical event from Wikidata.""" qid: str name: str description: str = "" year: Optional[int] = None month: Optional[int] = None day: Optional[int] = None lat: Optional[float] = None lon: Optional[float] = None participants: List[str] = field(default_factory=list) location_name: str = "" event_type: str = "" wikipedia_url: str = "" image_url: str = "" distance_km: float = 0.0 year_delta: int = 0 confidence: float = 0.0 source: str = "wikidata" def to_dict(self) -> Dict[str, Any]: return { "qid": self.qid, "name": self.name, "description": self.description, "year": self.year, "month": self.month, "day": self.day, "lat": self.lat, "lon": self.lon, "participants": self.participants, "location_name": self.location_name, "event_type": self.event_type, "wikipedia_url": self.wikipedia_url, "image_url": self.image_url, "distance_km": self.distance_km, "year_delta": self.year_delta, "confidence": self.confidence, "source": self.source, # Compatibility fields for existing code "summary": self.description, "narrative": self.description, "actors": self.participants, "themes": [self.event_type] if self.event_type else [], "artifacts": [], "visual_motifs": [], "facets": {"type": self.event_type}, "sources": [{"label": "Wikidata", "url": f"https://www.wikidata.org/wiki/{self.qid}"}], "match_confidence": self.confidence, } def _cache_key(lat: float, lon: float, year: int, radius_km: float) -> str: """Generate a cache key for the query parameters.""" raw = f"{lat:.2f}_{lon:.2f}_{year}_{radius_km:.0f}" return hashlib.md5(raw.encode()).hexdigest() def _get_cached(cache_key: str) -> Optional[List[Dict]]: """Retrieve cached results if they exist and aren't expired.""" cache_file = CACHE_DIR / f"{cache_key}.json" if not cache_file.exists(): return None try: with open(cache_file, "r", encoding="utf-8") as f: data = json.load(f) cached_time = data.get("timestamp", 0) if time.time() - cached_time > CACHE_TTL_SECONDS: cache_file.unlink(missing_ok=True) return None return data.get("events", []) except (json.JSONDecodeError, OSError): return None def _save_cache(cache_key: str, events: List[Dict]) -> None: """Save results to cache.""" cache_file = CACHE_DIR / f"{cache_key}.json" try: with open(cache_file, "w", encoding="utf-8") as f: json.dump({"timestamp": time.time(), "events": events}, f, ensure_ascii=False) except OSError: pass def _rate_limit() -> None: """Ensure we don't exceed Wikidata rate limits.""" global _last_request_time elapsed = time.time() - _last_request_time if elapsed < MIN_REQUEST_INTERVAL: time.sleep(MIN_REQUEST_INTERVAL - elapsed) _last_request_time = time.time() def _haversine(lat1: float, lon1: float, lat2: float, lon2: float) -> float: """Calculate distance in km between two points.""" R = 6371.0 phi1, phi2 = math.radians(lat1), math.radians(lat2) dphi = math.radians(lat2 - lat1) dlam = math.radians(lon2 - lon1) a = math.sin(dphi / 2) ** 2 + math.cos(phi1) * math.cos(phi2) * math.sin(dlam / 2) ** 2 return R * 2 * math.atan2(math.sqrt(a), math.sqrt(max(0, 1 - a))) def _build_sparql_query(lat: float, lon: float, year: int, radius_km: float, limit: int = 20) -> str: """ Build SPARQL query for historical events near coordinates and year. This query searches for: - Events (Q1656682) that occurred at a location - Battles, treaties, revolutions, etc. - Events with point in time or start time within the year range """ # Wikidata uses negative years for BCE year_start = year - 15 year_end = year + 15 # Convert radius to degrees (rough approximation) degree_radius = radius_km / 111.0 query = f""" SELECT DISTINCT ?event ?eventLabel ?eventDescription ?date ?coord ?locationLabel ?participantLabel ?typeLabel ?article ?image WHERE {{ # Find events with coordinates ?event wdt:P31/wdt:P279* wd:Q1656682 . # instance of event or subclass # Get coordinates - either direct or via location OPTIONAL {{ ?event wdt:P625 ?directCoord . }} OPTIONAL {{ ?event wdt:P276 ?location . ?location wdt:P625 ?locationCoord . }} BIND(COALESCE(?directCoord, ?locationCoord) AS ?coord) # Filter by coordinate bounding box FILTER(BOUND(?coord)) BIND(geof:latitude(?coord) AS ?lat) BIND(geof:longitude(?coord) AS ?lon) FILTER(?lat >= {lat - degree_radius} && ?lat <= {lat + degree_radius}) FILTER(?lon >= {lon - degree_radius} && ?lon <= {lon + degree_radius}) # Get date OPTIONAL {{ ?event wdt:P585 ?pointInTime . }} OPTIONAL {{ ?event wdt:P580 ?startTime . }} BIND(COALESCE(?pointInTime, ?startTime) AS ?date) # Filter by year range FILTER(BOUND(?date)) FILTER(YEAR(?date) >= {year_start} && YEAR(?date) <= {year_end}) # Optional: participants OPTIONAL {{ ?event wdt:P710 ?participant . }} # Optional: event type OPTIONAL {{ ?event wdt:P31 ?type . }} # Optional: Wikipedia article OPTIONAL {{ ?article schema:about ?event ; schema:isPartOf . }} # Optional: image OPTIONAL {{ ?event wdt:P18 ?image . }} # Location label OPTIONAL {{ ?event wdt:P276 ?loc . }} SERVICE wikibase:label {{ bd:serviceParam wikibase:language "en,de,fr,es,it" . }} }} ORDER BY ABS(YEAR(?date) - {year}) LIMIT {limit} """ return query def _build_fallback_query(lat: float, lon: float, year: int, limit: int = 15) -> str: """ Simpler fallback query that searches for any notable events in the year range. Uses text search and broader event types. """ year_start = year - 20 year_end = year + 20 query = f""" SELECT DISTINCT ?event ?eventLabel ?eventDescription ?date ?coord ?locationLabel ?article WHERE {{ # Broader event types VALUES ?eventType {{ wd:Q178561 # battle wd:Q131569 # treaty wd:Q7278 # political revolution wd:Q8076 # assassination wd:Q3882219 # coronation wd:Q1318295 # military offensive wd:Q2001676 # massacre wd:Q18669875 # historical event wd:Q13418847 # historical period wd:Q3024240 # historical event }} ?event wdt:P31 ?eventType . # Date filter OPTIONAL {{ ?event wdt:P585 ?pointInTime . }} OPTIONAL {{ ?event wdt:P580 ?startTime . }} BIND(COALESCE(?pointInTime, ?startTime) AS ?date) FILTER(BOUND(?date)) FILTER(YEAR(?date) >= {year_start} && YEAR(?date) <= {year_end}) # Get coordinates via location OPTIONAL {{ ?event wdt:P276 ?location . ?location wdt:P625 ?coord . }} OPTIONAL {{ ?event wdt:P625 ?directCoord . }} BIND(COALESCE(?coord, ?directCoord) AS ?finalCoord) # Wikipedia article OPTIONAL {{ ?article schema:about ?event ; schema:isPartOf . }} SERVICE wikibase:label {{ bd:serviceParam wikibase:language "en" . }} }} ORDER BY ABS(YEAR(?date) - {year}) LIMIT {limit} """ return query def _execute_sparql(query: str) -> Optional[Dict]: """Execute SPARQL query against Wikidata endpoint.""" _rate_limit() headers = { "Accept": "application/sparql-results+json", "User-Agent": "Meridian-Historical-App/1.0 (https://github.com/meridian; contact@example.com)" } # Try using requests library first (better SSL handling) if USE_REQUESTS: try: response = requests.get( WIKIDATA_SPARQL_ENDPOINT, params={"query": query}, headers=headers, timeout=30, verify=certifi.where(), ) response.raise_for_status() return response.json() except Exception as e: print(f"[Wikidata] SPARQL query failed (requests): {e}") # Fall through to urllib fallback # Fallback to urllib with SSL context params = urllib.parse.urlencode({"query": query}) url = f"{WIKIDATA_SPARQL_ENDPOINT}?{params}" try: # Create SSL context that doesn't verify certificates (fallback) ssl_context = ssl.create_default_context() ssl_context.check_hostname = False ssl_context.verify_mode = ssl.CERT_NONE req = urllib.request.Request(url, headers=headers) with urllib.request.urlopen(req, timeout=30, context=ssl_context) as response: return json.loads(response.read().decode("utf-8")) except (urllib.error.URLError, urllib.error.HTTPError, json.JSONDecodeError, TimeoutError) as e: print(f"[Wikidata] SPARQL query failed (urllib): {e}") return None def _parse_wikidata_date(date_str: str) -> Tuple[Optional[int], Optional[int], Optional[int]]: """Parse Wikidata date string to year, month, day.""" if not date_str: return None, None, None # Handle BCE dates (negative years) # Wikidata format: -0044-03-15T00:00:00Z for 44 BCE try: if date_str.startswith("-"): # BCE date parts = date_str[1:].split("T")[0].split("-") year = -int(parts[0]) month = int(parts[1]) if len(parts) > 1 else None day = int(parts[2]) if len(parts) > 2 else None return year, month, day else: # CE date parts = date_str.split("T")[0].split("-") year = int(parts[0]) month = int(parts[1]) if len(parts) > 1 else None day = int(parts[2]) if len(parts) > 2 else None return year, month, day except (ValueError, IndexError): return None, None, None def _parse_coordinates(coord_str: str) -> Tuple[Optional[float], Optional[float]]: """Parse Wikidata coordinate string to lat, lon.""" if not coord_str: return None, None # Format: Point(lon lat) try: if coord_str.startswith("Point("): inner = coord_str[6:-1] lon_str, lat_str = inner.split() return float(lat_str), float(lon_str) except (ValueError, IndexError): pass return None, None def _results_to_events( results: Dict, query_lat: float, query_lon: float, query_year: int, ) -> List[WikidataEvent]: """Convert SPARQL results to WikidataEvent objects.""" events_map: Dict[str, WikidataEvent] = {} bindings = results.get("results", {}).get("bindings", []) for binding in bindings: # Extract QID event_uri = binding.get("event", {}).get("value", "") if not event_uri: continue qid = event_uri.split("/")[-1] # Get or create event if qid not in events_map: name = binding.get("eventLabel", {}).get("value", "Unknown Event") description = binding.get("eventDescription", {}).get("value", "") # Parse date date_str = binding.get("date", {}).get("value", "") year, month, day = _parse_wikidata_date(date_str) # Parse coordinates coord_str = binding.get("coord", {}).get("value", "") lat, lon = _parse_coordinates(coord_str) # Location name location_name = binding.get("locationLabel", {}).get("value", "") # Event type event_type = binding.get("typeLabel", {}).get("value", "") # Wikipedia URL wikipedia_url = binding.get("article", {}).get("value", "") # Image URL image_url = binding.get("image", {}).get("value", "") # Calculate distance and confidence distance_km = 0.0 if lat is not None and lon is not None: distance_km = _haversine(query_lat, query_lon, lat, lon) year_delta = abs((year or query_year) - query_year) # Confidence scoring confidence = 0.7 if distance_km < 50: confidence += 0.15 elif distance_km < 150: confidence += 0.1 if year_delta == 0: confidence += 0.15 elif year_delta <= 5: confidence += 0.1 if wikipedia_url: confidence += 0.05 confidence = min(confidence, 0.98) events_map[qid] = WikidataEvent( qid=qid, name=name, description=description, year=year, month=month, day=day, lat=lat, lon=lon, location_name=location_name, event_type=event_type, wikipedia_url=wikipedia_url, image_url=image_url, distance_km=round(distance_km, 2), year_delta=year_delta, confidence=round(confidence, 3), ) # Add participant if present participant = binding.get("participantLabel", {}).get("value", "") if participant and participant not in events_map[qid].participants: events_map[qid].participants.append(participant) # Sort by relevance (lower distance + year_delta = better) events = list(events_map.values()) events.sort(key=lambda e: e.distance_km + e.year_delta * 5 - e.confidence * 20) return events def search_events_by_geo_time( lat: float, lon: float, year: int, radius_km: float = 300.0, limit: int = 10, use_cache: bool = True, ) -> List[Dict]: """ Search Wikidata for historical events near coordinates and year. Args: lat: Latitude lon: Longitude year: Target year (negative for BCE) radius_km: Search radius in kilometers limit: Maximum number of results use_cache: Whether to use cached results Returns: List of event dictionaries compatible with existing code """ # Check cache first cache_key = _cache_key(lat, lon, year, radius_km) if use_cache: cached = _get_cached(cache_key) if cached is not None: print(f"[Wikidata] Cache hit for {lat:.2f}, {lon:.2f}, {year}") return cached[:limit] print(f"[Wikidata] Querying for events near {lat:.2f}, {lon:.2f}, year {year}") # Try primary query first query = _build_sparql_query(lat, lon, year, radius_km, limit * 2) results = _execute_sparql(query) events: List[WikidataEvent] = [] if results: events = _results_to_events(results, lat, lon, year) # If no results, try fallback query if not events: print("[Wikidata] Primary query returned no results, trying fallback...") fallback_query = _build_fallback_query(lat, lon, year, limit * 2) fallback_results = _execute_sparql(fallback_query) if fallback_results: events = _results_to_events(fallback_results, lat, lon, year) # Convert to dicts and cache event_dicts = [e.to_dict() for e in events[:limit]] if use_cache and event_dicts: _save_cache(cache_key, event_dicts) print(f"[Wikidata] Found {len(event_dicts)} events") return event_dicts def get_event_detail(qid: str) -> Optional[Dict]: """ Fetch detailed information about a specific Wikidata event. Args: qid: Wikidata QID (e.g., "Q784") Returns: Event dictionary with full details, or None if not found """ query = f""" SELECT ?event ?eventLabel ?eventDescription ?date ?coord ?locationLabel ?participantLabel ?typeLabel ?article ?image ?causeLabel ?effectLabel WHERE {{ BIND(wd:{qid} AS ?event) OPTIONAL {{ ?event wdt:P585 ?pointInTime . }} OPTIONAL {{ ?event wdt:P580 ?startTime . }} BIND(COALESCE(?pointInTime, ?startTime) AS ?date) OPTIONAL {{ ?event wdt:P625 ?coord . }} OPTIONAL {{ ?event wdt:P276 ?location . ?location wdt:P625 ?locCoord . }} OPTIONAL {{ ?event wdt:P710 ?participant . }} OPTIONAL {{ ?event wdt:P31 ?type . }} OPTIONAL {{ ?event wdt:P828 ?cause . }} OPTIONAL {{ ?event wdt:P1542 ?effect . }} OPTIONAL {{ ?event wdt:P18 ?image . }} OPTIONAL {{ ?article schema:about ?event ; schema:isPartOf . }} SERVICE wikibase:label {{ bd:serviceParam wikibase:language "en" . }} }} LIMIT 50 """ results = _execute_sparql(query) if not results: return None events = _results_to_events(results, 0, 0, 0) if events: event = events[0] event_dict = event.to_dict() # Extract causes and effects from results bindings = results.get("results", {}).get("bindings", []) causes = set() effects = set() for binding in bindings: cause = binding.get("causeLabel", {}).get("value", "") effect = binding.get("effectLabel", {}).get("value", "") if cause: causes.add(cause) if effect: effects.add(effect) event_dict["causes"] = list(causes) event_dict["effects"] = list(effects) event_dict["relationships"] = { "causes": list(causes), "consequences": list(effects), } return event_dict return None def search_events_by_name(name: str, limit: int = 5) -> List[Dict]: """ Search Wikidata for events by name. Args: name: Event name to search for limit: Maximum results Returns: List of matching events """ # Escape special characters escaped_name = name.replace('"', '\\"') query = f""" SELECT DISTINCT ?event ?eventLabel ?eventDescription ?date ?coord ?locationLabel ?article WHERE {{ ?event wdt:P31/wdt:P279* wd:Q1656682 . ?event rdfs:label ?label . FILTER(LANG(?label) = "en") FILTER(CONTAINS(LCASE(?label), LCASE("{escaped_name}"))) OPTIONAL {{ ?event wdt:P585 ?pointInTime . }} OPTIONAL {{ ?event wdt:P580 ?startTime . }} BIND(COALESCE(?pointInTime, ?startTime) AS ?date) OPTIONAL {{ ?event wdt:P625 ?coord . }} OPTIONAL {{ ?event wdt:P276 ?location . ?location wdt:P625 ?locCoord . }} OPTIONAL {{ ?article schema:about ?event ; schema:isPartOf . }} SERVICE wikibase:label {{ bd:serviceParam wikibase:language "en" . }} }} LIMIT {limit} """ results = _execute_sparql(query) if not results: return [] events = _results_to_events(results, 0, 0, 0) return [e.to_dict() for e in events] # MCP-compatible function exports def mcp_search_historical_events( latitude: float, longitude: float, year: int, radius_km: float = 300.0, limit: int = 10, ) -> Dict: """ MCP tool: Search for historical events by coordinates and year. This queries Wikidata for events that occurred near the specified location and time, returning structured event data suitable for prompt generation. """ try: events = search_events_by_geo_time( lat=latitude, lon=longitude, year=year, radius_km=radius_km, limit=limit, ) return { "success": True, "query": { "latitude": latitude, "longitude": longitude, "year": year, "radius_km": radius_km, }, "count": len(events), "events": events, "source": "wikidata", } except Exception as e: return { "success": False, "error": str(e), } def mcp_get_event_by_qid(qid: str) -> Dict: """ MCP tool: Get detailed information about a Wikidata event. """ try: event = get_event_detail(qid) if event: return { "success": True, "event": event, } return { "success": False, "error": f"Event {qid} not found", } except Exception as e: return { "success": False, "error": str(e), }