Spaces:
Paused
Paused
| import spaces | |
| import gradio as gr | |
| import sys | |
| import time | |
| import os | |
| import random | |
| from PIL import Image | |
| import torch | |
| import asyncio # Import asyncio | |
| from skyreelsinfer import TaskType | |
| from skyreelsinfer.offload import OffloadConfig | |
| from skyreelsinfer.skyreels_video_infer import SkyReelsVideoInfer | |
| from huggingface_hub.utils import RepositoryNotFoundError, RevisionNotFoundError, EntryNotFoundError | |
| from diffusers.utils import export_to_video | |
| from diffusers.utils import load_image | |
| # os.environ["CUDA_VISIBLE_DEVICES"] = "" # Uncomment if needed | |
| os.environ["SAFETENSORS_FAST_GPU"] = "1" | |
| os.putenv("HF_HUB_ENABLE_HF_TRANSFER", "1") | |
| # No longer needed here: device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu") | |
| # Use gr.State to hold the predictor. Initialize it to None. | |
| predictor_state = gr.State(None) | |
| device="cuda:0" if torch.cuda.is_available() else "cpu" # Pass device to the constructor | |
| def init_predictor(task_type: str): | |
| try: | |
| predictor = SkyReelsVideoInfer( | |
| task_type=TaskType.I2V if task_type == "i2v" else TaskType.T2V, | |
| model_id="Skywork/skyreels-v1-Hunyuan-i2v", # Adjust model ID as needed | |
| quant_model=True, | |
| is_offload=True, # Consider removing if you have enough GPU memory | |
| offload_config=OffloadConfig( | |
| high_cpu_memory=True, | |
| parameters_level=True, | |
| ), | |
| use_multiprocessing=False, | |
| ) | |
| return predictor | |
| except (RepositoryNotFoundError, RevisionNotFoundError, EntryNotFoundError) as e: | |
| print(f"Error: Model not found. Details: {e}") | |
| return None | |
| except Exception as e: | |
| print(f"Error loading model: {e}") | |
| return None | |
| # Make generate_video async | |
| async def generate_video(prompt, image_file, predictor): | |
| if image_file is None: | |
| return gr.Error("Error: For i2v, provide an image.") | |
| if not isinstance(prompt, str) or not prompt.strip(): | |
| return gr.Error("Error: Please provide a prompt.") | |
| if predictor is None: | |
| return gr.Error("Error: Model not loaded.") | |
| random.seed(time.time()) | |
| seed = int(random.randrange(4294967294)) | |
| kwargs = { | |
| "prompt": prompt, | |
| "height": 256, | |
| "width": 256, | |
| "num_frames": 24, | |
| "num_inference_steps": 30, | |
| "seed": int(seed), | |
| "guidance_scale": 7.0, | |
| "embedded_guidance_scale": 1.0, | |
| "negative_prompt": "bad quality, blur", | |
| "cfg_for": False, | |
| } | |
| try: | |
| # Load the image and move it to the correct device *before* inference | |
| image = load_image(image=image_file.name) | |
| # No need to manually move to device. SkyReelsVideoInfer should handle it. | |
| kwargs["image"] = image | |
| except Exception as e: | |
| return gr.Error(f"Image loading error: {e}") | |
| try: | |
| output = predictor.inference(kwargs) | |
| frames = output | |
| except Exception as e: | |
| return gr.Error(f"Inference error: {e}"), None # Return None for predictor on error | |
| save_dir = "./result/i2v" # Consistent directory | |
| os.makedirs(save_dir, exist_ok=True) | |
| video_out_file = os.path.join(save_dir, f"{prompt[:100]}_{int(seed)}.mp4") | |
| print(f"Generating video: {video_out_file}") | |
| try: | |
| export_to_video(frames, video_out_file, fps=24) | |
| except Exception as e: | |
| return gr.Error(f"Video export error: {e}"), None # Return None for predictor | |
| return video_out_file, predictor # Return updated predictor | |
| def display_image(file): | |
| if file is not None: | |
| return Image.open(file.name) | |
| else: | |
| return None | |
| async def load_model(): | |
| predictor = init_predictor('i2v') | |
| return predictor | |
| async def main(): | |
| with gr.Blocks() as demo: | |
| image_file = gr.File(label="Image Prompt (Required)", file_types=["image"]) | |
| image_file_preview = gr.Image(label="Image Prompt Preview", interactive=False) | |
| prompt_textbox = gr.Text(label="Prompt") | |
| generate_button = gr.Button("Generate") | |
| output_video = gr.Video(label="Output Video") | |
| image_file.change( | |
| display_image, | |
| inputs=[image_file], | |
| outputs=[image_file_preview] | |
| ) | |
| generate_button.click( | |
| fn=generate_video, | |
| inputs=[prompt_textbox, image_file, predictor_state], | |
| outputs=[output_video, predictor_state], # Output predictor_state | |
| ) | |
| predictor_state.value = await load_model() # load and set predictor | |
| await demo.launch() | |
| if __name__ == "__main__": | |
| asyncio.run(main()) |