Skip to content
Snippets Groups Projects
Commit fac26b01 authored by lucien.noel's avatar lucien.noel
Browse files

refactoring Transformer : training d'une version simplifiée implémenté

parent 58a49768
No related branches found
No related tags found
No related merge requests found
.idea/
.venv/
__pycache__/
\ No newline at end of file
__pycache__/
*_old.py
\ No newline at end of file
Inspiré du projet [suivant](https://github.com/pytorch/examples/tree/main/word_language_model "Exemple pytorch d'utilisation d'un Transformer")
\ No newline at end of file
data.py 0 → 100644
import os
from io import open
import torch
class Dictionary(object):
def __init__(self) -> None:
self.word2idx = {}
self.idx2word = []
def add_word(self, word: str) -> int:
if word not in self.word2idx:
self.idx2word.append(word)
self.word2idx[word] = len(self.idx2word) - 1
return self.word2idx[word]
def __len__(self) -> int:
return len(self.idx2word)
class Corpus(object):
def __init__(self, path: str) -> None:
self.dictionary = Dictionary()
self.train = self.tokenize(os.path.join(path, './training_data/classical_music/test/input.txt'))
# self.valid = self.tokenize(os.path.join(path, './training_data/classical_music/test/input.txt'))
# self.test = self.tokenize(os.path.join(path, './training_data/classical_music/test/input.txt'))
self.valid = self.train
self.test = self.train
def tokenize(self, path: str) -> torch.Tensor:
"""Tokenizes a text file."""
assert os.path.exists(path), print(f"{path} doesn't exists")
# Add words to the dictionary
with open(path, 'r', encoding="utf8") as f:
for line in f:
words = line.split() + ['<eos>']
for word in words:
self.dictionary.add_word(word)
# Tokenize file content
with open(path, 'r', encoding="utf8") as f:
idss = []
for line in f:
words = line.split() + ['<eos>']
ids = []
for word in words:
ids.append(self.dictionary.word2idx[word])
idss.append(torch.tensor(ids).type(torch.int64))
ids = torch.cat(idss)
return ids
def batchify(data: torch.Tensor, bsz: int, device: str = 'cpu') -> torch.Tensor:
# Work out how cleanly we can divide the dataset into bsz parts.
nbatch = data.size(0) // bsz
# Trim off any extra elements that wouldn't cleanly fit (remainders).
data = data.narrow(0, 0, nbatch * bsz)
# Evenly divide the data across the bsz batches.
data = data.view(bsz, -1).t().contiguous()
return data.to(device)
import sys
import os.path
import numpy as np
import torch
import torch.onnx
import torch.nn as nn
from datetime import datetime
import musicreader as mr
import data
import musicgenerator as mg
# constants
TRAINING_INPUT_FILENAME = "input.txt"
TRAINING_INFOS_FILENAME = "training_infos.json"
MODEL_PARAMETER_FILENAME = "model_parameters.pt"
device = "cuda" if torch.cuda.is_available() else "cpu"
# default parameters
trainingFolder = os.path.abspath("./training_data/classical_music/test/")
training = True
outputPath = os.path.abspath("./output.mid")
if __name__ == '__main__':
learning_rate = 0.01
###############################################################################
# Prépare les données
###############################################################################
corpus = data.Corpus('./')
train_data = data.batchify(corpus.train, 16, device)
val_data = data.batchify(corpus.valid, 16, device)
###############################################################################
# Construit le modèle
###############################################################################
ntokens = len(corpus.dictionary)
model = mg.MusicGenerator(vocab_size=ntokens, dim_model=8, num_head=4).to(device)
criterion = nn.NLLLoss()
###############################################################################
# Entraîne le modèle
###############################################################################
mg.train(model, criterion, ntokens, train_data, val_data, 4, learning_rate, 5, 500)
def isTrainingNeeded() -> bool:
# TODO
return training
if __name__ == '__main__':
if training or isTrainingNeeded():
minBpm, maxBpm = mr.readTrainingFolder(trainingFolder, TRAINING_INPUT_FILENAME, TRAINING_INFOS_FILENAME)
mg.train(
trainingInputFilePath=os.path.join(trainingFolder, TRAINING_INPUT_FILENAME),
modelParametersFilePath=os.path.join(trainingFolder, MODEL_PARAMETER_FILENAME),
dim_model=1
)
else:
minBpm, maxBpm = mr.getMinMaxBpm(trainingFolder, TRAINING_INFOS_FILENAME)
bpm = np.random.randint(minBpm, maxBpm + 1)
mg.generate(outputPath, bpm, os.path.join(trainingFolder, MODEL_PARAMETER_FILENAME))
import math
import time
import torch
import torch.nn as nn
import torch.optim as optim
device = "cuda" if torch.cuda.is_available() else "cpu"
import torch.nn.functional as F
# source : https://medium.com/p/c80afbc9ffb1/
class PositionalEncoding(nn.Module):
def __init__(self, dim_model, dropout_p, max_len):
super().__init__()
# Info
self.dropout = nn.Dropout(dropout_p)
# Encoding - From formula
pos_encoding = torch.zeros(max_len, dim_model)
positions_list = torch.arange(0, max_len, dtype=torch.float).view(-1, 1) # 0, 1, 2, 3, 4, 5
division_term = torch.exp(torch.arange(0, dim_model, 2).float() * (-math.log(10000.0)) / dim_model) # 1000^(2i/dim_model)
# PE(pos, 2i) = sin(pos/1000^(2i/dim_model))
pos_encoding[:, 0::2] = torch.sin(positions_list * division_term)
# PE(pos, 2i + 1) = cos(pos/1000^(2i/dim_model))
pos_encoding[:, 1::2] = torch.cos(positions_list * division_term)
# Saving buffer (same as parameter without gradients needed)
pos_encoding = pos_encoding.unsqueeze(0).transpose(0, 1)
self.register_buffer("pos_encoding", pos_encoding)
def forward(self, token_embedding: torch.tensor) -> torch.tensor:
# Residual connection + pos encoding
return self.dropout(token_embedding + self.pos_encoding[:token_embedding.size(0), :])
class MusicGenerator(nn.Module):
def __init__(self, dim_model: int, vocab_size: int, num_heads: int):
super().__init__()
def __init__(self, d_model, dropout=0.1, max_len=5000):
super(PositionalEncoding, self).__init__()
self.dropout = nn.Dropout(p=dropout)
pe = torch.zeros(max_len, d_model)
position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1)
div_term = torch.exp(torch.arange(0, d_model, 2).float() * (-math.log(10000.0) / d_model))
pe[:, 0::2] = torch.sin(position * div_term)
pe[:, 1::2] = torch.cos(position * div_term)
pe = pe.unsqueeze(0).transpose(0, 1)
self.register_buffer('pe', pe)
def forward(self, x):
x = x + self.pe[:x.size(0), :]
return self.dropout(x)
class MusicGenerator(nn.Transformer):
def __init__(self, vocab_size: int, dim_model: int, num_head: int, dropout=0.5):
super(MusicGenerator, self).__init__(d_model=dim_model, nhead=num_head)
self.model_type = 'Transformer'
self.src_mask = None
self.pos_encoder = PositionalEncoding(dim_model, dropout)
self.input_emb = nn.Embedding(vocab_size, dim_model)
self.dim_model = dim_model
self.decoder = nn.Linear(dim_model, vocab_size)
self.init_weights()
def _generate_square_subsequent_mask(self, sz):
return torch.log(torch.tril(torch.ones(sz, sz)))
def init_weights(self):
initrange = 0.1
nn.init.uniform_(self.input_emb.weight, -initrange, initrange)
nn.init.zeros_(self.decoder.bias)
nn.init.uniform_(self.decoder.weight, -initrange, initrange)
def forward(self, src, has_mask=True):
if has_mask:
device = src.device
if self.src_mask is None or self.src_mask.size(0) != len(src):
mask = self._generate_square_subsequent_mask(len(src)).to(device)
self.src_mask = mask
else:
self.src_mask = None
src = self.input_emb(src) * math.sqrt(self.dim_model)
src = self.pos_encoder(src)
output = self.encoder(src, mask=self.src_mask)
output = self.decoder(output)
return F.log_softmax(output, dim=-1)
def get_batch(source: torch.Tensor, i: int, sequence_length: int) -> (torch.Tensor, torch.Tensor):
seq_len = min(sequence_length, len(source) - 1 - i)
data = source[i:i + seq_len]
target = source[i + 1:i + 1 + seq_len].view(-1)
return data, target
def evaluate(model: MusicGenerator,
criterion: nn.NLLLoss,
data_source: torch.Tensor,
ntokens: int,
sequence_length: int,
) -> float:
# Turn on evaluation mode which disables dropout.
model.eval()
total_loss = 0.
with torch.no_grad():
for i in range(0, data_source.size(0) - 1, sequence_length):
data, targets = get_batch(data_source, i, sequence_length)
output = model(data)
output = output.view(-1, ntokens)
total_loss += len(data) * criterion(output, targets).item()
return total_loss / (len(data_source) - 1)
def train(model: MusicGenerator,
criterion: nn.NLLLoss,
ntokens: int,
train_data: torch.Tensor,
val_data: torch.Tensor,
sequence_length: int,
lr: float,
epochs: int,
log_interval: int
) -> None:
# Loop over epochs.
best_val_loss = None
# At any point you can hit Ctrl + C to break out of training early.
try:
for epoch in range(1, epochs + 1):
epoch_start_time = time.time()
__train(model, criterion, ntokens, train_data, sequence_length, lr, epoch, log_interval)
val_loss = evaluate(model, criterion, val_data, ntokens, sequence_length)
print('-' * 89)
print('| end of epoch {:3d} | time: {:5.2f}s | valid loss {:5.2f} | '
'valid ppl {:8.2f}'.format(epoch, (time.time() - epoch_start_time),
val_loss, math.exp(val_loss)))
print('-' * 89)
# Save the model if the validation loss is the best we've seen so far.
if not best_val_loss or val_loss < best_val_loss:
# with open(args.save, 'wb') as f:
# torch.save(model, f)
best_val_loss = val_loss
else:
# Anneal the learning rate if no improvement has been seen in the validation dataset.
lr /= 4.0
except KeyboardInterrupt:
print('-' * 89)
print('Exiting from training early')
def __train(model: MusicGenerator,
criterion: nn.NLLLoss,
ntokens: int,
train_data: torch.Tensor,
sequence_length: int,
lr: float,
epoch: int,
log_interval: int
) -> None:
model.train()
total_loss = 0
start_time = time.time()
for batch, i in enumerate(range(0, train_data.size(0) - 1, sequence_length)):
data, targets = get_batch(train_data, i, sequence_length)
# Starting each batch, we detach the hidden state from how it was previously produced.
# If we didn't, the model would try backpropagating all the way to start of the dataset.
model.zero_grad()
output = model(data)
output = output.view(-1, ntokens)
loss = criterion(output, targets)
loss.backward()
# LAYERS
self.positional_encoder_layer = PositionalEncoding(
dim_model=dim_model, dropout_p=0.1, max_len=5000
)
self.embedding_layer = nn.Embedding(vocab_size, dim_model)
self.transformer_layer = nn.Transformer(d_model=dim_model, nhead=num_heads)
self.out = nn.Linear(dim_model, vocab_size)
def forward(self, src, tgt):
# Src size must be (batch_size, src sequence length)
# Tgt size must be (batch_size, tgt sequence length)
# Embedding + positional encoding - Out size = (batch_size, sequence length, dim_model)
src = self.embedding_layer(src) * math.sqrt(self.dim_model)
tgt = self.embedding_layer(tgt) * math.sqrt(self.dim_model)
src = self.positional_encoder_layer(src)
tgt = self.positional_encoder_layer(tgt)
# we permute to obtain size (sequence length, batch_size, dim_model),
src = src.permute(1, 0, 2)
tgt = tgt.permute(1, 0, 2)
# Transformer blocks - Out size = (sequence length, batch_size, num_tokens)
transformer_out = self.transformer_layer(src, tgt)
out = self.out(transformer_out)
return out
# `clip_grad_norm` helps prevent the exploding gradient problem in RNNs / LSTMs.
clip = 0.25
torch.nn.utils.clip_grad_norm_(model.parameters(), clip)
for p in model.parameters():
p.data.add_(p.grad, alpha=-lr)
total_loss += loss.item()
def get_random_batch(data, block_size=256, batch_size=64):
# generate a small batch of data of inputs x and targets y
ix = torch.randint(len(data) - block_size, (batch_size,))
x = torch.stack([data[i:i + block_size] for i in ix])
y = torch.stack([data[i + 1:i + block_size + 1] for i in ix])
x, y = x.to(device), y.to(device)
return x, y
if batch % log_interval == 0 and batch > 0:
cur_loss = total_loss / log_interval
elapsed = time.time() - start_time
print('| epoch {:3d} | {:5d}/{:5d} batches | lr {:02.2f} | ms/batch {:5.2f} | '
'loss {:5.2f} | ppl {:8.2f}'.format(epoch, batch, len(train_data) // sequence_length, lr, elapsed * 1000 / log_interval, cur_loss,
math.exp(cur_loss)))
total_loss = 0
start_time = time.time()
# if args.dry_run:
# break
def train(trainingInputFilePath: str, modelParametersFilePath: str, dim_model: int):
def train_old(trainingInputFilePath: str, modelParametersFilePath: str, dim_model: int):
print(f"training from {trainingInputFilePath}")
# code temp.
......@@ -92,7 +181,7 @@ def train(trainingInputFilePath: str, modelParametersFilePath: str, dim_model: i
break
vocab.add(char)
n_vocab = len(vocab)
ntokens = len(vocab)
stoi = {ch: i for i, ch in enumerate(vocab)}
itos = {i: ch for i, ch in enumerate(vocab)}
encode = lambda s: [stoi[c] for c in s] # encoder: take a string, output a list of integers
......@@ -104,43 +193,17 @@ def train(trainingInputFilePath: str, modelParametersFilePath: str, dim_model: i
train_data = data[:n]
val_data = data[n:]
print(f"n_vocab = {n_vocab}, dim_model = {dim_model}")
print(f"n_vocab = {ntokens}, dim_model = {dim_model}")
# IMPORTANT : dim_model doit être divisible par num_heads
mg = MusicGenerator(dim_model=dim_model, vocab_size=n_vocab, num_heads=1).to(device)
opt = torch.optim.SGD(mg.parameters(), lr=0.01)
loss_fn = nn.CrossEntropyLoss()
mg.train()
total_loss = 0
for i in range(10):
print(i)
X, y = get_random_batch(train_data)
X, y = torch.tensor(X).to(device), torch.tensor(y).to(device)
# Now we shift the tgt by one so with the <SOS> we predict the token at pos 1
y_input = y[:, :-1]
y_expected = y[:, 1:]
# Standard training except we pass in y_input and tgt_mask
pred = mg(X, y_input)
# Permute pred to have batch size first again
pred = pred.permute(1, 2, 0)
loss = loss_fn(pred, y_expected)
opt.zero_grad()
loss.backward()
opt.step()
total_loss += loss.detach().item()
with torch.no_grad():
print(mg())
# context = torch.zeros((1, 1), dtype=torch.long, device=device)
# print(decode(mg.generate(context, max_new_tokens=500)[0].tolist()))
# model = MusicGenerator(vocab_size=ntokens, dim_model=dim_model, num_head=1).to(device)
#
# model.train()
total_loss = 0.
start_time = time.time()
sequence_length = 2
# for batch, i in enumerate(range(0, n - 1, sequence_length)):
# data, targets = get_batch(train_data, i)
def generate(outputFilePath: str, bpm: int, modelParametersFilePath: str):
......
import json
import os
import torch
import music21
def readTrainingFolder(folderPath: str, trainingTextFilename: str, trainingInfoJsonFilename: str) -> (int, int):
"""
Read all midi files and save it into a text file that can be read for the training process.
Also save some info about the training data in trainingInfoJsonFilename.
:param folderPath: path to folder containing all training midi files to read
:param trainingInfoJsonFilename:
:param trainingTextFilename:
:return: min and max bpm of the training data
"""
print(f"read all midi files in {folderPath}")
print(f"save content in {os.path.join(folderPath, trainingTextFilename)}")
print(f"save infos in {os.path.join(folderPath, trainingInfoJsonFilename)}")
return 0, 0
def getMinMaxBpm(folderPath: str, trainingInfoJsonFilename: str) -> (int, int):
"""
ASSUMING THAT THE TRAINING DATA HAS ALREADY BEEN READ ONCE.
Read the trainingInfoJsonFilename file and return the min/max bpm.
:param folderPath: path to folder containing all training midi files
:param trainingInfoJsonFilename:
:return: min and max bpm of the training data
"""
print(f"read infos in {os.path.join(folderPath, trainingInfoJsonFilename)}")
return 0, 0
# en fait il y a pas besoin de ça car pytorch optimize aussi les données dans les nn.Embedding
# mais cette fonction pourrait être utile si on voudrait un embedding plus spécifique
# def createEmbedding(embeddingFilePath: str, trainingTextFilePath: str):
# print(f"read {trainingTextFilePath}")
# print(f"create embedding {embeddingFilePath}")
#
# # example code
# vocab = set()
# with open(trainingTextFilePath, 'r') as file:
# while True:
# char = file.read(1)
# if not char:
# break
# vocab.add(char)
# embedding = {c: [i] for i, c in enumerate(sorted(list(vocab)))}
# print(embedding)
# torch.save(embedding, embeddingFilePath)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment