HW-4:Speaker Identification

张开发
2026/4/18 6:23:17 15 分钟阅读

分享文章

HW-4:Speaker Identification
Predict speaker class from given speech:Learn how to use TransformerSelf-Attention是Transformer的核心机制【2022版李宏毅机器学习作业讲解】-HW4_哔哩哔哩_bilibilidatasetML2022Spring-hw4 | Kaggle1. train.pyimport numpy as np import torch import random import os import json import math from pathlib import Path from tqdm import tqdm from torch.utils.data import Dataset, DataLoader, random_split from torch.nn.utils.rnn import pad_sequence import torch.nn as nn import torch.nn.functional as F from torch.optim import Optimizer, AdamW from torch.optim.lr_scheduler import LambdaLR 固定随机种子 (Fix Random Seed) 确保实验可重复性 def set_seed(seed): 设置所有随机种子以保证实验可重复性 np.random.seed(seed) # NumPy随机种子 random.seed(seed) # Python内置随机种子 torch.manual_seed(seed) # PyTorch CPU随机种子 if torch.cuda.is_available(): torch.cuda.manual_seed(seed) # 当前GPU随机种子 torch.cuda.manual_seed_all(seed) # 所有GPU随机种子 torch.backends.cudnn.benchmark False # 禁用cudnn自动优化 torch.backends.cudnn.deterministic True # 确保确定性计算 set_seed(87) # 设置种子为87 数据处理部分 (Data Processing) # Dataset类 - 加载和处理语音数据 class myDataset(Dataset): 自定义数据集类用于加载预处理的梅尔频谱图 Args: data_dir: 数据目录路径 segment_len: 语音片段的长度帧数默认128帧 def __init__(self, data_dir, segment_len128): self.data_dir data_dir self.segment_len segment_len # 加载说话人ID映射文件 # mapping.json包含speaker2id和id2speaker的映射关系 mapping_path Path(data_dir) / mapping.json mapping json.load(mapping_path.open()) self.speaker2id mapping[speaker2id] # 说话人名称到ID的映射 # 加载元数据包含每个说话人的语音文件路径 metadata_path Path(data_dir) / metadata.json metadata json.load(open(metadata_path))[speakers] # 获取说话人总数 self.speaker_num len(metadata.keys()) # 构建数据集列表每个元素为[特征文件路径, 说话人ID] self.data [] for speaker in metadata.keys(): for utterances in metadata[speaker]: self.data.append([utterances[feature_path], self.speaker2id[speaker]]) def __len__(self): 返回数据集大小 return len(self.data) def __getitem__(self, index): 获取一个样本 Returns: mel: 梅尔频谱图张量 (segment_len, 40) 或 (原始长度, 40) speaker: 说话人ID张量 (1,) feat_path, speaker self.data[index] # 加载预处理的梅尔频谱图.pt文件 mel torch.load(os.path.join(self.data_dir, feat_path)) # 将语音切分成固定长度的片段 if len(mel) self.segment_len: # 随机选择起始点进行切分 start random.randint(0, len(mel) - self.segment_len) # 获取固定长度的片段 mel torch.FloatTensor(mel[start:startself.segment_len]) else: # 如果语音长度不足保持原长度 mel torch.FloatTensor(mel) # 将说话人ID转换为long类型用于损失计算 speaker torch.FloatTensor([speaker]).long() return mel, speaker def get_speaker_number(self): 返回说话人数量 return self.speaker_num # Dataloader的批处理函数 def collate_batch(batch): 对批次数据进行整理和填充 Args: batch: 包含多个(mel, speaker)的列表 Returns: mel: 填充后的梅尔频谱图张量 (batch_size, max_length, 40) speaker: 说话人ID张量 (batch_size,) mel, speaker zip(*batch) # 使用pad_sequence将不同长度的序列填充到相同长度 # padding_value-20 表示填充很小的值(log10^(-20)) mel pad_sequence(mel, batch_firstTrue, padding_value-20) # mel: (batch size, length, 40) return mel, torch.FloatTensor(speaker).long() def get_dataloader(data_dir, batch_size, n_workers): 创建训练集和验证集的DataLoader Args: data_dir: 数据目录 batch_size: 批次大小 n_workers: 数据加载进程数 Returns: train_loader: 训练集DataLoader valid_loader: 验证集DataLoader speaker_num: 说话人数量 dataset myDataset(data_dir) speaker_num dataset.get_speaker_number() # 按9:1的比例划分训练集和验证集 trainlen int(0.9 * len(dataset)) lengths [trainlen, len(dataset) - trainlen] trainset, validset random_split(dataset, lengths) # 创建训练集DataLoader train_loader DataLoader( trainset, batch_sizebatch_size, shuffleTrue, # 训练集打乱顺序 drop_lastTrue, # 丢弃最后一个不完整的batch num_workersn_workers, # 多进程加载数据 pin_memoryTrue, # 加速GPU数据传输 collate_fncollate_batch, # 自定义批处理函数 ) # 创建验证集DataLoader valid_loader DataLoader( validset, batch_sizebatch_size, num_workersn_workers, drop_lastTrue, pin_memoryTrue, collate_fncollate_batch, ) return train_loader, valid_loader, speaker_num 模型定义 (Model Definition) # 分类器模型 - 使用Transformer架构 class Classifier(nn.Module): 基于Transformer的说话人识别模型 Args: d_model: 模型隐藏层维度默认80 n_spks: 说话人数量默认600 dropout: Dropout比率默认0.1 def __init__(self, d_model80, n_spks600, dropout0.1): super().__init__() # 输入投影层将40维梅尔频谱投影到d_model维度 self.prenet nn.Linear(40, d_model) # Transformer编码器层 # 使用标准的TransformerEncoderLayer避免Flash Attention兼容性问题 self.encoder_layer nn.TransformerEncoderLayer( d_modeld_model, # 特征维度 dim_feedforward256, # 前馈网络维度 nhead2, # 注意力头数 dropoutdropout, # Dropout比率 activationrelu, # 激活函数 batch_firstFalse # 输入格式为(seq_len, batch, features) ) # 使用2层Transformer编码器 self.encoder nn.TransformerEncoder( self.encoder_layer, num_layers2 # 编码器层数 ) # 分类头将编码后的特征映射到说话人数量 self.pred_layer nn.Sequential( nn.Linear(d_model, d_model), # 全连接层 nn.ReLU(), # 激活函数 nn.Dropout(dropout), # Dropout防止过拟合 nn.Linear(d_model, n_spks), # 输出层d_model - 说话人数 ) def forward(self, mels): 前向传播 Args: mels: 梅尔频谱图 (batch_size, length, 40) Returns: out: 说话人分类概率 (batch_size, n_spks) # 步骤1: 投影到d_model维度 # out: (batch_size, length, d_model) out self.prenet(mels) # 步骤2: 调整维度以适应Transformer输入 # Transformer期望输入格式为 (seq_len, batch_size, features) out out.permute(1, 0, 2) # (length, batch_size, d_model) # 步骤3: Transformer编码 out self.encoder(out) # (length, batch_size, d_model) # 步骤4: 恢复维度 out out.transpose(0, 1) # (batch_size, length, d_model) # 步骤5: 平均池化 - 将时间维度压缩 # 对每个样本的所有时间步取平均 stats out.mean(dim1) # (batch_size, d_model) # 步骤6: 分类头得到最终输出 out self.pred_layer(stats) # (batch_size, n_spks) return out 学习率调度器 (Learning Rate Scheduler) def get_cosine_schedule_with_warmup( optimizer: Optimizer, num_warmup_steps: int, num_training_steps: int, num_cycles: float 0.5, last_epoch: int -1, ): 创建带有预热阶段的余弦退火学习率调度器 学习率变化过程 1. 预热阶段线性增加到初始学习率 2. 衰减阶段按照余弦函数衰减到0 Args: optimizer: 优化器 num_warmup_steps: 预热步数 num_training_steps: 总训练步数 num_cycles: 余弦周期的数量默认0.5表示半周期 last_epoch: 上次训练的epoch索引 Returns: LambdaLR: 学习率调度器 def lr_lambda(current_step): # 预热阶段线性增加 if current_step num_warmup_steps: return float(current_step) / float(max(1, num_warmup_steps)) # 衰减阶段余弦退火 progress float(current_step - num_warmup_steps) / float( max(1, num_training_steps - num_warmup_steps) ) # 计算余弦值0.5 * (1 cos(π * cycles * 2 * progress)) return max( 0.0, 0.5 * (1.0 math.cos(math.pi * float(num_cycles) * 2.0 * progress)) ) return LambdaLR(optimizer, lr_lambda, last_epoch) 训练辅助函数 (Training Helper Functions) def model_fn(batch, model, criterion, device): 模型前向传播函数 Args: batch: (mels, labels) 批次数据 model: 模型 criterion: 损失函数 device: 计算设备 Returns: loss: 损失值 accuracy: 准确率 mels, labels batch mels mels.to(device) # 移动到GPU/CPU labels labels.to(device) # 前向传播 outs model(mels) # 计算损失 loss criterion(outs, labels) # 计算准确率 preds outs.argmax(1) # 取概率最大的类别 accuracy torch.mean((preds labels).float()) # 计算正确率 return loss, accuracy def valid(dataloader, model, criterion, device): 在验证集上评估模型 Args: dataloader: 验证集DataLoader model: 模型 criterion: 损失函数 device: 计算设备 Returns: 验证集准确率 model.eval() # 切换到评估模式关闭dropout等 running_loss 0.0 running_accuracy 0.0 pbar tqdm(totallen(dataloader.dataset), ncols0, descValid, unit uttr) for i, batch in enumerate(dataloader): with torch.no_grad(): # 不计算梯度节省内存 loss, accuracy model_fn(batch, model, criterion, device) running_loss loss.item() running_accuracy accuracy.item() pbar.update(dataloader.batch_size) pbar.set_postfix( lossf{running_loss / (i1):.2f}, accuracyf{running_accuracy / (i1):.2f}, ) pbar.close() model.train() # 恢复训练模式 return running_accuracy / len(dataloader) 主函数 (Main Function) def parse_args(): 解析训练参数 Returns: config: 训练配置字典 config { data_dir: ./Dataset, # 数据目录 save_path: model.ckpt, # 模型保存路径 batch_size: 32, # 批次大小 n_workers: 8, # 数据加载进程数 valid_steps: 2000, # 每多少步进行一次验证 warmup_steps: 1000, # 学习率预热步数 save_steps: 10000, # 每多少步保存一次模型 total_steps: 70000, # 总训练步数 } return config def main( data_dir, save_path, batch_size, n_workers, valid_steps, warmup_steps, total_steps, save_steps, ): 训练主函数 Args: data_dir: 数据目录 save_path: 模型保存路径 batch_size: 批次大小 n_workers: 数据加载进程数 valid_steps: 验证步数间隔 warmup_steps: 预热步数 total_steps: 总训练步数 save_steps: 模型保存步数间隔 # 设置计算设备 device torch.device(cuda if torch.cuda.is_available() else cpu) print(f[Info]: Use {device} now!) # 加载数据 train_loader, valid_loader, speaker_num get_dataloader( data_dir, batch_size, n_workers ) train_iterator iter(train_loader) print(f[Info]: Finish loading data!, flushTrue) # 创建模型 model Classifier(n_spksspeaker_num).to(device) criterion nn.CrossEntropyLoss() # 交叉熵损失 optimizer AdamW(model.parameters(), lr1e-3) # AdamW优化器 scheduler get_cosine_schedule_with_warmup( optimizer, warmup_steps, total_steps ) print(f[Info]: Finish creating model!, flushTrue) # 训练状态跟踪 best_accuracy -1.0 best_state_dict None # 进度条 pbar tqdm(totalvalid_steps, ncols0, descTrain, unit step) # 训练循环 for step in range(total_steps): # 获取一个batch的数据 try: batch next(train_iterator) except StopIteration: # 如果数据用完重新创建迭代器 train_iterator iter(train_loader) batch next(train_iterator) # 前向传播和反向传播 loss, accuracy model_fn(batch, model, criterion, device) batch_loss loss.item() batch_accuracy accuracy.item() # 更新模型参数 loss.backward() # 反向传播计算梯度 optimizer.step() # 更新参数 scheduler.step() # 更新学习率 optimizer.zero_grad() # 清空梯度 # 更新进度条显示 pbar.update() pbar.set_postfix( lossf{batch_loss:.2f}, accuracyf{batch_accuracy:.2f}, stepstep 1, ) # 定期进行验证 if (step 1) % valid_steps 0: pbar.close() # 关闭当前进度条 # 在验证集上评估模型 valid_accuracy valid(valid_loader, model, criterion, device) # 保存最佳模型 if valid_accuracy best_accuracy: best_accuracy valid_accuracy best_state_dict model.state_dict() # 重新创建进度条 pbar tqdm(totalvalid_steps, ncols0, descTrain, unit step) # 定期保存模型 if (step 1) % save_steps 0 and best_state_dict is not None: torch.save(best_state_dict, save_path) pbar.write( fStep {step 1}, best model saved. (accuracy{best_accuracy:.4f}) ) pbar.close() # 程序入口 if __name__ __main__: # 解析参数并开始训练 main(**parse_args())2. inference.py 推理模块 (Inference Module) 用于加载训练好的模型对新语音进行说话人识别 import os import json import torch import csv from pathlib import Path from torch.utils.data import Dataset from tqdm.notebook import tqdm # 用于notebook环境如果是脚本可改为 from tqdm import tqdm from torch.utils.data import DataLoader from train import Classifier # 从训练脚本导入模型类 # # 推理数据集类 # class InferenceDataset(Dataset): 推理数据集类用于加载待识别的语音数据 Args: data_dir: 数据目录路径 def __init__(self, data_dir): 初始化推理数据集 加载testdata.json文件其中包含所有待识别语音的元数据 # 构建测试数据json文件路径 testdata_path Path(data_dir) / testdata.json # 加载json文件包含所有待识别语音的信息 metadata json.load(testdata_path.open()) self.data_dir data_dir # 存储所有语音的特征路径 self.data metadata[utterances] def __len__(self): 返回数据集大小 return len(self.data) def __getitem__(self, index): 获取一个样本 Args: index: 样本索引 Returns: feat_path: 特征文件路径用于记录 mel: 梅尔频谱图张量 # 获取第index个语音的元数据 utterance self.data[index] # 获取特征文件路径 feat_path utterance[feature_path] # 加载预处理的梅尔频谱图.pt文件 mel torch.load(os.path.join(self.data_dir, feat_path)) # 返回特征路径和梅尔频谱图 # 注意推理时不需要标签因为我们需要预测说话人 return feat_path, mel # # 批处理函数 # def inference_collate_batch(batch): 推理时的批处理函数将多个样本整理成一个batch Args: batch: 包含多个(feat_path, mel)元组的列表 Returns: feat_paths: 特征路径列表 mels: 堆叠后的梅尔频谱图张量 (batch_size, length, 40) # 解压batch中的特征路径和梅尔频谱图 feat_paths, mels zip(*batch) # 将梅尔频谱图堆叠成一个batch # torch.stack会沿着新的维度堆叠所有张量 # 输出形状: (batch_size, length, 40) return feat_paths, torch.stack(mels) # # 参数配置函数 # def parse_args(): 解析推理参数 Returns: config: 推理配置字典 config { data_dir: ./Dataset, # 数据目录路径 model_path: ./model.ckpt, # 训练好的模型权重文件路径 output_path: ./output.csv, # 输出结果文件路径 } return config # # 主推理函数 # def main( data_dir, model_path, output_path, ): 推理主函数 流程 1. 加载模型和映射文件 2. 加载测试数据 3. 对每个测试样本进行预测 4. 将预测结果保存到CSV文件 Args: data_dir: 数据目录路径 model_path: 模型权重文件路径 output_path: 输出文件路径 # 1. 设置计算设备 # 优先使用GPU如果没有GPU则使用CPU device torch.device(cuda if torch.cuda.is_available() else cpu) print(f[Info]: Use {device} now!) # 2. 加载说话人ID映射文件 # mapping.json包含两个映射 # - speaker2id: 说话人名称 - ID # - id2speaker: ID - 说话人名称 mapping_path Path(data_dir) / mapping.json mapping json.load(mapping_path.open()) # 获取id2speaker映射用于将预测的ID转换为说话人名称 id2speaker mapping[id2speaker] print(f[Info]: Loaded mapping for {len(id2speaker)} speakers) # 3. 加载测试数据 # 创建推理数据集 dataset InferenceDataset(data_dir) # 创建数据加载器 # batch_size1: 每次处理一个样本避免内存不足 # shuffleFalse: 保持原始顺序便于追踪结果 # drop_lastFalse: 保留所有数据 # num_workers8: 使用8个进程并行加载数据 # collate_fn: 自定义批处理函数 dataloader DataLoader( dataset, batch_size1, # 推理时通常使用batch_size1 shuffleFalse, # 不打乱顺序 drop_lastFalse, # 保留所有样本 num_workers8, # 多进程加载 collate_fninference_collate_batch, ) print(f[Info]: Finish loading data! Total samples: {len(dataset)}, flushTrue) # 4. 加载训练好的模型 # 获取说话人数量 speaker_num len(id2speaker) # 创建模型实例 # n_spks: 输出类别数说话人数量 model Classifier(n_spksspeaker_num).to(device) # 加载训练好的权重 # map_location: 确保权重能正确加载到指定设备 model.load_state_dict(torch.load(model_path, map_locationdevice)) # 切换到评估模式 # 这会关闭Dropout等训练特有的层 model.eval() print(f[Info]: Finish creating model! Loaded from {model_path}, flushTrue) # 5. 进行推理预测 # 初始化结果列表第一行是表头 results [[Id, Category]] # 使用tqdm显示进度条 for feat_paths, mels in tqdm(dataloader, descInference): with torch.no_grad(): # 不计算梯度节省内存和加速 # 将数据移动到设备 mels mels.to(device) # 前向传播得到模型输出 # outs形状: (batch_size, speaker_num) outs model(mels) # 获取预测结果取概率最大的类别索引 # argmax(1) 沿着类别维度取最大值 preds outs.argmax(1).cpu().numpy() # 将预测结果转换为说话人名称 for feat_path, pred in zip(feat_paths, preds): # 通过id2speaker映射将ID转换为说话人名称 # 注意pred是整数需要转换为字符串作为字典键 speaker_name id2speaker[str(pred)] # 添加结果[特征文件路径, 说话人名称] results.append([feat_path, speaker_name]) # 6. 保存预测结果 # 将结果写入CSV文件 with open(output_path, w, newline, encodingutf-8) as csvfile: writer csv.writer(csvfile) writer.writerows(results) print(f[Info]: Results saved to {output_path}) print(f[Info]: Total predictions: {len(results) - 1}) # # 程序入口 # if __name__ __main__: # 解析参数并运行推理 main(**parse_args())

更多文章