| | |
| | """ |
| | هماهنگکننده جمعآوری داده |
| | Data Collection Orchestrator - Manages all collectors |
| | """ |
| |
|
| | import asyncio |
| | import sys |
| | import os |
| | from pathlib import Path |
| | from typing import Dict, List, Any, Optional |
| | from datetime import datetime, timedelta |
| | import logging |
| |
|
| | |
| | sys.path.insert(0, str(Path(__file__).parent.parent)) |
| |
|
| | from crypto_data_bank.database import get_db |
| | from crypto_data_bank.collectors.free_price_collector import FreePriceCollector |
| | from crypto_data_bank.collectors.rss_news_collector import RSSNewsCollector |
| | from crypto_data_bank.collectors.sentiment_collector import SentimentCollector |
| | from crypto_data_bank.ai.huggingface_models import get_analyzer |
| |
|
| | logging.basicConfig( |
| | level=logging.INFO, |
| | format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' |
| | ) |
| | logger = logging.getLogger(__name__) |
| |
|
| |
|
| | class DataCollectionOrchestrator: |
| | """ |
| | هماهنگکننده اصلی جمعآوری داده |
| | Main orchestrator for data collection from all FREE sources |
| | """ |
| |
|
| | def __init__(self): |
| | self.db = get_db() |
| | self.price_collector = FreePriceCollector() |
| | self.news_collector = RSSNewsCollector() |
| | self.sentiment_collector = SentimentCollector() |
| | self.ai_analyzer = get_analyzer() |
| |
|
| | self.collection_tasks = [] |
| | self.is_running = False |
| |
|
| | |
| | self.intervals = { |
| | 'prices': 60, |
| | 'news': 300, |
| | 'sentiment': 180, |
| | } |
| |
|
| | self.last_collection = { |
| | 'prices': None, |
| | 'news': None, |
| | 'sentiment': None, |
| | } |
| |
|
| | async def collect_and_store_prices(self): |
| | """جمعآوری و ذخیره قیمتها""" |
| | try: |
| | logger.info("💰 Collecting prices from FREE sources...") |
| |
|
| | |
| | all_prices = await self.price_collector.collect_all_free_sources() |
| |
|
| | |
| | aggregated = self.price_collector.aggregate_prices(all_prices) |
| |
|
| | |
| | saved_count = 0 |
| | for price_data in aggregated: |
| | try: |
| | self.db.save_price( |
| | symbol=price_data['symbol'], |
| | price_data=price_data, |
| | source='free_aggregated' |
| | ) |
| | saved_count += 1 |
| | except Exception as e: |
| | logger.error(f"Error saving price for {price_data.get('symbol')}: {e}") |
| |
|
| | self.last_collection['prices'] = datetime.now() |
| |
|
| | logger.info(f"✅ Saved {saved_count}/{len(aggregated)} prices to database") |
| |
|
| | return { |
| | "success": True, |
| | "prices_collected": len(aggregated), |
| | "prices_saved": saved_count, |
| | "timestamp": datetime.now().isoformat() |
| | } |
| |
|
| | except Exception as e: |
| | logger.error(f"❌ Error collecting prices: {e}") |
| | return { |
| | "success": False, |
| | "error": str(e), |
| | "timestamp": datetime.now().isoformat() |
| | } |
| |
|
| | async def collect_and_store_news(self): |
| | """جمعآوری و ذخیره اخبار""" |
| | try: |
| | logger.info("📰 Collecting news from FREE RSS feeds...") |
| |
|
| | |
| | all_news = await self.news_collector.collect_all_rss_feeds() |
| |
|
| | |
| | unique_news = self.news_collector.deduplicate_news(all_news) |
| |
|
| | |
| | if hasattr(self.ai_analyzer, 'analyze_news_batch'): |
| | logger.info("🤖 Analyzing news with AI...") |
| | analyzed_news = await self.ai_analyzer.analyze_news_batch(unique_news[:50]) |
| | else: |
| | analyzed_news = unique_news |
| |
|
| | |
| | saved_count = 0 |
| | for news_item in analyzed_news: |
| | try: |
| | |
| | if 'ai_sentiment' in news_item: |
| | news_item['sentiment'] = news_item['ai_confidence'] |
| |
|
| | self.db.save_news(news_item) |
| | saved_count += 1 |
| | except Exception as e: |
| | logger.error(f"Error saving news: {e}") |
| |
|
| | self.last_collection['news'] = datetime.now() |
| |
|
| | logger.info(f"✅ Saved {saved_count}/{len(analyzed_news)} news items to database") |
| |
|
| | |
| | if analyzed_news and 'ai_sentiment' in analyzed_news[0]: |
| | try: |
| | |
| | trending = self.news_collector.get_trending_coins(analyzed_news) |
| |
|
| | |
| | for trend in trending[:10]: |
| | symbol = trend['coin'] |
| | symbol_news = [n for n in analyzed_news if symbol in n.get('coins', [])] |
| |
|
| | if symbol_news: |
| | agg_sentiment = await self.ai_analyzer.calculate_aggregated_sentiment( |
| | symbol_news, |
| | symbol |
| | ) |
| |
|
| | self.db.save_ai_analysis({ |
| | 'symbol': symbol, |
| | 'analysis_type': 'news_sentiment', |
| | 'model_used': 'finbert', |
| | 'input_data': { |
| | 'news_count': len(symbol_news), |
| | 'mentions': trend['mentions'] |
| | }, |
| | 'output_data': agg_sentiment, |
| | 'confidence': agg_sentiment.get('confidence', 0.0) |
| | }) |
| |
|
| | logger.info(f"✅ Saved AI analysis for {len(trending[:10])} trending coins") |
| |
|
| | except Exception as e: |
| | logger.error(f"Error saving AI analysis: {e}") |
| |
|
| | return { |
| | "success": True, |
| | "news_collected": len(unique_news), |
| | "news_saved": saved_count, |
| | "ai_analyzed": 'ai_sentiment' in analyzed_news[0] if analyzed_news else False, |
| | "timestamp": datetime.now().isoformat() |
| | } |
| |
|
| | except Exception as e: |
| | logger.error(f"❌ Error collecting news: {e}") |
| | return { |
| | "success": False, |
| | "error": str(e), |
| | "timestamp": datetime.now().isoformat() |
| | } |
| |
|
| | async def collect_and_store_sentiment(self): |
| | """جمعآوری و ذخیره احساسات بازار""" |
| | try: |
| | logger.info("😊 Collecting market sentiment from FREE sources...") |
| |
|
| | |
| | sentiment_data = await self.sentiment_collector.collect_all_sentiment_data() |
| |
|
| | |
| | if sentiment_data.get('overall_sentiment'): |
| | self.db.save_sentiment( |
| | sentiment_data['overall_sentiment'], |
| | source='free_aggregated' |
| | ) |
| |
|
| | self.last_collection['sentiment'] = datetime.now() |
| |
|
| | logger.info(f"✅ Saved market sentiment: {sentiment_data['overall_sentiment']['overall_sentiment']}") |
| |
|
| | return { |
| | "success": True, |
| | "sentiment": sentiment_data['overall_sentiment'], |
| | "timestamp": datetime.now().isoformat() |
| | } |
| |
|
| | except Exception as e: |
| | logger.error(f"❌ Error collecting sentiment: {e}") |
| | return { |
| | "success": False, |
| | "error": str(e), |
| | "timestamp": datetime.now().isoformat() |
| | } |
| |
|
| | async def collect_all_data_once(self) -> Dict[str, Any]: |
| | """ |
| | جمعآوری همه دادهها یک بار |
| | Collect all data once (prices, news, sentiment) |
| | """ |
| | logger.info("🚀 Starting full data collection cycle...") |
| |
|
| | results = await asyncio.gather( |
| | self.collect_and_store_prices(), |
| | self.collect_and_store_news(), |
| | self.collect_and_store_sentiment(), |
| | return_exceptions=True |
| | ) |
| |
|
| | return { |
| | "prices": results[0] if not isinstance(results[0], Exception) else {"error": str(results[0])}, |
| | "news": results[1] if not isinstance(results[1], Exception) else {"error": str(results[1])}, |
| | "sentiment": results[2] if not isinstance(results[2], Exception) else {"error": str(results[2])}, |
| | "timestamp": datetime.now().isoformat() |
| | } |
| |
|
| | async def price_collection_loop(self): |
| | """حلقه جمعآوری مستمر قیمتها""" |
| | while self.is_running: |
| | try: |
| | await self.collect_and_store_prices() |
| | await asyncio.sleep(self.intervals['prices']) |
| | except Exception as e: |
| | logger.error(f"Error in price collection loop: {e}") |
| | await asyncio.sleep(60) |
| |
|
| | async def news_collection_loop(self): |
| | """حلقه جمعآوری مستمر اخبار""" |
| | while self.is_running: |
| | try: |
| | await self.collect_and_store_news() |
| | await asyncio.sleep(self.intervals['news']) |
| | except Exception as e: |
| | logger.error(f"Error in news collection loop: {e}") |
| | await asyncio.sleep(300) |
| |
|
| | async def sentiment_collection_loop(self): |
| | """حلقه جمعآوری مستمر احساسات""" |
| | while self.is_running: |
| | try: |
| | await self.collect_and_store_sentiment() |
| | await asyncio.sleep(self.intervals['sentiment']) |
| | except Exception as e: |
| | logger.error(f"Error in sentiment collection loop: {e}") |
| | await asyncio.sleep(180) |
| |
|
| | async def start_background_collection(self): |
| | """ |
| | شروع جمعآوری پسزمینه |
| | Start continuous background data collection |
| | """ |
| | logger.info("🚀 Starting background data collection...") |
| |
|
| | self.is_running = True |
| |
|
| | |
| | self.collection_tasks = [ |
| | asyncio.create_task(self.price_collection_loop()), |
| | asyncio.create_task(self.news_collection_loop()), |
| | asyncio.create_task(self.sentiment_collection_loop()), |
| | ] |
| |
|
| | logger.info("✅ Background collection started!") |
| | logger.info(f" Prices: every {self.intervals['prices']}s") |
| | logger.info(f" News: every {self.intervals['news']}s") |
| | logger.info(f" Sentiment: every {self.intervals['sentiment']}s") |
| |
|
| | async def stop_background_collection(self): |
| | """توقف جمعآوری پسزمینه""" |
| | logger.info("🛑 Stopping background data collection...") |
| |
|
| | self.is_running = False |
| |
|
| | |
| | for task in self.collection_tasks: |
| | task.cancel() |
| |
|
| | |
| | await asyncio.gather(*self.collection_tasks, return_exceptions=True) |
| |
|
| | logger.info("✅ Background collection stopped!") |
| |
|
| | def get_collection_status(self) -> Dict[str, Any]: |
| | """دریافت وضعیت جمعآوری""" |
| | return { |
| | "is_running": self.is_running, |
| | "last_collection": { |
| | k: v.isoformat() if v else None |
| | for k, v in self.last_collection.items() |
| | }, |
| | "intervals": self.intervals, |
| | "database_stats": self.db.get_statistics(), |
| | "timestamp": datetime.now().isoformat() |
| | } |
| |
|
| |
|
| | |
| | _orchestrator = None |
| |
|
| | def get_orchestrator() -> DataCollectionOrchestrator: |
| | """دریافت instance هماهنگکننده""" |
| | global _orchestrator |
| | if _orchestrator is None: |
| | _orchestrator = DataCollectionOrchestrator() |
| | return _orchestrator |
| |
|
| |
|
| | async def main(): |
| | """Test the orchestrator""" |
| | print("\n" + "="*70) |
| | print("🧪 Testing Data Collection Orchestrator") |
| | print("="*70) |
| |
|
| | orchestrator = get_orchestrator() |
| |
|
| | |
| | print("\n1️⃣ Testing Single Collection Cycle...") |
| | results = await orchestrator.collect_all_data_once() |
| |
|
| | print("\n📊 Results:") |
| | print(f" Prices: {results['prices'].get('prices_saved', 0)} saved") |
| | print(f" News: {results['news'].get('news_saved', 0)} saved") |
| | print(f" Sentiment: {results['sentiment'].get('success', False)}") |
| |
|
| | |
| | print("\n2️⃣ Database Statistics:") |
| | stats = orchestrator.get_collection_status() |
| | print(f" Database size: {stats['database_stats'].get('database_size', 0):,} bytes") |
| | print(f" Prices: {stats['database_stats'].get('prices_count', 0)}") |
| | print(f" News: {stats['database_stats'].get('news_count', 0)}") |
| | print(f" AI Analysis: {stats['database_stats'].get('ai_analysis_count', 0)}") |
| |
|
| | print("\n✅ Orchestrator test complete!") |
| |
|
| |
|
| | if __name__ == "__main__": |
| | asyncio.run(main()) |
| |
|