|
|
|
|
|
|
|
|
|
|
|
import asyncio
|
|
|
import aiohttp
|
|
|
from playwright.async_api import async_playwright, Page
|
|
|
from bs4 import BeautifulSoup
|
|
|
import requests
|
|
|
import tweepy
|
|
|
import pandas as pd
|
|
|
import time
|
|
|
import re
|
|
|
from datetime import datetime, timedelta
|
|
|
from huggingface_hub import InferenceClient
|
|
|
import os
|
|
|
import sys
|
|
|
import random
|
|
|
|
|
|
import sqlite3
|
|
|
import logging
|
|
|
import imaplib
|
|
|
import email
|
|
|
import smtplib
|
|
|
from email.mime.text import MIMEText
|
|
|
import schedule
|
|
|
import hashlib
|
|
|
import json
|
|
|
import threading
|
|
|
import subprocess
|
|
|
from contextlib import contextmanager
|
|
|
from dataclasses import dataclass
|
|
|
from typing import List, Dict, Optional, Tuple
|
|
|
from urllib.parse import urljoin, urlparse
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@dataclass
|
|
|
class PersonalInfo:
|
|
|
prenom: str = "Valentin"
|
|
|
nom: str = "Cora"
|
|
|
email: str = "[email protected]"
|
|
|
email_derivee: str = "[email protected]"
|
|
|
telephone: str = "+41791234567"
|
|
|
adresse: str = "Av Chantemerle 9"
|
|
|
code_postal: str = "1009"
|
|
|
ville: str = "Pully"
|
|
|
pays: str = "Suisse"
|
|
|
|
|
|
@dataclass
|
|
|
class APIConfig:
|
|
|
hf_token: Optional[str] = None
|
|
|
x_token: Optional[str] = None
|
|
|
google_api_key: Optional[str] = None
|
|
|
google_cx: Optional[str] = None
|
|
|
gemini_key: Optional[str] = None
|
|
|
email_app_password: Optional[str] = None
|
|
|
telegram_bot_token: Optional[str] = None
|
|
|
telegram_chat_id: Optional[str] = None
|
|
|
|
|
|
@classmethod
|
|
|
def from_env(cls):
|
|
|
return cls(
|
|
|
hf_token=os.getenv("HF_TOKEN"),
|
|
|
x_token=os.getenv("X_TOKEN"),
|
|
|
google_api_key=os.getenv("GOOGLE_API_KEY"),
|
|
|
google_cx=os.getenv("GOOGLE_CX"),
|
|
|
gemini_key=os.getenv("GEMINI_KEY"),
|
|
|
email_app_password=os.getenv("EMAIL_APP_PASSWORD"),
|
|
|
telegram_bot_token=os.getenv("TELEGRAM_BOT_TOKEN"),
|
|
|
telegram_chat_id=os.getenv("TELEGRAM_CHAT_ID")
|
|
|
)
|
|
|
|
|
|
@dataclass
|
|
|
class Contest:
|
|
|
title: str
|
|
|
url: str
|
|
|
description: str
|
|
|
source: str
|
|
|
deadline: Optional[str] = None
|
|
|
prize: Optional[str] = None
|
|
|
difficulty_score: int = 0
|
|
|
|
|
|
@dataclass
|
|
|
class FormField:
|
|
|
selector: str
|
|
|
field_type: str
|
|
|
label: str
|
|
|
required: bool
|
|
|
current_value: str = ""
|
|
|
ai_context: str = ""
|
|
|
|
|
|
@dataclass
|
|
|
class FormAnalysis:
|
|
|
fields: List[FormField]
|
|
|
complexity_score: int
|
|
|
estimated_success_rate: float
|
|
|
requires_captcha: bool
|
|
|
requires_social_media: bool
|
|
|
form_url: str
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
PERSONAL_INFO = PersonalInfo()
|
|
|
API_CONFIG = APIConfig.from_env()
|
|
|
|
|
|
|
|
|
SITES_CH = [
|
|
|
'https://www.concours.ch/concours/tous',
|
|
|
'https://www.jeu-concours.biz/concours-pays_suisse.html',
|
|
|
'https://www.loisirs.ch/concours/',
|
|
|
'https://www.radin.ch/',
|
|
|
'https://win4win.ch/fr/',
|
|
|
'https://www.concours-suisse.ch/',
|
|
|
'https://corporate.migros.ch/fr/concours',
|
|
|
'https://www.20min.ch/fr/concours-et-jeux',
|
|
|
'https://dein-gewinnspiel.ch/en',
|
|
|
'https://www.myswitzerland.com/fr/planification/vie-pratique/concours/'
|
|
|
]
|
|
|
|
|
|
|
|
|
PROXY_LIST = ["http://20.206.106.192:80", "http://51.15.242.202:8888"]
|
|
|
USER_AGENTS = [
|
|
|
'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36',
|
|
|
'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36',
|
|
|
'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36'
|
|
|
]
|
|
|
|
|
|
|
|
|
logging.basicConfig(
|
|
|
level=logging.INFO,
|
|
|
format='%(asctime)s - %(levelname)s - %(message)s',
|
|
|
handlers=[
|
|
|
logging.FileHandler('concours_bot.log'),
|
|
|
logging.StreamHandler()
|
|
|
]
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class APITester:
|
|
|
def __init__(self, api_config: APIConfig):
|
|
|
self.api_config = api_config
|
|
|
|
|
|
def test_all_apis(self) -> Dict[str, bool]:
|
|
|
"""Teste toutes les APIs configurées"""
|
|
|
results = {}
|
|
|
|
|
|
print("🔍 Test de la configuration des APIs...")
|
|
|
print("=" * 40)
|
|
|
|
|
|
results['gemini'] = self._test_gemini()
|
|
|
results['telegram'] = self._test_telegram()
|
|
|
results['google_search'] = self._test_google_search()
|
|
|
results['huggingface'] = self._test_huggingface()
|
|
|
results['email'] = self._test_email_config()
|
|
|
|
|
|
return results
|
|
|
|
|
|
def _test_gemini(self) -> bool:
|
|
|
"""Test de l'API Gemini - DÉSACTIVÉ"""
|
|
|
print("🤖 Test Gemini API... DÉSACTIVÉ (utilisation d'alternatives locales)")
|
|
|
print("✅ Système de réponses locales activé")
|
|
|
return True
|
|
|
|
|
|
def _test_telegram(self) -> bool:
|
|
|
"""Test du Bot Telegram"""
|
|
|
print("\n📱 Test Telegram Bot...")
|
|
|
|
|
|
if not self.api_config.telegram_bot_token:
|
|
|
print("❌ TELEGRAM_BOT_TOKEN non configuré")
|
|
|
return False
|
|
|
|
|
|
if not self.api_config.telegram_chat_id:
|
|
|
print("❌ TELEGRAM_CHAT_ID non configuré")
|
|
|
return False
|
|
|
|
|
|
try:
|
|
|
url = f"https://api.telegram.org/bot{self.api_config.telegram_bot_token}/getMe"
|
|
|
response = requests.get(url, timeout=10)
|
|
|
|
|
|
if response.status_code == 200:
|
|
|
bot_info = response.json()
|
|
|
print(f"✅ Bot connecté: {bot_info['result']['first_name']}")
|
|
|
|
|
|
|
|
|
message_url = f"https://api.telegram.org/bot{self.api_config.telegram_bot_token}/sendMessage"
|
|
|
test_message = {
|
|
|
'chat_id': self.api_config.telegram_chat_id,
|
|
|
'text': '🎉 Test de configuration réussi ! Votre bot de concours est prêt.'
|
|
|
}
|
|
|
|
|
|
msg_response = requests.post(message_url, json=test_message, timeout=10)
|
|
|
if msg_response.status_code == 200:
|
|
|
print("✅ Message de test envoyé avec succès")
|
|
|
return True
|
|
|
else:
|
|
|
print(f"❌ Erreur envoi message: {msg_response.text}")
|
|
|
return False
|
|
|
else:
|
|
|
print(f"❌ Erreur connexion bot: {response.text}")
|
|
|
return False
|
|
|
|
|
|
except Exception as e:
|
|
|
print(f"❌ Erreur Telegram: {e}")
|
|
|
return False
|
|
|
|
|
|
def _test_google_search(self) -> bool:
|
|
|
"""Test de l'API Google Search"""
|
|
|
print("\n🔍 Test Google Search API...")
|
|
|
|
|
|
if not self.api_config.google_api_key:
|
|
|
print("❌ GOOGLE_API_KEY non configurée")
|
|
|
return False
|
|
|
|
|
|
if not self.api_config.google_cx:
|
|
|
print("❌ GOOGLE_CX non configuré")
|
|
|
return False
|
|
|
|
|
|
try:
|
|
|
url = "https://www.googleapis.com/customsearch/v1"
|
|
|
params = {
|
|
|
"key": self.api_config.google_api_key,
|
|
|
"cx": self.api_config.google_cx,
|
|
|
"q": "concours suisse test",
|
|
|
"num": 1
|
|
|
}
|
|
|
|
|
|
response = requests.get(url, params=params, timeout=10)
|
|
|
|
|
|
if response.status_code == 200:
|
|
|
results = response.json()
|
|
|
if 'items' in results:
|
|
|
print(f"✅ Google Search fonctionne: {len(results['items'])} résultat(s)")
|
|
|
return True
|
|
|
else:
|
|
|
print("⚠️ Google Search configuré mais aucun résultat")
|
|
|
return True
|
|
|
else:
|
|
|
print(f"❌ Erreur Google Search: {response.text}")
|
|
|
return False
|
|
|
|
|
|
except Exception as e:
|
|
|
print(f"❌ Erreur Google Search: {e}")
|
|
|
return False
|
|
|
|
|
|
def _test_huggingface(self) -> bool:
|
|
|
"""Test de l'API HuggingFace"""
|
|
|
print("\n🤗 Test HuggingFace API...")
|
|
|
|
|
|
if not self.api_config.hf_token:
|
|
|
print("❌ HF_TOKEN non configuré")
|
|
|
return False
|
|
|
|
|
|
try:
|
|
|
client = InferenceClient(token=self.api_config.hf_token)
|
|
|
result = client.text_generation(
|
|
|
"Bonjour, comment allez-vous?",
|
|
|
model="gpt2",
|
|
|
max_new_tokens=10
|
|
|
)
|
|
|
print("✅ HuggingFace fonctionne")
|
|
|
return True
|
|
|
except Exception as e:
|
|
|
print(f"❌ Erreur HuggingFace: {e}")
|
|
|
return False
|
|
|
|
|
|
def _test_email_config(self) -> bool:
|
|
|
"""Test de la configuration Email"""
|
|
|
print("\n📧 Test Configuration Email...")
|
|
|
|
|
|
if not self.api_config.email_app_password:
|
|
|
print("⚠️ EMAIL_APP_PASSWORD non configuré (optionnel)")
|
|
|
return False
|
|
|
|
|
|
if len(self.api_config.email_app_password) >= 10:
|
|
|
print("✅ Mot de passe Email configuré")
|
|
|
return True
|
|
|
else:
|
|
|
print("❌ Mot de passe Email semble invalide")
|
|
|
return False
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class SmartLauncher:
|
|
|
def __init__(self):
|
|
|
self.api_config = API_CONFIG
|
|
|
self.api_tester = APITester(self.api_config)
|
|
|
self.api_status = self._check_api_configuration()
|
|
|
|
|
|
def _check_api_configuration(self) -> Dict[str, bool]:
|
|
|
"""Vérifie quelles APIs sont configurées"""
|
|
|
return {
|
|
|
'gemini': bool(self.api_config.gemini_key),
|
|
|
'telegram': bool(self.api_config.telegram_bot_token and self.api_config.telegram_chat_id),
|
|
|
'google_search': bool(self.api_config.google_api_key and self.api_config.google_cx),
|
|
|
'huggingface': bool(self.api_config.hf_token),
|
|
|
'email': bool(self.api_config.email_app_password)
|
|
|
}
|
|
|
|
|
|
def display_status(self):
|
|
|
"""Affiche le statut de configuration"""
|
|
|
print("🔍 DÉTECTION DE LA CONFIGURATION")
|
|
|
print("=" * 40)
|
|
|
|
|
|
descriptions = {
|
|
|
'gemini': "Gemini API (IA intelligente)",
|
|
|
'telegram': "Telegram Bot (Alertes)",
|
|
|
'google_search': "Google Search (Plus de concours)",
|
|
|
'huggingface': "HuggingFace (IA backup)",
|
|
|
'email': "Email Analysis (Détection gains)"
|
|
|
}
|
|
|
|
|
|
configured_count = 0
|
|
|
for api, configured in self.api_status.items():
|
|
|
icon = "✅" if configured else "❌"
|
|
|
desc = descriptions[api]
|
|
|
print(f"{icon} {desc}")
|
|
|
if configured:
|
|
|
configured_count += 1
|
|
|
|
|
|
print(f"\n📊 APIs configurées: {configured_count}/5")
|
|
|
return configured_count
|
|
|
|
|
|
def calculate_effectiveness(self) -> int:
|
|
|
"""Calcule l'efficacité estimée du bot"""
|
|
|
base_effectiveness = 40
|
|
|
|
|
|
if self.api_status['gemini']:
|
|
|
base_effectiveness += 30
|
|
|
if self.api_status['telegram']:
|
|
|
base_effectiveness += 10
|
|
|
if self.api_status['google_search']:
|
|
|
base_effectiveness += 15
|
|
|
if self.api_status['huggingface']:
|
|
|
base_effectiveness += 3
|
|
|
if self.api_status['email']:
|
|
|
base_effectiveness += 2
|
|
|
|
|
|
return min(base_effectiveness, 100)
|
|
|
|
|
|
def show_performance_impact(self):
|
|
|
"""Montre l'impact sur les performances"""
|
|
|
print("\n🎯 IMPACT SUR LES PERFORMANCES:")
|
|
|
print("-" * 40)
|
|
|
|
|
|
if self.api_status['gemini']:
|
|
|
print("✅ Réponses IA personnalisées et intelligentes")
|
|
|
else:
|
|
|
print("⚠️ Utilisation de réponses préprogrammées")
|
|
|
|
|
|
if self.api_status['telegram']:
|
|
|
print("✅ Alertes en temps réel des victoires")
|
|
|
else:
|
|
|
print("⚠️ Pas d'alertes automatiques")
|
|
|
|
|
|
if self.api_status['google_search']:
|
|
|
print("✅ Recherche étendue de concours")
|
|
|
else:
|
|
|
print("⚠️ Scraping limité aux sites directs")
|
|
|
|
|
|
effectiveness = self.calculate_effectiveness()
|
|
|
print(f"\n📈 Efficacité estimée: {effectiveness}%")
|
|
|
|
|
|
def show_setup_guide(self):
|
|
|
"""Affiche le guide de configuration"""
|
|
|
print("\n📖 GUIDE DE CONFIGURATION DES APIs GRATUITES")
|
|
|
print("=" * 50)
|
|
|
|
|
|
print("\n🤖 1. GEMINI API (Google) - PRIORITÉ HAUTE")
|
|
|
print(" • Aller sur: https://aistudio.google.com/")
|
|
|
print(" • Se connecter avec Google")
|
|
|
print(" • Cliquer 'Get API Key'")
|
|
|
print(" • Copier la clé (AIza...)")
|
|
|
print(" • Variable: GEMINI_KEY")
|
|
|
|
|
|
print("\n📱 2. TELEGRAM BOT - PRIORITÉ HAUTE")
|
|
|
print(" • Chercher @BotFather sur Telegram")
|
|
|
print(" • Envoyer: /newbot")
|
|
|
print(" • Suivre les instructions")
|
|
|
print(" • Chercher @userinfobot pour Chat ID")
|
|
|
print(" • Variables: TELEGRAM_BOT_TOKEN, TELEGRAM_CHAT_ID")
|
|
|
|
|
|
print("\n🔍 3. GOOGLE SEARCH API - PRIORITÉ MOYENNE")
|
|
|
print(" • Aller sur: console.cloud.google.com")
|
|
|
print(" • Activer Custom Search API")
|
|
|
print(" • Créer une clé API")
|
|
|
print(" • Créer un moteur de recherche sur: cse.google.com")
|
|
|
print(" • Variables: GOOGLE_API_KEY, GOOGLE_CX")
|
|
|
|
|
|
print("\n🤗 4. HUGGINGFACE TOKEN - PRIORITÉ BASSE")
|
|
|
print(" • Créer compte sur: huggingface.co")
|
|
|
print(" • Aller dans Settings > Tokens")
|
|
|
print(" • Créer un token 'Read'")
|
|
|
print(" • Variable: HF_TOKEN")
|
|
|
|
|
|
print("\n📧 5. EMAIL OUTLOOK - OPTIONNEL")
|
|
|
print(" • Aller sur: outlook.live.com")
|
|
|
print(" • Paramètres > Sécurité")
|
|
|
print(" • Créer mot de passe d'application")
|
|
|
print(" • Variable: EMAIL_APP_PASSWORD")
|
|
|
|
|
|
def launch_options(self):
|
|
|
"""Affiche les options de lancement"""
|
|
|
print("\n🚀 OPTIONS DISPONIBLES:")
|
|
|
print("-" * 30)
|
|
|
print("1️⃣ Lancer le bot avec la configuration actuelle")
|
|
|
print("2️⃣ Tester toutes les APIs configurées")
|
|
|
print("3️⃣ Afficher le guide de configuration")
|
|
|
print("4️⃣ Configuration rapide PowerShell")
|
|
|
print("5️⃣ Quitter")
|
|
|
|
|
|
def show_powershell_config(self):
|
|
|
"""Affiche la configuration PowerShell rapide"""
|
|
|
print("\n⚡ CONFIGURATION RAPIDE POWERSHELL")
|
|
|
print("=" * 40)
|
|
|
print("Copiez-collez ces commandes dans PowerShell (remplacez par vos vraies clés):")
|
|
|
print()
|
|
|
print('$env:GEMINI_KEY="AIzaSyC-VOTRE-CLE-GEMINI"')
|
|
|
print('$env:TELEGRAM_BOT_TOKEN="123456:ABC-VOTRE-TOKEN"')
|
|
|
print('$env:TELEGRAM_CHAT_ID="VOTRE-CHAT-ID"')
|
|
|
print('$env:GOOGLE_API_KEY="AIzaSyD-VOTRE-CLE-GOOGLE"')
|
|
|
print('$env:GOOGLE_CX="VOTRE-SEARCH-ENGINE-ID"')
|
|
|
print('$env:HF_TOKEN="hf_VOTRE-TOKEN"')
|
|
|
print('$env:EMAIL_APP_PASSWORD="VOTRE-MOT-DE-PASSE"')
|
|
|
print()
|
|
|
print("⚠️ Redémarrez votre terminal après configuration")
|
|
|
|
|
|
def main_menu(self):
|
|
|
"""Menu principal intelligent"""
|
|
|
print("🎰 BOT DE CONCOURS SUISSE - LANCEUR INTELLIGENT")
|
|
|
print("=" * 55)
|
|
|
|
|
|
configured_count = self.display_status()
|
|
|
self.show_performance_impact()
|
|
|
|
|
|
if configured_count == 0:
|
|
|
print("\n⚠️ AUCUNE API CONFIGURÉE")
|
|
|
print("Le bot fonctionnera en mode basique uniquement.")
|
|
|
print("Recommandation: Configurez au moins Gemini + Telegram")
|
|
|
|
|
|
elif configured_count < 3:
|
|
|
print("\n💡 CONFIGURATION PARTIELLE")
|
|
|
print("Pour de meilleures performances, configurez plus d'APIs.")
|
|
|
|
|
|
else:
|
|
|
print("\n🎉 EXCELLENTE CONFIGURATION !")
|
|
|
print("Votre bot est prêt pour des performances optimales.")
|
|
|
|
|
|
self.launch_options()
|
|
|
|
|
|
while True:
|
|
|
choice = input("\n🎯 Votre choix (1-5): ").strip()
|
|
|
|
|
|
if choice == "1":
|
|
|
print("\n🎬 Lancement du bot...")
|
|
|
return True
|
|
|
|
|
|
elif choice == "2":
|
|
|
print("\n🧪 Test des APIs...")
|
|
|
results = self.api_tester.test_all_apis()
|
|
|
working_count = sum(results.values())
|
|
|
print(f"\n📊 Résumé: {working_count}/5 APIs fonctionnelles")
|
|
|
|
|
|
if working_count >= 2:
|
|
|
print("🚀 Prêt à lancer le bot !")
|
|
|
else:
|
|
|
print("⚠️ Configurez au moins 2 APIs pour de meilleurs résultats")
|
|
|
|
|
|
elif choice == "3":
|
|
|
self.show_setup_guide()
|
|
|
|
|
|
elif choice == "4":
|
|
|
self.show_powershell_config()
|
|
|
|
|
|
elif choice == "5":
|
|
|
print("👋 À bientôt !")
|
|
|
return False
|
|
|
|
|
|
else:
|
|
|
print("❌ Veuillez choisir 1, 2, 3, 4 ou 5")
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class DatabaseManager:
|
|
|
def __init__(self, db_path: str = 'concours_optimized.sqlite'):
|
|
|
self.db_path = db_path
|
|
|
self.local = threading.local()
|
|
|
self._init_db()
|
|
|
|
|
|
def _get_connection(self):
|
|
|
if not hasattr(self.local, 'conn'):
|
|
|
self.local.conn = sqlite3.connect(self.db_path)
|
|
|
self.local.conn.row_factory = sqlite3.Row
|
|
|
return self.local.conn
|
|
|
|
|
|
@contextmanager
|
|
|
def transaction(self):
|
|
|
conn = self._get_connection()
|
|
|
try:
|
|
|
yield conn
|
|
|
conn.commit()
|
|
|
except Exception:
|
|
|
conn.rollback()
|
|
|
raise
|
|
|
|
|
|
def _init_db(self):
|
|
|
with self.transaction() as conn:
|
|
|
conn.execute('''
|
|
|
CREATE TABLE IF NOT EXISTS participations (
|
|
|
url TEXT PRIMARY KEY,
|
|
|
title TEXT,
|
|
|
source TEXT,
|
|
|
status TEXT,
|
|
|
difficulty_score INTEGER,
|
|
|
success_rate REAL,
|
|
|
date TEXT,
|
|
|
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
|
|
|
)
|
|
|
''')
|
|
|
conn.execute('''
|
|
|
CREATE TABLE IF NOT EXISTS victories (
|
|
|
email_id TEXT PRIMARY KEY,
|
|
|
date TEXT,
|
|
|
lot TEXT,
|
|
|
source TEXT,
|
|
|
confirmed BOOLEAN DEFAULT FALSE,
|
|
|
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
|
|
|
)
|
|
|
''')
|
|
|
conn.execute('''
|
|
|
CREATE TABLE IF NOT EXISTS contest_cache (
|
|
|
url TEXT PRIMARY KEY,
|
|
|
content TEXT,
|
|
|
timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP
|
|
|
)
|
|
|
''')
|
|
|
conn.execute('CREATE INDEX IF NOT EXISTS idx_participations_date ON participations(date)')
|
|
|
conn.execute('CREATE INDEX IF NOT EXISTS idx_cache_timestamp ON contest_cache(timestamp)')
|
|
|
|
|
|
def add_participation(self, contest: Contest, status: str = 'pending', success_rate: float = 0.0):
|
|
|
with self.transaction() as conn:
|
|
|
conn.execute('''
|
|
|
INSERT OR REPLACE INTO participations
|
|
|
(url, title, source, status, difficulty_score, success_rate, date)
|
|
|
VALUES (?, ?, ?, ?, ?, ?, date('now'))
|
|
|
''', (contest.url, contest.title, contest.source, status, contest.difficulty_score, success_rate))
|
|
|
|
|
|
def participation_exists(self, url: str) -> bool:
|
|
|
conn = self._get_connection()
|
|
|
result = conn.execute("SELECT 1 FROM participations WHERE url = ?", (url,)).fetchone()
|
|
|
return result is not None
|
|
|
|
|
|
def add_victory(self, email_id: str, lot: str, source: str):
|
|
|
with self.transaction() as conn:
|
|
|
conn.execute('''
|
|
|
INSERT OR IGNORE INTO victories (email_id, date, lot, source)
|
|
|
VALUES (?, date('now'), ?, ?)
|
|
|
''', (email_id, lot, source))
|
|
|
|
|
|
def get_stats(self) -> Dict:
|
|
|
conn = self._get_connection()
|
|
|
stats = {}
|
|
|
|
|
|
stats['total_participations'] = conn.execute("SELECT COUNT(*) FROM participations").fetchone()[0]
|
|
|
stats['successful_participations'] = conn.execute("SELECT COUNT(*) FROM participations WHERE status='success'").fetchone()[0]
|
|
|
stats['total_victories'] = conn.execute("SELECT COUNT(*) FROM victories").fetchone()[0]
|
|
|
|
|
|
source_stats = conn.execute('''
|
|
|
SELECT source, COUNT(*) as count
|
|
|
FROM participations
|
|
|
GROUP BY source
|
|
|
ORDER BY count DESC
|
|
|
''').fetchall()
|
|
|
stats['by_source'] = {row[0]: row[1] for row in source_stats}
|
|
|
|
|
|
return stats
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class AIEngine:
|
|
|
def __init__(self, api_config: APIConfig):
|
|
|
self.api_config = api_config
|
|
|
self.cache = {}
|
|
|
|
|
|
def generate_response(self, question: str, context: str = "", response_type: str = "qa") -> str:
|
|
|
"""Génère une réponse IA avec fallback entre Gemini et HuggingFace"""
|
|
|
cache_key = hashlib.md5(f"{question}{context}{response_type}".encode()).hexdigest()
|
|
|
|
|
|
if cache_key in self.cache:
|
|
|
return self.cache[cache_key]
|
|
|
|
|
|
response = self._try_gemini(question, context, response_type)
|
|
|
if not response:
|
|
|
response = self._try_huggingface(question, context, response_type)
|
|
|
|
|
|
if not response:
|
|
|
response = self._generate_fallback_response(question, response_type)
|
|
|
|
|
|
self.cache[cache_key] = response
|
|
|
return response
|
|
|
|
|
|
def _try_gemini(self, question: str, context: str, response_type: str) -> Optional[str]:
|
|
|
|
|
|
return None
|
|
|
|
|
|
def _try_huggingface(self, question: str, context: str, response_type: str) -> Optional[str]:
|
|
|
if not self.api_config.hf_token:
|
|
|
return None
|
|
|
|
|
|
try:
|
|
|
client = InferenceClient(token=self.api_config.hf_token)
|
|
|
|
|
|
if response_type == "qa" and context:
|
|
|
result = client.question_answering(
|
|
|
question=question,
|
|
|
context=context,
|
|
|
model="distilbert-base-cased-distilled-squad"
|
|
|
)
|
|
|
return result.answer
|
|
|
else:
|
|
|
prompt = f"Question: {question}\nContexte: {context}\nRéponse:"
|
|
|
result = client.text_generation(
|
|
|
prompt,
|
|
|
model="gpt2",
|
|
|
max_new_tokens=100,
|
|
|
temperature=0.7
|
|
|
)
|
|
|
return result[0]['generated_text'].split('Réponse:')[-1].strip()
|
|
|
|
|
|
except Exception as e:
|
|
|
logging.warning(f"HuggingFace API error: {e}")
|
|
|
return None
|
|
|
|
|
|
def _generate_fallback_response(self, question: str, response_type: str) -> str:
|
|
|
"""Système de réponses intelligentes sans API"""
|
|
|
question_lower = question.lower()
|
|
|
|
|
|
if response_type == "motivation":
|
|
|
|
|
|
if any(word in question_lower for word in ["voyage", "vacances", "séjour"]):
|
|
|
return random.choice([
|
|
|
"J'adore voyager et découvrir de nouveaux horizons. Ce prix serait une opportunité fantastique pour moi de vivre une expérience inoubliable.",
|
|
|
"Voyager est ma passion et ce concours représente le voyage de mes rêves. J'espère avoir la chance de le remporter.",
|
|
|
"En tant que passionné de voyages, ce prix m'offrirait l'occasion parfaite de découvrir de nouveaux paysages et cultures."
|
|
|
])
|
|
|
elif any(word in question_lower for word in ["produit", "cosmétique", "beauté"]):
|
|
|
return random.choice([
|
|
|
"Je suis toujours à la recherche de nouveaux produits de qualité et j'aimerais beaucoup tester cette gamme.",
|
|
|
"Ces produits m'intéressent énormément et je serais ravi de pouvoir les découvrir.",
|
|
|
"J'ai entendu beaucoup de bien de cette marque et j'aimerais avoir l'opportunité de l'essayer."
|
|
|
])
|
|
|
elif any(word in question_lower for word in ["technologie", "smartphone", "ordinateur"]):
|
|
|
return random.choice([
|
|
|
"En tant que passionné de technologie, ce prix m'intéresse beaucoup et m'aiderait dans mes projets.",
|
|
|
"J'ai besoin de ce type d'équipement pour mes études et ce serait formidable de le gagner.",
|
|
|
"La technologie fait partie de ma vie quotidienne et ce prix serait très utile."
|
|
|
])
|
|
|
else:
|
|
|
return random.choice([
|
|
|
"Je participe avec enthousiasme à ce concours car le prix m'intéresse vraiment et correspond à mes besoins.",
|
|
|
"Ce concours m'attire particulièrement et je serais très heureux de remporter ce magnifique prix.",
|
|
|
"J'espère avoir la chance de gagner car ce prix me ferait énormément plaisir.",
|
|
|
"Je suis motivé à participer car cette opportunité pourrait vraiment changer ma journée."
|
|
|
])
|
|
|
|
|
|
elif response_type == "quiz":
|
|
|
|
|
|
if "suisse" in question_lower:
|
|
|
if "capitale" in question_lower:
|
|
|
return "Berne"
|
|
|
elif "langue" in question_lower:
|
|
|
return "Français, Allemand, Italien, Romanche"
|
|
|
elif "monnaie" in question_lower:
|
|
|
return "Franc suisse"
|
|
|
elif "population" in question_lower:
|
|
|
return "8.7 millions"
|
|
|
|
|
|
if "couleur" in question_lower:
|
|
|
return random.choice(["Rouge", "Bleu", "Vert", "Jaune"])
|
|
|
|
|
|
if any(word in question_lower for word in ["combien", "nombre", "quantité"]):
|
|
|
return random.choice(["3", "5", "10", "12", "20"])
|
|
|
|
|
|
if any(word in question_lower for word in ["année", "date", "quand"]):
|
|
|
return random.choice(["2024", "2023", "2025"])
|
|
|
|
|
|
|
|
|
return random.choice(["A", "B", "C", "Oui", "Non", "Vrai", "Faux"])
|
|
|
|
|
|
|
|
|
response_mapping = {
|
|
|
"age": random.choice(["25", "28", "30", "32"]),
|
|
|
"profession": random.choice(["Étudiant", "Employé", "Consultant"]),
|
|
|
"ville": "Pully",
|
|
|
"pays": "Suisse",
|
|
|
"default": "Merci"
|
|
|
}
|
|
|
|
|
|
return response_mapping.get(response_type, response_mapping["default"])
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class IntelligentScraper:
|
|
|
def __init__(self, db_manager: DatabaseManager, cache_duration_hours: int = 6):
|
|
|
self.db = db_manager
|
|
|
self.cache_duration = timedelta(hours=cache_duration_hours)
|
|
|
self.session = None
|
|
|
|
|
|
async def __aenter__(self):
|
|
|
connector = aiohttp.TCPConnector(limit=10, limit_per_host=3)
|
|
|
timeout = aiohttp.ClientTimeout(total=30, connect=10)
|
|
|
self.session = aiohttp.ClientSession(
|
|
|
connector=connector,
|
|
|
timeout=timeout,
|
|
|
headers={'User-Agent': random.choice(USER_AGENTS)}
|
|
|
)
|
|
|
return self
|
|
|
|
|
|
async def __aexit__(self, exc_type, exc_val, exc_tb):
|
|
|
if self.session:
|
|
|
await self.session.close()
|
|
|
|
|
|
async def scrape_all_sources(self) -> List[Contest]:
|
|
|
"""Scrape tous les sites et sources"""
|
|
|
all_contests = []
|
|
|
|
|
|
|
|
|
web_contests = await self._scrape_websites()
|
|
|
all_contests.extend(web_contests)
|
|
|
|
|
|
|
|
|
if API_CONFIG.google_api_key and API_CONFIG.google_cx:
|
|
|
google_contests = await self._scrape_google_search()
|
|
|
all_contests.extend(google_contests)
|
|
|
|
|
|
|
|
|
if API_CONFIG.x_token:
|
|
|
twitter_contests = await self._scrape_twitter()
|
|
|
all_contests.extend(twitter_contests)
|
|
|
|
|
|
|
|
|
unique_contests = self._filter_unique_contests(all_contests)
|
|
|
|
|
|
logging.info(f"Total contests found: {len(all_contests)}, unique new: {len(unique_contests)}")
|
|
|
return unique_contests
|
|
|
|
|
|
async def _scrape_websites(self) -> List[Contest]:
|
|
|
"""Scrape les sites web en parallèle"""
|
|
|
batch_size = 3
|
|
|
all_contests = []
|
|
|
|
|
|
for i in range(0, len(SITES_CH), batch_size):
|
|
|
batch = SITES_CH[i:i + batch_size]
|
|
|
tasks = [self._scrape_single_site(site) for site in batch]
|
|
|
|
|
|
batch_results = await asyncio.gather(*tasks, return_exceptions=True)
|
|
|
|
|
|
for result in batch_results:
|
|
|
if isinstance(result, list):
|
|
|
all_contests.extend(result)
|
|
|
elif isinstance(result, Exception):
|
|
|
logging.error(f"Batch scraping error: {result}")
|
|
|
|
|
|
await asyncio.sleep(2)
|
|
|
|
|
|
return all_contests
|
|
|
|
|
|
async def _scrape_single_site(self, url: str) -> List[Contest]:
|
|
|
"""Scrape un site web spécifique"""
|
|
|
try:
|
|
|
content = await self._fetch_with_retry(url)
|
|
|
if not content:
|
|
|
return []
|
|
|
|
|
|
soup = BeautifulSoup(content, 'html.parser')
|
|
|
contests = self._extract_contests_from_soup(soup, url)
|
|
|
|
|
|
logging.info(f"Found {len(contests)} contests on {url}")
|
|
|
return contests
|
|
|
|
|
|
except Exception as e:
|
|
|
logging.error(f"Error scraping {url}: {e}")
|
|
|
return []
|
|
|
|
|
|
async def _fetch_with_retry(self, url: str, max_retries: int = 3) -> Optional[str]:
|
|
|
"""Fetch avec retry et gestion d'erreurs"""
|
|
|
for attempt in range(max_retries):
|
|
|
try:
|
|
|
proxy = random.choice(PROXY_LIST) if PROXY_LIST else None
|
|
|
async with self.session.get(url, proxy=proxy) as response:
|
|
|
if response.status == 200:
|
|
|
return await response.text()
|
|
|
elif response.status == 429:
|
|
|
wait_time = 2 ** attempt * 5
|
|
|
logging.warning(f"Rate limited on {url}, waiting {wait_time}s")
|
|
|
await asyncio.sleep(wait_time)
|
|
|
else:
|
|
|
logging.warning(f"HTTP {response.status} for {url}")
|
|
|
|
|
|
except Exception as e:
|
|
|
logging.error(f"Attempt {attempt+1} failed for {url}: {e}")
|
|
|
if attempt < max_retries - 1:
|
|
|
await asyncio.sleep(2 ** attempt)
|
|
|
|
|
|
return None
|
|
|
|
|
|
def _extract_contests_from_soup(self, soup: BeautifulSoup, base_url: str) -> List[Contest]:
|
|
|
"""Extrait les concours d'une page HTML"""
|
|
|
contests = []
|
|
|
|
|
|
selectors = [
|
|
|
'.contest', '.concours', '.jeu', '.competition', '.giveaway',
|
|
|
'[data-contest]', '[data-concours]', '.prize', '.lot',
|
|
|
'article[class*="concours"]', '.entry', '.participate'
|
|
|
]
|
|
|
|
|
|
containers = []
|
|
|
for selector in selectors:
|
|
|
containers.extend(soup.select(selector))
|
|
|
|
|
|
if not containers:
|
|
|
containers = soup.find_all('a', href=re.compile(r'concours|jeu|contest|participate', re.I))
|
|
|
|
|
|
for container in containers[:20]:
|
|
|
try:
|
|
|
contest = self._parse_contest_container(container, base_url)
|
|
|
if contest and self._is_valid_contest(contest):
|
|
|
contests.append(contest)
|
|
|
except Exception as e:
|
|
|
logging.debug(f"Error parsing container: {e}")
|
|
|
|
|
|
return contests
|
|
|
|
|
|
def _parse_contest_container(self, container, base_url: str) -> Optional[Contest]:
|
|
|
"""Parse un conteneur de concours"""
|
|
|
title_selectors = ['h1', 'h2', 'h3', '.title', '.titre', '.contest-title']
|
|
|
title = ""
|
|
|
for selector in title_selectors:
|
|
|
title_elem = container.select_one(selector)
|
|
|
if title_elem:
|
|
|
title = title_elem.get_text(strip=True)
|
|
|
break
|
|
|
|
|
|
if not title:
|
|
|
title = container.get_text(strip=True)[:100]
|
|
|
|
|
|
url = ""
|
|
|
link_elem = container if container.name == 'a' else container.find('a')
|
|
|
if link_elem and link_elem.get('href'):
|
|
|
url = urljoin(base_url, link_elem['href'])
|
|
|
|
|
|
description = container.get_text(strip=True)[:500]
|
|
|
deadline = self._extract_deadline(description)
|
|
|
prize = self._extract_prize(description)
|
|
|
|
|
|
if not title or not url:
|
|
|
return None
|
|
|
|
|
|
return Contest(
|
|
|
title=title[:200],
|
|
|
url=url,
|
|
|
description=description,
|
|
|
source=base_url,
|
|
|
deadline=deadline,
|
|
|
prize=prize,
|
|
|
difficulty_score=self._estimate_difficulty(description)
|
|
|
)
|
|
|
|
|
|
def _extract_deadline(self, text: str) -> Optional[str]:
|
|
|
"""Extrait la date limite du texte"""
|
|
|
patterns = [
|
|
|
r"jusqu[\'']?au (\d{1,2}[\/\-]\d{1,2}[\/\-]\d{2,4})",
|
|
|
r"avant le (\d{1,2}[\/\-]\d{1,2}[\/\-]\d{2,4})",
|
|
|
r"fin le (\d{1,2}[\/\-]\d{1,2}[\/\-]\d{2,4})"
|
|
|
]
|
|
|
|
|
|
for pattern in patterns:
|
|
|
match = re.search(pattern, text, re.I)
|
|
|
if match:
|
|
|
return match.group(1)
|
|
|
return None
|
|
|
|
|
|
def _extract_prize(self, text: str) -> Optional[str]:
|
|
|
"""Extrait le prix du texte"""
|
|
|
patterns = [
|
|
|
r"gagne[rz]?\s+([^.!?]{1,50})",
|
|
|
r"prix[:\s]+([^.!?]{1,50})",
|
|
|
r"lot[:\s]+([^.!?]{1,50})",
|
|
|
r"(\d+\s*CHF|\d+\s*euros?|\d+\s*francs?)"
|
|
|
]
|
|
|
|
|
|
for pattern in patterns:
|
|
|
match = re.search(pattern, text, re.I)
|
|
|
if match:
|
|
|
return match.group(1).strip()
|
|
|
return None
|
|
|
|
|
|
def _estimate_difficulty(self, description: str) -> int:
|
|
|
"""Estime la difficulté de participation (0-10)"""
|
|
|
difficulty = 0
|
|
|
|
|
|
if re.search(r'justifi|motivation|pourquoi|essay', description, re.I):
|
|
|
difficulty += 3
|
|
|
if re.search(r'photo|image|créat', description, re.I):
|
|
|
difficulty += 2
|
|
|
if re.search(r'quiz|question|répond', description, re.I):
|
|
|
difficulty += 1
|
|
|
if re.search(r'partag|social|facebook|twitter', description, re.I):
|
|
|
difficulty += 1
|
|
|
if re.search(r'inscription|compte|profil', description, re.I):
|
|
|
difficulty += 1
|
|
|
|
|
|
return min(difficulty, 10)
|
|
|
|
|
|
def _is_valid_contest(self, contest: Contest) -> bool:
|
|
|
"""Valide qu'un concours est légitime"""
|
|
|
swiss_indicators = [
|
|
|
'suisse', 'switzerland', 'ch', 'romandie', 'genève', 'lausanne',
|
|
|
'zurich', 'bern', 'ouvert en suisse', 'résidents suisses'
|
|
|
]
|
|
|
|
|
|
full_text = (contest.title + " " + contest.description).lower()
|
|
|
has_swiss_access = any(indicator in full_text for indicator in swiss_indicators)
|
|
|
|
|
|
excluded_terms = [
|
|
|
'payant', 'payment', 'carte bancaire', 'spam', 'phishing',
|
|
|
'adult', 'casino', 'bitcoin', 'crypto', 'investment'
|
|
|
]
|
|
|
|
|
|
has_excluded = any(term in full_text for term in excluded_terms)
|
|
|
valid_url = contest.url.startswith(('http://', 'https://'))
|
|
|
|
|
|
return has_swiss_access and not has_excluded and valid_url
|
|
|
|
|
|
async def _scrape_google_search(self) -> List[Contest]:
|
|
|
"""Scrape via Google Custom Search API"""
|
|
|
try:
|
|
|
query = "concours gratuit Suisse 2025 site:.ch"
|
|
|
response = requests.get(
|
|
|
"https://www.googleapis.com/customsearch/v1",
|
|
|
params={
|
|
|
"key": API_CONFIG.google_api_key,
|
|
|
"cx": API_CONFIG.google_cx,
|
|
|
"q": query,
|
|
|
"num": 10
|
|
|
},
|
|
|
timeout=10
|
|
|
)
|
|
|
|
|
|
results = response.json().get('items', [])
|
|
|
contests = []
|
|
|
|
|
|
for res in results:
|
|
|
title = res.get('title', '')
|
|
|
url = res.get('link', '')
|
|
|
description = res.get('snippet', '')
|
|
|
|
|
|
contest = Contest(
|
|
|
title=title,
|
|
|
url=url,
|
|
|
description=description,
|
|
|
source='Google Search',
|
|
|
difficulty_score=self._estimate_difficulty(description)
|
|
|
)
|
|
|
|
|
|
if self._is_valid_contest(contest):
|
|
|
contests.append(contest)
|
|
|
|
|
|
logging.info(f"Found {len(contests)} contests via Google Search")
|
|
|
return contests
|
|
|
|
|
|
except Exception as e:
|
|
|
logging.error(f"Google Search API error: {e}")
|
|
|
return []
|
|
|
|
|
|
async def _scrape_twitter(self) -> List[Contest]:
|
|
|
"""Scrape Twitter/X pour les concours"""
|
|
|
try:
|
|
|
client = tweepy.Client(bearer_token=API_CONFIG.x_token)
|
|
|
tweets = client.search_recent_tweets(
|
|
|
query="concours gratuit Suisse lang:fr",
|
|
|
max_results=10
|
|
|
)
|
|
|
|
|
|
contests = []
|
|
|
if tweets.data:
|
|
|
for tweet in tweets.data:
|
|
|
if self._is_swiss_accessible(tweet.text):
|
|
|
contest = Contest(
|
|
|
title=tweet.text[:50] + "...",
|
|
|
url=f"https://x.com/i/status/{tweet.id}",
|
|
|
description=tweet.text,
|
|
|
source='Twitter/X',
|
|
|
difficulty_score=5
|
|
|
)
|
|
|
contests.append(contest)
|
|
|
|
|
|
logging.info(f"Found {len(contests)} contests on Twitter/X")
|
|
|
return contests
|
|
|
|
|
|
except Exception as e:
|
|
|
logging.error(f"Twitter/X API error: {e}")
|
|
|
return []
|
|
|
|
|
|
def _is_swiss_accessible(self, text: str) -> bool:
|
|
|
"""Vérifie si accessible depuis la Suisse"""
|
|
|
swiss_pattern = r"(suisse|ch|romandie|ouvert\s+a\s+la\s+suisse|geneve|lausanne)"
|
|
|
return bool(re.search(swiss_pattern, text.lower(), re.IGNORECASE))
|
|
|
|
|
|
def _filter_unique_contests(self, contests: List[Contest]) -> List[Contest]:
|
|
|
"""Filtre les concours uniques et non déjà traités"""
|
|
|
unique_contests = []
|
|
|
seen_urls = set()
|
|
|
|
|
|
for contest in contests:
|
|
|
if contest.url not in seen_urls and not self.db.participation_exists(contest.url):
|
|
|
unique_contests.append(contest)
|
|
|
seen_urls.add(contest.url)
|
|
|
|
|
|
return unique_contests
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class SmartParticipator:
|
|
|
def __init__(self, db_manager: DatabaseManager, ai_engine: AIEngine):
|
|
|
self.db = db_manager
|
|
|
self.ai = ai_engine
|
|
|
self.personal_info = PERSONAL_INFO
|
|
|
|
|
|
self.field_patterns = {
|
|
|
'prenom': [r'prenom|prénom|first.*name'],
|
|
|
'nom': [r'nom(?!.*prenom)|last.*name|family.*name'],
|
|
|
'email': [r'email|e-mail|courriel'],
|
|
|
'telephone': [r'tel|phone|telephone|téléphone'],
|
|
|
'adresse': [r'adresse|address|rue|street'],
|
|
|
'code_postal': [r'code.postal|zip|postal'],
|
|
|
'ville': [r'ville|city|localité'],
|
|
|
'pays': [r'pays|country|nation'],
|
|
|
'motivation': [r'motivation|pourquoi|why|reason'],
|
|
|
'quiz': [r'question|quiz|réponse|answer']
|
|
|
}
|
|
|
|
|
|
async def participate_in_contest(self, contest: Contest) -> bool:
|
|
|
"""Participe à un concours de manière intelligente"""
|
|
|
async with async_playwright() as p:
|
|
|
browser = await p.chromium.launch(
|
|
|
headless=True,
|
|
|
args=['--no-sandbox', '--disable-blink-features=AutomationControlled']
|
|
|
)
|
|
|
|
|
|
context = await browser.new_context(
|
|
|
user_agent=random.choice(USER_AGENTS),
|
|
|
viewport={'width': 1920, 'height': 1080}
|
|
|
)
|
|
|
|
|
|
page = await context.new_page()
|
|
|
|
|
|
try:
|
|
|
analysis = await self._analyze_form(page, contest.url)
|
|
|
|
|
|
if analysis.estimated_success_rate < 0.3:
|
|
|
logging.warning(f"Low success rate for {contest.url}: {analysis.estimated_success_rate}")
|
|
|
self.db.add_participation(contest, 'skipped_low_success', analysis.estimated_success_rate)
|
|
|
return False
|
|
|
|
|
|
if analysis.requires_captcha:
|
|
|
logging.warning(f"CAPTCHA detected for {contest.url}, skipping")
|
|
|
self.db.add_participation(contest, 'skipped_captcha', analysis.estimated_success_rate)
|
|
|
return False
|
|
|
|
|
|
success = await self._fill_and_submit_form(page, analysis, contest)
|
|
|
|
|
|
status = 'success' if success else 'failed'
|
|
|
self.db.add_participation(contest, status, analysis.estimated_success_rate)
|
|
|
|
|
|
logging.info(f"Participation {'successful' if success else 'failed'} for {contest.title}")
|
|
|
return success
|
|
|
|
|
|
except Exception as e:
|
|
|
logging.error(f"Participation error for {contest.url}: {e}")
|
|
|
self.db.add_participation(contest, 'error', 0.0)
|
|
|
return False
|
|
|
|
|
|
finally:
|
|
|
await browser.close()
|
|
|
|
|
|
async def _analyze_form(self, page: Page, url: str) -> FormAnalysis:
|
|
|
"""Analyse un formulaire de concours"""
|
|
|
try:
|
|
|
await page.goto(url, wait_until='networkidle', timeout=15000)
|
|
|
|
|
|
fields = await self._detect_form_fields(page)
|
|
|
complexity = sum(self._calculate_field_complexity(field) for field in fields)
|
|
|
has_captcha = await self._detect_captcha(page)
|
|
|
has_social_requirements = await self._detect_social_requirements(page)
|
|
|
success_rate = self._estimate_success_rate(fields, complexity, has_captcha)
|
|
|
|
|
|
return FormAnalysis(
|
|
|
fields=fields,
|
|
|
complexity_score=complexity,
|
|
|
estimated_success_rate=success_rate,
|
|
|
requires_captcha=has_captcha,
|
|
|
requires_social_media=has_social_requirements,
|
|
|
form_url=url
|
|
|
)
|
|
|
|
|
|
except Exception as e:
|
|
|
logging.error(f"Form analysis error: {e}")
|
|
|
return FormAnalysis([], 10, 0.0, True, True, url)
|
|
|
|
|
|
async def _detect_form_fields(self, page: Page) -> List[FormField]:
|
|
|
"""Détecte tous les champs de formulaire"""
|
|
|
fields = []
|
|
|
|
|
|
selectors = [
|
|
|
'input[type="text"]', 'input[type="email"]', 'input[type="tel"]',
|
|
|
'input[type="number"]', 'input:not([type])', 'textarea', 'select'
|
|
|
]
|
|
|
|
|
|
for selector in selectors:
|
|
|
elements = await page.query_selector_all(selector)
|
|
|
|
|
|
for element in elements:
|
|
|
try:
|
|
|
field = await self._analyze_single_field(element, page)
|
|
|
if field:
|
|
|
fields.append(field)
|
|
|
except Exception:
|
|
|
continue
|
|
|
|
|
|
return fields
|
|
|
|
|
|
async def _analyze_single_field(self, element, page: Page) -> Optional[FormField]:
|
|
|
"""Analyse un champ individuel"""
|
|
|
try:
|
|
|
field_type = await element.evaluate('el => el.type || el.tagName.toLowerCase()')
|
|
|
name = await element.evaluate('el => el.name || el.id || ""')
|
|
|
placeholder = await element.evaluate('el => el.placeholder || ""')
|
|
|
required = await element.evaluate('el => el.required')
|
|
|
|
|
|
label_text = await self._find_field_label(element, page)
|
|
|
selector = await self._create_unique_selector(element)
|
|
|
|
|
|
return FormField(
|
|
|
selector=selector,
|
|
|
field_type=field_type,
|
|
|
label=label_text,
|
|
|
required=required,
|
|
|
ai_context=f"Name: {name}, Placeholder: {placeholder}, Label: {label_text}"
|
|
|
)
|
|
|
|
|
|
except Exception:
|
|
|
return None
|
|
|
|
|
|
async def _find_field_label(self, element, page: Page) -> str:
|
|
|
"""Trouve le label associé à un champ"""
|
|
|
try:
|
|
|
element_id = await element.evaluate('el => el.id')
|
|
|
if element_id:
|
|
|
label = await page.query_selector(f'label[for="{element_id}"]')
|
|
|
if label:
|
|
|
return await label.inner_text()
|
|
|
|
|
|
parent_label = await element.evaluate('''
|
|
|
el => {
|
|
|
let parent = el.parentElement;
|
|
|
while (parent && parent.tagName !== 'BODY') {
|
|
|
if (parent.tagName === 'LABEL') {
|
|
|
return parent.innerText;
|
|
|
}
|
|
|
parent = parent.parentElement;
|
|
|
}
|
|
|
return '';
|
|
|
}
|
|
|
''')
|
|
|
|
|
|
if parent_label:
|
|
|
return parent_label.strip()
|
|
|
|
|
|
prev_text = await element.evaluate('''
|
|
|
el => {
|
|
|
const prev = el.previousElementSibling;
|
|
|
return prev ? prev.innerText : '';
|
|
|
}
|
|
|
''')
|
|
|
|
|
|
return prev_text.strip()
|
|
|
|
|
|
except Exception:
|
|
|
return ""
|
|
|
|
|
|
async def _create_unique_selector(self, element) -> str:
|
|
|
"""Crée un sélecteur CSS unique"""
|
|
|
element_id = await element.evaluate('el => el.id')
|
|
|
if element_id:
|
|
|
return f'#{element_id}'
|
|
|
|
|
|
name = await element.evaluate('el => el.name')
|
|
|
if name:
|
|
|
return f'[name="{name}"]'
|
|
|
|
|
|
class_name = await element.evaluate('el => el.className')
|
|
|
tag_name = await element.evaluate('el => el.tagName.toLowerCase()')
|
|
|
|
|
|
if class_name:
|
|
|
return f'{tag_name}.{class_name.split()[0]}'
|
|
|
|
|
|
return f'{tag_name}:nth-of-type(1)'
|
|
|
|
|
|
def _calculate_field_complexity(self, field: FormField) -> int:
|
|
|
"""Calcule la complexité d'un champ"""
|
|
|
complexity = 1
|
|
|
|
|
|
if field.field_type == 'textarea':
|
|
|
complexity += 3
|
|
|
elif field.field_type == 'select':
|
|
|
complexity += 2
|
|
|
elif field.required:
|
|
|
complexity += 1
|
|
|
|
|
|
if re.search(r'motivation|justifi|pourquoi', field.label, re.I):
|
|
|
complexity += 3
|
|
|
elif re.search(r'quiz|question', field.label, re.I):
|
|
|
complexity += 2
|
|
|
|
|
|
return complexity
|
|
|
|
|
|
async def _detect_captcha(self, page: Page) -> bool:
|
|
|
"""Détecte la présence de CAPTCHA"""
|
|
|
captcha_selectors = [
|
|
|
'.g-recaptcha', '.h-captcha', '#captcha', '.captcha',
|
|
|
'iframe[src*="recaptcha"]', 'iframe[src*="hcaptcha"]'
|
|
|
]
|
|
|
|
|
|
for selector in captcha_selectors:
|
|
|
element = await page.query_selector(selector)
|
|
|
if element:
|
|
|
return True
|
|
|
|
|
|
return False
|
|
|
|
|
|
async def _detect_social_requirements(self, page: Page) -> bool:
|
|
|
"""Détecte les exigences de réseaux sociaux"""
|
|
|
content = await page.content()
|
|
|
social_patterns = [
|
|
|
r'follow.*us', r'partag.*facebook', r'retweet',
|
|
|
r'like.*page', r'subscribe.*channel'
|
|
|
]
|
|
|
|
|
|
for pattern in social_patterns:
|
|
|
if re.search(pattern, content, re.I):
|
|
|
return True
|
|
|
|
|
|
return False
|
|
|
|
|
|
def _estimate_success_rate(self, fields: List[FormField], complexity: int, has_captcha: bool) -> float:
|
|
|
"""Estime le taux de succès"""
|
|
|
base_rate = 0.8
|
|
|
|
|
|
if has_captcha:
|
|
|
base_rate *= 0.1
|
|
|
|
|
|
if complexity > 15:
|
|
|
base_rate *= 0.4
|
|
|
elif complexity > 10:
|
|
|
base_rate *= 0.6
|
|
|
elif complexity > 5:
|
|
|
base_rate *= 0.8
|
|
|
|
|
|
required_fields = [f for f in fields if f.required]
|
|
|
if len(required_fields) <= 3:
|
|
|
base_rate *= 1.1
|
|
|
|
|
|
return min(base_rate, 1.0)
|
|
|
|
|
|
async def _fill_and_submit_form(self, page: Page, analysis: FormAnalysis, contest: Contest) -> bool:
|
|
|
"""Remplit et soumet le formulaire"""
|
|
|
try:
|
|
|
filled_fields = 0
|
|
|
|
|
|
for field in analysis.fields:
|
|
|
try:
|
|
|
await page.wait_for_selector(field.selector, timeout=3000)
|
|
|
element = await page.query_selector(field.selector)
|
|
|
|
|
|
if not element:
|
|
|
continue
|
|
|
|
|
|
value = await self._generate_field_value(field, contest, page)
|
|
|
if not value:
|
|
|
continue
|
|
|
|
|
|
if field.field_type == 'select':
|
|
|
await self._fill_select_field(element, value, page)
|
|
|
else:
|
|
|
await element.fill(value)
|
|
|
|
|
|
filled_fields += 1
|
|
|
await asyncio.sleep(random.uniform(0.3, 0.8))
|
|
|
|
|
|
except Exception as e:
|
|
|
logging.debug(f"Error filling field {field.selector}: {e}")
|
|
|
continue
|
|
|
|
|
|
submit_success = await self._submit_form(page)
|
|
|
|
|
|
logging.info(f"Filled {filled_fields}/{len(analysis.fields)} fields, submitted: {submit_success}")
|
|
|
return filled_fields > 0 and submit_success
|
|
|
|
|
|
except Exception as e:
|
|
|
logging.error(f"Form filling error: {e}")
|
|
|
return False
|
|
|
|
|
|
async def _generate_field_value(self, field: FormField, contest: Contest, page: Page) -> Optional[str]:
|
|
|
"""Génère une valeur pour un champ"""
|
|
|
field_type = self._identify_field_type(field)
|
|
|
|
|
|
personal_mapping = {
|
|
|
'prenom': self.personal_info.prenom,
|
|
|
'nom': self.personal_info.nom,
|
|
|
'email': self.personal_info.email_derivee,
|
|
|
'telephone': self.personal_info.telephone,
|
|
|
'adresse': self.personal_info.adresse,
|
|
|
'code_postal': self.personal_info.code_postal,
|
|
|
'ville': self.personal_info.ville,
|
|
|
'pays': self.personal_info.pays
|
|
|
}
|
|
|
|
|
|
if field_type in personal_mapping:
|
|
|
return personal_mapping[field_type]
|
|
|
|
|
|
if field_type == 'motivation':
|
|
|
return self.ai.generate_response(
|
|
|
field.label,
|
|
|
contest.description,
|
|
|
"motivation"
|
|
|
)
|
|
|
elif field_type == 'quiz':
|
|
|
return self.ai.generate_response(
|
|
|
field.label,
|
|
|
contest.description,
|
|
|
"quiz"
|
|
|
)
|
|
|
|
|
|
default_values = {
|
|
|
'age': '25',
|
|
|
'genre': 'Monsieur',
|
|
|
'profession': 'Étudiant'
|
|
|
}
|
|
|
|
|
|
for key, value in default_values.items():
|
|
|
if key in field.label.lower():
|
|
|
return value
|
|
|
|
|
|
return None
|
|
|
|
|
|
def _identify_field_type(self, field: FormField) -> str:
|
|
|
"""Identifie le type de champ"""
|
|
|
combined_text = f"{field.ai_context} {field.label}".lower()
|
|
|
|
|
|
for field_type, patterns in self.field_patterns.items():
|
|
|
for pattern in patterns:
|
|
|
if re.search(pattern, combined_text, re.I):
|
|
|
return field_type
|
|
|
|
|
|
return 'unknown'
|
|
|
|
|
|
async def _fill_select_field(self, element, value: str, page: Page):
|
|
|
"""Remplit un champ select"""
|
|
|
try:
|
|
|
options = await element.query_selector_all('option')
|
|
|
|
|
|
for option in options:
|
|
|
option_text = await option.inner_text()
|
|
|
option_value = await option.get_attribute('value')
|
|
|
|
|
|
if (value.lower() in option_text.lower() or
|
|
|
value.lower() in (option_value or "").lower()):
|
|
|
await element.select_option(value=option_value)
|
|
|
return
|
|
|
|
|
|
if options and len(options) > 1:
|
|
|
first_option = await options[1].get_attribute('value')
|
|
|
await element.select_option(value=first_option)
|
|
|
|
|
|
except Exception as e:
|
|
|
logging.debug(f"Select field error: {e}")
|
|
|
|
|
|
async def _submit_form(self, page: Page) -> bool:
|
|
|
"""Soumet le formulaire"""
|
|
|
submit_selectors = [
|
|
|
'input[type="submit"]',
|
|
|
'button[type="submit"]',
|
|
|
'button:has-text("Participer")',
|
|
|
'button:has-text("Envoyer")',
|
|
|
'button:has-text("Valider")',
|
|
|
'.submit-btn',
|
|
|
'.participate-btn'
|
|
|
]
|
|
|
|
|
|
for selector in submit_selectors:
|
|
|
try:
|
|
|
element = await page.query_selector(selector)
|
|
|
if element:
|
|
|
is_visible = await element.is_visible()
|
|
|
is_enabled = await element.is_enabled()
|
|
|
|
|
|
if is_visible and is_enabled:
|
|
|
await element.click()
|
|
|
await page.wait_for_timeout(3000)
|
|
|
return True
|
|
|
|
|
|
except Exception:
|
|
|
continue
|
|
|
|
|
|
return False
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class EmailManager:
|
|
|
def __init__(self, api_config: APIConfig, ai_engine: AIEngine, db_manager: DatabaseManager):
|
|
|
self.api_config = api_config
|
|
|
self.ai = ai_engine
|
|
|
self.db = db_manager
|
|
|
self.personal_info = PERSONAL_INFO
|
|
|
|
|
|
def check_and_analyze_emails(self):
|
|
|
"""Vérifie et analyse les emails pour détecter les victoires"""
|
|
|
if not self.api_config.email_app_password:
|
|
|
logging.warning("Email app password not configured")
|
|
|
return
|
|
|
|
|
|
try:
|
|
|
mail = imaplib.IMAP4_SSL('outlook.office365.com')
|
|
|
mail.login(self.personal_info.email, self.api_config.email_app_password)
|
|
|
mail.select('inbox')
|
|
|
|
|
|
since_date = (datetime.now() - timedelta(days=7)).strftime('%d-%b-%Y')
|
|
|
status, messages = mail.search(None, f'(SINCE "{since_date}")')
|
|
|
|
|
|
victories = []
|
|
|
email_count = 0
|
|
|
|
|
|
for num in messages[0].split()[-20:]:
|
|
|
try:
|
|
|
email_count += 1
|
|
|
_, msg = mail.fetch(num, '(RFC822)')
|
|
|
email_msg = email.message_from_bytes(msg[0][1])
|
|
|
|
|
|
subject = email_msg['Subject'] or ""
|
|
|
from_addr = email_msg['From'] or ""
|
|
|
|
|
|
body = self._extract_email_body(email_msg)
|
|
|
|
|
|
analysis = self.ai.generate_response(
|
|
|
f"Cet email indique-t-il une victoire dans un concours ? Sujet: {subject}",
|
|
|
body[:1000],
|
|
|
"quiz"
|
|
|
)
|
|
|
|
|
|
if 'oui' in analysis.lower() or 'gagne' in analysis.lower():
|
|
|
prize = self._extract_prize_from_email(body, subject)
|
|
|
victories.append({
|
|
|
'email_id': num.decode(),
|
|
|
'date': datetime.now().strftime('%Y-%m-%d'),
|
|
|
'prize': prize,
|
|
|
'source': from_addr,
|
|
|
'subject': subject
|
|
|
})
|
|
|
|
|
|
self.db.add_victory(num.decode(), prize, from_addr)
|
|
|
self._send_victory_alert(prize, from_addr, subject)
|
|
|
|
|
|
logging.info(f"Victory detected: {prize} from {from_addr}")
|
|
|
|
|
|
except Exception as e:
|
|
|
logging.error(f"Error processing email {num}: {e}")
|
|
|
continue
|
|
|
|
|
|
mail.logout()
|
|
|
logging.info(f"Processed {email_count} emails, found {len(victories)} victories")
|
|
|
|
|
|
except Exception as e:
|
|
|
logging.error(f"Email checking error: {e}")
|
|
|
|
|
|
def _extract_email_body(self, email_msg) -> str:
|
|
|
"""Extrait le corps de l'email"""
|
|
|
body = ""
|
|
|
|
|
|
if email_msg.is_multipart():
|
|
|
for part in email_msg.walk():
|
|
|
if part.get_content_type() == 'text/plain':
|
|
|
try:
|
|
|
body = part.get_payload(decode=True).decode('utf-8', errors='ignore')
|
|
|
break
|
|
|
except:
|
|
|
continue
|
|
|
else:
|
|
|
try:
|
|
|
body = email_msg.get_payload(decode=True).decode('utf-8', errors='ignore')
|
|
|
except:
|
|
|
body = str(email_msg.get_payload())
|
|
|
|
|
|
return body
|
|
|
|
|
|
def _extract_prize_from_email(self, body: str, subject: str) -> str:
|
|
|
"""Extrait le prix gagné de l'email"""
|
|
|
text = f"{subject} {body}"
|
|
|
|
|
|
prize_patterns = [
|
|
|
r"vous avez gagné\s+([^.!?\n]{1,100})",
|
|
|
r"prix[:\s]+([^.!?\n]{1,100})",
|
|
|
r"lot[:\s]+([^.!?\n]{1,100})",
|
|
|
r"remporté\s+([^.!?\n]{1,100})",
|
|
|
r"(\d+\s*CHF|\d+\s*euros?|\d+\s*francs?)"
|
|
|
]
|
|
|
|
|
|
for pattern in prize_patterns:
|
|
|
match = re.search(pattern, text, re.I)
|
|
|
if match:
|
|
|
return match.group(1).strip()
|
|
|
|
|
|
return "Prix non spécifié"
|
|
|
|
|
|
def _send_victory_alert(self, prize: str, source: str, subject: str):
|
|
|
"""Envoie une alerte de victoire"""
|
|
|
if not self.api_config.telegram_bot_token or not self.api_config.telegram_chat_id:
|
|
|
return
|
|
|
|
|
|
message = f"""
|
|
|
🎉 **VICTOIRE DÉTECTÉE !**
|
|
|
|
|
|
🏆 **Prix**: {prize}
|
|
|
📧 **Source**: {source}
|
|
|
📋 **Sujet**: {subject}
|
|
|
📅 **Date**: {datetime.now().strftime('%d/%m/%Y %H:%M')}
|
|
|
|
|
|
Félicitations ! 🎊
|
|
|
"""
|
|
|
|
|
|
try:
|
|
|
url = f"https://api.telegram.org/bot{self.api_config.telegram_bot_token}/sendMessage"
|
|
|
payload = {
|
|
|
'chat_id': self.api_config.telegram_chat_id,
|
|
|
'text': message,
|
|
|
'parse_mode': 'Markdown'
|
|
|
}
|
|
|
|
|
|
response = requests.post(url, json=payload, timeout=10)
|
|
|
if response.status_code == 200:
|
|
|
logging.info("Victory alert sent successfully")
|
|
|
else:
|
|
|
logging.error(f"Telegram alert failed: {response.text}")
|
|
|
|
|
|
except Exception as e:
|
|
|
logging.error(f"Telegram alert error: {e}")
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class MonitoringSystem:
|
|
|
def __init__(self, db_manager: DatabaseManager):
|
|
|
self.db = db_manager
|
|
|
|
|
|
def generate_daily_report(self) -> str:
|
|
|
"""Génère un rapport quotidien"""
|
|
|
stats = self.db.get_stats()
|
|
|
|
|
|
today = datetime.now().strftime('%Y-%m-%d')
|
|
|
conn = self.db._get_connection()
|
|
|
|
|
|
today_participations = conn.execute(
|
|
|
"SELECT COUNT(*) FROM participations WHERE date = ?", (today,)
|
|
|
).fetchone()[0]
|
|
|
|
|
|
today_successes = conn.execute(
|
|
|
"SELECT COUNT(*) FROM participations WHERE date = ? AND status = 'success'", (today,)
|
|
|
).fetchone()[0]
|
|
|
|
|
|
success_rate = (today_successes / max(today_participations, 1)) * 100
|
|
|
|
|
|
report = f"""
|
|
|
📊 **RAPPORT QUOTIDIEN - {today}**
|
|
|
|
|
|
🎯 **Aujourd'hui**:
|
|
|
• Participations: {today_participations}
|
|
|
• Succès: {today_successes}
|
|
|
• Taux de succès: {success_rate:.1f}%
|
|
|
|
|
|
📈 **Total**:
|
|
|
• Participations totales: {stats['total_participations']}
|
|
|
• Participations réussies: {stats['successful_participations']}
|
|
|
• Victoires détectées: {stats['total_victories']}
|
|
|
|
|
|
🌐 **Par source**:
|
|
|
"""
|
|
|
|
|
|
for source, count in stats['by_source'].items():
|
|
|
report += f" • {source}: {count}\n"
|
|
|
|
|
|
return report
|
|
|
|
|
|
def send_daily_report(self):
|
|
|
"""Envoie le rapport quotidien"""
|
|
|
if not API_CONFIG.telegram_bot_token or not API_CONFIG.telegram_chat_id:
|
|
|
return
|
|
|
|
|
|
report = self.generate_daily_report()
|
|
|
|
|
|
try:
|
|
|
url = f"https://api.telegram.org/bot{API_CONFIG.telegram_bot_token}/sendMessage"
|
|
|
payload = {
|
|
|
'chat_id': API_CONFIG.telegram_chat_id,
|
|
|
'text': report,
|
|
|
'parse_mode': 'Markdown'
|
|
|
}
|
|
|
|
|
|
response = requests.post(url, json=payload, timeout=10)
|
|
|
if response.status_code == 200:
|
|
|
logging.info("Daily report sent successfully")
|
|
|
else:
|
|
|
logging.error(f"Daily report failed: {response.text}")
|
|
|
|
|
|
except Exception as e:
|
|
|
logging.error(f"Daily report error: {e}")
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class ContestBotOrchestrator:
|
|
|
def __init__(self):
|
|
|
self.db = DatabaseManager()
|
|
|
self.ai = AIEngine(API_CONFIG)
|
|
|
self.scraper = None
|
|
|
self.participator = SmartParticipator(self.db, self.ai)
|
|
|
self.email_manager = EmailManager(API_CONFIG, self.ai, self.db)
|
|
|
self.monitor = MonitoringSystem(self.db)
|
|
|
|
|
|
async def run_full_cycle(self):
|
|
|
"""Execute un cycle complet de scraping et participation"""
|
|
|
logging.info("Starting full contest bot cycle")
|
|
|
|
|
|
try:
|
|
|
|
|
|
async with IntelligentScraper(self.db) as scraper:
|
|
|
self.scraper = scraper
|
|
|
contests = await scraper.scrape_all_sources()
|
|
|
|
|
|
if not contests:
|
|
|
logging.info("No new contests found")
|
|
|
return
|
|
|
|
|
|
|
|
|
contests.sort(key=lambda x: x.difficulty_score)
|
|
|
|
|
|
|
|
|
participation_count = 0
|
|
|
max_daily_participations = 20
|
|
|
|
|
|
for contest in contests[:max_daily_participations]:
|
|
|
try:
|
|
|
|
|
|
if participation_count > 0:
|
|
|
wait_time = random.uniform(30, 120)
|
|
|
logging.info(f"Waiting {wait_time:.0f}s before next participation")
|
|
|
await asyncio.sleep(wait_time)
|
|
|
|
|
|
success = await self.participator.participate_in_contest(contest)
|
|
|
participation_count += 1
|
|
|
|
|
|
if success:
|
|
|
logging.info(f"✅ Successfully participated in: {contest.title}")
|
|
|
else:
|
|
|
logging.warning(f"❌ Failed to participate in: {contest.title}")
|
|
|
|
|
|
|
|
|
if success:
|
|
|
await asyncio.sleep(random.uniform(60, 180))
|
|
|
|
|
|
except Exception as e:
|
|
|
logging.error(f"Error participating in {contest.title}: {e}")
|
|
|
continue
|
|
|
|
|
|
logging.info(f"Participation cycle completed: {participation_count} attempts")
|
|
|
|
|
|
except Exception as e:
|
|
|
logging.error(f"Full cycle error: {e}")
|
|
|
|
|
|
def run_email_check(self):
|
|
|
"""Vérifie les emails pour les victoires"""
|
|
|
try:
|
|
|
logging.info("Checking emails for victories")
|
|
|
self.email_manager.check_and_analyze_emails()
|
|
|
except Exception as e:
|
|
|
logging.error(f"Email check error: {e}")
|
|
|
|
|
|
def run_daily_report(self):
|
|
|
"""Génère et envoie le rapport quotidien"""
|
|
|
try:
|
|
|
logging.info("Generating daily report")
|
|
|
self.monitor.send_daily_report()
|
|
|
except Exception as e:
|
|
|
logging.error(f"Daily report error: {e}")
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def run_bot_cycle():
|
|
|
"""Point d'entrée pour le scheduler"""
|
|
|
bot = ContestBotOrchestrator()
|
|
|
|
|
|
|
|
|
asyncio.run(bot.run_full_cycle())
|
|
|
|
|
|
|
|
|
bot.run_email_check()
|
|
|
|
|
|
def run_daily_report():
|
|
|
"""Point d'entrée pour le rapport quotidien"""
|
|
|
bot = ContestBotOrchestrator()
|
|
|
bot.run_daily_report()
|
|
|
|
|
|
def main():
|
|
|
"""Fonction principale avec launcher intelligent intégré"""
|
|
|
|
|
|
|
|
|
if len(sys.argv) > 1:
|
|
|
if sys.argv[1] == "--run-now":
|
|
|
logging.info("Running immediate cycle")
|
|
|
run_bot_cycle()
|
|
|
return
|
|
|
elif sys.argv[1] == "--test-apis":
|
|
|
|
|
|
tester = APITester(API_CONFIG)
|
|
|
results = tester.test_all_apis()
|
|
|
working_count = sum(results.values())
|
|
|
print(f"\n📊 Résumé: {working_count}/5 APIs fonctionnelles")
|
|
|
return
|
|
|
elif sys.argv[1] == "--scheduler":
|
|
|
|
|
|
logging.info("Starting Contest Bot with scheduler")
|
|
|
|
|
|
schedule.every().day.at("08:00").do(run_bot_cycle)
|
|
|
schedule.every().day.at("14:00").do(run_bot_cycle)
|
|
|
schedule.every().day.at("20:00").do(run_daily_report)
|
|
|
|
|
|
logging.info("Scheduler started. Waiting for scheduled tasks...")
|
|
|
|
|
|
while True:
|
|
|
try:
|
|
|
schedule.run_pending()
|
|
|
time.sleep(60)
|
|
|
except KeyboardInterrupt:
|
|
|
logging.info("Bot stopped by user")
|
|
|
break
|
|
|
except Exception as e:
|
|
|
logging.error(f"Scheduler error: {e}")
|
|
|
time.sleep(300)
|
|
|
return
|
|
|
|
|
|
|
|
|
try:
|
|
|
launcher = SmartLauncher()
|
|
|
should_launch = launcher.main_menu()
|
|
|
|
|
|
if should_launch:
|
|
|
|
|
|
print("\n🤔 Comment voulez-vous lancer le bot ?")
|
|
|
print("1️⃣ Test immédiat (--run-now)")
|
|
|
print("2️⃣ Mode scheduler automatique")
|
|
|
|
|
|
mode_choice = input("\nVotre choix (1/2): ").strip()
|
|
|
|
|
|
if mode_choice == "1":
|
|
|
print("\n🎬 Lancement immédiat...")
|
|
|
run_bot_cycle()
|
|
|
elif mode_choice == "2":
|
|
|
print("\n⏰ Démarrage du scheduler...")
|
|
|
print("Programmation: 08:00, 14:00 (concours) et 20:00 (rapport)")
|
|
|
|
|
|
schedule.every().day.at("08:00").do(run_bot_cycle)
|
|
|
schedule.every().day.at("14:00").do(run_bot_cycle)
|
|
|
schedule.every().day.at("20:00").do(run_daily_report)
|
|
|
|
|
|
print("Scheduler démarré. Utilisez Ctrl+C pour arrêter.")
|
|
|
|
|
|
while True:
|
|
|
try:
|
|
|
schedule.run_pending()
|
|
|
time.sleep(60)
|
|
|
except KeyboardInterrupt:
|
|
|
print("\n👋 Bot arrêté par l'utilisateur")
|
|
|
break
|
|
|
except Exception as e:
|
|
|
logging.error(f"Scheduler error: {e}")
|
|
|
time.sleep(300)
|
|
|
else:
|
|
|
print("❌ Choix invalide")
|
|
|
else:
|
|
|
print("👋 À bientôt !")
|
|
|
|
|
|
except KeyboardInterrupt:
|
|
|
print("\n\n👋 Arrêté par l'utilisateur")
|
|
|
except Exception as e:
|
|
|
print(f"\n❌ Erreur: {e}")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
|
main() |