Spaces:
Runtime error
Runtime error
| from smolagents import CodeAgent, HfApiModel, tool | |
| import datetime | |
| import requests | |
| import yaml | |
| import os | |
| import json | |
| import gradio as gr | |
| from tools.final_answer import FinalAnswerTool | |
| import logging | |
| import tempfile | |
| import soundfile as sf | |
| import numpy as np | |
| # Configurar logging | |
| logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') | |
| logger = logging.getLogger('music_recognition_app') | |
| # Verificar versión de smolagents | |
| import smolagents | |
| logger.info(f"smolagents version: {smolagents.__version__}") | |
| # Verificación de FastRTC con VAD | |
| try: | |
| from fastrtc import Stream, ReplyOnPause | |
| ReplyOnPause(lambda x: x) | |
| logger.info("FastRTC with VAD support is installed correctly") | |
| except ImportError: | |
| logger.error("FastRTC not installed") | |
| raise | |
| except RuntimeError as e: | |
| logger.error(f"FastRTC VAD support missing: {str(e)}") | |
| raise | |
| HISTORY_FILE = "song_history.json" | |
| LANGUAGES = {"English": "en", "Español": "es", "Français": "fr"} | |
| UI_MESSAGES = { | |
| "en": { | |
| "title": "Music Recognition & Fun Facts", | |
| "subtitle": "Identify songs in real-time and learn more", | |
| "rec_button": " START LISTENING", | |
| "please_record": "Click 'START LISTENING' to recognize songs in real-time", | |
| "recording": "Listening... Play music to recognize", | |
| "song_recognized": "Song recognized!", | |
| "about_artist": "About", | |
| "ask_more": "Ask me more about this artist or music", | |
| "chat_placeholder": "Ask about the song or artist...", | |
| "send_button": "Send" | |
| }, | |
| "es": { | |
| "title": "Reconocimiento de Música y Datos Curiosos", | |
| "subtitle": "Identifica canciones en tiempo real y aprende más", | |
| "rec_button": " EMPEZAR A ESCUCHAR", | |
| "please_record": "Haz clic en 'EMPEZAR A ESCUCHAR' para reconocer canciones", | |
| "recording": "Escuchando... Reproduce música para reconocer", | |
| "song_recognized": "¡Canción reconocida!", | |
| "about_artist": "Sobre", | |
| "ask_more": "Pregúntame más sobre este artista o música", | |
| "chat_placeholder": "Pregunta sobre la canción o artista...", | |
| "send_button": "Enviar" | |
| }, | |
| "fr": { | |
| "title": "Reconnaissance de Musique et Anecdotes", | |
| "subtitle": "Identifiez des chansons en temps réel et apprenez davantage", | |
| "rec_button": " COMMENCER À ÉCOUTER", | |
| "please_record": "Cliquez sur 'COMMENCER À ÉCOUTER' pour reconnaître", | |
| "recording": "Écoute... Jouez de la musique pour reconnaître", | |
| "song_recognized": "Chanson reconnue!", | |
| "about_artist": "À propos de", | |
| "ask_more": "Demandez-moi plus sur cet artiste ou la musique", | |
| "chat_placeholder": "Posez une question sur la chanson ou l'artiste...", | |
| "send_button": "Envoyer" | |
| } | |
| } | |
| def recognize_song(audio_path: str) -> dict: | |
| """Recognize a song from an audio file using the ACRCloud API. | |
| This function sends an audio file to the ACRCloud API for music recognition and returns a dictionary | |
| with song details if successful, or an error message if it fails. | |
| Parameters | |
| ---------- | |
| audio_path : str | |
| The file path to the audio file that will be sent to ACRCloud for recognition. | |
| Returns | |
| ------- | |
| dict | |
| A dictionary containing song details (Song, Artist, Album, Recognition Date) or an error message if recognition fails. | |
| """ | |
| # Imprimir la docstring para depuración | |
| logger.info(f"recognize_song docstring: {recognize_song.__doc__}") | |
| ACR_ACCESS_KEY = os.getenv("ACR_ACCESS_KEY") | |
| ACR_SECRET_KEY = os.getenv("ACR_SECRET_KEY") | |
| if not os.path.exists(audio_path): | |
| return {"error": "The audio file does not exist"} | |
| try: | |
| url = "http://identify-eu-west-1.acrcloud.com/v1/identify" | |
| data = {"access_key": ACR_ACCESS_KEY, "data_type": "audio", "sample_rate": 44100, "audio_format": "mp3"} | |
| with open(audio_path, 'rb') as file: | |
| files = {"sample": file} | |
| response = requests.post(url, data=data, files=files) | |
| if response.status_code != 200: | |
| return {"error": f"API Error: {response.status_code}"} | |
| result = response.json() | |
| if result['status']['code'] != 0: | |
| return {"error": result['status']['msg']} | |
| if 'metadata' not in result or 'music' not in result['metadata']: | |
| return {"error": "Could not recognize the song"} | |
| song_info = result['metadata']['music'][0] | |
| song_data = { | |
| "Song": song_info.get('title', 'Unknown'), | |
| "Artist": song_info.get('artists', [{}])[0].get('name', 'Unknown'), | |
| "Album": song_info.get('album', {}).get('name', 'Unknown'), | |
| "Recognition Date": datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") | |
| } | |
| save_to_history(song_data) | |
| return song_data | |
| except Exception as e: | |
| return {"error": f"Error processing audio: {str(e)}"} | |
| def save_to_history(song_data): | |
| """Saves a song to the history file. | |
| Args: | |
| song_data (dict): The song information to save. | |
| """ | |
| try: | |
| if os.path.exists(HISTORY_FILE): | |
| with open(HISTORY_FILE, 'r') as f: | |
| history = json.load(f) | |
| else: | |
| history = [] | |
| history.insert(0, song_data) | |
| if len(history) > 50: | |
| history = history[:50] | |
| with open(HISTORY_FILE, 'w') as f: | |
| json.dump(history, f, indent=2) | |
| except Exception as e: | |
| logger.error(f"Error saving to history: {str(e)}") | |
| def get_artist_info(artist_name: str, song_name: str = "", language: str = "en") -> str: | |
| """Gets detailed background information about a music artist and their song. | |
| Args: | |
| artist_name (str): Name of the artist to get information about. | |
| song_name (str, optional): Name of the song, if available. | |
| language (str): Language code (en, es, fr). | |
| Returns: | |
| str: A string with artist information or an error message. | |
| """ | |
| prompts = { | |
| "en": f"Provide details about '{artist_name}'. Include biography, fun facts, and about '{song_name}' if available.", | |
| "es": f"Proporciona detalles sobre '{artist_name}'. Incluye biografía, datos curiosos y sobre '{song_name}' si está disponible.", | |
| "fr": f"Fournissez des détails sur '{artist_name}'. Incluez biographie, anecdotes et sur '{song_name}' si disponible." | |
| } | |
| language = language if language in prompts else "en" | |
| messages = [{"role": "user", "content": prompts[language]}] | |
| try: | |
| response = model(messages) | |
| return response.content | |
| except Exception as e: | |
| return f"Could not retrieve info: {str(e)}" | |
| model = HfApiModel(max_tokens=2096, temperature=0.5, model_id='Qwen/Qwen2.5-Coder-32B-Instruct') | |
| with open("prompts.yaml", 'r') as stream: | |
| prompt_templates = yaml.safe_load(stream) | |
| agent = CodeAgent(model=model, tools=[recognize_song, get_artist_info], max_steps=8, verbosity_level=1, prompt_templates=prompt_templates) | |
| # Buffer para audio en tiempo real | |
| audio_buffer = [] | |
| buffer_duration = 10 # 10 segundos por fragmento | |
| sample_rate = 44100 | |
| def process_audio_stream(audio_chunk): | |
| """Processes audio chunks in real-time for song recognition. | |
| Args: | |
| audio_chunk: The audio data chunk received from FastRTC. | |
| Returns: | |
| str: The recognized song title and artist info, or a status message. | |
| """ | |
| global audio_buffer | |
| if isinstance(audio_chunk, bytes): | |
| audio_data = np.frombuffer(audio_chunk, dtype=np.int16) | |
| else: | |
| audio_data = audio_chunk | |
| audio_buffer.extend(audio_data) | |
| if len(audio_buffer) >= sample_rate * buffer_duration: | |
| with tempfile.NamedTemporaryFile(suffix=".mp3", delete=False) as tmp_file: | |
| sf.write(tmp_file.name, np.array(audio_buffer), sample_rate, format="mp3") | |
| result = recognize_song(tmp_file.name) | |
| os.unlink(tmp_file.name) | |
| audio_buffer.clear() | |
| if "error" not in result: | |
| song_name = result['Song'] | |
| artist_name = result['Artist'] | |
| artist_info = get_artist_info(artist_name, song_name, "en") # Usa lang_code si prefieres idioma dinámico | |
| return f"🎵 **{song_name}** by {artist_name}\n\n{artist_info}" | |
| return "No match yet, keep playing..." | |
| return "Processing..." | |
| stream = Stream(ReplyOnPause(process_audio_stream), modality="audio", mode="send-receive") | |
| with gr.Blocks() as demo: | |
| lang_code = gr.State("en") | |
| audio_status = gr.State("no_audio") | |
| song_info_state = gr.State(None) | |
| artist_info_state = gr.State("") | |
| def get_ui_message(key, lang="en"): | |
| return UI_MESSAGES.get(lang, UI_MESSAGES["en"]).get(key, "") | |
| title_component = gr.Markdown(f"# 🎵 {get_ui_message('title', 'en')}") | |
| subtitle_component = gr.Markdown(get_ui_message('subtitle', 'en')) | |
| with gr.Row(): | |
| language_dropdown = gr.Dropdown(choices=list(LANGUAGES.keys()), value="English", label=get_ui_message('choose_language', 'en')) | |
| with gr.Tab("Song Recognition"): | |
| audio_status_msg = gr.Markdown(f"*{get_ui_message('please_record', 'en')}*") | |
| with gr.Row(): | |
| record_btn = gr.Button(get_ui_message('rec_button', 'en'), variant="primary") | |
| stream_output = gr.Markdown(label="Real-time Recognition") | |
| song_title_display = gr.Markdown("") | |
| artist_facts = gr.Markdown("") | |
| def toggle_audio_widget(lang_code): | |
| return "loading", get_ui_message('recording', lang_code), "" | |
| def update_ui_language(language_name): | |
| lang_code = LANGUAGES.get(language_name, "en") | |
| return ( | |
| f"# 🎵 {get_ui_message('title', lang_code)}", | |
| get_ui_message('subtitle', lang_code), | |
| gr.update(label=get_ui_message('choose_language', lang_code)), | |
| gr.update(value=get_ui_message('rec_button', lang_code)), | |
| f"*{get_ui_message('please_record', lang_code)}*", | |
| lang_code | |
| ) | |
| record_btn.click( | |
| fn=toggle_audio_widget, | |
| inputs=[lang_code], | |
| outputs=[audio_status, audio_status_msg, artist_facts] | |
| ).then( | |
| fn=stream.run, | |
| inputs=[], | |
| outputs=[stream_output] | |
| ).then( | |
| fn=lambda output: (output.split('\n\n')[0], '\n\n'.join(output.split('\n\n')[1:]) if '\n\n' in output else ""), | |
| inputs=[stream_output], | |
| outputs=[song_title_display, artist_facts] | |
| ) | |
| language_dropdown.change( | |
| fn=update_ui_language, | |
| inputs=[language_dropdown], | |
| outputs=[title_component, subtitle_component, language_dropdown, record_btn, audio_status_msg, lang_code] | |
| ) | |
| demo.launch(show_error=True, share=True, debug=True, server_name="0.0.0.0") |