|
|
|
|
|
import logging |
|
|
import whisper |
|
|
import os |
|
|
import tempfile |
|
|
from typing import Dict, Any, Optional |
|
|
from pathlib import Path |
|
|
|
|
|
|
|
|
logging.basicConfig(level=logging.INFO) |
|
|
logger = logging.getLogger(__name__) |
|
|
|
|
|
|
|
|
class VoiceProcessor: |
|
|
""" |
|
|
Service for processing voice queries with speech-to-text and translation. |
|
|
|
|
|
Features: |
|
|
- Speech-to-text using OpenAI Whisper |
|
|
- Automatic language detection |
|
|
- Arabic-to-English translation |
|
|
- Supports 99+ languages |
|
|
- Works offline |
|
|
|
|
|
Whisper Model Sizes: |
|
|
- tiny: 39M params, ~1GB RAM, fast but less accurate |
|
|
- base: 74M params, ~1GB RAM, balanced (RECOMMENDED for quick start) |
|
|
- small: 244M params, ~2GB RAM, good accuracy |
|
|
- medium: 769M params, ~5GB RAM, better accuracy |
|
|
- large: 1550M params, ~10GB RAM, best accuracy |
|
|
""" |
|
|
|
|
|
def __init__(self, model_size: str = "base"): |
|
|
""" |
|
|
Initialize the voice processing service. |
|
|
|
|
|
Args: |
|
|
model_size: Whisper model to use. Options: |
|
|
- "tiny" (39M) - Fast, less accurate |
|
|
- "base" (74M) - Balanced, recommended for development |
|
|
- "small" (244M) - Good accuracy |
|
|
- "medium" (769M) - Better accuracy |
|
|
- "large" (1550M) - Best accuracy, slowest |
|
|
""" |
|
|
logger.info(f"Loading Whisper model: {model_size}") |
|
|
logger.info("This may take a few minutes on first run (downloading model)...") |
|
|
|
|
|
|
|
|
|
|
|
self.model = whisper.load_model(model_size) |
|
|
|
|
|
self.model_size = model_size |
|
|
logger.info(f"✓ Whisper model '{model_size}' loaded successfully") |
|
|
logger.info(f"Supported languages: 99+ (auto-detected)") |
|
|
|
|
|
def transcribe_audio( |
|
|
self, |
|
|
audio_path: str, |
|
|
language: Optional[str] = None |
|
|
) -> Dict[str, Any]: |
|
|
""" |
|
|
Transcribe audio file in its original language. |
|
|
|
|
|
Args: |
|
|
audio_path: Path to audio file (mp3, wav, m4a, etc.) |
|
|
language: Optional language code (e.g., "en", "ar"). If None, auto-detect. |
|
|
|
|
|
Returns: |
|
|
Dictionary with transcription results: |
|
|
{ |
|
|
"text": "transcribed text", |
|
|
"language": "en", |
|
|
"language_name": "English", |
|
|
"confidence": 0.95 |
|
|
} |
|
|
""" |
|
|
logger.info(f"Transcribing audio: {audio_path}") |
|
|
|
|
|
|
|
|
result = self.model.transcribe( |
|
|
audio_path, |
|
|
language=language, |
|
|
fp16=False |
|
|
) |
|
|
|
|
|
transcription = { |
|
|
"text": result["text"].strip(), |
|
|
"language": result["language"], |
|
|
"language_name": self._get_language_name(result["language"]), |
|
|
"confidence": self._calculate_confidence(result) |
|
|
} |
|
|
|
|
|
logger.info(f"✓ Transcribed: '{transcription['text'][:100]}...'") |
|
|
logger.info(f" Language: {transcription['language_name']} ({transcription['language']})") |
|
|
logger.info(f" Confidence: {transcription['confidence']:.2f}") |
|
|
|
|
|
return transcription |
|
|
|
|
|
def translate_to_english(self, audio_path: str) -> Dict[str, Any]: |
|
|
""" |
|
|
Transcribe audio and translate to English (if not already English). |
|
|
|
|
|
This is optimized for the use case where you always want English output, |
|
|
regardless of the input language. |
|
|
|
|
|
Args: |
|
|
audio_path: Path to audio file |
|
|
|
|
|
Returns: |
|
|
Dictionary with translation results: |
|
|
{ |
|
|
"original_text": "النص الأصلي", |
|
|
"english_text": "translated text", |
|
|
"original_language": "ar", |
|
|
"original_language_name": "Arabic", |
|
|
"was_translated": true |
|
|
} |
|
|
""" |
|
|
logger.info(f"Processing audio for English output: {audio_path}") |
|
|
|
|
|
|
|
|
original = self.model.transcribe(audio_path, fp16=False) |
|
|
|
|
|
|
|
|
translated = self.model.transcribe( |
|
|
audio_path, |
|
|
task="translate", |
|
|
fp16=False |
|
|
) |
|
|
|
|
|
result = { |
|
|
"original_text": original["text"].strip(), |
|
|
"english_text": translated["text"].strip(), |
|
|
"original_language": original["language"], |
|
|
"original_language_name": self._get_language_name(original["language"]), |
|
|
"was_translated": original["language"] != "en" |
|
|
} |
|
|
|
|
|
if result["was_translated"]: |
|
|
logger.info(f"✓ Detected {result['original_language_name']}, translated to English") |
|
|
logger.info(f" Original: '{result['original_text'][:100]}...'") |
|
|
logger.info(f" English: '{result['english_text'][:100]}...'") |
|
|
else: |
|
|
logger.info(f"✓ Already in English, no translation needed") |
|
|
|
|
|
return result |
|
|
|
|
|
def process_voice_query(self, audio_path: str) -> Dict[str, Any]: |
|
|
""" |
|
|
Complete pipeline: transcribe, translate if needed, return query text. |
|
|
|
|
|
This is the main method for the voice assistant use case. |
|
|
|
|
|
Args: |
|
|
audio_path: Path to audio file |
|
|
|
|
|
Returns: |
|
|
Dictionary ready for division extraction: |
|
|
{ |
|
|
"query": "english text for processing", |
|
|
"original_text": "original text if different", |
|
|
"language": "ar", |
|
|
"language_name": "Arabic", |
|
|
"was_translated": true, |
|
|
"audio_duration": 5.2 |
|
|
} |
|
|
""" |
|
|
logger.info(f"Processing voice query: {audio_path}") |
|
|
|
|
|
|
|
|
audio_info = whisper.load_audio(audio_path) |
|
|
duration = len(audio_info) / whisper.audio.SAMPLE_RATE |
|
|
|
|
|
|
|
|
result = self.translate_to_english(audio_path) |
|
|
|
|
|
return { |
|
|
"query": result["english_text"], |
|
|
"original_text": result["original_text"], |
|
|
"language": result["original_language"], |
|
|
"language_name": result["original_language_name"], |
|
|
"was_translated": result["was_translated"], |
|
|
"audio_duration": round(duration, 2) |
|
|
} |
|
|
|
|
|
def _get_language_name(self, lang_code: str) -> str: |
|
|
"""Get full language name from code.""" |
|
|
language_names = { |
|
|
"en": "English", |
|
|
"ar": "Arabic", |
|
|
"es": "Spanish", |
|
|
"fr": "French", |
|
|
"de": "German", |
|
|
"zh": "Chinese", |
|
|
"ja": "Japanese", |
|
|
"ko": "Korean", |
|
|
"ru": "Russian", |
|
|
"pt": "Portuguese", |
|
|
"it": "Italian", |
|
|
"nl": "Dutch", |
|
|
"tr": "Turkish", |
|
|
"pl": "Polish", |
|
|
"uk": "Ukrainian", |
|
|
"vi": "Vietnamese", |
|
|
"th": "Thai", |
|
|
"hi": "Hindi", |
|
|
"ur": "Urdu", |
|
|
|
|
|
} |
|
|
return language_names.get(lang_code, lang_code.upper()) |
|
|
|
|
|
def _calculate_confidence(self, whisper_result: Dict) -> float: |
|
|
""" |
|
|
Calculate confidence score from Whisper result. |
|
|
|
|
|
Whisper doesn't directly provide confidence, so we estimate it |
|
|
based on available metrics. |
|
|
""" |
|
|
|
|
|
if "segments" in whisper_result and whisper_result["segments"]: |
|
|
avg_logprob = sum(s.get("avg_logprob", -1.0) for s in whisper_result["segments"]) |
|
|
avg_logprob /= len(whisper_result["segments"]) |
|
|
|
|
|
|
|
|
confidence = max(0.0, min(1.0, (avg_logprob + 2.0) / 2.0)) |
|
|
return round(confidence, 2) |
|
|
|
|
|
|
|
|
return 0.85 |
|
|
|
|
|
def save_uploaded_audio(self, audio_bytes: bytes, filename: str) -> str: |
|
|
""" |
|
|
Save uploaded audio file to temporary location. |
|
|
|
|
|
Args: |
|
|
audio_bytes: Audio file bytes |
|
|
filename: Original filename |
|
|
|
|
|
Returns: |
|
|
Path to saved file |
|
|
""" |
|
|
|
|
|
temp_dir = Path(tempfile.gettempdir()) / "voice_assistant_uploads" |
|
|
temp_dir.mkdir(exist_ok=True) |
|
|
|
|
|
|
|
|
file_extension = Path(filename).suffix |
|
|
temp_file = temp_dir / f"upload_{os.urandom(8).hex()}{file_extension}" |
|
|
|
|
|
temp_file.write_bytes(audio_bytes) |
|
|
logger.info(f"Saved uploaded audio to: {temp_file}") |
|
|
|
|
|
return str(temp_file) |
|
|
|
|
|
def save_audio_array(self, audio_data, sample_rate: int) -> str: |
|
|
""" |
|
|
Save audio numpy array to temporary WAV file (for Gradio integration). |
|
|
|
|
|
Args: |
|
|
audio_data: Audio data as numpy array |
|
|
sample_rate: Sample rate of the audio |
|
|
|
|
|
Returns: |
|
|
Path to saved WAV file |
|
|
""" |
|
|
import numpy as np |
|
|
import scipy.io.wavfile as wavfile |
|
|
|
|
|
|
|
|
temp_dir = Path(tempfile.gettempdir()) / "voice_assistant_uploads" |
|
|
temp_dir.mkdir(exist_ok=True) |
|
|
|
|
|
|
|
|
temp_file = temp_dir / f"gradio_{os.urandom(8).hex()}.wav" |
|
|
|
|
|
|
|
|
if isinstance(audio_data, np.ndarray): |
|
|
|
|
|
if audio_data.dtype == np.float32 or audio_data.dtype == np.float64: |
|
|
audio_data = (audio_data * 32767).astype(np.int16) |
|
|
|
|
|
wavfile.write(str(temp_file), sample_rate, audio_data) |
|
|
logger.info(f"Saved Gradio audio to: {temp_file}") |
|
|
|
|
|
return str(temp_file) |
|
|
|
|
|
def cleanup_temp_file(self, file_path: str): |
|
|
"""Delete temporary audio file.""" |
|
|
try: |
|
|
if os.path.exists(file_path): |
|
|
os.remove(file_path) |
|
|
logger.info(f"Cleaned up temp file: {file_path}") |
|
|
except Exception as e: |
|
|
logger.warning(f"Failed to cleanup temp file {file_path}: {e}") |
|
|
|