Spaces:
Running
Running
| import gradio as gr | |
| import os | |
| from PIL import Image | |
| import tempfile | |
| from gradio_client import Client, handle_file | |
| import torch | |
| from transformers import VitsModel, AutoTokenizer, pipeline | |
| import scipy.io.wavfile as wavfile | |
| import traceback | |
| import random | |
| import time | |
| import numpy as np | |
| from pydub import AudioSegment | |
| # ========================= | |
| # Параметры | |
| # ========================= | |
| TALKING_HEAD_SPACE = "Skywork/skyreels-a1-talking-head" | |
| device = "cuda" if torch.cuda.is_available() else "cpu" | |
| print(f"Using device: {device}") | |
| # ========================= | |
| # Загрузка моделей | |
| # ========================= | |
| try: | |
| # TTS модель (казахский) | |
| tts_model = VitsModel.from_pretrained("facebook/mms-tts-kaz").to(device) | |
| tts_tokenizer = AutoTokenizer.from_pretrained("facebook/mms-tts-kaz") | |
| # Настройка конфигурации для более приятного и выразительного голоса | |
| tts_model.config.noise_scale = 0.5 # Меньше шума для чище голоса | |
| tts_model.config.noise_scale_duration = 0.8 # Вариация в длительности | |
| tts_model.config.speaking_rate = 0.9 # Чуть медленнее для выразительности | |
| # Перевод ru -> kk | |
| translator = pipeline( | |
| "translation", | |
| model="facebook/nllb-200-distilled-600M", | |
| device=0 if device == "cuda" else -1 | |
| ) | |
| # Модель для генерации вопросов | |
| qa_model = pipeline( | |
| "text2text-generation", | |
| model="google/flan-t5-small", | |
| device=0 if device == "cuda" else -1 | |
| ) | |
| print("✅ Все модели успешно загружены!") | |
| except Exception as e: | |
| raise RuntimeError(f"❌ Ошибка загрузки моделей: {str(e)}") | |
| # ========================= | |
| # Вспомогательные функции | |
| # ========================= | |
| def generate_quiz(text: str): | |
| """ Генерирует один вопрос и два варианта ответа на основе текста. | |
| Алгоритмы: | |
| 1. Базовый: случайное предложение и первые слова. | |
| 2. Пропуск ключевого слова. | |
| 3. Вопрос о числе/дате. | |
| """ | |
| try: | |
| sentences = [s.strip() for s in text.replace("!", ".").replace("?", ".").split(".") if s.strip()] | |
| if len(sentences) < 1: | |
| raise ValueError("Текст слишком короткий") | |
| algo = random.choice([1, 2, 3]) | |
| # ------------------------ | |
| if algo == 1: # Базовый алгоритм | |
| question_sentence = random.choice(sentences) | |
| words = question_sentence.split() | |
| if len(words) <= 3: | |
| correct_answer = question_sentence | |
| question = "Что сказано в этом предложении?" | |
| else: | |
| question = "Что сказано в тексте?" | |
| correct_answer = " ".join(words[:6]) + ("..." if len(words) > 6 else "") | |
| wrong_sentence = random.choice([s for s in sentences if s != question_sentence] or ["Другая информация"]) | |
| wrong_words = wrong_sentence.split() | |
| wrong_answer = " ".join(wrong_words[:6]) + ("..." if len(wrong_words) > 6 else "") | |
| # ------------------------ | |
| elif algo == 2: # Пропуск ключевого слова | |
| question_sentence = random.choice(sentences) | |
| words = question_sentence.split() | |
| if len(words) > 2: | |
| key_word = random.choice(words) | |
| question = question_sentence.replace(key_word, "_____") | |
| correct_answer = key_word | |
| wrong_answer = random.choice([w for w in words if w != key_word] or ["другое"]) | |
| else: | |
| # fallback | |
| return generate_quiz(text) | |
| # ------------------------ | |
| elif algo == 3: # Вопрос о числе или дате | |
| import re | |
| question_sentence = random.choice(sentences) | |
| numbers = re.findall(r'\d+', question_sentence) | |
| if numbers: | |
| number = random.choice(numbers) | |
| question = question_sentence.replace(number, "_____") | |
| correct_answer = number | |
| wrong_answer = str(int(number)+random.randint(1,5)) | |
| else: | |
| # fallback к базовому | |
| return generate_quiz(text) | |
| options = [correct_answer, wrong_answer] | |
| random.shuffle(options) | |
| return question, options, correct_answer | |
| except Exception as e: | |
| raise ValueError(f"Ошибка генерации вопроса: {str(e)}") | |
| def synthesize_audio(text_ru: str): | |
| """Переводит русскую строку на казахский, синтезирует аудио и возвращает путь к файлу .wav""" | |
| translation = translator(text_ru, src_lang="rus_Cyrl", tgt_lang="kaz_Cyrl") | |
| text_kk = translation[0]["translation_text"] | |
| inputs = tts_tokenizer(text_kk, return_tensors="pt").to(device) | |
| with torch.no_grad(): | |
| output = tts_model(**inputs) | |
| waveform = output.waveform.squeeze().cpu().numpy() | |
| waveform /= np.max(np.abs(waveform)) + 1e-8 # Нормализация для лучшего качества | |
| audio = (waveform * 32767).astype('int16') | |
| sampling_rate = getattr(tts_model.config, 'sampling_rate', 22050) | |
| tmpf = tempfile.NamedTemporaryFile(suffix='.wav', delete=False) | |
| wavfile.write(tmpf.name, sampling_rate, audio) | |
| tmpf.close() | |
| return tmpf.name | |
| def concatenate_audio_files(audio_files): | |
| """Объединяет несколько аудио файлов в один с паузами между ними""" | |
| combined = AudioSegment.empty() | |
| pause = AudioSegment.silent(duration=1000) # 1 секунда паузы | |
| for i, audio_file in enumerate(audio_files): | |
| audio = AudioSegment.from_wav(audio_file) | |
| combined += audio | |
| if i < len(audio_files) - 1: # Не добавляем паузу после последнего файла | |
| combined += pause | |
| output_file = tempfile.NamedTemporaryFile(suffix='.wav', delete=False) | |
| combined.export(output_file.name, format='wav') | |
| output_file.close() | |
| return output_file.name | |
| def make_talking_head(image_path: str, audio_path: str, max_retries=3): | |
| """Вызывает SkyReels/Talking Head space и возвращает путь или URL видео.""" | |
| for attempt in range(max_retries): | |
| try: | |
| client = Client(TALKING_HEAD_SPACE) | |
| result = client.predict( | |
| image_path=handle_file(image_path), | |
| audio_path=handle_file(audio_path), | |
| guidance_scale=3.0, | |
| steps=10, | |
| api_name="/process_image_audio" | |
| ) | |
| print(f"Result type: {type(result)}") | |
| print(f"Result content: {result}") | |
| if isinstance(result, tuple): | |
| video_path = result[0] | |
| if isinstance(video_path, dict) and "video" in video_path: | |
| return video_path["video"] | |
| elif isinstance(video_path, str): | |
| return video_path | |
| else: | |
| for item in result: | |
| if isinstance(item, str) and (item.endswith('.mp4') or item.endswith('.webm') or os.path.exists(str(item))): | |
| return item | |
| raise ValueError(f"Не удалось найти видео в результате: {result}") | |
| elif isinstance(result, dict) and "video" in result: | |
| return result["video"] | |
| elif isinstance(result, str): | |
| return result | |
| else: | |
| raise ValueError(f"Unexpected talking head result: {type(result)}, value: {result}") | |
| except Exception as e: | |
| if attempt < max_retries - 1: | |
| print(f"Попытка {attempt + 1} не удалась: {e}. Повторяю через 2 секунды...") | |
| time.sleep(2) | |
| else: | |
| raise Exception(f"Ошибка после {max_retries} попыток: {str(e)}") | |
| # ========================= | |
| # Основные обработчики для Gradio | |
| # ========================= | |
| def start_lesson(image: Image.Image, text: str, state): | |
| """Создает одно видео: текст лекции + вопрос с вариантами ответа""" | |
| if image is None or not text.strip() or len(text) > 500: | |
| return None, "Пожалуйста, загрузите фото и введите текст лекции (до 500 символов)", gr.update(visible=False), gr.update(visible=False), state | |
| try: | |
| # Сохраняем изображение | |
| tmpimg = tempfile.NamedTemporaryFile(suffix='.png', delete=False) | |
| if image.mode != 'RGB': | |
| image = image.convert('RGB') | |
| image.save(tmpimg.name) | |
| tmpimg.close() | |
| image_path = tmpimg.name | |
| # Генерируем вопрос | |
| question, options, correct = generate_quiz(text) | |
| # Создаем три аудио файла | |
| audio_files = [] | |
| # 1. Текст лекции | |
| audio1 = synthesize_audio(text) | |
| audio_files.append(audio1) | |
| # 2. Вопрос | |
| question_text = f"А теперь вопрос: {question}" | |
| audio2 = synthesize_audio(question_text) | |
| audio_files.append(audio2) | |
| # 3. Варианты ответа | |
| options_text = f"Первый вариант: {options[0]}. Второй вариант: {options[1]}" | |
| audio3 = synthesize_audio(options_text) | |
| audio_files.append(audio3) | |
| # Объединяем все аудио в одно | |
| combined_audio = concatenate_audio_files(audio_files) | |
| # Создаем одно видео с полным содержанием | |
| video_path = make_talking_head(image_path, combined_audio) | |
| # Сохраняем состояние | |
| state_data = { | |
| 'image_path': image_path, | |
| 'correct': correct, | |
| 'options': options, | |
| 'question': question | |
| } | |
| # Удаляем временные аудио файлы | |
| for audio_file in audio_files: | |
| try: | |
| os.remove(audio_file) | |
| except: | |
| pass | |
| try: | |
| os.remove(combined_audio) | |
| except: | |
| pass | |
| question_display = f"**Вопрос:** {question}" | |
| return ( | |
| video_path, | |
| question_display, | |
| gr.update(value=options[0], visible=True), | |
| gr.update(value=options[1], visible=True), | |
| state_data | |
| ) | |
| except Exception as e: | |
| traceback.print_exc() | |
| return None, f"❌ Ошибка: {e}", gr.update(visible=False), gr.update(visible=False), state | |
| def answer_selected(selected_option: str, state): | |
| """Генерирует реакцию лектора и показывает в том же окне""" | |
| if not state: | |
| return None, "❌ Ошибка: отсутствует состояние урока" | |
| try: | |
| correct = state.get('correct') | |
| image_path = state.get('image_path') | |
| if selected_option == correct: | |
| reaction_ru = "Правильно! Отлично справились!" | |
| display_message = "✅ **Дұрыс! Жарайсың!**" | |
| else: | |
| reaction_ru = f"К сожалению неправильно. Правильный ответ был: {correct}" | |
| display_message = f"❌ **Қате!** Дұрыс жауап: **{correct}**" | |
| # Создаем аудио с реакцией | |
| audio_path = synthesize_audio(reaction_ru) | |
| # Создаем видео с реакцией | |
| reaction_video = make_talking_head(image_path, audio_path) | |
| try: | |
| os.remove(audio_path) | |
| except: | |
| pass | |
| return reaction_video, display_message | |
| except Exception as e: | |
| traceback.print_exc() | |
| return None, f"❌ Ошибка: {e}" | |
| # ========================= | |
| # Gradio UI | |
| # ========================= | |
| title = "🎓 Интерактивті Бейне Мұғалім TiлГен" | |
| description = ( | |
| "**Қалай жұмыс істейді:**\n" | |
| "1. Мұғалімнің суретін жүктеп, дәріс мәтінін енгізіңіз (орыс, 500 таңбаға дейін)\n" | |
| "2. 'Сабақты бастау' түймесін басыңыз-мұғалім мәтінді оқып, сұрақ қояды\n" | |
| "3. Дұрыс жауапты таңдаңыз-мұғалім сіздің жауабыңызға жауап береді" | |
| ) | |
| with gr.Blocks(theme=gr.themes.Soft()) as demo: | |
| gr.Markdown(f"# {title}\n{description}") | |
| with gr.Row(): | |
| with gr.Column(scale=1): | |
| inp_image = gr.Image(type='pil', label='📸 Мұғалімнің суреті') | |
| inp_text = gr.Textbox( | |
| lines=5, | |
| label='📝 Дәріс мәтіні (орыс.)', | |
| placeholder='Дәріс мәтінін енгізіңіз...', | |
| info="Ең көбі 500 таңба" | |
| ) | |
| btn_start = gr.Button("🚀 Сабақты бастау", variant="primary", size="lg") | |
| with gr.Column(scale=1): | |
| out_video = gr.Video(label='🎬 Мұғалімнің видеосы') | |
| out_question = gr.Markdown("") | |
| with gr.Row(): | |
| btn_opt1 = gr.Button("Вариант 1", visible=False, size="lg", variant="secondary") | |
| btn_opt2 = gr.Button("Вариант 2", visible=False, size="lg", variant="secondary") | |
| out_result = gr.Markdown("") | |
| lesson_state = gr.State({}) | |
| # Запуск урока | |
| btn_start.click( | |
| fn=start_lesson, | |
| inputs=[inp_image, inp_text, lesson_state], | |
| outputs=[out_video, out_question, btn_opt1, btn_opt2, lesson_state] | |
| ) | |
| # Обработка ответов | |
| def handle_answer_1(state): | |
| option = state.get('options', [''])[0] if state else '' | |
| return answer_selected(option, state) | |
| def handle_answer_2(state): | |
| option = state.get('options', [''])[1] if state and len(state.get('options', [])) > 1 else '' | |
| return answer_selected(option, state) | |
| btn_opt1.click( | |
| fn=handle_answer_1, | |
| inputs=[lesson_state], | |
| outputs=[out_video, out_result] | |
| ) | |
| btn_opt2.click( | |
| fn=handle_answer_2, | |
| inputs=[lesson_state], | |
| outputs=[out_video, out_result] | |
| ) | |
| if __name__ == '__main__': | |
| demo.launch(server_name="0.0.0.0", server_port=7860) |