Spaces:
Runtime error
Runtime error
| import os | |
| import requests | |
| # Set the host and port of the service | |
| talker_service_host = os.environ.get("TALKER_SERVICE_HOST", "localhost") | |
| talker_service_port = os.environ.get("TALKER_SERVICE_PORT", 8003) | |
| # API endpoint URLs | |
| CHANGE_MODEL_URL = f"http://{talker_service_host}:{talker_service_port}/talker_change_model/" | |
| TALKER_RESPONSE_URL = f"http://{talker_service_host}:{talker_service_port}/talker_response/" | |
| def change_talker_model(model_name): | |
| """Request to change the Talker model.""" | |
| params = {"model_name": model_name} | |
| try: | |
| response = requests.post(CHANGE_MODEL_URL, params=params) | |
| response.raise_for_status() # Raise an exception for HTTP errors | |
| print(f"Model change successful: {response.json()}") | |
| except requests.RequestException as e: | |
| print(f"Model change failed: {e}") | |
| def request_talker_response(source_image_path, driven_audio_path, payload, output_video_path): | |
| """Request Talker to generate video.""" | |
| # Prepare payload as form data (ensure all values are strings) | |
| data = { | |
| 'preprocess_type': payload.get('preprocess_type', 'crop'), | |
| 'is_still_mode': str(payload.get('is_still_mode', False)), | |
| 'enhancer': str(payload.get('enhancer', False)), | |
| 'batch_size': str(payload.get('batch_size', 4)), | |
| 'size_of_image': str(payload.get('size_of_image', 256)), | |
| 'pose_style': str(payload.get('pose_style', 0)), | |
| 'facerender': payload.get('facerender', 'facevid2vid'), | |
| 'exp_weight': str(payload.get('exp_weight', 1.0)), | |
| 'blink_every': str(payload.get('blink_every', True)), | |
| 'talker_method': payload.get('talker_method', 'SadTalker'), | |
| 'fps': str(payload.get('fps', 30)), | |
| } | |
| # Prepare files for upload | |
| with open(source_image_path, 'rb') as image_file, open(driven_audio_path, 'rb') as audio_file: | |
| files = { | |
| 'source_image': (os.path.basename(source_image_path), image_file, 'image/jpeg'), # Adjust MIME type if necessary | |
| 'driven_audio': (os.path.basename(driven_audio_path), audio_file, 'audio/wav'), # Adjust MIME type if necessary | |
| } | |
| try: | |
| # Sending POST request to the server with form data and files | |
| response = requests.post(TALKER_RESPONSE_URL, data=data, files=files) | |
| response.raise_for_status() # Raise an exception for HTTP errors | |
| with open(output_video_path, 'wb') as video_file: | |
| video_file.write(response.content) | |
| print(f"Talker response successful, video saved as: {output_video_path}") | |
| except requests.RequestException as e: | |
| print(f"Talker response failed: {e}") | |
| if __name__ == "__main__": | |
| # Models to test | |
| models = [ | |
| "SadTalker", | |
| "Wav2Lip", | |
| "Wav2Lipv2", | |
| "NeRFTalk", | |
| ] | |
| result_dir = "outputs" | |
| os.makedirs(result_dir, exist_ok=True) | |
| # Loop to change model and generate Talker response | |
| for model_name in models: | |
| print(f"Switching to model: {model_name}") | |
| change_talker_model(model_name) | |
| # Request Talker to generate video | |
| payload = { | |
| "preprocess_type": "crop", | |
| "is_still_mode": False, | |
| "enhancer": False, | |
| "batch_size": 4, | |
| "size_of_image": 256, | |
| "pose_style": 0, | |
| "facerender": "facevid2vid", | |
| "exp_weight": 1.0, | |
| "blink_every": True, | |
| "talker_method": model_name, | |
| "fps": 30, | |
| } | |
| request_talker_response( | |
| "inputs/example.png", | |
| "answer.wav", # 确保音频文件路径正确 | |
| payload, os.path.join(result_dir, f"output_video_{model_name}.mp4") | |
| ) | |
| print("\n" + "-" * 50 + "\n") | |