| | """ |
| | Extended Sentiment Collectors |
| | Fetches sentiment data from LunarCrush, Santiment, and other sentiment APIs |
| | """ |
| |
|
| | import asyncio |
| | from datetime import datetime, timezone |
| | from typing import Dict, List, Optional, Any |
| | from utils.api_client import get_client |
| | from utils.logger import setup_logger, log_api_request, log_error |
| |
|
| | logger = setup_logger("sentiment_extended_collector") |
| |
|
| |
|
| | async def get_lunarcrush_global() -> Dict[str, Any]: |
| | """ |
| | Fetch global market sentiment from LunarCrush |
| | |
| | Note: LunarCrush API v3 requires API key |
| | Free tier available with limited requests |
| | |
| | Returns: |
| | Dict with provider, category, data, timestamp, success, error |
| | """ |
| | provider = "LunarCrush" |
| | category = "sentiment" |
| | endpoint = "/public/metrics/global" |
| |
|
| | logger.info(f"Fetching global sentiment from {provider}") |
| |
|
| | try: |
| | client = get_client() |
| |
|
| | |
| | url = "https://lunarcrush.com/api3/public/metrics/global" |
| |
|
| | |
| | response = await client.get(url, timeout=10) |
| |
|
| | |
| | log_api_request( |
| | logger, |
| | provider, |
| | endpoint, |
| | response.get("response_time_ms", 0), |
| | "success" if response["success"] else "error", |
| | response.get("status_code") |
| | ) |
| |
|
| | if not response["success"]: |
| | |
| | logger.warning(f"{provider} - API requires authentication, returning placeholder") |
| | return { |
| | "provider": provider, |
| | "category": category, |
| | "data": { |
| | "status": "placeholder", |
| | "message": "LunarCrush API requires authentication", |
| | "planned_features": [ |
| | "Social media sentiment tracking", |
| | "Galaxy Score (social activity metric)", |
| | "AltRank (relative social dominance)", |
| | "Influencer tracking", |
| | "Social volume and engagement metrics" |
| | ] |
| | }, |
| | "timestamp": datetime.now(timezone.utc).isoformat(), |
| | "success": True, |
| | "error": None, |
| | "is_placeholder": True |
| | } |
| |
|
| | |
| | data = response["data"] |
| |
|
| | sentiment_data = None |
| | if isinstance(data, dict): |
| | sentiment_data = { |
| | "social_volume": data.get("social_volume"), |
| | "social_score": data.get("social_score"), |
| | "market_sentiment": data.get("sentiment"), |
| | "timestamp": data.get("timestamp") |
| | } |
| |
|
| | logger.info(f"{provider} - {endpoint} - Retrieved sentiment data") |
| |
|
| | return { |
| | "provider": provider, |
| | "category": category, |
| | "data": sentiment_data, |
| | "timestamp": datetime.now(timezone.utc).isoformat(), |
| | "success": True, |
| | "error": None, |
| | "response_time_ms": response.get("response_time_ms", 0) |
| | } |
| |
|
| | except Exception as e: |
| | error_msg = f"Unexpected error: {str(e)}" |
| | log_error(logger, provider, "exception", error_msg, endpoint, exc_info=True) |
| | return { |
| | "provider": provider, |
| | "category": category, |
| | "data": { |
| | "status": "placeholder", |
| | "message": f"LunarCrush integration error: {str(e)}" |
| | }, |
| | "timestamp": datetime.now(timezone.utc).isoformat(), |
| | "success": True, |
| | "error": None, |
| | "is_placeholder": True |
| | } |
| |
|
| |
|
| | async def get_santiment_metrics() -> Dict[str, Any]: |
| | """ |
| | Fetch sentiment metrics from Santiment |
| | |
| | Note: Santiment API requires authentication |
| | Provides on-chain, social, and development activity metrics |
| | |
| | Returns: |
| | Dict with provider, category, data, timestamp, success, error |
| | """ |
| | provider = "Santiment" |
| | category = "sentiment" |
| | endpoint = "/graphql" |
| |
|
| | logger.info(f"Fetching sentiment metrics from {provider} (placeholder)") |
| |
|
| | try: |
| | |
| | |
| |
|
| | placeholder_data = { |
| | "status": "placeholder", |
| | "message": "Santiment API requires authentication and GraphQL queries", |
| | "planned_metrics": [ |
| | "Social volume and trends", |
| | "Development activity", |
| | "Network growth", |
| | "Exchange flow", |
| | "MVRV ratio", |
| | "Daily active addresses", |
| | "Token age consumed", |
| | "Crowd sentiment" |
| | ], |
| | "note": "Requires Santiment API key and SAN tokens for full access" |
| | } |
| |
|
| | logger.info(f"{provider} - {endpoint} - Placeholder data returned") |
| |
|
| | return { |
| | "provider": provider, |
| | "category": category, |
| | "data": placeholder_data, |
| | "timestamp": datetime.now(timezone.utc).isoformat(), |
| | "success": True, |
| | "error": None, |
| | "is_placeholder": True |
| | } |
| |
|
| | except Exception as e: |
| | error_msg = f"Unexpected error: {str(e)}" |
| | log_error(logger, provider, "exception", error_msg, endpoint, exc_info=True) |
| | return { |
| | "provider": provider, |
| | "category": category, |
| | "data": None, |
| | "timestamp": datetime.now(timezone.utc).isoformat(), |
| | "success": False, |
| | "error": error_msg, |
| | "error_type": "exception" |
| | } |
| |
|
| |
|
| | async def get_cryptoquant_sentiment() -> Dict[str, Any]: |
| | """ |
| | Fetch on-chain sentiment from CryptoQuant |
| | |
| | Returns: |
| | Dict with provider, category, data, timestamp, success, error |
| | """ |
| | provider = "CryptoQuant" |
| | category = "sentiment" |
| | endpoint = "/sentiment" |
| |
|
| | logger.info(f"Fetching sentiment from {provider} (placeholder)") |
| |
|
| | try: |
| | |
| | |
| |
|
| | placeholder_data = { |
| | "status": "placeholder", |
| | "message": "CryptoQuant API requires authentication", |
| | "planned_metrics": [ |
| | "Exchange reserves", |
| | "Miner flows", |
| | "Whale transactions", |
| | "Stablecoin supply ratio", |
| | "Funding rates", |
| | "Open interest" |
| | ] |
| | } |
| |
|
| | return { |
| | "provider": provider, |
| | "category": category, |
| | "data": placeholder_data, |
| | "timestamp": datetime.now(timezone.utc).isoformat(), |
| | "success": True, |
| | "error": None, |
| | "is_placeholder": True |
| | } |
| |
|
| | except Exception as e: |
| | return { |
| | "provider": provider, |
| | "category": category, |
| | "data": None, |
| | "timestamp": datetime.now(timezone.utc).isoformat(), |
| | "success": False, |
| | "error": str(e), |
| | "error_type": "exception" |
| | } |
| |
|
| |
|
| | async def get_augmento_signals() -> Dict[str, Any]: |
| | """ |
| | Fetch market sentiment signals from Augmento.ai |
| | |
| | Returns: |
| | Dict with provider, category, data, timestamp, success, error |
| | """ |
| | provider = "Augmento" |
| | category = "sentiment" |
| | endpoint = "/signals" |
| |
|
| | logger.info(f"Fetching sentiment signals from {provider} (placeholder)") |
| |
|
| | try: |
| | |
| | |
| |
|
| | placeholder_data = { |
| | "status": "placeholder", |
| | "message": "Augmento API requires authentication", |
| | "planned_features": [ |
| | "AI-powered sentiment signals", |
| | "Topic extraction from social media", |
| | "Emerging trend detection", |
| | "Sentiment momentum indicators" |
| | ] |
| | } |
| |
|
| | return { |
| | "provider": provider, |
| | "category": category, |
| | "data": placeholder_data, |
| | "timestamp": datetime.now(timezone.utc).isoformat(), |
| | "success": True, |
| | "error": None, |
| | "is_placeholder": True |
| | } |
| |
|
| | except Exception as e: |
| | return { |
| | "provider": provider, |
| | "category": category, |
| | "data": None, |
| | "timestamp": datetime.now(timezone.utc).isoformat(), |
| | "success": False, |
| | "error": str(e), |
| | "error_type": "exception" |
| | } |
| |
|
| |
|
| | async def get_thetie_sentiment() -> Dict[str, Any]: |
| | """ |
| | Fetch sentiment data from TheTie.io |
| | |
| | Returns: |
| | Dict with provider, category, data, timestamp, success, error |
| | """ |
| | provider = "TheTie" |
| | category = "sentiment" |
| | endpoint = "/sentiment" |
| |
|
| | logger.info(f"Fetching sentiment from {provider} (placeholder)") |
| |
|
| | try: |
| | |
| | |
| |
|
| | placeholder_data = { |
| | "status": "placeholder", |
| | "message": "TheTie API requires authentication", |
| | "planned_metrics": [ |
| | "Twitter sentiment scores", |
| | "Social media momentum", |
| | "Influencer tracking", |
| | "Sentiment trends over time" |
| | ] |
| | } |
| |
|
| | return { |
| | "provider": provider, |
| | "category": category, |
| | "data": placeholder_data, |
| | "timestamp": datetime.now(timezone.utc).isoformat(), |
| | "success": True, |
| | "error": None, |
| | "is_placeholder": True |
| | } |
| |
|
| | except Exception as e: |
| | return { |
| | "provider": provider, |
| | "category": category, |
| | "data": None, |
| | "timestamp": datetime.now(timezone.utc).isoformat(), |
| | "success": False, |
| | "error": str(e), |
| | "error_type": "exception" |
| | } |
| |
|
| |
|
| | async def get_coinmarketcal_events() -> Dict[str, Any]: |
| | """ |
| | Fetch upcoming crypto events from CoinMarketCal (free API) |
| | |
| | Returns: |
| | Dict with provider, category, data, timestamp, success, error |
| | """ |
| | provider = "CoinMarketCal" |
| | category = "sentiment" |
| | endpoint = "/events" |
| |
|
| | logger.info(f"Fetching events from {provider}") |
| |
|
| | try: |
| | client = get_client() |
| |
|
| | |
| | url = "https://developers.coinmarketcal.com/v1/events" |
| |
|
| | params = { |
| | "page": 1, |
| | "max": 20, |
| | "showOnly": "hot_events" |
| | } |
| |
|
| | |
| | response = await client.get(url, params=params, timeout=10) |
| |
|
| | |
| | log_api_request( |
| | logger, |
| | provider, |
| | endpoint, |
| | response.get("response_time_ms", 0), |
| | "success" if response["success"] else "error", |
| | response.get("status_code") |
| | ) |
| |
|
| | if not response["success"]: |
| | |
| | logger.warning(f"{provider} - API may require authentication, returning placeholder") |
| | return { |
| | "provider": provider, |
| | "category": category, |
| | "data": { |
| | "status": "placeholder", |
| | "message": "CoinMarketCal API may require authentication", |
| | "planned_features": [ |
| | "Upcoming crypto events calendar", |
| | "Project updates and announcements", |
| | "Conferences and meetups", |
| | "Hard forks and mainnet launches" |
| | ] |
| | }, |
| | "timestamp": datetime.now(timezone.utc).isoformat(), |
| | "success": True, |
| | "error": None, |
| | "is_placeholder": True |
| | } |
| |
|
| | |
| | data = response["data"] |
| |
|
| | events_data = None |
| | if isinstance(data, dict) and "body" in data: |
| | events = data["body"] |
| |
|
| | events_data = { |
| | "total_events": len(events) if isinstance(events, list) else 0, |
| | "upcoming_events": [ |
| | { |
| | "title": event.get("title", {}).get("en"), |
| | "coins": [coin.get("symbol") for coin in event.get("coins", [])], |
| | "date": event.get("date_event"), |
| | "proof": event.get("proof"), |
| | "source": event.get("source") |
| | } |
| | for event in (events[:10] if isinstance(events, list) else []) |
| | ] |
| | } |
| |
|
| | logger.info(f"{provider} - {endpoint} - Retrieved {events_data.get('total_events', 0)} events") |
| |
|
| | return { |
| | "provider": provider, |
| | "category": category, |
| | "data": events_data, |
| | "timestamp": datetime.now(timezone.utc).isoformat(), |
| | "success": True, |
| | "error": None, |
| | "response_time_ms": response.get("response_time_ms", 0) |
| | } |
| |
|
| | except Exception as e: |
| | error_msg = f"Unexpected error: {str(e)}" |
| | log_error(logger, provider, "exception", error_msg, endpoint, exc_info=True) |
| | return { |
| | "provider": provider, |
| | "category": category, |
| | "data": { |
| | "status": "placeholder", |
| | "message": f"CoinMarketCal integration error: {str(e)}" |
| | }, |
| | "timestamp": datetime.now(timezone.utc).isoformat(), |
| | "success": True, |
| | "error": None, |
| | "is_placeholder": True |
| | } |
| |
|
| |
|
| | async def collect_extended_sentiment_data() -> List[Dict[str, Any]]: |
| | """ |
| | Main function to collect extended sentiment data from all sources |
| | |
| | Returns: |
| | List of results from all sentiment collectors |
| | """ |
| | logger.info("Starting extended sentiment data collection from all sources") |
| |
|
| | |
| | results = await asyncio.gather( |
| | get_lunarcrush_global(), |
| | get_santiment_metrics(), |
| | get_cryptoquant_sentiment(), |
| | get_augmento_signals(), |
| | get_thetie_sentiment(), |
| | get_coinmarketcal_events(), |
| | return_exceptions=True |
| | ) |
| |
|
| | |
| | processed_results = [] |
| | for result in results: |
| | if isinstance(result, Exception): |
| | logger.error(f"Collector failed with exception: {str(result)}") |
| | processed_results.append({ |
| | "provider": "Unknown", |
| | "category": "sentiment", |
| | "data": None, |
| | "timestamp": datetime.now(timezone.utc).isoformat(), |
| | "success": False, |
| | "error": str(result), |
| | "error_type": "exception" |
| | }) |
| | else: |
| | processed_results.append(result) |
| |
|
| | |
| | successful = sum(1 for r in processed_results if r.get("success", False)) |
| | placeholder_count = sum(1 for r in processed_results if r.get("is_placeholder", False)) |
| |
|
| | logger.info( |
| | f"Extended sentiment collection complete: {successful}/{len(processed_results)} successful " |
| | f"({placeholder_count} placeholders)" |
| | ) |
| |
|
| | return processed_results |
| |
|
| |
|
| | |
| | if __name__ == "__main__": |
| | async def main(): |
| | results = await collect_extended_sentiment_data() |
| |
|
| | print("\n=== Extended Sentiment Data Collection Results ===") |
| | for result in results: |
| | print(f"\nProvider: {result['provider']}") |
| | print(f"Success: {result['success']}") |
| | print(f"Is Placeholder: {result.get('is_placeholder', False)}") |
| |
|
| | if result['success']: |
| | data = result.get('data', {}) |
| | if isinstance(data, dict): |
| | if data.get('status') == 'placeholder': |
| | print(f"Status: {data.get('message', 'N/A')}") |
| | else: |
| | print(f"Data keys: {list(data.keys())}") |
| | else: |
| | print(f"Error: {result.get('error', 'Unknown')}") |
| |
|
| | asyncio.run(main()) |
| |
|