| import torch |
|
|
| from typing import List, Union, Literal, Optional |
| from transformers import PreTrainedModel |
| from PIL import Image |
|
|
| from .configuration_moondream import PhiConfig |
| from .configuration_moondream import MoondreamConfig |
| from .vision_encoder import VisionEncoder |
| from .region_model import RegionModel |
| from .modeling_phi import PhiForCausalLM |
|
|
| class Moondream(PreTrainedModel): |
| config_class = MoondreamConfig |
| _supports_flash_attn_2 = True |
|
|
| def __init__(self, config): |
| super().__init__(config) |
| self.vision_encoder = VisionEncoder( |
| use_flash_attn=config._attn_implementation == "flash_attention_2" |
| ) |
| self.region_model = RegionModel() |
|
|
| if type(config.text_config) == dict: |
| phi_config = PhiConfig( |
| **config.text_config, attn_implementation=config._attn_implementation |
| ) |
| else: |
| phi_config = config.text_config |
| self.text_model = PhiForCausalLM(phi_config) |
|
|
| @property |
| def device(self): |
| return self.text_model.device |
|
|
| def encode_image(self, image): |
| with torch.no_grad(): |
| return self.vision_encoder(image) |
|
|
| def input_embeds(self, prompt, image_embeds, tokenizer): |
| def _tokenize(txt): |
| return tokenizer( |
| txt, return_tensors="pt", add_special_tokens=False |
| ).input_ids.to(self.device) |
|
|
| text_emb = self.text_model.get_input_embeddings() |
|
|
| |
| embeds = [] |
| embeds.append( |
| text_emb((torch.tensor([[tokenizer.bos_token_id]], device=self.device))) |
| ) |
|
|
| if "<image>" not in prompt: |
| embeds.append(text_emb(_tokenize(prompt))) |
| else: |
| assert prompt.count("<image>") == 1 |
| before, after = prompt.split("<image>") |
| if len(before) > 0: |
| embeds.append(text_emb(_tokenize(before))) |
| embeds.append(image_embeds.to(self.device)) |
| if len(after) > 0: |
| embeds.append(text_emb(_tokenize(after))) |
|
|
| return torch.cat(embeds, dim=1) |
|
|
| def get_input_embeddings(self): |
| return self.text_model.get_input_embeddings() |
|
|
| def generate( |
| self, |
| image_embeds, |
| prompt, |
| tokenizer, |
| max_new_tokens=128, |
| **kwargs, |
| ): |
| generate_config = { |
| "eos_token_id": tokenizer.eos_token_id, |
| "bos_token_id": tokenizer.bos_token_id, |
| "pad_token_id": tokenizer.bos_token_id, |
| "max_new_tokens": max_new_tokens, |
| **kwargs, |
| } |
|
|
| with torch.no_grad(): |
| inputs_embeds = self.input_embeds(prompt, image_embeds, tokenizer) |
| attention_mask = torch.ones((inputs_embeds.shape[0], inputs_embeds.shape[1]), device=self.device) |
| output_ids = self.text_model.generate( |
| inputs_embeds=inputs_embeds, |
| attention_mask=attention_mask, |
| **generate_config, |
| ) |
|
|
| return tokenizer.batch_decode(output_ids, skip_special_tokens=True) |
|
|
| |
| def caption( |
| self, |
| images: List[Image.Image], |
| tokenizer, |
| length: Optional[Literal["short"]] = None, |
| **kwargs, |
| ): |
| image_embeds = self.encode_image(images) |
|
|
| templated_prompts = [ |
| f"<image>\n\n{'Short caption' if length == 'short' else 'Caption'}:" for _ in images |
| ] |
| inputs_embeds = torch.stack([ |
| self.input_embeds(prompt, image_embed.unsqueeze(0), tokenizer)[0] |
| for prompt, image_embed in zip(templated_prompts, image_embeds) |
| ]) |
| attention_mask = torch.ones((inputs_embeds.shape[0], inputs_embeds.shape[1]), device=self.device) |
|
|
| generate_config = { |
| "eos_token_id": tokenizer.eos_token_id, |
| "bos_token_id": tokenizer.bos_token_id, |
| "pad_token_id": tokenizer.bos_token_id, |
| "repetition_penalty": 1.2, |
| "max_new_tokens": 512, |
| **kwargs, |
| } |
|
|
| with torch.no_grad(): |
| output_ids = self.text_model.generate( |
| inputs_embeds=inputs_embeds, |
| attention_mask=attention_mask, |
| **generate_config, |
| ) |
|
|
| return [ |
| x.strip() |
| for x in tokenizer.batch_decode(output_ids, skip_special_tokens=True) |
| ] |
|
|
| def answer_question( |
| self, |
| image_embeds, |
| question, |
| tokenizer, |
| chat_history="", |
| result_queue=None, |
| max_new_tokens=256, |
| **kwargs, |
| ): |
| prompt = f"<image>\n\n{chat_history}Question: {question}\n\nAnswer:" |
| answer = self.generate( |
| image_embeds, |
| prompt, |
| tokenizer=tokenizer, |
| max_new_tokens=max_new_tokens, |
| **kwargs, |
| )[0] |
| cleaned_answer = answer.strip() |
|
|
| |
| if result_queue: |
| result_queue.put(cleaned_answer) |
| else: |
| return cleaned_answer |
|
|
| def batch_answer( |
| self, |
| images, |
| prompts, |
| tokenizer, |
| **kwargs, |
| ): |
| image_embeds = self.encode_image(images) |
|
|
| templated_prompts = [ |
| f"<image>\n\nQuestion: {prompt}\n\nAnswer:" for prompt in prompts |
| ] |
| prompt_embs = [ |
| self.input_embeds(prompt, image_embed.unsqueeze(0), tokenizer)[0] |
| for prompt, image_embed in zip(templated_prompts, image_embeds) |
| ] |
|
|
| bos_emb = prompt_embs[0][0] |
| max_len = max([p.shape[0] for p in prompt_embs]) |
|
|
| inputs_embeds = torch.cat( |
| [ |
| torch.cat([bos_emb.repeat(max_len - p.shape[0], 1), p]).unsqueeze(0) |
| for p in prompt_embs |
| ], |
| dim=0, |
| ) |
| attention_mask = torch.cat( |
| [ |
| torch.cat( |
| [ |
| torch.zeros( |
| 1, |
| max_len - p.shape[0], |
| device=self.device, |
| dtype=torch.long, |
| ), |
| torch.ones(1, p.shape[0], device=self.device, dtype=torch.long), |
| ], |
| dim=1, |
| ) |
| for p in prompt_embs |
| ], |
| dim=0, |
| ) |
|
|
| generate_config = { |
| "eos_token_id": tokenizer.eos_token_id, |
| "bos_token_id": tokenizer.bos_token_id, |
| "pad_token_id": tokenizer.bos_token_id, |
| "max_new_tokens": 512, |
| **kwargs, |
| } |
|
|
| with torch.no_grad(): |
| output_ids = self.text_model.generate( |
| inputs_embeds=inputs_embeds, |
| attention_mask=attention_mask, |
| **generate_config, |
| ) |
|
|
| return [ |
| x.strip() |
| for x in tokenizer.batch_decode(output_ids, skip_special_tokens=True) |
| ] |
|
|
| def detect(self, image: Image.Image, query: str, tokenizer): |
| pass |
|
|