| import os
|
| import torch
|
| import torch.nn as nn
|
| from torch.utils.data import Dataset, DataLoader
|
| from torchvision import transforms
|
| from transformers import ViTModel, BertTokenizerFast, BertConfig, BertLMHeadModel, AdamW
|
| from PIL import Image, ImageFile
|
| import pandas as pd
|
| from tqdm import tqdm
|
|
|
|
|
| Image.MAX_IMAGE_PIXELS = None
|
|
|
| ImageFile.LOAD_TRUNCATED_IMAGES = True
|
|
|
|
|
| device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
|
| print(f"Using device: {device}")
|
|
|
|
|
| VIT_MODEL_NAME = "google/vit-base-patch16-224"
|
| BERT_MODEL_NAME = "dbmdz/bert-base-turkish-cased"
|
| model = "TeLVE_v1.0.pth"
|
| MAX_LENGTH = 128
|
| BATCH_SIZE = 8
|
| EPOCHS = 5
|
| LEARNING_RATE = 2e-5
|
|
|
| class ImageCaptioningDataset(Dataset):
|
| def __init__(self, dataframe, img_dir, tokenizer):
|
| self.dataframe = dataframe
|
| self.img_dir = img_dir
|
| self.tokenizer = tokenizer
|
| self.transform = transforms.Compose([
|
| transforms.Resize((224, 224)),
|
| transforms.ToTensor(),
|
| transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
|
| ])
|
|
|
| def __len__(self):
|
| return len(self.dataframe)
|
|
|
| def __getitem__(self, idx):
|
| row = self.dataframe.iloc[idx]
|
| img_path = os.path.join(self.img_dir, row['photo_id'] + ".jpg")
|
|
|
| try:
|
| image = Image.open(img_path).convert('RGB')
|
| image = self.transform(image)
|
| except (FileNotFoundError, IOError):
|
|
|
| return None
|
|
|
| caption = row['ai_description']
|
|
|
|
|
| if not isinstance(caption, str):
|
| return None
|
|
|
| encoding = self.tokenizer(
|
| caption,
|
| add_special_tokens=True,
|
| max_length=MAX_LENGTH,
|
| padding='max_length',
|
| truncation=True,
|
| return_attention_mask=True,
|
| return_tensors='pt'
|
| )
|
|
|
| return {
|
| 'pixel_values': image,
|
| 'input_ids': encoding['input_ids'].squeeze(),
|
| 'attention_mask': encoding['attention_mask'].squeeze(),
|
| 'labels': encoding['input_ids'].squeeze()
|
| }
|
|
|
|
|
| class ImageCaptioningModel(nn.Module):
|
| def __init__(self, vit_model, bert_model):
|
| super(ImageCaptioningModel, self).__init__()
|
| self.vit = vit_model
|
| self.bert = bert_model
|
| self.linear = nn.Linear(self.vit.config.hidden_size, self.bert.config.hidden_size)
|
|
|
| def forward(self, pixel_values, input_ids, attention_mask, labels=None):
|
| image_features = self.vit(pixel_values).last_hidden_state
|
| image_features = self.linear(image_features)
|
|
|
| outputs = self.bert(input_ids=input_ids,
|
| attention_mask=attention_mask,
|
| encoder_hidden_states=image_features,
|
| labels=labels,
|
| return_dict=True)
|
|
|
| return outputs.loss, outputs.logits
|
|
|
| def collate_fn(batch):
|
|
|
| batch = list(filter(lambda x: x is not None, batch))
|
| if len(batch) == 0:
|
| return None
|
| return {key: torch.stack([item[key] for item in batch]) for key in batch[0]}
|
|
|
| def train_vlm_model():
|
|
|
| encodings = ['utf-8', 'iso-8859-9', 'windows-1254']
|
| for encoding in encodings:
|
| try:
|
| df = pd.read_csv('./datasets/' + model + '.tsv000', sep='\t', encoding=encoding)
|
| print(f"Successfully read the file with {encoding} encoding.")
|
| break
|
| except UnicodeDecodeError:
|
| print(f"Failed to read with {encoding} encoding. Trying next...")
|
| else:
|
| raise ValueError("Could not read the file with any of the specified encodings.")
|
|
|
|
|
| tokenizer = BertTokenizerFast.from_pretrained(BERT_MODEL_NAME)
|
|
|
|
|
| dataset = ImageCaptioningDataset(df, '../download/images', tokenizer)
|
| dataloader = DataLoader(dataset, batch_size=BATCH_SIZE, shuffle=True, collate_fn=collate_fn)
|
|
|
|
|
| vit_model = ViTModel.from_pretrained(VIT_MODEL_NAME)
|
| bert_config = BertConfig.from_pretrained(BERT_MODEL_NAME)
|
| bert_config.is_decoder = True
|
| bert_config.add_cross_attention = True
|
| bert_model = BertLMHeadModel.from_pretrained(BERT_MODEL_NAME, config=bert_config)
|
|
|
|
|
| model = ImageCaptioningModel(vit_model, bert_model)
|
| model.to(device)
|
|
|
|
|
| optimizer = AdamW(model.parameters(), lr=LEARNING_RATE)
|
|
|
|
|
| model.train()
|
| for epoch in range(EPOCHS):
|
| total_loss = 0
|
| progress_bar = tqdm(dataloader, desc=f"Epoch {epoch+1}/{EPOCHS}")
|
| for batch in progress_bar:
|
| if batch is None:
|
| continue
|
|
|
| pixel_values = batch['pixel_values'].to(device)
|
| input_ids = batch['input_ids'].to(device)
|
| attention_mask = batch['attention_mask'].to(device)
|
| labels = batch['labels'].to(device)
|
|
|
| optimizer.zero_grad()
|
| loss, _ = model(pixel_values, input_ids, attention_mask, labels)
|
| loss.backward()
|
| optimizer.step()
|
|
|
| total_loss += loss.item()
|
| progress_bar.set_postfix({'loss': loss.item()})
|
|
|
| print(f"Epoch {epoch+1}/{EPOCHS}, Average Loss: {total_loss/len(dataloader)}")
|
|
|
|
|
| torch.save(model.state_dict(), "./models/" + model)
|
| tokenizer.save_pretrained("./tokenizer")
|
|
|
| if __name__ == "__main__":
|
| train_vlm_model() |