|
|
|
|
|
|
|
|
|
|
|
import asyncio
|
|
|
from playwright.async_api import async_playwright, Page
|
|
|
from bs4 import BeautifulSoup
|
|
|
import requests
|
|
|
import pandas as pd
|
|
|
import time
|
|
|
import re
|
|
|
from datetime import datetime, timedelta
|
|
|
import os
|
|
|
import sys
|
|
|
import random
|
|
|
import sqlite3
|
|
|
import logging
|
|
|
import schedule
|
|
|
import hashlib
|
|
|
import json
|
|
|
import threading
|
|
|
from contextlib import contextmanager
|
|
|
from dataclasses import dataclass
|
|
|
from typing import List, Dict, Optional, Tuple
|
|
|
from urllib.parse import urljoin, urlparse
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@dataclass
|
|
|
class PersonalInfo:
|
|
|
prenom: str = "Valentin"
|
|
|
nom: str = "Cora"
|
|
|
email: str = "[email protected]"
|
|
|
email_derivee: str = "[email protected]"
|
|
|
telephone: str = "+41791234567"
|
|
|
adresse: str = "Av Chantemerle 9"
|
|
|
code_postal: str = "1009"
|
|
|
ville: str = "Pully"
|
|
|
pays: str = "Suisse"
|
|
|
|
|
|
@dataclass
|
|
|
class Contest:
|
|
|
title: str
|
|
|
url: str
|
|
|
description: str
|
|
|
source: str
|
|
|
deadline: Optional[str] = None
|
|
|
prize: Optional[str] = None
|
|
|
difficulty_score: int = 0
|
|
|
|
|
|
@dataclass
|
|
|
class FormField:
|
|
|
selector: str
|
|
|
field_type: str
|
|
|
label: str
|
|
|
required: bool
|
|
|
current_value: str = ""
|
|
|
ai_context: str = ""
|
|
|
|
|
|
@dataclass
|
|
|
class FormAnalysis:
|
|
|
fields: List[FormField]
|
|
|
complexity_score: int
|
|
|
estimated_success_rate: float
|
|
|
requires_captcha: bool
|
|
|
requires_social_media: bool
|
|
|
form_url: str
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
PERSONAL_INFO = PersonalInfo()
|
|
|
|
|
|
|
|
|
SITES_CH = [
|
|
|
'https://www.concours.ch/concours/tous',
|
|
|
'https://www.jeu-concours.biz/concours-pays_suisse.html',
|
|
|
'https://www.loisirs.ch/concours/',
|
|
|
'https://www.radin.ch/',
|
|
|
'https://win4win.ch/fr/',
|
|
|
'https://www.concours-suisse.ch/',
|
|
|
'https://corporate.migros.ch/fr/concours',
|
|
|
'https://www.20min.ch/fr/concours-et-jeux',
|
|
|
'https://dein-gewinnspiel.ch/en',
|
|
|
'https://www.myswitzerland.com/fr/planification/vie-pratique/concours/'
|
|
|
]
|
|
|
|
|
|
|
|
|
USER_AGENTS = [
|
|
|
'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36',
|
|
|
'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36',
|
|
|
'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36'
|
|
|
]
|
|
|
|
|
|
|
|
|
logging.basicConfig(
|
|
|
level=logging.INFO,
|
|
|
format='%(asctime)s - %(levelname)s - %(message)s',
|
|
|
handlers=[
|
|
|
logging.FileHandler('concours_bot_sans_api.log', encoding='utf-8'),
|
|
|
logging.StreamHandler()
|
|
|
]
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class DatabaseManager:
|
|
|
def __init__(self, db_path: str = 'concours_sans_api.sqlite'):
|
|
|
self.db_path = db_path
|
|
|
self.local = threading.local()
|
|
|
self._init_db()
|
|
|
|
|
|
def _get_connection(self):
|
|
|
if not hasattr(self.local, 'conn'):
|
|
|
self.local.conn = sqlite3.connect(self.db_path)
|
|
|
self.local.conn.row_factory = sqlite3.Row
|
|
|
return self.local.conn
|
|
|
|
|
|
@contextmanager
|
|
|
def transaction(self):
|
|
|
conn = self._get_connection()
|
|
|
try:
|
|
|
yield conn
|
|
|
conn.commit()
|
|
|
except Exception:
|
|
|
conn.rollback()
|
|
|
raise
|
|
|
|
|
|
def _init_db(self):
|
|
|
with self.transaction() as conn:
|
|
|
conn.execute('''
|
|
|
CREATE TABLE IF NOT EXISTS participations (
|
|
|
url TEXT PRIMARY KEY,
|
|
|
title TEXT,
|
|
|
source TEXT,
|
|
|
status TEXT,
|
|
|
difficulty_score INTEGER,
|
|
|
success_rate REAL,
|
|
|
date TEXT,
|
|
|
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
|
|
|
)
|
|
|
''')
|
|
|
conn.execute('''
|
|
|
CREATE TABLE IF NOT EXISTS victories (
|
|
|
email_id TEXT PRIMARY KEY,
|
|
|
date TEXT,
|
|
|
lot TEXT,
|
|
|
source TEXT,
|
|
|
confirmed BOOLEAN DEFAULT FALSE,
|
|
|
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
|
|
|
)
|
|
|
''')
|
|
|
|
|
|
conn.execute('CREATE INDEX IF NOT EXISTS idx_participations_date ON participations(date)')
|
|
|
|
|
|
def add_participation(self, contest: Contest, status: str = 'pending', success_rate: float = 0.0):
|
|
|
with self.transaction() as conn:
|
|
|
conn.execute('''
|
|
|
INSERT OR REPLACE INTO participations
|
|
|
(url, title, source, status, difficulty_score, success_rate, date)
|
|
|
VALUES (?, ?, ?, ?, ?, ?, date('now'))
|
|
|
''', (contest.url, contest.title, contest.source, status, contest.difficulty_score, success_rate))
|
|
|
|
|
|
def participation_exists(self, url: str) -> bool:
|
|
|
conn = self._get_connection()
|
|
|
result = conn.execute("SELECT 1 FROM participations WHERE url = ?", (url,)).fetchone()
|
|
|
return result is not None
|
|
|
|
|
|
def get_stats(self) -> Dict:
|
|
|
conn = self._get_connection()
|
|
|
stats = {}
|
|
|
|
|
|
stats['total_participations'] = conn.execute("SELECT COUNT(*) FROM participations").fetchone()[0]
|
|
|
stats['successful_participations'] = conn.execute("SELECT COUNT(*) FROM participations WHERE status='success'").fetchone()[0]
|
|
|
stats['total_victories'] = conn.execute("SELECT COUNT(*) FROM victories").fetchone()[0]
|
|
|
|
|
|
source_stats = conn.execute('''
|
|
|
SELECT source, COUNT(*) as count
|
|
|
FROM participations
|
|
|
GROUP BY source
|
|
|
ORDER BY count DESC
|
|
|
''').fetchall()
|
|
|
stats['by_source'] = {row[0]: row[1] for row in source_stats}
|
|
|
|
|
|
return stats
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class LocalResponseEngine:
|
|
|
def __init__(self):
|
|
|
self.cache = {}
|
|
|
|
|
|
|
|
|
self.knowledge_base = {
|
|
|
"suisse": {
|
|
|
"capitale": "Berne",
|
|
|
"langues": "Français, Allemand, Italien, Romanche",
|
|
|
"monnaie": "Franc suisse",
|
|
|
"population": "8.7 millions",
|
|
|
"villes": ["Zurich", "Genève", "Bâle", "Lausanne", "Berne", "Winterthour"],
|
|
|
"cantons": ["Vaud", "Genève", "Valais", "Fribourg", "Neuchâtel", "Jura"]
|
|
|
},
|
|
|
"general": {
|
|
|
"couleurs": ["Rouge", "Bleu", "Vert", "Jaune", "Orange", "Violet", "Rose", "Noir", "Blanc"],
|
|
|
"nombres": ["1", "2", "3", "4", "5", "10", "12", "15", "20", "25", "50", "100"],
|
|
|
"annees": ["2023", "2024", "2025"]
|
|
|
}
|
|
|
}
|
|
|
|
|
|
def generate_response(self, question: str, context: str = "", response_type: str = "qa") -> str:
|
|
|
"""Génère une réponse intelligente sans API"""
|
|
|
cache_key = hashlib.md5(f"{question}{context}{response_type}".encode()).hexdigest()
|
|
|
|
|
|
if cache_key in self.cache:
|
|
|
return self.cache[cache_key]
|
|
|
|
|
|
response = self._generate_intelligent_response(question, context, response_type)
|
|
|
|
|
|
|
|
|
self.cache[cache_key] = response
|
|
|
return response
|
|
|
|
|
|
def _generate_intelligent_response(self, question: str, context: str, response_type: str) -> str:
|
|
|
"""Génère une réponse basée sur l'analyse du texte"""
|
|
|
question_lower = question.lower()
|
|
|
context_lower = context.lower()
|
|
|
|
|
|
if response_type == "motivation":
|
|
|
return self._generate_motivation(question_lower, context_lower)
|
|
|
elif response_type == "quiz":
|
|
|
return self._generate_quiz_answer(question_lower, context_lower)
|
|
|
else:
|
|
|
return self._generate_general_response(question_lower, context_lower)
|
|
|
|
|
|
def _generate_motivation(self, question: str, context: str) -> str:
|
|
|
"""Génère une motivation personnalisée"""
|
|
|
|
|
|
if any(word in context for word in ["voyage", "vacances", "séjour", "destination"]):
|
|
|
return random.choice([
|
|
|
"J'adore voyager et découvrir de nouveaux horizons. Ce prix serait une opportunité fantastique pour moi de vivre une expérience inoubliable en Suisse ou ailleurs.",
|
|
|
"Voyager est ma passion et ce concours représente le voyage de mes rêves. J'espère avoir la chance de le remporter pour découvrir de nouveaux paysages.",
|
|
|
"En tant que passionné de voyages, ce prix m'offrirait l'occasion parfaite de découvrir de nouveaux lieux et cultures, ce qui m'enrichirait énormément."
|
|
|
])
|
|
|
elif any(word in context for word in ["produit", "cosmétique", "beauté", "soin"]):
|
|
|
return random.choice([
|
|
|
"Je suis toujours à la recherche de nouveaux produits de qualité et j'aimerais beaucoup tester cette gamme qui semble très prometteuse.",
|
|
|
"Ces produits m'intéressent énormément et je serais ravi de pouvoir les découvrir et partager mon expérience avec mes proches.",
|
|
|
"J'ai entendu beaucoup de bien de cette marque et j'aimerais avoir l'opportunité de l'essayer pour me faire ma propre opinion."
|
|
|
])
|
|
|
elif any(word in context for word in ["technologie", "smartphone", "ordinateur", "électronique"]):
|
|
|
return random.choice([
|
|
|
"En tant que passionné de technologie, ce prix m'intéresse beaucoup et m'aiderait dans mes projets personnels et professionnels.",
|
|
|
"J'ai besoin de ce type d'équipement pour mes études et mes loisirs, ce serait formidable de le gagner dans ce concours.",
|
|
|
"La technologie fait partie de ma vie quotidienne et ce prix serait très utile pour mes activités créatives."
|
|
|
])
|
|
|
elif any(word in context for word in ["nourriture", "restaurant", "gastronomie", "cuisine"]):
|
|
|
return random.choice([
|
|
|
"J'adore découvrir de nouvelles saveurs et expériences culinaires. Ce prix me permettrait de vivre un moment gastronomique exceptionnel.",
|
|
|
"La cuisine est une de mes passions et ce concours m'offrirait l'opportunité de découvrir de nouveaux goûts et techniques.",
|
|
|
"En tant qu'amateur de bonne cuisine, je serais ravi de remporter ce prix pour explorer de nouvelles expériences gastronomiques."
|
|
|
])
|
|
|
else:
|
|
|
return random.choice([
|
|
|
"Je participe avec enthousiasme à ce concours car le prix m'intéresse vraiment et correspond parfaitement à mes centres d'intérêt actuels.",
|
|
|
"Ce concours m'attire particulièrement et je serais très heureux de remporter ce magnifique prix qui me ferait énormément plaisir.",
|
|
|
"J'espère avoir la chance de gagner car ce prix représente une belle opportunité pour moi et ma famille.",
|
|
|
"Je suis motivé à participer car cette opportunité pourrait vraiment améliorer mon quotidien de manière positive."
|
|
|
])
|
|
|
|
|
|
def _generate_quiz_answer(self, question: str, context: str) -> str:
|
|
|
"""Génère une réponse de quiz intelligente"""
|
|
|
|
|
|
if "suisse" in question:
|
|
|
if any(word in question for word in ["capitale", "capital"]):
|
|
|
return self.knowledge_base["suisse"]["capitale"]
|
|
|
elif any(word in question for word in ["langue", "langues", "language"]):
|
|
|
return random.choice(["Français", "Allemand", "Italien", "Romanche"])
|
|
|
elif any(word in question for word in ["monnaie", "currency", "franc"]):
|
|
|
return self.knowledge_base["suisse"]["monnaie"]
|
|
|
elif any(word in question for word in ["population", "habitants"]):
|
|
|
return self.knowledge_base["suisse"]["population"]
|
|
|
elif any(word in question for word in ["ville", "city", "cities"]):
|
|
|
return random.choice(self.knowledge_base["suisse"]["villes"])
|
|
|
elif any(word in question for word in ["canton", "cantons"]):
|
|
|
return random.choice(self.knowledge_base["suisse"]["cantons"])
|
|
|
|
|
|
|
|
|
if any(word in question for word in ["couleur", "color", "couleurs"]):
|
|
|
return random.choice(self.knowledge_base["general"]["couleurs"])
|
|
|
|
|
|
if any(word in question for word in ["combien", "nombre", "quantité", "how many"]):
|
|
|
return random.choice(self.knowledge_base["general"]["nombres"])
|
|
|
|
|
|
if any(word in question for word in ["année", "date", "quand", "when", "year"]):
|
|
|
return random.choice(self.knowledge_base["general"]["annees"])
|
|
|
|
|
|
|
|
|
if any(word in question for word in ["est-ce", "is", "are", "do", "does"]):
|
|
|
return random.choice(["Oui", "Non"])
|
|
|
|
|
|
|
|
|
if any(word in question for word in ["vrai", "faux", "true", "false"]):
|
|
|
return random.choice(["Vrai", "Faux"])
|
|
|
|
|
|
|
|
|
return random.choice(["A", "B", "C", "D", "1", "2", "3"])
|
|
|
|
|
|
def _generate_general_response(self, question: str, context: str) -> str:
|
|
|
"""Génère une réponse générale"""
|
|
|
if "age" in question or "âge" in question:
|
|
|
return random.choice(["25", "28", "30", "32"])
|
|
|
elif "profession" in question or "métier" in question:
|
|
|
return random.choice(["Étudiant", "Employé", "Consultant", "Développeur"])
|
|
|
elif "ville" in question:
|
|
|
return PERSONAL_INFO.ville
|
|
|
elif "pays" in question:
|
|
|
return PERSONAL_INFO.pays
|
|
|
else:
|
|
|
return "Merci"
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class LocalScraper:
|
|
|
def __init__(self, db_manager: DatabaseManager):
|
|
|
self.db = db_manager
|
|
|
self.session = None
|
|
|
|
|
|
async def __aenter__(self):
|
|
|
|
|
|
self.session = requests.Session()
|
|
|
self.session.headers.update({'User-Agent': random.choice(USER_AGENTS)})
|
|
|
return self
|
|
|
|
|
|
async def __aexit__(self, exc_type, exc_val, exc_tb):
|
|
|
if self.session:
|
|
|
self.session.close()
|
|
|
|
|
|
async def scrape_all_sources(self) -> List[Contest]:
|
|
|
"""Scrape tous les sites web localement"""
|
|
|
all_contests = []
|
|
|
|
|
|
|
|
|
web_contests = await self._scrape_websites()
|
|
|
all_contests.extend(web_contests)
|
|
|
|
|
|
|
|
|
unique_contests = self._filter_unique_contests(all_contests)
|
|
|
|
|
|
logging.info(f"Total contests found: {len(all_contests)}, unique new: {len(unique_contests)}")
|
|
|
return unique_contests
|
|
|
|
|
|
async def _scrape_websites(self) -> List[Contest]:
|
|
|
"""Scrape les sites web en parallèle"""
|
|
|
batch_size = 3
|
|
|
all_contests = []
|
|
|
|
|
|
for i in range(0, len(SITES_CH), batch_size):
|
|
|
batch = SITES_CH[i:i + batch_size]
|
|
|
tasks = [self._scrape_single_site(site) for site in batch]
|
|
|
|
|
|
batch_results = await asyncio.gather(*tasks, return_exceptions=True)
|
|
|
|
|
|
for result in batch_results:
|
|
|
if isinstance(result, list):
|
|
|
all_contests.extend(result)
|
|
|
elif isinstance(result, Exception):
|
|
|
logging.error(f"Batch scraping error: {result}")
|
|
|
|
|
|
await asyncio.sleep(2)
|
|
|
|
|
|
return all_contests
|
|
|
|
|
|
async def _scrape_single_site(self, url: str) -> List[Contest]:
|
|
|
"""Scrape un site web spécifique"""
|
|
|
try:
|
|
|
content = await self._fetch_with_retry(url)
|
|
|
if not content:
|
|
|
return []
|
|
|
|
|
|
soup = BeautifulSoup(content, 'html.parser')
|
|
|
contests = self._extract_contests_from_soup(soup, url)
|
|
|
|
|
|
logging.info(f"Found {len(contests)} contests on {url}")
|
|
|
return contests
|
|
|
|
|
|
except Exception as e:
|
|
|
logging.error(f"Error scraping {url}: {e}")
|
|
|
return []
|
|
|
|
|
|
async def _fetch_with_retry(self, url: str, max_retries: int = 3) -> Optional[str]:
|
|
|
"""Fetch avec retry et gestion d'erreurs"""
|
|
|
for attempt in range(max_retries):
|
|
|
try:
|
|
|
response = self.session.get(url, timeout=30)
|
|
|
if response.status_code == 200:
|
|
|
return response.text
|
|
|
elif response.status_code == 429:
|
|
|
wait_time = 2 ** attempt * 5
|
|
|
logging.warning(f"Rate limited on {url}, waiting {wait_time}s")
|
|
|
await asyncio.sleep(wait_time)
|
|
|
else:
|
|
|
logging.warning(f"HTTP {response.status_code} for {url}")
|
|
|
|
|
|
except Exception as e:
|
|
|
logging.error(f"Attempt {attempt+1} failed for {url}: {e}")
|
|
|
if attempt < max_retries - 1:
|
|
|
await asyncio.sleep(2 ** attempt)
|
|
|
|
|
|
return None
|
|
|
|
|
|
def _extract_contests_from_soup(self, soup: BeautifulSoup, base_url: str) -> List[Contest]:
|
|
|
"""Extrait les concours d'une page HTML"""
|
|
|
contests = []
|
|
|
|
|
|
|
|
|
selectors = [
|
|
|
'.contest', '.concours', '.jeu', '.competition', '.giveaway',
|
|
|
'[data-contest]', '[data-concours]', '.prize', '.lot',
|
|
|
'article[class*="concours"]', '.entry', '.participate'
|
|
|
]
|
|
|
|
|
|
containers = []
|
|
|
for selector in selectors:
|
|
|
containers.extend(soup.select(selector))
|
|
|
|
|
|
|
|
|
if not containers:
|
|
|
containers = soup.find_all('a', href=re.compile(r'concours|jeu|contest|participate', re.I))
|
|
|
|
|
|
for container in containers[:20]:
|
|
|
try:
|
|
|
contest = self._parse_contest_container(container, base_url)
|
|
|
if contest and self._is_valid_contest(contest):
|
|
|
contests.append(contest)
|
|
|
except Exception as e:
|
|
|
logging.debug(f"Error parsing container: {e}")
|
|
|
|
|
|
return contests
|
|
|
|
|
|
def _parse_contest_container(self, container, base_url: str) -> Optional[Contest]:
|
|
|
"""Parse un conteneur de concours"""
|
|
|
|
|
|
title_selectors = ['h1', 'h2', 'h3', '.title', '.titre', '.contest-title']
|
|
|
title = ""
|
|
|
for selector in title_selectors:
|
|
|
title_elem = container.select_one(selector)
|
|
|
if title_elem:
|
|
|
title = title_elem.get_text(strip=True)
|
|
|
break
|
|
|
|
|
|
if not title:
|
|
|
title = container.get_text(strip=True)[:100]
|
|
|
|
|
|
|
|
|
url = ""
|
|
|
link_elem = container if container.name == 'a' else container.find('a')
|
|
|
if link_elem and link_elem.get('href'):
|
|
|
url = urljoin(base_url, link_elem['href'])
|
|
|
|
|
|
|
|
|
description = container.get_text(strip=True)[:500]
|
|
|
|
|
|
|
|
|
deadline = self._extract_deadline(description)
|
|
|
prize = self._extract_prize(description)
|
|
|
|
|
|
if not title or not url:
|
|
|
return None
|
|
|
|
|
|
return Contest(
|
|
|
title=title[:200],
|
|
|
url=url,
|
|
|
description=description,
|
|
|
source=base_url,
|
|
|
deadline=deadline,
|
|
|
prize=prize,
|
|
|
difficulty_score=self._estimate_difficulty(description)
|
|
|
)
|
|
|
|
|
|
def _extract_deadline(self, text: str) -> Optional[str]:
|
|
|
"""Extrait la date limite du texte"""
|
|
|
patterns = [
|
|
|
r"jusqu[\'']?au (\d{1,2}[\/\-]\d{1,2}[\/\-]\d{2,4})",
|
|
|
r"avant le (\d{1,2}[\/\-]\d{1,2}[\/\-]\d{2,4})",
|
|
|
r"fin le (\d{1,2}[\/\-]\d{1,2}[\/\-]\d{2,4})"
|
|
|
]
|
|
|
|
|
|
for pattern in patterns:
|
|
|
match = re.search(pattern, text, re.I)
|
|
|
if match:
|
|
|
return match.group(1)
|
|
|
return None
|
|
|
|
|
|
def _extract_prize(self, text: str) -> Optional[str]:
|
|
|
"""Extrait le prix du texte"""
|
|
|
patterns = [
|
|
|
r"gagne[rz]?\s+([^.!?]{1,50})",
|
|
|
r"prix[:\s]+([^.!?]{1,50})",
|
|
|
r"lot[:\s]+([^.!?]{1,50})",
|
|
|
r"(\d+\s*CHF|\d+\s*euros?|\d+\s*francs?)"
|
|
|
]
|
|
|
|
|
|
for pattern in patterns:
|
|
|
match = re.search(pattern, text, re.I)
|
|
|
if match:
|
|
|
return match.group(1).strip()
|
|
|
return None
|
|
|
|
|
|
def _estimate_difficulty(self, description: str) -> int:
|
|
|
"""Estime la difficulté de participation (0-10)"""
|
|
|
difficulty = 0
|
|
|
|
|
|
if re.search(r'justifi|motivation|pourquoi|essay', description, re.I):
|
|
|
difficulty += 3
|
|
|
if re.search(r'photo|image|créat', description, re.I):
|
|
|
difficulty += 2
|
|
|
if re.search(r'quiz|question|répond', description, re.I):
|
|
|
difficulty += 1
|
|
|
if re.search(r'partag|social|facebook|twitter', description, re.I):
|
|
|
difficulty += 1
|
|
|
if re.search(r'inscription|compte|profil', description, re.I):
|
|
|
difficulty += 1
|
|
|
|
|
|
return min(difficulty, 10)
|
|
|
|
|
|
def _is_valid_contest(self, contest: Contest) -> bool:
|
|
|
"""Valide qu'un concours est légitime"""
|
|
|
swiss_indicators = [
|
|
|
'suisse', 'switzerland', 'ch', 'romandie', 'genève', 'lausanne',
|
|
|
'zurich', 'bern', 'ouvert en suisse', 'résidents suisses'
|
|
|
]
|
|
|
|
|
|
full_text = (contest.title + " " + contest.description).lower()
|
|
|
has_swiss_access = any(indicator in full_text for indicator in swiss_indicators)
|
|
|
|
|
|
excluded_terms = [
|
|
|
'payant', 'payment', 'carte bancaire', 'spam', 'phishing',
|
|
|
'adult', 'casino', 'bitcoin', 'crypto', 'investment'
|
|
|
]
|
|
|
|
|
|
has_excluded = any(term in full_text for term in excluded_terms)
|
|
|
valid_url = contest.url.startswith(('http://', 'https://'))
|
|
|
|
|
|
return has_swiss_access and not has_excluded and valid_url
|
|
|
|
|
|
def _filter_unique_contests(self, contests: List[Contest]) -> List[Contest]:
|
|
|
"""Filtre les concours uniques et non déjà traités"""
|
|
|
unique_contests = []
|
|
|
seen_urls = set()
|
|
|
|
|
|
for contest in contests:
|
|
|
if contest.url not in seen_urls and not self.db.participation_exists(contest.url):
|
|
|
unique_contests.append(contest)
|
|
|
seen_urls.add(contest.url)
|
|
|
|
|
|
return unique_contests
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class SmartParticipator:
|
|
|
def __init__(self, db_manager: DatabaseManager, response_engine: LocalResponseEngine):
|
|
|
self.db = db_manager
|
|
|
self.response_engine = response_engine
|
|
|
self.personal_info = PERSONAL_INFO
|
|
|
|
|
|
self.field_patterns = {
|
|
|
'prenom': [r'prenom|prénom|first.*name'],
|
|
|
'nom': [r'nom(?!.*prenom)|last.*name|family.*name'],
|
|
|
'email': [r'email|e-mail|courriel'],
|
|
|
'telephone': [r'tel|phone|telephone|téléphone'],
|
|
|
'adresse': [r'adresse|address|rue|street'],
|
|
|
'code_postal': [r'code.postal|zip|postal'],
|
|
|
'ville': [r'ville|city|localité'],
|
|
|
'pays': [r'pays|country|nation'],
|
|
|
'motivation': [r'motivation|pourquoi|why|reason'],
|
|
|
'quiz': [r'question|quiz|réponse|answer']
|
|
|
}
|
|
|
|
|
|
async def participate_in_contest(self, contest: Contest) -> bool:
|
|
|
"""Participe à un concours de manière intelligente"""
|
|
|
async with async_playwright() as p:
|
|
|
browser = await p.chromium.launch(
|
|
|
headless=True,
|
|
|
args=['--no-sandbox', '--disable-blink-features=AutomationControlled']
|
|
|
)
|
|
|
|
|
|
context = await browser.new_context(
|
|
|
user_agent=random.choice(USER_AGENTS),
|
|
|
viewport={'width': 1920, 'height': 1080}
|
|
|
)
|
|
|
|
|
|
page = await context.new_page()
|
|
|
|
|
|
try:
|
|
|
|
|
|
analysis = await self._analyze_form(page, contest.url)
|
|
|
|
|
|
if analysis.estimated_success_rate < 0.3:
|
|
|
logging.warning(f"Low success rate for {contest.url}: {analysis.estimated_success_rate}")
|
|
|
self.db.add_participation(contest, 'skipped_low_success', analysis.estimated_success_rate)
|
|
|
return False
|
|
|
|
|
|
if analysis.requires_captcha:
|
|
|
logging.warning(f"CAPTCHA detected for {contest.url}, skipping")
|
|
|
self.db.add_participation(contest, 'skipped_captcha', analysis.estimated_success_rate)
|
|
|
return False
|
|
|
|
|
|
|
|
|
success = await self._fill_and_submit_form(page, analysis, contest)
|
|
|
|
|
|
status = 'success' if success else 'failed'
|
|
|
self.db.add_participation(contest, status, analysis.estimated_success_rate)
|
|
|
|
|
|
logging.info(f"Participation {'successful' if success else 'failed'} for {contest.title}")
|
|
|
return success
|
|
|
|
|
|
except Exception as e:
|
|
|
logging.error(f"Participation error for {contest.url}: {e}")
|
|
|
self.db.add_participation(contest, 'error', 0.0)
|
|
|
return False
|
|
|
|
|
|
finally:
|
|
|
await browser.close()
|
|
|
|
|
|
async def _analyze_form(self, page: Page, url: str) -> FormAnalysis:
|
|
|
"""Analyse un formulaire de concours"""
|
|
|
try:
|
|
|
await page.goto(url, wait_until='networkidle', timeout=15000)
|
|
|
|
|
|
|
|
|
fields = await self._detect_form_fields(page)
|
|
|
|
|
|
|
|
|
complexity = sum(self._calculate_field_complexity(field) for field in fields)
|
|
|
|
|
|
|
|
|
has_captcha = await self._detect_captcha(page)
|
|
|
has_social_requirements = await self._detect_social_requirements(page)
|
|
|
|
|
|
|
|
|
success_rate = self._estimate_success_rate(fields, complexity, has_captcha)
|
|
|
|
|
|
return FormAnalysis(
|
|
|
fields=fields,
|
|
|
complexity_score=complexity,
|
|
|
estimated_success_rate=success_rate,
|
|
|
requires_captcha=has_captcha,
|
|
|
requires_social_media=has_social_requirements,
|
|
|
form_url=url
|
|
|
)
|
|
|
|
|
|
except Exception as e:
|
|
|
logging.error(f"Form analysis error: {e}")
|
|
|
return FormAnalysis([], 10, 0.0, True, True, url)
|
|
|
|
|
|
async def _detect_form_fields(self, page: Page) -> List[FormField]:
|
|
|
"""Détecte tous les champs de formulaire"""
|
|
|
fields = []
|
|
|
|
|
|
selectors = [
|
|
|
'input[type="text"]', 'input[type="email"]', 'input[type="tel"]',
|
|
|
'input[type="number"]', 'input:not([type])', 'textarea', 'select'
|
|
|
]
|
|
|
|
|
|
for selector in selectors:
|
|
|
elements = await page.query_selector_all(selector)
|
|
|
|
|
|
for element in elements:
|
|
|
try:
|
|
|
field = await self._analyze_single_field(element, page)
|
|
|
if field:
|
|
|
fields.append(field)
|
|
|
except Exception:
|
|
|
continue
|
|
|
|
|
|
return fields
|
|
|
|
|
|
async def _analyze_single_field(self, element, page: Page) -> Optional[FormField]:
|
|
|
"""Analyse un champ individuel"""
|
|
|
try:
|
|
|
field_type = await element.evaluate('el => el.type || el.tagName.toLowerCase()')
|
|
|
name = await element.evaluate('el => el.name || el.id || ""')
|
|
|
placeholder = await element.evaluate('el => el.placeholder || ""')
|
|
|
required = await element.evaluate('el => el.required')
|
|
|
|
|
|
|
|
|
label_text = await self._find_field_label(element, page)
|
|
|
|
|
|
|
|
|
selector = await self._create_unique_selector(element)
|
|
|
|
|
|
return FormField(
|
|
|
selector=selector,
|
|
|
field_type=field_type,
|
|
|
label=label_text,
|
|
|
required=required,
|
|
|
ai_context=f"Name: {name}, Placeholder: {placeholder}, Label: {label_text}"
|
|
|
)
|
|
|
|
|
|
except Exception:
|
|
|
return None
|
|
|
|
|
|
async def _find_field_label(self, element, page: Page) -> str:
|
|
|
"""Trouve le label associé à un champ"""
|
|
|
try:
|
|
|
|
|
|
element_id = await element.evaluate('el => el.id')
|
|
|
if element_id:
|
|
|
label = await page.query_selector(f'label[for="{element_id}"]')
|
|
|
if label:
|
|
|
return await label.inner_text()
|
|
|
|
|
|
|
|
|
parent_label = await element.evaluate('''
|
|
|
el => {
|
|
|
let parent = el.parentElement;
|
|
|
while (parent && parent.tagName !== 'BODY') {
|
|
|
if (parent.tagName === 'LABEL') {
|
|
|
return parent.innerText;
|
|
|
}
|
|
|
parent = parent.parentElement;
|
|
|
}
|
|
|
return '';
|
|
|
}
|
|
|
''')
|
|
|
|
|
|
if parent_label:
|
|
|
return parent_label.strip()
|
|
|
|
|
|
|
|
|
prev_text = await element.evaluate('''
|
|
|
el => {
|
|
|
const prev = el.previousElementSibling;
|
|
|
return prev ? prev.innerText : '';
|
|
|
}
|
|
|
''')
|
|
|
|
|
|
return prev_text.strip()
|
|
|
|
|
|
except Exception:
|
|
|
return ""
|
|
|
|
|
|
async def _create_unique_selector(self, element) -> str:
|
|
|
"""Crée un sélecteur CSS unique"""
|
|
|
|
|
|
element_id = await element.evaluate('el => el.id')
|
|
|
if element_id:
|
|
|
return f'#{element_id}'
|
|
|
|
|
|
name = await element.evaluate('el => el.name')
|
|
|
if name:
|
|
|
return f'[name="{name}"]'
|
|
|
|
|
|
class_name = await element.evaluate('el => el.className')
|
|
|
tag_name = await element.evaluate('el => el.tagName.toLowerCase()')
|
|
|
|
|
|
if class_name:
|
|
|
return f'{tag_name}.{class_name.split()[0]}'
|
|
|
|
|
|
|
|
|
return f'{tag_name}:nth-of-type(1)'
|
|
|
|
|
|
def _calculate_field_complexity(self, field: FormField) -> int:
|
|
|
"""Calcule la complexité d'un champ"""
|
|
|
complexity = 1
|
|
|
|
|
|
if field.field_type == 'textarea':
|
|
|
complexity += 3
|
|
|
elif field.field_type == 'select':
|
|
|
complexity += 2
|
|
|
elif field.required:
|
|
|
complexity += 1
|
|
|
|
|
|
if re.search(r'motivation|justifi|pourquoi', field.label, re.I):
|
|
|
complexity += 3
|
|
|
elif re.search(r'quiz|question', field.label, re.I):
|
|
|
complexity += 2
|
|
|
|
|
|
return complexity
|
|
|
|
|
|
async def _detect_captcha(self, page: Page) -> bool:
|
|
|
"""Détecte la présence de CAPTCHA"""
|
|
|
captcha_selectors = [
|
|
|
'.g-recaptcha', '.h-captcha', '#captcha', '.captcha',
|
|
|
'iframe[src*="recaptcha"]', 'iframe[src*="hcaptcha"]'
|
|
|
]
|
|
|
|
|
|
for selector in captcha_selectors:
|
|
|
element = await page.query_selector(selector)
|
|
|
if element:
|
|
|
return True
|
|
|
|
|
|
return False
|
|
|
|
|
|
async def _detect_social_requirements(self, page: Page) -> bool:
|
|
|
"""Détecte les exigences de réseaux sociaux"""
|
|
|
content = await page.content()
|
|
|
social_patterns = [
|
|
|
r'follow.*us', r'partag.*facebook', r'retweet',
|
|
|
r'like.*page', r'subscribe.*channel'
|
|
|
]
|
|
|
|
|
|
for pattern in social_patterns:
|
|
|
if re.search(pattern, content, re.I):
|
|
|
return True
|
|
|
|
|
|
return False
|
|
|
|
|
|
def _estimate_success_rate(self, fields: List[FormField], complexity: int, has_captcha: bool) -> float:
|
|
|
"""Estime le taux de succès"""
|
|
|
base_rate = 0.8
|
|
|
|
|
|
if has_captcha:
|
|
|
base_rate *= 0.1
|
|
|
|
|
|
if complexity > 15:
|
|
|
base_rate *= 0.4
|
|
|
elif complexity > 10:
|
|
|
base_rate *= 0.6
|
|
|
elif complexity > 5:
|
|
|
base_rate *= 0.8
|
|
|
|
|
|
required_fields = [f for f in fields if f.required]
|
|
|
if len(required_fields) <= 3:
|
|
|
base_rate *= 1.1
|
|
|
|
|
|
return min(base_rate, 1.0)
|
|
|
|
|
|
async def _fill_and_submit_form(self, page: Page, analysis: FormAnalysis, contest: Contest) -> bool:
|
|
|
"""Remplit et soumet le formulaire"""
|
|
|
try:
|
|
|
filled_fields = 0
|
|
|
|
|
|
for field in analysis.fields:
|
|
|
try:
|
|
|
|
|
|
await page.wait_for_selector(field.selector, timeout=3000)
|
|
|
element = await page.query_selector(field.selector)
|
|
|
|
|
|
if not element:
|
|
|
continue
|
|
|
|
|
|
|
|
|
value = await self._generate_field_value(field, contest, page)
|
|
|
if not value:
|
|
|
continue
|
|
|
|
|
|
|
|
|
if field.field_type == 'select':
|
|
|
await self._fill_select_field(element, value, page)
|
|
|
else:
|
|
|
await element.fill(value)
|
|
|
|
|
|
filled_fields += 1
|
|
|
await asyncio.sleep(random.uniform(0.3, 0.8))
|
|
|
|
|
|
except Exception as e:
|
|
|
logging.debug(f"Error filling field {field.selector}: {e}")
|
|
|
continue
|
|
|
|
|
|
|
|
|
submit_success = await self._submit_form(page)
|
|
|
|
|
|
logging.info(f"Filled {filled_fields}/{len(analysis.fields)} fields, submitted: {submit_success}")
|
|
|
return filled_fields > 0 and submit_success
|
|
|
|
|
|
except Exception as e:
|
|
|
logging.error(f"Form filling error: {e}")
|
|
|
return False
|
|
|
|
|
|
async def _generate_field_value(self, field: FormField, contest: Contest, page: Page) -> Optional[str]:
|
|
|
"""Génère une valeur pour un champ"""
|
|
|
|
|
|
field_type = self._identify_field_type(field)
|
|
|
|
|
|
|
|
|
personal_mapping = {
|
|
|
'prenom': self.personal_info.prenom,
|
|
|
'nom': self.personal_info.nom,
|
|
|
'email': self.personal_info.email_derivee,
|
|
|
'telephone': self.personal_info.telephone,
|
|
|
'adresse': self.personal_info.adresse,
|
|
|
'code_postal': self.personal_info.code_postal,
|
|
|
'ville': self.personal_info.ville,
|
|
|
'pays': self.personal_info.pays
|
|
|
}
|
|
|
|
|
|
if field_type in personal_mapping:
|
|
|
return personal_mapping[field_type]
|
|
|
|
|
|
|
|
|
if field_type == 'motivation':
|
|
|
return self.response_engine.generate_response(
|
|
|
field.label,
|
|
|
contest.description,
|
|
|
"motivation"
|
|
|
)
|
|
|
elif field_type == 'quiz':
|
|
|
return self.response_engine.generate_response(
|
|
|
field.label,
|
|
|
contest.description,
|
|
|
"quiz"
|
|
|
)
|
|
|
|
|
|
|
|
|
default_values = {
|
|
|
'age': '25',
|
|
|
'genre': 'Monsieur',
|
|
|
'profession': 'Étudiant'
|
|
|
}
|
|
|
|
|
|
for key, value in default_values.items():
|
|
|
if key in field.label.lower():
|
|
|
return value
|
|
|
|
|
|
return None
|
|
|
|
|
|
def _identify_field_type(self, field: FormField) -> str:
|
|
|
"""Identifie le type de champ"""
|
|
|
combined_text = f"{field.ai_context} {field.label}".lower()
|
|
|
|
|
|
for field_type, patterns in self.field_patterns.items():
|
|
|
for pattern in patterns:
|
|
|
if re.search(pattern, combined_text, re.I):
|
|
|
return field_type
|
|
|
|
|
|
return 'unknown'
|
|
|
|
|
|
async def _fill_select_field(self, element, value: str, page: Page):
|
|
|
"""Remplit un champ select"""
|
|
|
try:
|
|
|
options = await element.query_selector_all('option')
|
|
|
|
|
|
for option in options:
|
|
|
option_text = await option.inner_text()
|
|
|
option_value = await option.get_attribute('value')
|
|
|
|
|
|
if (value.lower() in option_text.lower() or
|
|
|
value.lower() in (option_value or "").lower()):
|
|
|
await element.select_option(value=option_value)
|
|
|
return
|
|
|
|
|
|
|
|
|
if options and len(options) > 1:
|
|
|
first_option = await options[1].get_attribute('value')
|
|
|
await element.select_option(value=first_option)
|
|
|
|
|
|
except Exception as e:
|
|
|
logging.debug(f"Select field error: {e}")
|
|
|
|
|
|
async def _submit_form(self, page: Page) -> bool:
|
|
|
"""Soumet le formulaire"""
|
|
|
submit_selectors = [
|
|
|
'input[type="submit"]',
|
|
|
'button[type="submit"]',
|
|
|
'button:has-text("Participer")',
|
|
|
'button:has-text("Envoyer")',
|
|
|
'button:has-text("Valider")',
|
|
|
'.submit-btn',
|
|
|
'.participate-btn'
|
|
|
]
|
|
|
|
|
|
for selector in submit_selectors:
|
|
|
try:
|
|
|
element = await page.query_selector(selector)
|
|
|
if element:
|
|
|
is_visible = await element.is_visible()
|
|
|
is_enabled = await element.is_enabled()
|
|
|
|
|
|
if is_visible and is_enabled:
|
|
|
await element.click()
|
|
|
await page.wait_for_timeout(3000)
|
|
|
return True
|
|
|
|
|
|
except Exception:
|
|
|
continue
|
|
|
|
|
|
return False
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class ContestBotOrchestrator:
|
|
|
def __init__(self):
|
|
|
self.db = DatabaseManager()
|
|
|
self.response_engine = LocalResponseEngine()
|
|
|
self.scraper = None
|
|
|
self.participator = SmartParticipator(self.db, self.response_engine)
|
|
|
|
|
|
async def run_full_cycle(self):
|
|
|
"""Execute un cycle complet de scraping et participation"""
|
|
|
logging.info("Starting full contest bot cycle (sans API)")
|
|
|
|
|
|
try:
|
|
|
|
|
|
async with LocalScraper(self.db) as scraper:
|
|
|
self.scraper = scraper
|
|
|
contests = await scraper.scrape_all_sources()
|
|
|
|
|
|
if not contests:
|
|
|
logging.info("No new contests found")
|
|
|
return
|
|
|
|
|
|
|
|
|
contests.sort(key=lambda x: x.difficulty_score)
|
|
|
|
|
|
|
|
|
participation_count = 0
|
|
|
max_daily_participations = 15
|
|
|
|
|
|
for contest in contests[:max_daily_participations]:
|
|
|
try:
|
|
|
|
|
|
if participation_count > 0:
|
|
|
wait_time = random.uniform(30, 120)
|
|
|
logging.info(f"Waiting {wait_time:.0f}s before next participation")
|
|
|
await asyncio.sleep(wait_time)
|
|
|
|
|
|
success = await self.participator.participate_in_contest(contest)
|
|
|
participation_count += 1
|
|
|
|
|
|
if success:
|
|
|
logging.info(f"SUCCESS: Successfully participated in: {contest.title}")
|
|
|
else:
|
|
|
logging.warning(f"FAILED: Failed to participate in: {contest.title}")
|
|
|
|
|
|
|
|
|
if success:
|
|
|
await asyncio.sleep(random.uniform(60, 180))
|
|
|
|
|
|
except Exception as e:
|
|
|
logging.error(f"Error participating in {contest.title}: {e}")
|
|
|
continue
|
|
|
|
|
|
logging.info(f"Participation cycle completed: {participation_count} attempts")
|
|
|
|
|
|
except Exception as e:
|
|
|
logging.error(f"Full cycle error: {e}")
|
|
|
|
|
|
def generate_report(self) -> str:
|
|
|
"""Génère un rapport simple"""
|
|
|
stats = self.db.get_stats()
|
|
|
|
|
|
today = datetime.now().strftime('%Y-%m-%d')
|
|
|
conn = self.db._get_connection()
|
|
|
|
|
|
today_participations = conn.execute(
|
|
|
"SELECT COUNT(*) FROM participations WHERE date = ?", (today,)
|
|
|
).fetchone()[0]
|
|
|
|
|
|
today_successes = conn.execute(
|
|
|
"SELECT COUNT(*) FROM participations WHERE date = ? AND status = 'success'", (today,)
|
|
|
).fetchone()[0]
|
|
|
|
|
|
success_rate = (today_successes / max(today_participations, 1)) * 100
|
|
|
|
|
|
report = f"""
|
|
|
📊 RAPPORT QUOTIDIEN - {today}
|
|
|
|
|
|
🎯 Aujourd'hui:
|
|
|
• Participations: {today_participations}
|
|
|
• Succès: {today_successes}
|
|
|
• Taux de succès: {success_rate:.1f}%
|
|
|
|
|
|
📈 Total:
|
|
|
• Participations totales: {stats['total_participations']}
|
|
|
• Participations réussies: {stats['successful_participations']}
|
|
|
|
|
|
🌐 Par source:
|
|
|
"""
|
|
|
|
|
|
for source, count in stats['by_source'].items():
|
|
|
report += f" • {source}: {count}\n"
|
|
|
|
|
|
return report
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def run_bot_cycle():
|
|
|
"""Point d'entrée pour le scheduler"""
|
|
|
|
|
|
if sys.platform.startswith('win'):
|
|
|
asyncio.set_event_loop_policy(asyncio.WindowsProactorEventLoopPolicy())
|
|
|
|
|
|
bot = ContestBotOrchestrator()
|
|
|
|
|
|
|
|
|
asyncio.run(bot.run_full_cycle())
|
|
|
|
|
|
|
|
|
report = bot.generate_report()
|
|
|
print(report)
|
|
|
|
|
|
def main():
|
|
|
"""Fonction principale avec scheduler"""
|
|
|
print("🎰 BOT DE CONCOURS SUISSE - VERSION SANS API")
|
|
|
print("=" * 50)
|
|
|
print("✅ Système de réponses locales activé")
|
|
|
print("✅ Scraping direct des sites web")
|
|
|
print("✅ Aucune API externe requise")
|
|
|
print()
|
|
|
|
|
|
|
|
|
if sys.platform.startswith('win'):
|
|
|
asyncio.set_event_loop_policy(asyncio.WindowsProactorEventLoopPolicy())
|
|
|
|
|
|
|
|
|
if len(sys.argv) > 1 and sys.argv[1] == "--run-now":
|
|
|
logging.info("Running immediate cycle")
|
|
|
run_bot_cycle()
|
|
|
return
|
|
|
|
|
|
|
|
|
logging.info("Starting Contest Bot with scheduler")
|
|
|
schedule.every().day.at("08:00").do(run_bot_cycle)
|
|
|
schedule.every().day.at("14:00").do(run_bot_cycle)
|
|
|
|
|
|
|
|
|
print("🚀 Lancement immédiat pour test...")
|
|
|
run_bot_cycle()
|
|
|
|
|
|
|
|
|
logging.info("Scheduler started. Waiting for scheduled tasks...")
|
|
|
|
|
|
while True:
|
|
|
try:
|
|
|
schedule.run_pending()
|
|
|
time.sleep(60)
|
|
|
except KeyboardInterrupt:
|
|
|
logging.info("Bot stopped by user")
|
|
|
break
|
|
|
except Exception as e:
|
|
|
logging.error(f"Scheduler error: {e}")
|
|
|
time.sleep(300)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
|
main()
|
|
|
|