""" RTS Game Web Server - FastAPI + WebSocket Optimized for HuggingFace Spaces with Docker Features: - Real-time multiplayer RTS gameplay - AI tactical analysis via Qwen2.5 LLM - Multi-language support (EN/FR/ZH-TW) - Red Alert-style mechanics """ from fastapi import FastAPI, WebSocket, WebSocketDisconnect from fastapi.responses import HTMLResponse, FileResponse from fastapi.staticfiles import StaticFiles from fastapi.middleware.cors import CORSMiddleware import asyncio import json import random import time from typing import Dict, List, Optional, Set, Any from dataclasses import dataclass, asdict from enum import Enum import uuid # Import localization and AI systems from localization import LOCALIZATION from ai_analysis import get_ai_analyzer, get_model_download_status from nl_to_mcp_translator import translate_nl_to_mcp # Add NL translation import # Game Constants TILE_SIZE = 40 MAP_WIDTH = 96 MAP_HEIGHT = 72 VIEW_WIDTH = 48 VIEW_HEIGHT = 27 # Initialize FastAPI app app = FastAPI(title="RTS Game", version="1.0.0") # CORS middleware app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) from backend.constants import ( UnitType, BuildingType, UNIT_COSTS, BUILDING_COSTS, POWER_PRODUCTION, POWER_CONSUMPTION, LOW_POWER_THRESHOLD, LOW_POWER_PRODUCTION_FACTOR, HARVESTER_CAPACITY, HARVEST_AMOUNT_PER_ORE, HARVEST_AMOUNT_PER_GEM, HQ_BUILD_RADIUS_TILES, ALLOW_MULTIPLE_SAME_BUILDING, ) class TerrainType(str, Enum): GRASS = "grass" ORE = "ore" GEM = "gem" WATER = "water" # Production Requirements - Critical for gameplay! PRODUCTION_REQUIREMENTS = { UnitType.INFANTRY: BuildingType.BARRACKS, UnitType.TANK: BuildingType.WAR_FACTORY, UnitType.ARTILLERY: BuildingType.WAR_FACTORY, UnitType.HELICOPTER: BuildingType.WAR_FACTORY, UnitType.HARVESTER: BuildingType.HQ, # Harvester needs HQ, NOT Refinery! } ## Costs, power system, and harvesting constants are imported above # Data Classes @dataclass class Position: x: float y: float def distance_to(self, other: 'Position') -> float: return ((self.x - other.x) ** 2 + (self.y - other.y) ** 2) ** 0.5 def to_dict(self): return {"x": self.x, "y": self.y} @dataclass class Unit: id: str type: UnitType player_id: int position: Position health: int max_health: int speed: float damage: int range: float target: Optional[Position] = None target_unit_id: Optional[str] = None target_building_id: Optional[str] = None cargo: int = 0 gathering: bool = False returning: bool = False ore_target: Optional[Position] = None last_attacker_id: Optional[str] = None manual_control: bool = False # True when player gives manual orders manual_order: bool = False # True when player gives manual move/attack order collision_radius: float = 15.0 # Collision detection radius attack_cooldown: int = 0 # Frames until next attack attack_animation: int = 0 # Frames for attack animation (for visual feedback) def to_dict(self): return { "id": self.id, "type": self.type.value, "player_id": self.player_id, "position": self.position.to_dict(), "health": self.health, "max_health": self.max_health, "speed": self.speed, "damage": self.damage, "range": self.range, "target": self.target.to_dict() if self.target else None, "target_unit_id": self.target_unit_id, "target_building_id": self.target_building_id, "cargo": self.cargo, "gathering": self.gathering, "returning": self.returning, "manual_control": self.manual_control, "manual_order": self.manual_order, "collision_radius": self.collision_radius, "attack_cooldown": self.attack_cooldown, "attack_animation": self.attack_animation } @dataclass class Building: id: str type: BuildingType player_id: int position: Position health: int max_health: int production_queue: List[str] production_progress: float target_unit_id: Optional[str] = None # For defense turrets attack_cooldown: int = 0 # For defense turrets attack_animation: int = 0 # For defense turrets def to_dict(self): return { "id": self.id, "type": self.type.value, "player_id": self.player_id, "position": self.position.to_dict(), "health": self.health, "max_health": self.max_health, "production_queue": self.production_queue, "production_progress": self.production_progress, "target_unit_id": self.target_unit_id, "attack_cooldown": self.attack_cooldown, "attack_animation": self.attack_animation } @dataclass class Player: id: int name: str color: str credits: int power: int power_consumption: int is_ai: bool language: str = "en" # Language preference (en, fr, zh-TW) superweapon_charge: int = 0 # 0-1800 ticks (30 seconds at 60 ticks/sec) superweapon_ready: bool = False nuke_preparing: bool = False # True when 'N' key pressed, waiting for target def to_dict(self): return asdict(self) # Game State Manager class GameState: def __init__(self): self.units: Dict[str, Unit] = {} self.buildings: Dict[str, Building] = {} self.players: Dict[int, Player] = {} self.terrain: List[List[TerrainType]] = [] self.fog_of_war: List[List[bool]] = [] self.game_started = False self.game_over = False self.winner: Optional[str] = None # "player" or "enemy" self.tick = 0 self.init_map() self.init_players() def init_map(self): """Initialize terrain with grass, ore, and water""" self.terrain = [[TerrainType.GRASS for _ in range(MAP_WIDTH)] for _ in range(MAP_HEIGHT)] self.fog_of_war = [[True for _ in range(MAP_WIDTH)] for _ in range(MAP_HEIGHT)] # Add ore patches for _ in range(15): ox, oy = random.randint(5, MAP_WIDTH-6), random.randint(5, MAP_HEIGHT-6) for dx in range(-2, 3): for dy in range(-2, 3): if 0 <= ox+dx < MAP_WIDTH and 0 <= oy+dy < MAP_HEIGHT: if random.random() > 0.3: self.terrain[oy+dy][ox+dx] = TerrainType.ORE # Add gem patches (rare) for _ in range(5): gx, gy = random.randint(5, MAP_WIDTH-6), random.randint(5, MAP_HEIGHT-6) for dx in range(-1, 2): for dy in range(-1, 2): if 0 <= gx+dx < MAP_WIDTH and 0 <= gy+dy < MAP_HEIGHT: if random.random() > 0.5: self.terrain[gy+dy][gx+dx] = TerrainType.GEM # Add water bodies for _ in range(8): wx, wy = random.randint(5, MAP_WIDTH-6), random.randint(5, MAP_HEIGHT-6) for dx in range(-3, 4): for dy in range(-3, 4): if 0 <= wx+dx < MAP_WIDTH and 0 <= wy+dy < MAP_HEIGHT: if (dx*dx + dy*dy) < 9: self.terrain[wy+dy][wx+dx] = TerrainType.WATER def init_players(self): """Initialize player 0 (human) and player 1 (AI)""" # Start with power=50 (from HQ), consumption=0 self.players[0] = Player(0, "Player", "#4A90E2", 5000, 50, 0, False) self.players[1] = Player(1, "AI", "#E74C3C", 5000, 50, 0, True) # Create starting HQ for each player hq0_id = str(uuid.uuid4()) self.buildings[hq0_id] = Building( id=hq0_id, type=BuildingType.HQ, player_id=0, position=Position(5 * TILE_SIZE, 5 * TILE_SIZE), health=500, max_health=500, production_queue=[], production_progress=0 ) hq1_id = str(uuid.uuid4()) self.buildings[hq1_id] = Building( id=hq1_id, type=BuildingType.HQ, player_id=1, position=Position((MAP_WIDTH-8) * TILE_SIZE, (MAP_HEIGHT-8) * TILE_SIZE), health=500, max_health=500, production_queue=[], production_progress=0 ) # Starting units for i in range(3): self.create_unit(UnitType.INFANTRY, 0, Position((7+i)*TILE_SIZE, 7*TILE_SIZE)) self.create_unit(UnitType.INFANTRY, 1, Position((MAP_WIDTH-10-i)*TILE_SIZE, (MAP_HEIGHT-10)*TILE_SIZE)) def create_unit(self, unit_type: UnitType, player_id: int, position: Position) -> Unit: """Create a new unit""" unit_stats = { UnitType.INFANTRY: {"health": 100, "speed": 2.0, "damage": 10, "range": 80}, UnitType.TANK: {"health": 200, "speed": 1.5, "damage": 30, "range": 120}, UnitType.HARVESTER: {"health": 150, "speed": 1.0, "damage": 0, "range": 0}, UnitType.HELICOPTER: {"health": 120, "speed": 3.0, "damage": 25, "range": 150}, UnitType.ARTILLERY: {"health": 100, "speed": 1.0, "damage": 50, "range": 200}, } stats = unit_stats[unit_type] unit_id = str(uuid.uuid4()) unit = Unit( id=unit_id, type=unit_type, player_id=player_id, position=position, health=stats["health"], max_health=stats["health"], speed=stats["speed"], damage=stats["damage"], range=stats["range"], target=None, target_unit_id=None ) self.units[unit_id] = unit return unit def create_building(self, building_type: BuildingType, player_id: int, position: Position) -> Building: """Create a new building""" building_stats = { BuildingType.HQ: {"health": 500}, BuildingType.BARRACKS: {"health": 300}, BuildingType.WAR_FACTORY: {"health": 400}, BuildingType.REFINERY: {"health": 250}, BuildingType.POWER_PLANT: {"health": 200}, BuildingType.DEFENSE_TURRET: {"health": 350}, } stats = building_stats[building_type] building_id = str(uuid.uuid4()) building = Building( id=building_id, type=building_type, player_id=player_id, position=position, health=stats["health"], max_health=stats["health"], production_queue=[], production_progress=0 ) self.buildings[building_id] = building return building def calculate_power(self, player_id: int) -> tuple[int, int, str]: """ Calculate power production and consumption for a player. Returns: tuple: (power_production, power_consumption, status) status: 'green' (enough power), 'yellow' (low power), 'red' (no power) """ production = 0 consumption = 0 for building in self.buildings.values(): if building.player_id == player_id: # Add power production production += POWER_PRODUCTION.get(building.type, 0) # Add power consumption consumption += POWER_CONSUMPTION.get(building.type, 0) # Determine status if consumption == 0: status = 'green' elif production >= consumption: status = 'green' elif production >= consumption * LOW_POWER_THRESHOLD: status = 'yellow' else: status = 'red' # Update player power values if player_id in self.players: self.players[player_id].power = production self.players[player_id].power_consumption = consumption return production, consumption, status def to_dict(self): """Convert game state to dictionary for JSON serialization""" return { "tick": self.tick, "game_started": self.game_started, "game_over": self.game_over, "winner": self.winner, "players": {pid: p.to_dict() for pid, p in self.players.items()}, "units": {uid: u.to_dict() for uid, u in self.units.items()}, "buildings": {bid: b.to_dict() for bid, b in self.buildings.items()}, "terrain": [[t.value for t in row] for row in self.terrain], "fog_of_war": self.fog_of_war } # WebSocket Connection Manager class ConnectionManager: def __init__(self): self.active_connections: List[WebSocket] = [] self.game_state = GameState() self.game_loop_task: Optional[asyncio.Task] = None self.ai_analyzer = get_ai_analyzer() self.last_ai_analysis: Dict[str, Any] = {} self.ai_analysis_interval = 30.0 # Analyze every 30 seconds self.last_ai_analysis_time = 0.0 # RED ALERT: Enemy AI state self.ai_last_action_tick = 0 self.ai_action_interval = 120 # Take action every 6 seconds (120 ticks at 20Hz) self.ai_build_plan = [ 'power_plant', 'refinery', 'barracks', 'power_plant', # Second power plant 'war_factory', ] self.ai_build_index = 0 self.ai_unit_cycle = ['infantry', 'infantry', 'tank', 'infantry', 'helicopter'] self.ai_unit_index = 0 async def connect(self, websocket: WebSocket): await websocket.accept() self.active_connections.append(websocket) # Start game loop if not already running if self.game_loop_task is None or self.game_loop_task.done(): self.game_loop_task = asyncio.create_task(self.game_loop()) def disconnect(self, websocket: WebSocket): if websocket in self.active_connections: self.active_connections.remove(websocket) async def broadcast(self, message: dict): """Send message to all connected clients""" disconnected = [] for connection in self.active_connections: try: await connection.send_json(message) except: disconnected.append(connection) # Clean up disconnected clients for conn in disconnected: self.disconnect(conn) async def game_loop(self): """Main game loop - runs at 20 ticks per second""" while self.active_connections: try: # Update game state self.update_game_state() # AI Analysis (periodic) - only if model is available current_time = time.time() if (self.ai_analyzer.model_available and current_time - self.last_ai_analysis_time >= self.ai_analysis_interval): await self.run_ai_analysis() self.last_ai_analysis_time = current_time # Broadcast state to all clients state_dict = self.game_state.to_dict() state_dict['ai_analysis'] = self.last_ai_analysis # Include AI insights # Include model download status so UI can show progress if not self.ai_analyzer.model_available: state_dict['model_download'] = get_model_download_status() await self.broadcast({ "type": "state_update", "state": state_dict }) # 50ms delay = 20 ticks/sec await asyncio.sleep(0.05) except Exception as e: print(f"Game loop error: {e}") await asyncio.sleep(0.1) async def run_ai_analysis(self): """Run AI tactical analysis in background""" # Skip if model not available if not self.ai_analyzer.model_available: # Provide heuristic analysis so panel is never empty player_lang = self.game_state.players.get(0, Player(0, "Player", "#000", 0, 0, 0, False)).language self.last_ai_analysis = self.ai_analyzer._heuristic_analysis(self.game_state.to_dict(), player_lang) return try: # Get player language preference player_lang = self.game_state.players.get(0, Player(0, "Player", "#000", 0, 0, 0, False)).language # Run analysis in thread pool to avoid blocking loop = asyncio.get_event_loop() analysis = await loop.run_in_executor( None, self.ai_analyzer.summarize_combat_situation, self.game_state.to_dict(), player_lang ) self.last_ai_analysis = analysis # Don't print every time to avoid console spam # print(f"🤖 AI Analysis: {analysis.get('summary', '')}") except Exception as e: print(f"⚠️ AI analysis error: {e}") player_lang = self.game_state.players.get(0, Player(0, "Player", "#000", 0, 0, 0, False)).language self.last_ai_analysis = self.ai_analyzer._heuristic_analysis(self.game_state.to_dict(), player_lang) def update_game_state(self): """Update game simulation - Red Alert style!""" self.game_state.tick += 1 # Update superweapon charge (30 seconds = 1800 ticks at 60 ticks/sec) for player in self.game_state.players.values(): if not player.superweapon_ready and player.superweapon_charge < 1800: player.superweapon_charge += 1 if player.superweapon_charge >= 1800: player.superweapon_ready = True # RED ALERT: Calculate power for both players power_prod_p0, power_cons_p0, power_status_p0 = self.game_state.calculate_power(0) power_prod_p1, power_cons_p1, power_status_p1 = self.game_state.calculate_power(1) # Store power status for later use (warning every 5 seconds = 100 ticks at 20Hz) if not hasattr(self, 'last_low_power_warning'): self.last_low_power_warning = 0 if power_status_p0 == 'red' and self.game_state.tick - self.last_low_power_warning > 100: # Send low power warning to player (translated) player_language = self.game_state.players[0].language if 0 in self.game_state.players else "en" message = LOCALIZATION.translate(player_language, "notification.low_power") asyncio.create_task(self.broadcast({ "type": "notification", "message": message, "level": "warning" })) self.last_low_power_warning = self.game_state.tick # RED ALERT: Enemy AI strategic decisions if self.game_state.tick - self.ai_last_action_tick >= self.ai_action_interval: self.update_enemy_ai() self.ai_last_action_tick = self.game_state.tick # Check victory conditions (no HQ = defeat) if not self.game_state.game_over: player_hq_exists = any(b.type == BuildingType.HQ and b.player_id == 0 for b in self.game_state.buildings.values()) enemy_hq_exists = any(b.type == BuildingType.HQ and b.player_id == 1 for b in self.game_state.buildings.values()) if not player_hq_exists and enemy_hq_exists: # Player lost self.game_state.game_over = True self.game_state.winner = "enemy" player_language = self.game_state.players[0].language if 0 in self.game_state.players else "en" winner_name = LOCALIZATION.translate(player_language, "game.winner.enemy") message = LOCALIZATION.translate(player_language, "game.win.banner", winner=winner_name) asyncio.create_task(self.broadcast({ "type": "game_over", "winner": "enemy", "message": message })) elif not enemy_hq_exists and player_hq_exists: # Player won! self.game_state.game_over = True self.game_state.winner = "player" player_language = self.game_state.players[0].language if 0 in self.game_state.players else "en" winner_name = LOCALIZATION.translate(player_language, "game.winner.player") message = LOCALIZATION.translate(player_language, "game.win.banner", winner=winner_name) asyncio.create_task(self.broadcast({ "type": "game_over", "winner": "player", "message": message })) elif not player_hq_exists and not enemy_hq_exists: # Draw (both destroyed simultaneously) self.game_state.game_over = True self.game_state.winner = "draw" asyncio.create_task(self.broadcast({ "type": "game_over", "winner": "draw", "message": "Draw! Both HQs destroyed" })) # Update units for unit in list(self.game_state.units.values()): # RED ALERT: Harvester AI (only if not manually controlled) if unit.type == UnitType.HARVESTER and not unit.manual_control: self.update_harvester(unit) # Don't continue - let it move with the target set by AI # RED ALERT: Auto-defense - if attacked, fight back! (but respect manual orders) if unit.last_attacker_id and unit.last_attacker_id in self.game_state.units: if not unit.target_unit_id and not unit.manual_order: # Not already attacking and no manual order unit.target_unit_id = unit.last_attacker_id # Don't clear movement target if player gave manual move order # RED ALERT: Auto-acquire nearby enemies when idle (but respect manual orders) if not unit.target_unit_id and not unit.target and unit.damage > 0 and not unit.manual_order: nearest_enemy = self.find_nearest_enemy(unit) if nearest_enemy and unit.position.distance_to(nearest_enemy.position) < unit.range * 3: unit.target_unit_id = nearest_enemy.id # Handle combat if unit.target_unit_id: if unit.target_unit_id in self.game_state.units: target = self.game_state.units[unit.target_unit_id] distance = unit.position.distance_to(target.position) if distance <= unit.range: # In range - attack! unit.target = None # Stop moving # Cooldown system - attack every N frames if unit.attack_cooldown <= 0: # Calculate damage based on unit type # Infantry: 5-10 damage per hit, fast attacks (20 frames) # Tank: 30-40 damage per hit, slow attacks (40 frames) # Artillery: 50-60 damage per hit, very slow (60 frames) # Helicopter: 15-20 damage per hit, medium speed (30 frames) damage_multipliers = { UnitType.INFANTRY: (5, 20), # (damage, cooldown) UnitType.TANK: (35, 40), UnitType.ARTILLERY: (55, 60), UnitType.HELICOPTER: (18, 30), UnitType.HARVESTER: (0, 0), } damage, cooldown = damage_multipliers.get(unit.type, (5, 20)) # Apply damage target.health -= damage unit.attack_cooldown = cooldown unit.attack_animation = 10 # 10 frames of attack animation target.last_attacker_id = unit.id # RED ALERT: Track attacker for auto-defense if target.health <= 0: # Target destroyed del self.game_state.units[unit.target_unit_id] unit.target_unit_id = None unit.last_attacker_id = None else: # Move closer unit.target = Position(target.position.x, target.position.y) else: # Target no longer exists unit.target_unit_id = None # Handle building attacks if unit.target_building_id: if unit.target_building_id in self.game_state.buildings: target = self.game_state.buildings[unit.target_building_id] distance = unit.position.distance_to(target.position) if distance <= unit.range: # In range - attack! unit.target = None # Stop moving # Cooldown system - attack every N frames if unit.attack_cooldown <= 0: damage_multipliers = { UnitType.INFANTRY: (5, 20), # (damage, cooldown) UnitType.TANK: (35, 40), UnitType.ARTILLERY: (55, 60), UnitType.HELICOPTER: (18, 30), UnitType.HARVESTER: (0, 0), } damage, cooldown = damage_multipliers.get(unit.type, (5, 20)) # Apply damage to building target.health -= damage unit.attack_cooldown = cooldown unit.attack_animation = 10 # 10 frames of attack animation if target.health <= 0: # Building destroyed del self.game_state.buildings[unit.target_building_id] unit.target_building_id = None else: # Move closer unit.target = Position(target.position.x, target.position.y) else: # Target no longer exists unit.target_building_id = None # Decrease attack cooldown and animation if unit.attack_cooldown > 0: unit.attack_cooldown -= 1 if unit.attack_animation > 0: unit.attack_animation -= 1 # Movement if unit.target: # Move towards target dx = unit.target.x - unit.position.x dy = unit.target.y - unit.position.y dist = (dx*dx + dy*dy) ** 0.5 if dist > 5: unit.position.x += (dx / dist) * unit.speed unit.position.y += (dy / dist) * unit.speed # Apply dispersion after movement self.apply_unit_dispersion(unit) else: unit.target = None unit.manual_order = False # Clear manual order flag when destination reached # If Harvester reached manual destination, resume AI if unit.type == UnitType.HARVESTER and unit.manual_control: unit.manual_control = False # RED ALERT: AI unit behavior (enemy side) if self.game_state.players[unit.player_id].is_ai: self.update_ai_unit(unit) # Update buildings production for building in self.game_state.buildings.values(): # Defense turret auto-attack logic if building.type == BuildingType.DEFENSE_TURRET: turret_range = 300.0 # Defense turret range # Find nearest enemy unit if not building.target_unit_id or building.target_unit_id not in self.game_state.units: min_dist = float('inf') nearest_enemy = None for enemy_unit in self.game_state.units.values(): if enemy_unit.player_id != building.player_id: dist = building.position.distance_to(enemy_unit.position) if dist < turret_range and dist < min_dist: min_dist = dist nearest_enemy = enemy_unit if nearest_enemy: building.target_unit_id = nearest_enemy.id # Attack target if in range if building.target_unit_id and building.target_unit_id in self.game_state.units: target = self.game_state.units[building.target_unit_id] distance = building.position.distance_to(target.position) if distance <= turret_range: # Attack! if building.attack_cooldown <= 0: damage = 20 # Turret damage target.health -= damage building.attack_cooldown = 30 # 30 frames cooldown building.attack_animation = 10 if target.health <= 0: # Target destroyed del self.game_state.units[building.target_unit_id] building.target_unit_id = None else: # Out of range, lose target building.target_unit_id = None # Decrease cooldowns if building.attack_cooldown > 0: building.attack_cooldown -= 1 if building.attack_animation > 0: building.attack_animation -= 1 if building.production_queue: # RED ALERT: Check power status for this building's player _, _, power_status = self.game_state.calculate_power(building.player_id) # Adjust production speed based on power production_speed = 0.01 if power_status == 'red': production_speed *= LOW_POWER_PRODUCTION_FACTOR # 50% speed when low power building.production_progress += production_speed if building.production_progress >= 1.0: # Complete production unit_type = UnitType(building.production_queue.pop(0)) spawn_pos = Position( building.position.x + TILE_SIZE * 2, building.position.y + TILE_SIZE * 2 ) # Find free position near spawn point new_unit = self.game_state.create_unit(unit_type, building.player_id, spawn_pos) if new_unit: free_pos = self.find_free_position_nearby(spawn_pos, new_unit.id) new_unit.position = free_pos building.production_progress = 0 # Endgame checks: elimination-first (after all destructions this tick) if not self.game_state.game_over: p_units = sum(1 for u in self.game_state.units.values() if u.player_id == 0) e_units = sum(1 for u in self.game_state.units.values() if u.player_id == 1) p_buildings = sum(1 for b in self.game_state.buildings.values() if b.player_id == 0) e_buildings = sum(1 for b in self.game_state.buildings.values() if b.player_id == 1) player_language = self.game_state.players[0].language if 0 in self.game_state.players else "en" if p_units == 0 and p_buildings == 0 and (e_units > 0 or e_buildings > 0): # Player eliminated self.game_state.game_over = True self.game_state.winner = "enemy" winner_name = LOCALIZATION.translate(player_language, "game.winner.enemy") message = LOCALIZATION.translate(player_language, "game.win.banner", winner=winner_name) asyncio.create_task(self.broadcast({ "type": "game_over", "winner": "enemy", "message": message })) elif e_units == 0 and e_buildings == 0 and (p_units > 0 or p_buildings > 0): # Enemy eliminated self.game_state.game_over = True self.game_state.winner = "player" winner_name = LOCALIZATION.translate(player_language, "game.winner.player") message = LOCALIZATION.translate(player_language, "game.win.banner", winner=winner_name) asyncio.create_task(self.broadcast({ "type": "game_over", "winner": "player", "message": message })) elif p_units == 0 and p_buildings == 0 and e_units == 0 and e_buildings == 0: # Total annihilation -> draw self.game_state.game_over = True self.game_state.winner = "draw" # Localized draw banner if available try: draw_msg = LOCALIZATION.translate(player_language, "game.draw.banner") except Exception: draw_msg = "Draw!" asyncio.create_task(self.broadcast({ "type": "game_over", "winner": "draw", "message": draw_msg })) def find_nearest_enemy(self, unit: Unit) -> Optional[Unit]: """RED ALERT: Find nearest enemy unit""" min_dist = float('inf') nearest_enemy = None for other_unit in self.game_state.units.values(): if other_unit.player_id != unit.player_id: dist = unit.position.distance_to(other_unit.position) if dist < min_dist: min_dist = dist nearest_enemy = other_unit return nearest_enemy def is_position_occupied(self, position: Position, current_unit_id: str, radius: float = 15.0) -> bool: """Check if a position is occupied by another unit""" for unit_id, unit in self.game_state.units.items(): if unit_id == current_unit_id: continue distance = position.distance_to(unit.position) if distance < (radius + unit.collision_radius): return True return False def find_free_position_nearby(self, position: Position, unit_id: str, max_attempts: int = 16) -> Position: """Find a free position around the given position using spiral search""" # Check if current position is free if not self.is_position_occupied(position, unit_id): return position # Spiral search outward with circular pattern import math for ring in range(1, max_attempts + 1): # Number of positions to test in this ring (8 directions) num_positions = 8 radius = 25.0 * ring # Increase radius with each ring for i in range(num_positions): angle = (i * 360.0 / num_positions) * (math.pi / 180.0) # Convert to radians offset_x = radius * math.cos(angle) offset_y = radius * math.sin(angle) new_pos = Position( position.x + offset_x, position.y + offset_y ) # Keep within map bounds new_pos.x = max(TILE_SIZE, min(MAP_WIDTH * TILE_SIZE - TILE_SIZE, new_pos.x)) new_pos.y = max(TILE_SIZE, min(MAP_HEIGHT * TILE_SIZE - TILE_SIZE, new_pos.y)) if not self.is_position_occupied(new_pos, unit_id): return new_pos # If no free position found, return original (fallback) return position def apply_unit_dispersion(self, unit: Unit): """Apply automatic dispersion to prevent units from overlapping""" if self.is_position_occupied(unit.position, unit.id): new_position = self.find_free_position_nearby(unit.position, unit.id) unit.position = new_position def update_harvester(self, unit: Unit): """RED ALERT: Harvester AI - auto-collect resources!""" # If returning to base with cargo if unit.returning: # Find nearest Refinery or HQ depot = self.find_nearest_depot(unit.player_id, unit.position) if depot: distance = unit.position.distance_to(depot.position) if distance < TILE_SIZE * 2: # Deposit cargo self.game_state.players[unit.player_id].credits += unit.cargo unit.cargo = 0 unit.returning = False unit.gathering = False unit.ore_target = None unit.target = None # Clear target after deposit unit.manual_control = False # Resume AI after deposit else: # Move to depot unit.target = Position(depot.position.x, depot.position.y) else: # No depot - stop returning unit.returning = False return # If at ore patch, harvest it if unit.ore_target: distance = ((unit.position.x - unit.ore_target.x) ** 2 + (unit.position.y - unit.ore_target.y) ** 2) ** 0.5 if distance < TILE_SIZE / 2: # Harvest ore tile_x = int(unit.ore_target.x / TILE_SIZE) tile_y = int(unit.ore_target.y / TILE_SIZE) if (0 <= tile_x < MAP_WIDTH and 0 <= tile_y < MAP_HEIGHT): terrain = self.game_state.terrain[tile_y][tile_x] if terrain == TerrainType.ORE: unit.cargo = min(HARVESTER_CAPACITY, unit.cargo + HARVEST_AMOUNT_PER_ORE) self.game_state.terrain[tile_y][tile_x] = TerrainType.GRASS elif terrain == TerrainType.GEM: unit.cargo = min(HARVESTER_CAPACITY, unit.cargo + HARVEST_AMOUNT_PER_GEM) self.game_state.terrain[tile_y][tile_x] = TerrainType.GRASS unit.ore_target = None unit.gathering = False # If cargo full or nearly full, return if unit.cargo >= HARVESTER_CAPACITY * 0.9: unit.returning = True unit.target = None else: # Move to ore unit.target = unit.ore_target return # FIXED: Always search for ore when idle (not gathering and no ore target) # This ensures Harvester automatically finds ore after spawning or depositing if not unit.gathering and not unit.ore_target: nearest_ore = self.find_nearest_ore(unit.position) if nearest_ore: unit.ore_target = nearest_ore unit.gathering = True unit.target = nearest_ore # If no ore found, clear any residual target to stay idle elif unit.target: unit.target = None def find_nearest_depot(self, player_id: int, position: Position) -> Optional[Building]: """Find nearest Refinery or HQ for harvester""" nearest = None min_dist = float('inf') for building in self.game_state.buildings.values(): if building.player_id == player_id: if building.type in [BuildingType.REFINERY, BuildingType.HQ]: dist = position.distance_to(building.position) if dist < min_dist: min_dist = dist nearest = building return nearest def find_nearest_ore(self, position: Position) -> Optional[Position]: """Find nearest ore or gem tile""" nearest = None min_dist = float('inf') for y in range(MAP_HEIGHT): for x in range(MAP_WIDTH): if self.game_state.terrain[y][x] in [TerrainType.ORE, TerrainType.GEM]: ore_pos = Position(x * TILE_SIZE + TILE_SIZE/2, y * TILE_SIZE + TILE_SIZE/2) dist = position.distance_to(ore_pos) if dist < min_dist: min_dist = dist nearest = ore_pos return nearest def update_ai_unit(self, unit: Unit): """RED ALERT: Enemy AI behavior - aggressive!""" if unit.damage == 0: # Don't attack with harvesters return # Always try to attack nearest enemy if not unit.target_unit_id: nearest_enemy = self.find_nearest_enemy(unit) if nearest_enemy: distance = unit.position.distance_to(nearest_enemy.position) # Attack if within aggro range if distance < 500: # Aggro range unit.target_unit_id = nearest_enemy.id def update_enemy_ai(self): """RED ALERT: Enemy AI strategic decision making""" player_ai = self.game_state.players[1] # 1. Check if AI should build next building if self.ai_build_index < len(self.ai_build_plan): next_building_type = self.ai_build_plan[self.ai_build_index] building_type = BuildingType(next_building_type) cost = BUILDING_COSTS.get(building_type, 0) # Check if AI can afford it if player_ai.credits >= cost: # Check if prerequisites are met (simplified) can_build = True # Check power plant requirement if building_type in [BuildingType.BARRACKS, BuildingType.REFINERY, BuildingType.WAR_FACTORY]: has_power_plant = any( b.type == BuildingType.POWER_PLANT and b.player_id == 1 for b in self.game_state.buildings.values() ) if not has_power_plant: can_build = False # Check barracks requirement for war factory if building_type == BuildingType.WAR_FACTORY: has_barracks = any( b.type == BuildingType.BARRACKS and b.player_id == 1 for b in self.game_state.buildings.values() ) if not has_barracks: can_build = False if can_build: # Find build location near AI HQ ai_hq = next( (b for b in self.game_state.buildings.values() if b.player_id == 1 and b.type == BuildingType.HQ), None ) if ai_hq: # Random offset from HQ offset_x = random.randint(-200, 200) offset_y = random.randint(-200, 200) build_pos = Position( max(TILE_SIZE * 3, min(MAP_WIDTH * TILE_SIZE - TILE_SIZE * 3, ai_hq.position.x + offset_x)), max(TILE_SIZE * 3, min(MAP_HEIGHT * TILE_SIZE - TILE_SIZE * 3, ai_hq.position.y + offset_y)) ) # Deduct credits player_ai.credits -= cost # Create building immediately (simplified - no construction queue for AI) self.game_state.create_building(building_type, 1, build_pos) print(f"🤖 AI built {building_type.value} at {build_pos.x:.0f},{build_pos.y:.0f}") # Move to next building in plan self.ai_build_index += 1 # 2. Produce units if we have production buildings ai_barracks = [ b for b in self.game_state.buildings.values() if b.player_id == 1 and b.type == BuildingType.BARRACKS ] ai_war_factory = [ b for b in self.game_state.buildings.values() if b.player_id == 1 and b.type == BuildingType.WAR_FACTORY ] # Produce infantry from barracks if ai_barracks and len(ai_barracks[0].production_queue) == 0: if player_ai.credits >= UNIT_COSTS[UnitType.INFANTRY]: player_ai.credits -= UNIT_COSTS[UnitType.INFANTRY] ai_barracks[0].production_queue.append(UnitType.INFANTRY.value) print(f"🤖 AI queued Infantry") # Produce vehicles from war factory (cycle through unit types) if ai_war_factory and len(ai_war_factory[0].production_queue) == 0: next_unit_type = self.ai_unit_cycle[self.ai_unit_index] unit_type = UnitType(next_unit_type) cost = UNIT_COSTS.get(unit_type, 0) if player_ai.credits >= cost: player_ai.credits -= cost ai_war_factory[0].production_queue.append(unit_type.value) self.ai_unit_index = (self.ai_unit_index + 1) % len(self.ai_unit_cycle) print(f"🤖 AI queued {unit_type.value}") # 3. Make AI harvesters collect resources for unit in self.game_state.units.values(): if unit.player_id == 1 and unit.type == UnitType.HARVESTER: if not unit.manual_control: # Harvester AI is handled by update_harvester() in main loop pass async def launch_nuke(self, player_id: int, target: Position): """ Launch a nuclear missile at the target location. Args: player_id (int): The ID of the player launching the nuke. target (Position): The target location for the nuke. """ # Check if player has superweapon ready player = self.game_state.players.get(player_id) if not player or not player.superweapon_ready: return {"success": False, "error": "Superweapon not ready"} # Find all units in the target area (5x5 tiles) affected_units = [] for unit in self.game_state.units.values(): if unit.position.x >= target.x - TILE_SIZE and unit.position.x <= target.x + TILE_SIZE and \ unit.position.y >= target.y - TILE_SIZE and unit.position.y <= target.y + TILE_SIZE: affected_units.append(unit) # Find all buildings in the target area affected_buildings = [] for building in self.game_state.buildings.values(): if building.position.x >= target.x - TILE_SIZE and building.position.x <= target.x + TILE_SIZE and \ building.position.y >= target.y - TILE_SIZE and building.position.y <= target.y + TILE_SIZE: affected_buildings.append(building) # Deal damage to units (50% chance to destroy each unit) for unit in affected_units: if random.random() < 0.5: # Destroyed del self.game_state.units[unit.id] else: # Damaged (survived) unit.health = max(1, unit.health - 50) # Destroy buildings (except HQ, which cannot be destroyed by nukes) for building in affected_buildings: if building.type != BuildingType.HQ: del self.game_state.buildings[building.id] # Reset superweapon charge player.superweapon_charge = 0 player.superweapon_ready = False # Broadcast nuke launch event await self.broadcast({ "type": "nuke_launched", "player_id": player_id, "target": target.to_dict(), "affected_units": [u.to_dict() for u in affected_units], "affected_buildings": [b.to_dict() for b in affected_buildings] }) return {"success": True, "message": "Nuclear missile launched"} async def execute_mcp_call(self, mcp_call: dict) -> dict: """Execute an MCP tool call on the game state""" tool = mcp_call.get("tool") args = mcp_call.get("args", {}) try: if tool == "get_game_state": return { "action": "get_game_state", "data": self.game_state.to_dict() } elif tool == "move_units": unit_ids = args.get("unit_ids", []) target_x = args.get("target_x", 0) target_y = args.get("target_y", 0) # Find units by type or ID moved_units = [] for unit_id, unit in self.game_state.units.items(): if unit.player_id == 0: # Player units if unit.type.name.lower() in unit_ids or unit_id in unit_ids: unit.target = Position(target_x, target_y) moved_units.append(unit_id) return { "action": "move_units", "units_moved": len(moved_units), "target": (target_x, target_y) } elif tool == "attack_unit": attacker_ids = args.get("attacker_ids", []) target_id = args.get("target_id", "") # Find target unit target_unit = None for unit_id, unit in self.game_state.units.items(): if unit.player_id == 1 and (unit_id == target_id or str(unit.type).lower() == target_id.lower()): target_unit = unit break if target_unit: # Set attackers to target this unit attackers_set = 0 for unit_id, unit in self.game_state.units.items(): if unit.player_id == 0: # Player units if unit.type.name.lower() in attacker_ids or unit_id in attacker_ids: unit.target_unit_id = target_unit.id attackers_set += 1 return { "action": "attack_unit", "target": target_id, "attackers": attackers_set } else: return { "action": "attack_unit", "error": f"Target unit {target_id} not found" } elif tool == "build_building": building_type = args.get("building_type", "") position_x = args.get("position_x", 0) position_y = args.get("position_y", 0) player_id = args.get("player_id", 0) # Map building type string to enum building_map = { "hq": BuildingType.HQ, "power_plant": BuildingType.POWER_PLANT, "barracks": BuildingType.BARRACKS, "war_factory": BuildingType.WAR_FACTORY, "refinery": BuildingType.REFINERY, "defense_turret": BuildingType.DEFENSE_TURRET } building_enum = building_map.get(building_type.lower()) if building_enum: # Check if player has enough credits player = self.game_state.players.get(player_id) building_cost = { BuildingType.HQ: 0, # Can't build HQ BuildingType.POWER_PLANT: 300, BuildingType.BARRACKS: 500, BuildingType.WAR_FACTORY: 800, BuildingType.REFINERY: 600, BuildingType.DEFENSE_TURRET: 400 } cost = building_cost.get(building_enum, 1000) if player and player.credits >= cost: player.credits -= cost building_id = str(uuid.uuid4()) self.game_state.buildings[building_id] = Building( id=building_id, type=building_enum, player_id=player_id, position=Position(position_x, position_y), health=500, max_health=500, production_queue=[], production_progress=0 ) return { "action": "build_building", "building": building_type, "position": (position_x, position_y), "cost": cost } else: return { "action": "build_building", "error": f"Not enough credits. Need {cost}, have {player.credits if player else 0}" } else: return { "action": "build_building", "error": f"Unknown building type: {building_type}" } elif tool == "get_ai_analysis": language = args.get("language", "fr") # Use the existing AI analysis system from ai_analysis import get_ai_analyzer analyzer = get_ai_analyzer() if analyzer: analysis = analyzer.summarize_combat_situation(self.game_state.to_dict(), language) return { "action": "get_ai_analysis", "analysis": analysis } else: return { "action": "get_ai_analysis", "error": "AI analyzer not available" } else: return { "action": "unknown_tool", "error": f"Unknown MCP tool: {tool}" } except Exception as e: return { "action": "error", "error": str(e) } async def handle_nl_command(self, websocket: WebSocket, command: str): """Handle natural language commands from users""" try: # Translate NL to MCP translation_result = translate_nl_to_mcp(command) if translation_result.get("success"): mcp_call = translation_result["translation"] # Execute the MCP call result = await self.execute_mcp_call(mcp_call) # Send response back to user await websocket.send_json({ "type": "nl_command_response", "original_command": command, "translation": mcp_call, "result": result, "success": True }) else: # Send error response await websocket.send_json({ "type": "nl_command_response", "original_command": command, "error": translation_result.get("error", "Translation failed"), "clarification": translation_result.get("clarification", ""), "success": False }) except Exception as e: await websocket.send_json({ "type": "nl_command_response", "original_command": command, "error": f"Command processing error: {str(e)}", "success": False }) async def handle_message(self, websocket: WebSocket, message: dict): """Handle incoming WebSocket messages""" try: if message["type"] == "nl_command": # Handle natural language command nl_text = message.get("text", "") language = message.get("language", "fr") if not nl_text.strip(): await websocket.send_json({ "type": "nl_command_response", "status": "error", "message": "Empty command received" }) return # Translate natural language to MCP try: from nl_to_mcp_translator import translate_nl_to_mcp mcp_call = translate_nl_to_mcp(nl_text, language) if mcp_call.get("error"): await websocket.send_json({ "type": "nl_command_response", "status": "error", "message": f"Translation error: {mcp_call['error']}", "original_text": nl_text }) return # Execute the MCP call result = await self.execute_mcp_call(mcp_call) # Send response back to client await websocket.send_json({ "type": "nl_command_response", "status": "success", "message": f"Command executed: {result.get('action', 'unknown')}", "result": result, "original_text": nl_text, "translated_call": mcp_call }) # Broadcast game state update to all clients state_dict = self.game_state.to_dict() await self.broadcast({ "type": "state_update", "state": state_dict }) except Exception as e: await websocket.send_json({ "type": "nl_command_response", "status": "error", "message": f"Command execution failed: {str(e)}", "original_text": nl_text }) # Handle other message types here... # ...existing code...