| | """ |
| | Whale Tracking Collectors |
| | Fetches whale transaction data from WhaleAlert, Arkham Intelligence, and other sources |
| | """ |
| |
|
| | import asyncio |
| | from datetime import datetime, timezone |
| | from typing import Dict, List, Optional, Any |
| | from utils.api_client import get_client |
| | from utils.logger import setup_logger, log_api_request, log_error |
| |
|
| | logger = setup_logger("whale_tracking_collector") |
| |
|
| |
|
| | async def get_whalealert_transactions(api_key: Optional[str] = None) -> Dict[str, Any]: |
| | """ |
| | Fetch recent large crypto transactions from WhaleAlert |
| | |
| | Args: |
| | api_key: WhaleAlert API key |
| | |
| | Returns: |
| | Dict with provider, category, data, timestamp, success, error |
| | """ |
| | provider = "WhaleAlert" |
| | category = "whale_tracking" |
| | endpoint = "/transactions" |
| |
|
| | logger.info(f"Fetching whale transactions from {provider}") |
| |
|
| | try: |
| | if not api_key: |
| | error_msg = f"API key required for {provider}" |
| | log_error(logger, provider, "missing_api_key", error_msg, endpoint) |
| | return { |
| | "provider": provider, |
| | "category": category, |
| | "data": None, |
| | "timestamp": datetime.now(timezone.utc).isoformat(), |
| | "success": False, |
| | "error": error_msg, |
| | "error_type": "missing_api_key" |
| | } |
| |
|
| | client = get_client() |
| |
|
| | |
| | url = "https://api.whale-alert.io/v1/transactions" |
| |
|
| | |
| | now = int(datetime.now(timezone.utc).timestamp()) |
| | start_time = now - 3600 |
| |
|
| | params = { |
| | "api_key": api_key, |
| | "start": start_time, |
| | "limit": 100 |
| | } |
| |
|
| | |
| | response = await client.get(url, params=params, timeout=15) |
| |
|
| | |
| | log_api_request( |
| | logger, |
| | provider, |
| | endpoint, |
| | response.get("response_time_ms", 0), |
| | "success" if response["success"] else "error", |
| | response.get("status_code") |
| | ) |
| |
|
| | if not response["success"]: |
| | error_msg = response.get("error_message", "Unknown error") |
| | log_error(logger, provider, response.get("error_type", "unknown"), error_msg, endpoint) |
| | return { |
| | "provider": provider, |
| | "category": category, |
| | "data": None, |
| | "timestamp": datetime.now(timezone.utc).isoformat(), |
| | "success": False, |
| | "error": error_msg, |
| | "error_type": response.get("error_type") |
| | } |
| |
|
| | |
| | data = response["data"] |
| |
|
| | |
| | whale_data = None |
| | if isinstance(data, dict) and "transactions" in data: |
| | transactions = data["transactions"] |
| |
|
| | |
| | total_value_usd = sum(tx.get("amount_usd", 0) for tx in transactions) |
| | symbols = set(tx.get("symbol", "unknown") for tx in transactions) |
| |
|
| | whale_data = { |
| | "transaction_count": len(transactions), |
| | "total_value_usd": round(total_value_usd, 2), |
| | "unique_symbols": list(symbols), |
| | "time_range_hours": 1, |
| | "largest_tx": max(transactions, key=lambda x: x.get("amount_usd", 0)) if transactions else None, |
| | "transactions": transactions[:10] |
| | } |
| |
|
| | logger.info( |
| | f"{provider} - {endpoint} - Retrieved {whale_data.get('transaction_count', 0)} transactions, " |
| | f"Total value: ${whale_data.get('total_value_usd', 0):,.0f}" if whale_data else "No data" |
| | ) |
| |
|
| | return { |
| | "provider": provider, |
| | "category": category, |
| | "data": whale_data, |
| | "timestamp": datetime.now(timezone.utc).isoformat(), |
| | "success": True, |
| | "error": None, |
| | "response_time_ms": response.get("response_time_ms", 0) |
| | } |
| |
|
| | except Exception as e: |
| | error_msg = f"Unexpected error: {str(e)}" |
| | log_error(logger, provider, "exception", error_msg, endpoint, exc_info=True) |
| | return { |
| | "provider": provider, |
| | "category": category, |
| | "data": None, |
| | "timestamp": datetime.now(timezone.utc).isoformat(), |
| | "success": False, |
| | "error": error_msg, |
| | "error_type": "exception" |
| | } |
| |
|
| |
|
| | async def get_arkham_intel() -> Dict[str, Any]: |
| | """ |
| | Fetch blockchain intelligence data from Arkham Intelligence |
| | |
| | Note: Arkham requires authentication and may not have a public API. |
| | This is a placeholder implementation that should be extended with proper API access. |
| | |
| | Returns: |
| | Dict with provider, category, data, timestamp, success, error |
| | """ |
| | provider = "Arkham" |
| | category = "whale_tracking" |
| | endpoint = "/intelligence" |
| |
|
| | logger.info(f"Fetching intelligence data from {provider} (placeholder)") |
| |
|
| | try: |
| | |
| | |
| | |
| |
|
| | placeholder_data = { |
| | "status": "placeholder", |
| | "message": "Arkham Intelligence API not yet implemented", |
| | "planned_features": [ |
| | "Wallet address labeling", |
| | "Entity tracking and attribution", |
| | "Transaction flow analysis", |
| | "Dark web marketplace monitoring", |
| | "Exchange flow tracking" |
| | ], |
| | "note": "Requires Arkham API access or partnership" |
| | } |
| |
|
| | logger.info(f"{provider} - {endpoint} - Placeholder data returned") |
| |
|
| | return { |
| | "provider": provider, |
| | "category": category, |
| | "data": placeholder_data, |
| | "timestamp": datetime.now(timezone.utc).isoformat(), |
| | "success": True, |
| | "error": None, |
| | "is_placeholder": True |
| | } |
| |
|
| | except Exception as e: |
| | error_msg = f"Unexpected error: {str(e)}" |
| | log_error(logger, provider, "exception", error_msg, endpoint, exc_info=True) |
| | return { |
| | "provider": provider, |
| | "category": category, |
| | "data": None, |
| | "timestamp": datetime.now(timezone.utc).isoformat(), |
| | "success": False, |
| | "error": error_msg, |
| | "error_type": "exception" |
| | } |
| |
|
| |
|
| | async def get_clankapp_whales() -> Dict[str, Any]: |
| | """ |
| | Fetch whale tracking data from ClankApp |
| | |
| | Returns: |
| | Dict with provider, category, data, timestamp, success, error |
| | """ |
| | provider = "ClankApp" |
| | category = "whale_tracking" |
| | endpoint = "/whales" |
| |
|
| | logger.info(f"Fetching whale data from {provider}") |
| |
|
| | try: |
| | client = get_client() |
| |
|
| | |
| | |
| | url = "https://clankapp.com/api/v1/whales" |
| |
|
| | |
| | response = await client.get(url, timeout=10) |
| |
|
| | |
| | log_api_request( |
| | logger, |
| | provider, |
| | endpoint, |
| | response.get("response_time_ms", 0), |
| | "success" if response["success"] else "error", |
| | response.get("status_code") |
| | ) |
| |
|
| | if not response["success"]: |
| | |
| | logger.warning(f"{provider} - API not available, returning placeholder") |
| | return { |
| | "provider": provider, |
| | "category": category, |
| | "data": { |
| | "status": "placeholder", |
| | "message": "ClankApp API not accessible or requires authentication", |
| | "planned_features": [ |
| | "Whale wallet tracking", |
| | "Large transaction alerts", |
| | "Portfolio tracking" |
| | ] |
| | }, |
| | "timestamp": datetime.now(timezone.utc).isoformat(), |
| | "success": True, |
| | "error": None, |
| | "is_placeholder": True |
| | } |
| |
|
| | |
| | data = response["data"] |
| |
|
| | logger.info(f"{provider} - {endpoint} - Data retrieved successfully") |
| |
|
| | return { |
| | "provider": provider, |
| | "category": category, |
| | "data": data, |
| | "timestamp": datetime.now(timezone.utc).isoformat(), |
| | "success": True, |
| | "error": None, |
| | "response_time_ms": response.get("response_time_ms", 0) |
| | } |
| |
|
| | except Exception as e: |
| | error_msg = f"Unexpected error: {str(e)}" |
| | log_error(logger, provider, "exception", error_msg, endpoint, exc_info=True) |
| | return { |
| | "provider": provider, |
| | "category": category, |
| | "data": { |
| | "status": "placeholder", |
| | "message": f"ClankApp integration error: {str(e)}" |
| | }, |
| | "timestamp": datetime.now(timezone.utc).isoformat(), |
| | "success": True, |
| | "error": None, |
| | "is_placeholder": True |
| | } |
| |
|
| |
|
| | async def get_bitquery_whale_transactions() -> Dict[str, Any]: |
| | """ |
| | Fetch large transactions using BitQuery GraphQL API |
| | |
| | Returns: |
| | Dict with provider, category, data, timestamp, success, error |
| | """ |
| | provider = "BitQuery" |
| | category = "whale_tracking" |
| | endpoint = "/graphql" |
| |
|
| | logger.info(f"Fetching whale transactions from {provider}") |
| |
|
| | try: |
| | client = get_client() |
| |
|
| | |
| | url = "https://graphql.bitquery.io" |
| |
|
| | |
| | query = """ |
| | { |
| | ethereum(network: ethereum) { |
| | transfers( |
| | amount: {gt: 100000} |
| | options: {limit: 10, desc: "amount"} |
| | ) { |
| | transaction { |
| | hash |
| | } |
| | amount |
| | currency { |
| | symbol |
| | name |
| | } |
| | sender { |
| | address |
| | } |
| | receiver { |
| | address |
| | } |
| | block { |
| | timestamp { |
| | iso8601 |
| | } |
| | } |
| | } |
| | } |
| | } |
| | """ |
| |
|
| | payload = {"query": query} |
| | headers = {"Content-Type": "application/json"} |
| |
|
| | |
| | response = await client.post(url, json=payload, headers=headers, timeout=15) |
| |
|
| | |
| | log_api_request( |
| | logger, |
| | provider, |
| | endpoint, |
| | response.get("response_time_ms", 0), |
| | "success" if response["success"] else "error", |
| | response.get("status_code") |
| | ) |
| |
|
| | if not response["success"]: |
| | |
| | logger.warning(f"{provider} - API request failed, returning placeholder") |
| | return { |
| | "provider": provider, |
| | "category": category, |
| | "data": { |
| | "status": "placeholder", |
| | "message": "BitQuery API requires authentication", |
| | "planned_features": [ |
| | "Large transaction tracking via GraphQL", |
| | "Multi-chain whale monitoring", |
| | "Token transfer analytics" |
| | ] |
| | }, |
| | "timestamp": datetime.now(timezone.utc).isoformat(), |
| | "success": True, |
| | "error": None, |
| | "is_placeholder": True |
| | } |
| |
|
| | |
| | data = response["data"] |
| |
|
| | whale_data = None |
| | if isinstance(data, dict) and "data" in data: |
| | transfers = data.get("data", {}).get("ethereum", {}).get("transfers", []) |
| |
|
| | if transfers: |
| | total_value = sum(t.get("amount", 0) for t in transfers) |
| |
|
| | whale_data = { |
| | "transaction_count": len(transfers), |
| | "total_value": round(total_value, 2), |
| | "largest_transfers": transfers[:5] |
| | } |
| |
|
| | logger.info( |
| | f"{provider} - {endpoint} - Retrieved {whale_data.get('transaction_count', 0)} large transactions" |
| | if whale_data else f"{provider} - {endpoint} - No data" |
| | ) |
| |
|
| | return { |
| | "provider": provider, |
| | "category": category, |
| | "data": whale_data or {"status": "no_data", "message": "No large transactions found"}, |
| | "timestamp": datetime.now(timezone.utc).isoformat(), |
| | "success": True, |
| | "error": None, |
| | "response_time_ms": response.get("response_time_ms", 0) |
| | } |
| |
|
| | except Exception as e: |
| | error_msg = f"Unexpected error: {str(e)}" |
| | log_error(logger, provider, "exception", error_msg, endpoint, exc_info=True) |
| | return { |
| | "provider": provider, |
| | "category": category, |
| | "data": { |
| | "status": "placeholder", |
| | "message": f"BitQuery integration error: {str(e)}" |
| | }, |
| | "timestamp": datetime.now(timezone.utc).isoformat(), |
| | "success": True, |
| | "error": None, |
| | "is_placeholder": True |
| | } |
| |
|
| |
|
| | async def collect_whale_tracking_data(whalealert_key: Optional[str] = None) -> List[Dict[str, Any]]: |
| | """ |
| | Main function to collect whale tracking data from all sources |
| | |
| | Args: |
| | whalealert_key: WhaleAlert API key |
| | |
| | Returns: |
| | List of results from all whale tracking collectors |
| | """ |
| | logger.info("Starting whale tracking data collection from all sources") |
| |
|
| | |
| | results = await asyncio.gather( |
| | get_whalealert_transactions(whalealert_key), |
| | get_arkham_intel(), |
| | get_clankapp_whales(), |
| | get_bitquery_whale_transactions(), |
| | return_exceptions=True |
| | ) |
| |
|
| | |
| | processed_results = [] |
| | for result in results: |
| | if isinstance(result, Exception): |
| | logger.error(f"Collector failed with exception: {str(result)}") |
| | processed_results.append({ |
| | "provider": "Unknown", |
| | "category": "whale_tracking", |
| | "data": None, |
| | "timestamp": datetime.now(timezone.utc).isoformat(), |
| | "success": False, |
| | "error": str(result), |
| | "error_type": "exception" |
| | }) |
| | else: |
| | processed_results.append(result) |
| |
|
| | |
| | successful = sum(1 for r in processed_results if r.get("success", False)) |
| | placeholder_count = sum(1 for r in processed_results if r.get("is_placeholder", False)) |
| |
|
| | logger.info( |
| | f"Whale tracking collection complete: {successful}/{len(processed_results)} successful " |
| | f"({placeholder_count} placeholders)" |
| | ) |
| |
|
| | return processed_results |
| |
|
| |
|
| | class WhaleTrackingCollector: |
| | """ |
| | Whale Tracking Collector class for WebSocket streaming interface |
| | Wraps the standalone whale tracking collection functions |
| | """ |
| |
|
| | def __init__(self, config: Any = None): |
| | """ |
| | Initialize the whale tracking collector |
| | |
| | Args: |
| | config: Configuration object (optional, for compatibility) |
| | """ |
| | self.config = config |
| | self.logger = logger |
| |
|
| | async def collect(self) -> Dict[str, Any]: |
| | """ |
| | Collect whale tracking data from all sources |
| | |
| | Returns: |
| | Dict with aggregated whale tracking data |
| | """ |
| | import os |
| | whalealert_key = os.getenv("WHALEALERT_API_KEY") |
| | results = await collect_whale_tracking_data(whalealert_key) |
| |
|
| | |
| | aggregated = { |
| | "large_transactions": [], |
| | "whale_wallets": [], |
| | "total_volume": 0, |
| | "alert_threshold": 1000000, |
| | "alerts": [], |
| | "timestamp": datetime.now(timezone.utc).isoformat() |
| | } |
| |
|
| | for result in results: |
| | if result.get("success") and result.get("data"): |
| | provider = result.get("provider", "unknown") |
| | data = result["data"] |
| |
|
| | |
| | if isinstance(data, dict) and data.get("status") == "placeholder": |
| | continue |
| |
|
| | |
| | if provider == "WhaleAlert" and isinstance(data, dict): |
| | transactions = data.get("transactions", []) |
| | for tx in transactions: |
| | aggregated["large_transactions"].append({ |
| | "amount": tx.get("amount", 0), |
| | "amount_usd": tx.get("amount_usd", 0), |
| | "symbol": tx.get("symbol", "unknown"), |
| | "from": tx.get("from", {}).get("owner", "unknown"), |
| | "to": tx.get("to", {}).get("owner", "unknown"), |
| | "timestamp": tx.get("timestamp"), |
| | "source": provider |
| | }) |
| | aggregated["total_volume"] += data.get("total_value_usd", 0) |
| |
|
| | |
| | elif isinstance(data, dict): |
| | tx_count = data.get("transaction_count", 0) |
| | total_value = data.get("total_value_usd", data.get("total_value", 0)) |
| | aggregated["total_volume"] += total_value |
| |
|
| | return aggregated |
| |
|
| |
|
| | |
| | if __name__ == "__main__": |
| | async def main(): |
| | import os |
| |
|
| | whalealert_key = os.getenv("WHALEALERT_API_KEY") |
| |
|
| | results = await collect_whale_tracking_data(whalealert_key) |
| |
|
| | print("\n=== Whale Tracking Data Collection Results ===") |
| | for result in results: |
| | print(f"\nProvider: {result['provider']}") |
| | print(f"Success: {result['success']}") |
| | print(f"Is Placeholder: {result.get('is_placeholder', False)}") |
| |
|
| | if result['success']: |
| | data = result.get('data', {}) |
| | if isinstance(data, dict): |
| | if data.get('status') == 'placeholder': |
| | print(f"Status: {data.get('message', 'N/A')}") |
| | else: |
| | print(f"Transaction Count: {data.get('transaction_count', 'N/A')}") |
| | print(f"Total Value: ${data.get('total_value_usd', data.get('total_value', 0)):,.0f}") |
| | else: |
| | print(f"Error: {result.get('error', 'Unknown')}") |
| |
|
| | asyncio.run(main()) |
| |
|