李宏毅百度(李宏毅_机器学习_作业4(详解)_HW4 Classify the speakers)
本次作业需要学习完transformer后完成!
Task
做语者辨识任务 ,一共有600个语者 ,给了每一个语者的语音feature进行训练 ,然后通过test_feature进行语者辨识 。(本质上还是分类任务Classification)
Simple(0.60824):run sample code and know how to use transformer
Medium(0.70375):know how to adjust parameters of transformer
Strong(0.77750):construct conformer
Boss(0.86500):implement self-attention pooling and additive margin softmax使用kaggle训练作业模型
助教样例code解读
数据集分析
mapping.json文件
将speakers的id映射到编号0~599 ,因为一共有600个不同的speaker需要对语音进行分类
metadata.json文件
存放的是training data ,本次实验没有专门设置validation data ,需要从training data中划分validation data
n_mels:在对语音数据进行处理时 ,从每一个时间维度上选取n_mels个维度来表示这个feature
speakers:以key-value形式存放speakers的id和所有feature(每个speaker都有多个feature)
feature_path:这个feature的文件名
mel_len:每一个feature的长度(每一个可能都不一样 ,后期需要处理)testdata.json文件
与metadata形式类似 ,需要我们进行语者辨识 。utterance:话语; 言论Dataset
本次实验的数据来源于 Voxceleb2语音数据集,是真实世界中语者的语音 ,作业中选取了600个语者 ,和他们的语音进行训练
import os import json import torch import random from pathlib import Path from torch.utils.data import Dataset from torch.nn.utils.rnn import pad_sequence class myDataset(Dataset): def __init__(self, data_dir, segment_len=128): self.data_dir = data_dir self.segment_len = segment_len # Load the mapping from speaker neme to their corresponding id. mapping_path = Path(data_dir) / "mapping.json" #mapping_path: Dataset\mapping.json mapping = json.load(mapping_path.open()) #mapping: {speaker2id: {id00464: 0, id00559: 1, self.speaker2id = mapping["speaker2id"] #self.speaker2id: {id00464: 0, id00559: 1, id00578: 2, id00905: 3,... # Load metadata of training data. metadata_path = Path(data_dir) / "metadata.json" metadata = json.load(open(metadata_path))["speakers"] #metadata中存放的key是speaker_id,value是每个speaker的feature和对应长度 # Get the total number of speaker. self.speaker_num = len(metadata.keys()) self.data = [] for speaker in metadata.keys(): #遍历每一个spearker_id for utterances in metadata[speaker]: #通过speaker_id取出speaker的所有feature和len """ utterances格式: {feature_path: uttr-18e375195dc146fd8d14b8a322c29b90.pt, mel_len: 435} {feature_path: uttr-da9917d5853049178487c065c9e8b718.pt, mel_len: 490}... """ self.data.append([utterances["feature_path"], self.speaker2id[speaker]]) #self.data:[[uttr-18e375195dc146fd8d14b8a322c29b90.pt, 436], # [uttr-da9917d5853049178487c065c9e8b718.pt, 436],... #一共600个speaker,436表示第436个speaker def __len__(self): return len(self.data) def __getitem__(self, index): feat_path, speaker = self.data[index] #feature和speaker编号[0,599] # Load preprocessed mel-spectrogram. mel = torch.load(os.path.join(self.data_dir, feat_path)) #加载feature #mel.size():torch.Size([490, 40]) # Segmemt mel-spectrogram into "segment_len" frames. if len(mel) > self.segment_len: #将feature切片成固定长度 # Randomly get the starting point of the segment. start = random.randint(0, len(mel) - self.segment_len) #随机选取切片起始点 # Get a segment with "segment_len" frames. mel = torch.FloatTensor(mel[start:start+self.segment_len])#截取长度为segment_len的片段 mel.size():torch.Size([128, 40]) else: mel = torch.FloatTensor(mel) #为什么小于segment_len不填充? 填充在dataloader中完成 # Turn the speaker id into long for computing loss later. speaker = torch.FloatTensor([speaker]).long() #将speaker的编号转为long类型 return mel, speaker def get_speaker_number(self): return self.speaker_num #600Dataloader
主要任务:1.划分验证集 2.将长度小于segment_len的mel进行padding 3.生成dataloader
import torch from torch.utils.data import DataLoader, random_split from torch.nn.utils.rnn import pad_sequence def collate_batch(batch): #用于整理数据的函数,参数为dataloader中的一个batch # Process features within a batch. """Collate a batch of data.""" mel, speaker = zip(*batch) #zip拆包 ,将一个batch中的mel和speaker分开 ,各自单独形成一个数组 # Because we train the model batch by batch, we need to pad the features in the same batch to make their lengths the same. #mel中元素长度不相同时,将所有的mel元素填充到最长的元素的长度 ,填充的值由padding_value决定 mel = pad_sequence(mel, batch_first=True, padding_value=-20) # pad log 10^(-20) which is very small value. # mel: (batch size, length, 40) return mel, torch.FloatTensor(speaker).long() def get_dataloader(data_dir, batch_size, n_workers): """Generate dataloader""" dataset = myDataset(data_dir) speaker_num = dataset.get_speaker_number() # Split dataset into training dataset and validation dataset trainlen = int(0.9 * len(dataset)) lengths = [trainlen, len(dataset) - trainlen] trainset, validset = random_split(dataset, lengths) #无覆盖的随机划分训练集和验证集 train_loader = DataLoader( trainset, batch_size=batch_size, shuffle=True, drop_last=True, num_workers=n_workers, pin_memory=True, collate_fn=collate_batch, ) valid_loader = DataLoader( validset, batch_size=batch_size, num_workers=n_workers, drop_last=True, pin_memory=True, collate_fn=collate_batch, ) return train_loader, valid_loader, speaker_numModel
最关键部分 ,transformer运用
transformer基础架构来自于论文: Attention Is All You Need
论文解读: 李沐大神的论文带读 ,用了都说好这里是分类任务 ,仅需要使用Encoder部分
pytorch官方文档: torch.nn.TransformerEncoderLayer import torch import torch.nn as nn import torch.nn.functional as F class Classifier(nn.Module): def __init__(self, d_model=80, n_spks=600, dropout=0.1): super().__init__() # Project the dimension of features from that of input into d_model. self.prenet = nn.Linear(40, d_model) # TODO: # Change Transformer to Conformer. # https://arxiv.org/abs/2005.08100 #对于文本分类等下游任务 ,只需要用到Encoder部分即可 #nhead:multi_head_attention中head个数 #d_model:输入的feature的个数 #dim_feedforward:feedforward network的维度 #dropout默认0.1 self.encoder_layer = nn.TransformerEncoderLayer( d_model=d_model, dim_feedforward=256, nhead=2 ) # self.encoder = nn.TransformerEncoder(self.encoder_layer, num_layers=2) # Project the the dimension of features from d_model into speaker nums. self.pred_layer = nn.Sequential( nn.Linear(d_model, d_model), nn.ReLU(), nn.Linear(d_model, n_spks), ) def forward(self, mels): """ args: mels: (batch size, length, 40) return: out: (batch size, n_spks) """ # out: (batch size, length, d_model) length=segment_len out = self.prenet(mels) # out: (length, batch size, d_model) out = out.permute(1, 0, 2) #交换dim=0和dim=1 # The encoder layer expect features in the shape of (length, batch size, d_model). out = self.encoder_layer(out) # out: (batch size, length, d_model) out = out.transpose(0, 1) #转置dim=0和dim=1 # mean pooling stats = out.mean(dim=1) #可以理解为求平均并去除维度1 stats.size():(batch_size,d_model) # out: (batch, n_spks) out = self.pred_layer(stats) return outLearning rate schedule
当batch设置的比较大的时候通常需要比较大的学习率(通常batch_size和学习率成正比) ,但在刚开始训练时 ,参数是随机初始化的 ,梯度也比较大 ,这时学习率也比较大,会使得训练不稳定。
warm up 方法就是在最初几轮迭代采用比较小的学习率 ,等梯度下降到一定程度再恢复初始学习率
------《神经网络与深度学习》 import math import torch from torch.optim import Optimizer from torch.optim.lr_scheduler import LambdaLR def get_cosine_schedule_with_warmup( optimizer: Optimizer, num_warmup_steps: int, num_training_steps: int, num_cycles: float = 0.5, last_epoch: int = -1, ): """ Create a schedule with a learning rate that decreases following the values of the cosine function between the initial lr set in the optimizer to 0, after a warmup period during which it increases linearly between 0 and the initial lr set in the optimizer. Args: optimizer (:class:`~torch.optim.Optimizer`): The optimizer for which to schedule the learning rate. num_warmup_steps (:obj:`int`): The number of steps for the warmup phase. num_training_steps (:obj:`int`): The total number of training steps. num_cycles (:obj:`float`, `optional`, defaults to 0.5): The number of waves in the cosine schedule (the defaults is to just decrease from the max value to 0 following a half-cosine). last_epoch (:obj:`int`, `optional`, defaults to -1): The index of the last epoch when resuming training. Return: :obj:`torch.optim.lr_scheduler.LambdaLR` with the appropriate schedule. """ def lr_lambda(current_step): # Warmup if current_step < num_warmup_steps: return float(current_step) / float(max(1, num_warmup_steps)) # decadence progress = float(current_step - num_warmup_steps) / float( max(1, num_training_steps - num_warmup_steps) ) return max( 0.0, 0.5 * (1.0 + math.cos(math.pi * float(num_cycles) * 2.0 * progress)) ) return LambdaLR(optimizer, lr_lambda, last_epoch)Model Function
调用自定义model的forward部分 ,每遍历一个batch都要调用一次model_fn
import torch def model_fn(batch, model, criterion, device): """Forward a batch through the model.""" mels, labels = batch #print("model_fn_mels.size():",mels.size()) # out:torch.Size([16, 128, 40]) [batch_size,segment_len,40] mels = mels.to(device) labels = labels.to(device) outs = model(mels) loss = criterion(outs, labels) # Get the speaker id with highest probability. preds = outs.argmax(1) # Compute accuracy. accuracy = torch.mean((preds == labels).float()) return loss, accuracyValidate
计算验证集上的准确率
from tqdm import tqdm import torch def valid(dataloader, model, criterion, device): """Validate on validation set.""" model.eval() running_loss = 0.0 running_accuracy = 0.0 #验证集5667个 pbar = tqdm(total=len(dataloader.dataset), ncols=0, desc="Valid", unit=" uttr") for i, batch in enumerate(dataloader): with torch.no_grad(): loss, accuracy = model_fn(batch, model, criterion, device) running_loss += loss.item() running_accuracy += accuracy.item() pbar.update(dataloader.batch_size) pbar.set_postfix( loss=f"{running_loss / (i+1):.2f}", accuracy=f"{running_accuracy / (i+1):.2f}", ) pbar.close() model.train() return running_accuracy / len(dataloader)Main function
开始跑模型,这里与之前的作业有不同的地方 。前几个作业是跑完一个epoch也就是完整训练集 ,再开始跑验证集 。这里是跑valid_steps个batch ,跑一遍验证集 。
from tqdm import tqdm import torch import torch.nn as nn from torch.optim import AdamW from torch.utils.data import DataLoader, random_split def parse_args(): """arguments""" config = { "data_dir": "./Dataset", "save_path": "model.ckpt", "batch_size": 16, "n_workers": 0, "valid_steps": 2000, "warmup_steps": 1000, "save_steps": 10000, "total_steps": 70000, } return config def main( data_dir, save_path, batch_size, n_workers, valid_steps, warmup_steps, total_steps, save_steps, ): """Main function.""" device = torch.device("cuda" if torch.cuda.is_available() else "cpu") print(f"[Info]: Use {device} now!") train_loader, valid_loader, speaker_num = get_dataloader(data_dir, batch_size, n_workers) train_iterator = iter(train_loader) #iter()生成迭代器,以batch为单位 #print("train_iterator:",train_iterator) #<torch.utils.data.dataloader._SingleProcessDataLoaderIter object at 0x000001FD07C558D0> print(f"[Info]: Finish loading data!",flush = True) model = Classifier(n_spks=speaker_num).to(device) criterion = nn.CrossEntropyLoss() optimizer = AdamW(model.parameters(), lr=1e-3) scheduler = get_cosine_schedule_with_warmup(optimizer, warmup_steps, total_steps) #上面定义的warm up函数 print(f"[Info]: Finish creating model!",flush = True) best_accuracy = -1.0 best_state_dict = None pbar = tqdm(total=valid_steps, ncols=0, desc="Train", unit=" step") #train valid_steps个batch再跑验证集 for step in range(total_steps): #一共运行total_Steps轮 ,这里没有epoch的概念 # Get data try: batch = next(train_iterator) #next()返回迭代器的下一个项目 ,即下一个batch #print("batch[0].size():",batch[0].size()) #out:torch.Size([16, 128, 40]) [batch_size,segment_len,40] except StopIteration: # 不指定 default 且迭代器元素耗尽, 将引发 StopIteration 异常 train_iterator = iter(train_loader) batch = next(train_iterator) loss, accuracy = model_fn(batch, model, criterion, device) #计算当前batch的loss和acc #print("loss:",loss) #tensor(6.3915, device=cuda:0, grad_fn=<NllLossBackward0>) batch_loss = loss.item() # loss是张量 ,item()可以取出张量中的值 #print("batch_loss:",batch_loss) #batch_loss: 6.391468048095703 batch_accuracy = accuracy.item() # Updata model 反向传播更新参数 ,每跑一个batch都会更新 loss.backward() optimizer.step() scheduler.step() optimizer.zero_grad() # Log pbar.update() #打印当前loss和acc pbar.set_postfix( loss=f"{batch_loss:.2f}", accuracy=f"{batch_accuracy:.2f}", step=step + 1, ) # Do validation if (step + 1) % valid_steps == 0: #经过valid_steps开始跑验证集 pbar.close() valid_accuracy = valid(valid_loader, model, criterion, device) #计算valid_acc # keep the best model if valid_accuracy > best_accuracy: best_accuracy = valid_accuracy best_state_dict = model.state_dict() #保存模型参数 pbar = tqdm(total=valid_steps, ncols=0, desc="Train", unit=" step") # Save the best model so far. if (step + 1) % save_steps == 0 and best_state_dict is not None: #每save_steps轮会保存一次当前最好模型 torch.save(best_state_dict, save_path) pbar.write(f"Step {step + 1}, best model saved. (accuracy={best_accuracy:.4f})") pbar.close() if __name__ == "__main__": main(**parse_args())Inference
inference:推理 ,就是跑testing data
类比training即可Main function of inference
类似Main function
样例code得分
Medium
调整参数过medium
d_model=160
n_head=8
num_layers=2
linear layer:1层
total_steps=100000
这一轮train上准确率100% ,只虽然只进行了13步 ,但从loss上可以看出是有过拟合的
Strong
Transformer->Conformer
先上结果 ,未过strong
严重过拟合 ,在训练集和验证集上均有过拟合现象,验证集上的准确率远高于测试集上结果论文地址: Conformer
conformer的思路很简单 ,就是将Transformer和CNN进行结合 。原因:
1.Transformer中由于attention机制 ,拥有很好的全局性 。
2.CNN拥有较好的局部性,可以对细粒度的信息进行提取 。
两者结合在语音上有较好的效果 。论文中阐述了具体的model架构 。 首先 pip conformer包 !pip install conformer 导入conformer包 from conformer import ConformerBlock 修改module import torch import torch.nn as nn import torch.nn.functional as F class Classifier(nn.Module): def __init__(self, d_model=512, n_spks=600, dropout=0.1): super().__init__() # Project the dimension of features from that of input into d_model. self.prenet = nn.Linear(40, d_model) # TODO: # Change Transformer to Conformer. # https://arxiv.org/abs/2005.08100 #对于文本分类等下游任务 ,只需要用到Encoder部分即可 #nhead:multi_head_attention中head个数 #d_model:输入的feature的个数 #dim_feedforward:feedforward network的维度 #dropout默认0.1 #self.encoder_layer = nn.TransformerEncoderLayer( #d_model=d_model, dim_feedforward=256, nhead=8 #) #self.encoder = nn.TransformerEncoder(self.encoder_layer, num_layers=2) self.conformer_block=ConformerBlock( dim=d_model, dim_head=64, heads=8, ff_mult=4, conv_expansion_factor=2, conv_kernel_size=31, attn_dropout=dropout, ff_dropout=dropout, conv_dropout=dropout ) # Project the the dimension of features from d_model into speaker nums. self.pred_layer = nn.Sequential( #nn.Linear(d_model, d_model), #nn.ReLU(), nn.Linear(d_model, n_spks), ) def forward(self, mels): """ args: mels: (batch size, length, 40) return: out: (batch size, n_spks) """ # out: (batch size, length, d_model) length=segment_len out = self.prenet(mels) # out: (length, batch size, d_model) out = out.permute(1, 0, 2) #交换dim=0和dim=1 # The encoder layer expect features in the shape of (length, batch size, d_model). out = self.conformer_block(out) # out: (batch size, length, d_model) out = out.transpose(0, 1) #转置dim=0和dim=1 # mean pooling stats = out.mean(dim=1) #可以理解为求平均并去除维度1 stats.size():(batch_size,d_model) # out: (batch, n_spks) out = self.pred_layer(stats) return outSelf-attention pooling
self attention pooling论文
主要看论文中的self-attention pooling架构 ,和mean pooling相比之下,self-attention pooling是通过可学习参数来进行pooling ,相比mean pooling可以提取到一些信息 。
参考大佬视频讲解
代码: #self attention pooling类实现 import torch.nn.functional as F import torch.nn as nn class Self_Attentive_Pooling(nn.Module): def __init__(self,dim): super(Self_Attentive_Pooling,self).__init__() self.sap_linear=nn.Linear(dim,dim) self.attention=nn.Parameter(torch.FloatTensor(dim,1)) def forward(self,x): x=x.permute(0,2,1) h=torch.tanh(self.sap_linear(x)) w=torch.matmul(h,self.attention).squeeze(dim=2) w=F.softmax(w,dim=1).view(x.size(0),x.size(1),1) x=torch.sum(x*w,dim=1) return x修改model:
import torch import torch.nn as nn import torch.nn.functional as F class Classifier(nn.Module): def __init__(self, d_model=512, n_spks=600, dropout=0.1): super().__init__() # Project the dimension of features from that of input into d_model. self.prenet = nn.Linear(40, d_model) # TODO: # Change Transformer to Conformer. # https://arxiv.org/abs/2005.08100 #对于文本分类等下游任务 ,只需要用到Encoder部分即可 #nhead:multi_head_attention中head个数 #d_model:输入的feature的个数 #dim_feedforward:feedforward network的维度 #dropout默认0.1 #self.encoder_layer = nn.TransformerEncoderLayer( #d_model=d_model, dim_feedforward=256, nhead=8 #) #self.encoder = nn.TransformerEncoder(self.encoder_layer, num_layers=2) self.conformer_block=ConformerBlock( dim=d_model, dim_head=64, heads=8, ff_mult=4, conv_expansion_factor=2, conv_kernel_size=31, attn_dropout=dropout, ff_dropout=dropout, conv_dropout=dropout ) # Project the the dimension of features from d_model into speaker nums. self.pooling=Self_Attentive_Pooling(d_model) self.pred_layer = nn.Sequential( #nn.Linear(d_model, d_model), #nn.ReLU(), nn.Linear(d_model, n_spks), ) def forward(self, mels): """ args: mels: (batch size, length, 40) return: out: (batch size, n_spks) """ # out: (batch size, length, d_model) length=segment_len out = self.prenet(mels) # out: (length, batch size, d_model) out = out.permute(1, 0, 2) #交换dim=0和dim=1 # The encoder layer expect features in the shape of (length, batch size, d_model). out = self.conformer_block(out) # out: (batch size, length, d_model) #out = out.transpose(0, 1) #转置dim=0和dim=1 # mean pooling #stats = out.mean(dim=1) #可以理解为求平均并去除维度1 stats.size():(batch_size,d_model) out=out.permute(1,2,0) stats=self.pooling(out) # out: (batch, n_spks) out = self.pred_layer(stats) return outtotal_steps=70000
total_steps=100000
创心域SEO版权声明:以上内容作者已申请原创保护,未经允许不得转载,侵权必究!授权事宜、对本内容有异议或投诉,敬请联系网站管理员,我们将尽快回复您,谢谢合作!