"""
Wikidata service for finding historical events.
Queries Wikidata for events at specific coordinates and times.
"""
from __future__ import annotations
import hashlib
import json
import math
import os
import ssl
import time
from dataclasses import dataclass, field
from datetime import datetime
from pathlib import Path
from typing import Any, Dict, List, Optional, Tuple
import urllib.request
import urllib.parse
import urllib.error
# Note: We use urllib with SSL fallback to avoid permission issues with requests/certifi
USE_REQUESTS = False
# Cache configuration
ROOT_DIR = Path(__file__).resolve().parent.parent
CACHE_DIR = ROOT_DIR / "data" / "wikidata_cache"
CACHE_DIR.mkdir(parents=True, exist_ok=True)
CACHE_TTL_SECONDS = 86400 * 7 # 7 days
WIKIDATA_SPARQL_ENDPOINT = "https://query.wikidata.org/sparql"
# Rate limiting
_last_request_time: float = 0.0
MIN_REQUEST_INTERVAL = 1.5 # seconds between requests
@dataclass
class WikidataEvent:
"""Represents a historical event from Wikidata."""
qid: str
name: str
description: str = ""
year: Optional[int] = None
month: Optional[int] = None
day: Optional[int] = None
lat: Optional[float] = None
lon: Optional[float] = None
participants: List[str] = field(default_factory=list)
location_name: str = ""
event_type: str = ""
wikipedia_url: str = ""
image_url: str = ""
distance_km: float = 0.0
year_delta: int = 0
confidence: float = 0.0
source: str = "wikidata"
def to_dict(self) -> Dict[str, Any]:
return {
"qid": self.qid,
"name": self.name,
"description": self.description,
"year": self.year,
"month": self.month,
"day": self.day,
"lat": self.lat,
"lon": self.lon,
"participants": self.participants,
"location_name": self.location_name,
"event_type": self.event_type,
"wikipedia_url": self.wikipedia_url,
"image_url": self.image_url,
"distance_km": self.distance_km,
"year_delta": self.year_delta,
"confidence": self.confidence,
"source": self.source,
# Compatibility fields for existing code
"summary": self.description,
"narrative": self.description,
"actors": self.participants,
"themes": [self.event_type] if self.event_type else [],
"artifacts": [],
"visual_motifs": [],
"facets": {"type": self.event_type},
"sources": [{"label": "Wikidata", "url": f"https://www.wikidata.org/wiki/{self.qid}"}],
"match_confidence": self.confidence,
}
def _cache_key(lat: float, lon: float, year: int, radius_km: float) -> str:
"""Generate a cache key for the query parameters."""
raw = f"{lat:.2f}_{lon:.2f}_{year}_{radius_km:.0f}"
return hashlib.md5(raw.encode()).hexdigest()
def _get_cached(cache_key: str) -> Optional[List[Dict]]:
"""Retrieve cached results if they exist and aren't expired."""
cache_file = CACHE_DIR / f"{cache_key}.json"
if not cache_file.exists():
return None
try:
with open(cache_file, "r", encoding="utf-8") as f:
data = json.load(f)
cached_time = data.get("timestamp", 0)
if time.time() - cached_time > CACHE_TTL_SECONDS:
cache_file.unlink(missing_ok=True)
return None
return data.get("events", [])
except (json.JSONDecodeError, OSError):
return None
def _save_cache(cache_key: str, events: List[Dict]) -> None:
"""Save results to cache."""
cache_file = CACHE_DIR / f"{cache_key}.json"
try:
with open(cache_file, "w", encoding="utf-8") as f:
json.dump({"timestamp": time.time(), "events": events}, f, ensure_ascii=False)
except OSError:
pass
def _rate_limit() -> None:
"""Ensure we don't exceed Wikidata rate limits."""
global _last_request_time
elapsed = time.time() - _last_request_time
if elapsed < MIN_REQUEST_INTERVAL:
time.sleep(MIN_REQUEST_INTERVAL - elapsed)
_last_request_time = time.time()
def _haversine(lat1: float, lon1: float, lat2: float, lon2: float) -> float:
"""Calculate distance in km between two points."""
R = 6371.0
phi1, phi2 = math.radians(lat1), math.radians(lat2)
dphi = math.radians(lat2 - lat1)
dlam = math.radians(lon2 - lon1)
a = math.sin(dphi / 2) ** 2 + math.cos(phi1) * math.cos(phi2) * math.sin(dlam / 2) ** 2
return R * 2 * math.atan2(math.sqrt(a), math.sqrt(max(0, 1 - a)))
def _build_sparql_query(lat: float, lon: float, year: int, radius_km: float, limit: int = 20) -> str:
"""
Build SPARQL query for historical events near coordinates and year.
This query searches for:
- Events (Q1656682) that occurred at a location
- Battles, treaties, revolutions, etc.
- Events with point in time or start time within the year range
"""
# Wikidata uses negative years for BCE
year_start = year - 15
year_end = year + 15
# Convert radius to degrees (rough approximation)
degree_radius = radius_km / 111.0
query = f"""
SELECT DISTINCT ?event ?eventLabel ?eventDescription ?date ?coord ?locationLabel ?participantLabel ?typeLabel ?article ?image
WHERE {{
# Find events with coordinates
?event wdt:P31/wdt:P279* wd:Q1656682 . # instance of event or subclass
# Get coordinates - either direct or via location
OPTIONAL {{
?event wdt:P625 ?directCoord .
}}
OPTIONAL {{
?event wdt:P276 ?location .
?location wdt:P625 ?locationCoord .
}}
BIND(COALESCE(?directCoord, ?locationCoord) AS ?coord)
# Filter by coordinate bounding box
FILTER(BOUND(?coord))
BIND(geof:latitude(?coord) AS ?lat)
BIND(geof:longitude(?coord) AS ?lon)
FILTER(?lat >= {lat - degree_radius} && ?lat <= {lat + degree_radius})
FILTER(?lon >= {lon - degree_radius} && ?lon <= {lon + degree_radius})
# Get date
OPTIONAL {{ ?event wdt:P585 ?pointInTime . }}
OPTIONAL {{ ?event wdt:P580 ?startTime . }}
BIND(COALESCE(?pointInTime, ?startTime) AS ?date)
# Filter by year range
FILTER(BOUND(?date))
FILTER(YEAR(?date) >= {year_start} && YEAR(?date) <= {year_end})
# Optional: participants
OPTIONAL {{ ?event wdt:P710 ?participant . }}
# Optional: event type
OPTIONAL {{ ?event wdt:P31 ?type . }}
# Optional: Wikipedia article
OPTIONAL {{
?article schema:about ?event ;
schema:isPartOf .
}}
# Optional: image
OPTIONAL {{ ?event wdt:P18 ?image . }}
# Location label
OPTIONAL {{ ?event wdt:P276 ?loc . }}
SERVICE wikibase:label {{ bd:serviceParam wikibase:language "en,de,fr,es,it" . }}
}}
ORDER BY ABS(YEAR(?date) - {year})
LIMIT {limit}
"""
return query
def _build_fallback_query(lat: float, lon: float, year: int, limit: int = 15) -> str:
"""
Simpler fallback query that searches for any notable events in the year range.
Uses text search and broader event types.
"""
year_start = year - 20
year_end = year + 20
query = f"""
SELECT DISTINCT ?event ?eventLabel ?eventDescription ?date ?coord ?locationLabel ?article
WHERE {{
# Broader event types
VALUES ?eventType {{
wd:Q178561 # battle
wd:Q131569 # treaty
wd:Q7278 # political revolution
wd:Q8076 # assassination
wd:Q3882219 # coronation
wd:Q1318295 # military offensive
wd:Q2001676 # massacre
wd:Q18669875 # historical event
wd:Q13418847 # historical period
wd:Q3024240 # historical event
}}
?event wdt:P31 ?eventType .
# Date filter
OPTIONAL {{ ?event wdt:P585 ?pointInTime . }}
OPTIONAL {{ ?event wdt:P580 ?startTime . }}
BIND(COALESCE(?pointInTime, ?startTime) AS ?date)
FILTER(BOUND(?date))
FILTER(YEAR(?date) >= {year_start} && YEAR(?date) <= {year_end})
# Get coordinates via location
OPTIONAL {{
?event wdt:P276 ?location .
?location wdt:P625 ?coord .
}}
OPTIONAL {{
?event wdt:P625 ?directCoord .
}}
BIND(COALESCE(?coord, ?directCoord) AS ?finalCoord)
# Wikipedia article
OPTIONAL {{
?article schema:about ?event ;
schema:isPartOf .
}}
SERVICE wikibase:label {{ bd:serviceParam wikibase:language "en" . }}
}}
ORDER BY ABS(YEAR(?date) - {year})
LIMIT {limit}
"""
return query
def _execute_sparql(query: str) -> Optional[Dict]:
"""Execute SPARQL query against Wikidata endpoint."""
_rate_limit()
headers = {
"Accept": "application/sparql-results+json",
"User-Agent": "Meridian-Historical-App/1.0 (https://github.com/meridian; contact@example.com)"
}
# Try using requests library first (better SSL handling)
if USE_REQUESTS:
try:
response = requests.get(
WIKIDATA_SPARQL_ENDPOINT,
params={"query": query},
headers=headers,
timeout=30,
verify=certifi.where(),
)
response.raise_for_status()
return response.json()
except Exception as e:
print(f"[Wikidata] SPARQL query failed (requests): {e}")
# Fall through to urllib fallback
# Fallback to urllib with SSL context
params = urllib.parse.urlencode({"query": query})
url = f"{WIKIDATA_SPARQL_ENDPOINT}?{params}"
try:
# Create SSL context that doesn't verify certificates (fallback)
ssl_context = ssl.create_default_context()
ssl_context.check_hostname = False
ssl_context.verify_mode = ssl.CERT_NONE
req = urllib.request.Request(url, headers=headers)
with urllib.request.urlopen(req, timeout=30, context=ssl_context) as response:
return json.loads(response.read().decode("utf-8"))
except (urllib.error.URLError, urllib.error.HTTPError, json.JSONDecodeError, TimeoutError) as e:
print(f"[Wikidata] SPARQL query failed (urllib): {e}")
return None
def _parse_wikidata_date(date_str: str) -> Tuple[Optional[int], Optional[int], Optional[int]]:
"""Parse Wikidata date string to year, month, day."""
if not date_str:
return None, None, None
# Handle BCE dates (negative years)
# Wikidata format: -0044-03-15T00:00:00Z for 44 BCE
try:
if date_str.startswith("-"):
# BCE date
parts = date_str[1:].split("T")[0].split("-")
year = -int(parts[0])
month = int(parts[1]) if len(parts) > 1 else None
day = int(parts[2]) if len(parts) > 2 else None
return year, month, day
else:
# CE date
parts = date_str.split("T")[0].split("-")
year = int(parts[0])
month = int(parts[1]) if len(parts) > 1 else None
day = int(parts[2]) if len(parts) > 2 else None
return year, month, day
except (ValueError, IndexError):
return None, None, None
def _parse_coordinates(coord_str: str) -> Tuple[Optional[float], Optional[float]]:
"""Parse Wikidata coordinate string to lat, lon."""
if not coord_str:
return None, None
# Format: Point(lon lat)
try:
if coord_str.startswith("Point("):
inner = coord_str[6:-1]
lon_str, lat_str = inner.split()
return float(lat_str), float(lon_str)
except (ValueError, IndexError):
pass
return None, None
def _results_to_events(
results: Dict,
query_lat: float,
query_lon: float,
query_year: int,
) -> List[WikidataEvent]:
"""Convert SPARQL results to WikidataEvent objects."""
events_map: Dict[str, WikidataEvent] = {}
bindings = results.get("results", {}).get("bindings", [])
for binding in bindings:
# Extract QID
event_uri = binding.get("event", {}).get("value", "")
if not event_uri:
continue
qid = event_uri.split("/")[-1]
# Get or create event
if qid not in events_map:
name = binding.get("eventLabel", {}).get("value", "Unknown Event")
description = binding.get("eventDescription", {}).get("value", "")
# Parse date
date_str = binding.get("date", {}).get("value", "")
year, month, day = _parse_wikidata_date(date_str)
# Parse coordinates
coord_str = binding.get("coord", {}).get("value", "")
lat, lon = _parse_coordinates(coord_str)
# Location name
location_name = binding.get("locationLabel", {}).get("value", "")
# Event type
event_type = binding.get("typeLabel", {}).get("value", "")
# Wikipedia URL
wikipedia_url = binding.get("article", {}).get("value", "")
# Image URL
image_url = binding.get("image", {}).get("value", "")
# Calculate distance and confidence
distance_km = 0.0
if lat is not None and lon is not None:
distance_km = _haversine(query_lat, query_lon, lat, lon)
year_delta = abs((year or query_year) - query_year)
# Confidence scoring
confidence = 0.7
if distance_km < 50:
confidence += 0.15
elif distance_km < 150:
confidence += 0.1
if year_delta == 0:
confidence += 0.15
elif year_delta <= 5:
confidence += 0.1
if wikipedia_url:
confidence += 0.05
confidence = min(confidence, 0.98)
events_map[qid] = WikidataEvent(
qid=qid,
name=name,
description=description,
year=year,
month=month,
day=day,
lat=lat,
lon=lon,
location_name=location_name,
event_type=event_type,
wikipedia_url=wikipedia_url,
image_url=image_url,
distance_km=round(distance_km, 2),
year_delta=year_delta,
confidence=round(confidence, 3),
)
# Add participant if present
participant = binding.get("participantLabel", {}).get("value", "")
if participant and participant not in events_map[qid].participants:
events_map[qid].participants.append(participant)
# Sort by relevance (lower distance + year_delta = better)
events = list(events_map.values())
events.sort(key=lambda e: e.distance_km + e.year_delta * 5 - e.confidence * 20)
return events
def search_events_by_geo_time(
lat: float,
lon: float,
year: int,
radius_km: float = 300.0,
limit: int = 10,
use_cache: bool = True,
) -> List[Dict]:
"""
Search Wikidata for historical events near coordinates and year.
Args:
lat: Latitude
lon: Longitude
year: Target year (negative for BCE)
radius_km: Search radius in kilometers
limit: Maximum number of results
use_cache: Whether to use cached results
Returns:
List of event dictionaries compatible with existing code
"""
# Check cache first
cache_key = _cache_key(lat, lon, year, radius_km)
if use_cache:
cached = _get_cached(cache_key)
if cached is not None:
print(f"[Wikidata] Cache hit for {lat:.2f}, {lon:.2f}, {year}")
return cached[:limit]
print(f"[Wikidata] Querying for events near {lat:.2f}, {lon:.2f}, year {year}")
# Try primary query first
query = _build_sparql_query(lat, lon, year, radius_km, limit * 2)
results = _execute_sparql(query)
events: List[WikidataEvent] = []
if results:
events = _results_to_events(results, lat, lon, year)
# If no results, try fallback query
if not events:
print("[Wikidata] Primary query returned no results, trying fallback...")
fallback_query = _build_fallback_query(lat, lon, year, limit * 2)
fallback_results = _execute_sparql(fallback_query)
if fallback_results:
events = _results_to_events(fallback_results, lat, lon, year)
# Convert to dicts and cache
event_dicts = [e.to_dict() for e in events[:limit]]
if use_cache and event_dicts:
_save_cache(cache_key, event_dicts)
print(f"[Wikidata] Found {len(event_dicts)} events")
return event_dicts
def get_event_detail(qid: str) -> Optional[Dict]:
"""
Fetch detailed information about a specific Wikidata event.
Args:
qid: Wikidata QID (e.g., "Q784")
Returns:
Event dictionary with full details, or None if not found
"""
query = f"""
SELECT ?event ?eventLabel ?eventDescription ?date ?coord ?locationLabel
?participantLabel ?typeLabel ?article ?image ?causeLabel ?effectLabel
WHERE {{
BIND(wd:{qid} AS ?event)
OPTIONAL {{ ?event wdt:P585 ?pointInTime . }}
OPTIONAL {{ ?event wdt:P580 ?startTime . }}
BIND(COALESCE(?pointInTime, ?startTime) AS ?date)
OPTIONAL {{ ?event wdt:P625 ?coord . }}
OPTIONAL {{
?event wdt:P276 ?location .
?location wdt:P625 ?locCoord .
}}
OPTIONAL {{ ?event wdt:P710 ?participant . }}
OPTIONAL {{ ?event wdt:P31 ?type . }}
OPTIONAL {{ ?event wdt:P828 ?cause . }}
OPTIONAL {{ ?event wdt:P1542 ?effect . }}
OPTIONAL {{ ?event wdt:P18 ?image . }}
OPTIONAL {{
?article schema:about ?event ;
schema:isPartOf .
}}
SERVICE wikibase:label {{ bd:serviceParam wikibase:language "en" . }}
}}
LIMIT 50
"""
results = _execute_sparql(query)
if not results:
return None
events = _results_to_events(results, 0, 0, 0)
if events:
event = events[0]
event_dict = event.to_dict()
# Extract causes and effects from results
bindings = results.get("results", {}).get("bindings", [])
causes = set()
effects = set()
for binding in bindings:
cause = binding.get("causeLabel", {}).get("value", "")
effect = binding.get("effectLabel", {}).get("value", "")
if cause:
causes.add(cause)
if effect:
effects.add(effect)
event_dict["causes"] = list(causes)
event_dict["effects"] = list(effects)
event_dict["relationships"] = {
"causes": list(causes),
"consequences": list(effects),
}
return event_dict
return None
def search_events_by_name(name: str, limit: int = 5) -> List[Dict]:
"""
Search Wikidata for events by name.
Args:
name: Event name to search for
limit: Maximum results
Returns:
List of matching events
"""
# Escape special characters
escaped_name = name.replace('"', '\\"')
query = f"""
SELECT DISTINCT ?event ?eventLabel ?eventDescription ?date ?coord ?locationLabel ?article
WHERE {{
?event wdt:P31/wdt:P279* wd:Q1656682 .
?event rdfs:label ?label .
FILTER(LANG(?label) = "en")
FILTER(CONTAINS(LCASE(?label), LCASE("{escaped_name}")))
OPTIONAL {{ ?event wdt:P585 ?pointInTime . }}
OPTIONAL {{ ?event wdt:P580 ?startTime . }}
BIND(COALESCE(?pointInTime, ?startTime) AS ?date)
OPTIONAL {{ ?event wdt:P625 ?coord . }}
OPTIONAL {{
?event wdt:P276 ?location .
?location wdt:P625 ?locCoord .
}}
OPTIONAL {{
?article schema:about ?event ;
schema:isPartOf .
}}
SERVICE wikibase:label {{ bd:serviceParam wikibase:language "en" . }}
}}
LIMIT {limit}
"""
results = _execute_sparql(query)
if not results:
return []
events = _results_to_events(results, 0, 0, 0)
return [e.to_dict() for e in events]
# MCP-compatible function exports
def mcp_search_historical_events(
latitude: float,
longitude: float,
year: int,
radius_km: float = 300.0,
limit: int = 10,
) -> Dict:
"""
MCP tool: Search for historical events by coordinates and year.
This queries Wikidata for events that occurred near the specified
location and time, returning structured event data suitable for
prompt generation.
"""
try:
events = search_events_by_geo_time(
lat=latitude,
lon=longitude,
year=year,
radius_km=radius_km,
limit=limit,
)
return {
"success": True,
"query": {
"latitude": latitude,
"longitude": longitude,
"year": year,
"radius_km": radius_km,
},
"count": len(events),
"events": events,
"source": "wikidata",
}
except Exception as e:
return {
"success": False,
"error": str(e),
}
def mcp_get_event_by_qid(qid: str) -> Dict:
"""
MCP tool: Get detailed information about a Wikidata event.
"""
try:
event = get_event_detail(qid)
if event:
return {
"success": True,
"event": event,
}
return {
"success": False,
"error": f"Event {qid} not found",
}
except Exception as e:
return {
"success": False,
"error": str(e),
}