CNN音频降噪模型PyTorch实现:完整代码及评估指标
由于涉及到数据集的版权问题,本文不提供数据集下载链接。请读者自行准备数据集。
- 数据加载
首先,我们需要加载音频数据集。这里使用PyTorch中的Dataset和DataLoader来加载数据集。
import torch
import librosa
import numpy as np
from torch.utils.data import Dataset, DataLoader
# 定义Dataset
class AudioDataset(Dataset):
def __init__(self, data_path, sr=16000, segment_length=2, transform=None):
self.sr = sr
self.segment_length = segment_length
self.transform = transform
self.data = []
with open(data_path, 'r') as f:
for line in f:
line = line.strip()
self.data.append(line)
def __len__(self):
return len(self.data)
def __getitem__(self, index):
audio_path = self.data[index]
x, _ = librosa.load(audio_path, sr=self.sr, mono=True)
x = self._preprocess(x)
if self.transform:
x = self.transform(x)
return x
def _preprocess(self, x):
# 如果音频的长度小于segment_length,就在后面填充0
if len(x) < self.segment_length * self.sr:
x = np.pad(x, (0, self.segment_length * self.sr - len(x)), 'constant')
# 如果音频的长度大于segment_length,就随机截取一个长度为segment_length的片段
elif len(x) > self.segment_length * self.sr:
start = np.random.randint(0, len(x) - self.segment_length * self.sr)
x = x[start:start+self.segment_length * self.sr]
return x
# 加载数据集
train_dataset = AudioDataset('train.txt')
test_dataset = AudioDataset('test.txt')
train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False)
在上面的代码中,我们定义了一个AudioDataset类来加载音频数据集。在__getitem__方法中,我们首先使用librosa库加载音频文件,然后对音频进行预处理,最后返回预处理后的音频。在_preprocess方法中,我们首先判断音频的长度是否小于segment_length * sr,如果小于,就在后面填充0;如果大于,就随机截取一个长度为segment_length * sr的片段。这是为了保证所有音频的长度都一样。如果音频长度相同,我们就可以使用批量处理来加速训练过程。
- 数据处理
在训练模型之前,我们需要对音频进行预处理。首先,我们需要将音频转换成频谱图,并对其进行归一化处理。然后,我们需要将频谱图转换成张量,并将其输入到模型中。
import torch.nn.functional as F
# 定义预处理函数
def preprocess(x):
# 转换成频谱图
spec = librosa.stft(x, n_fft=1024, hop_length=256, win_length=1024)
mag, phase = librosa.magphase(spec)
mag_db = librosa.amplitude_to_db(mag, ref=np.max)
# 归一化处理
mag_db = (mag_db + 80) / 80
# 转换成张量
mag_db = torch.tensor(mag_db).unsqueeze(0)
return mag_db
# 定义后处理函数
def postprocess(mag_db, x):
# 将张量转换成频谱图
mag_db = mag_db.squeeze(0).detach().cpu().numpy()
mag = librosa.db_to_amplitude(mag_db * 80 - 80, ref=1.0)
phase = np.angle(librosa.stft(x, n_fft=1024, hop_length=256, win_length=1024))
spec = mag * np.exp(1j * phase)
# 将频谱图转换成音频
x_hat = librosa.istft(spec, hop_length=256, win_length=1024)
return x_hat
# 将预处理函数应用到数据集上
class AudioDataset(Dataset):
def __init__(self, data_path, sr=16000, segment_length=2, transform=None):
self.sr = sr
self.segment_length = segment_length
self.transform = transform
self.data = []
with open(data_path, 'r') as f:
for line in f:
line = line.strip()
self.data.append(line)
def __len__(self):
return len(self.data)
def __getitem__(self, index):
audio_path = self.data[index]
x, _ = librosa.load(audio_path, sr=self.sr, mono=True)
x = self._preprocess(x)
if self.transform:
x = self.transform(x)
return x
def _preprocess(self, x):
x = preprocess(x)
return x
# 将后处理函数应用到模型的输出上
def evaluate(model, loader):
model.eval()
total_loss = 0
total_sdr = 0
total_sir = 0
total_sar = 0
with torch.no_grad():
for data in loader:
x = data.to(device)
x_noisy = x + torch.randn_like(x) * 0.1
x_hat = model(x_noisy)
loss = F.mse_loss(x_hat, x)
mag_db_hat = x_hat.squeeze(1)
mag_db = x.squeeze(1)
x_hat = postprocess(mag_db_hat, x_noisy.squeeze(1))
x = postprocess(mag_db, x.squeeze(1))
sdr, sir, sar, _ = mir_eval.separation.bss_eval_sources(x, x_hat)
total_loss += loss.item() * x.size(0)
total_sdr += sdr.sum()
total_sir += sir.sum()
total_sar += sar.sum()
avg_loss = total_loss / len(loader.dataset)
avg_sdr = total_sdr / (len(loader.dataset) * 2)
avg_sir = total_sir / (len(loader.dataset) * 2)
avg_sar = total_sar / (len(loader.dataset) * 2)
return avg_loss, avg_sdr, avg_sir, avg_sar
在上面的代码中,我们定义了一个preprocess函数来将音频转换成频谱图,并对其进行归一化处理。首先,我们使用librosa库的stft函数将音频转换成频谱图。然后,我们使用magphase函数将频谱图转换成振幅谱和相位谱。接着,我们使用amplitude_to_db函数将振幅谱转换成分贝单位的谱,并进行归一化处理。最后,我们将谱转换成张量,并将其返回。
在后处理函数postprocess中,我们首先将张量转换成频谱图,然后使用db_to_amplitude函数将分贝单位的谱转换成振幅谱。接着,我们使用stft函数将相位谱和振幅谱合并成频谱图。最后,我们使用istft函数将频谱图转换成音频,并返回音频。
在evaluate函数中,我们首先将模型切换到评估模式。然后,我们遍历数据集,并将每个音频输入到模型中进行评估。在评估过程中,我们使用F.mse_loss函数计算均方误差,并使用mir_eval库计算SDR、SIR和SAR等评估指标。最后,我们将评估指标的平均值返回。
- 模型
在本文中,我们使用了一个简单的卷积神经网络来进行音频降噪。这个神经网络由若干个卷积层和池化层组成,最后经过一个全连接层输出降噪后的音频。
import torch.nn as nn
# 定义模型
class AudioDenoiser(nn.Module):
def __init__(self):
super(AudioDenoiser, self).__init__()
self.conv1 = nn.Conv2d(1, 32, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
self.conv2 = nn.Conv2d(32, 32, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
self.pool1 = nn.MaxPool2d(kernel_size=(2, 2), stride=(2, 2))
self.conv3 = nn.Conv2d(32, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
self.conv4 = nn.Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
self.pool2 = nn.MaxPool2d(kernel_size=(2, 2), stride=(2, 2))
self.conv5 = nn.Conv2d(64, 128, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
self.conv6 = nn.Conv2d(128, 128, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
self.pool3 = nn.MaxPool2d(kernel_size=(2, 2), stride=(2, 2))
self.fc1 = nn.Linear(128 * 32 * 32, 1024)
self.fc2 = nn.Linear(1024, 128 * 32 * 32)
self.conv7 = nn.ConvTranspose2d(128, 64, kernel_size=(3, 3), stride=(2, 2), padding=(1, 1), output_padding=(1, 1))
self.conv8 = nn.Conv2d(64, 32, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
self.conv9 = nn.ConvTranspose2d(32, 16, kernel_size=(3, 3), stride=(2, 2), padding=(1, 1), output_padding=(1, 1))
self.conv10 = nn.Conv2d(16, 1, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
def forward(self, x):
x = F.relu(self.conv1(x))
x = F.relu(self.conv2(x))
x = self.pool1(x)
x = F.relu(self.conv3(x))
x = F.relu(self.conv4(x))
x = self.pool2(x)
x = F.relu(self.conv5(x))
x = F.relu(self.conv6(x))
x = self.pool3(x)
x = x.view(-1, 128 * 32 * 32)
x = F.relu(self.fc1(x))
x = F.relu(self.fc2(x))
x = x.view(-1, 128, 32, 32)
x = F.relu(self.conv7(x))
x = F.relu(self.conv8(x))
x = F.relu(self.conv9(x))
x = F.relu(self.conv10(x))
return x
# 创建模型
model = AudioDenoiser().to(device)
在上面的代码中,我们定义了一个AudioDenoiser类来实现音频降噪。这个类继承自nn.Module类,并重载了forward方法。在forward方法中,我们使用了若干个卷积层和池化层来提取音频的特征,然后使用全连接层将这些特征转换成一个向量。接着,我们使用反卷积层将这个向量转换成一个频谱图,并输出降噪后的音频。
- 训练模型
在训练模型之前,我们需要定义损失函数和优化器。
import torch.optim as optim
# 定义损失函数和优化器
criterion = nn.MSELoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)
在训练模型时,我们遍历数据集,并将每个音频输入到模型中进行训练。在训练过程中,我们使用MSELoss函数计算均方误差,并使用Adam优化器更新模型参数。
# 训练模型
for epoch in range(10):
model.train()
total_loss = 0
for data in train_loader:
x = data.to(device)
x_noisy = x + torch.randn_like(x) * 0.1
optimizer.zero_grad()
x_hat = model(x_noisy)
loss = criterion(x_hat, x)
loss.backward()
optimizer.step()
total_loss += loss.item() * x.size(0)
avg_loss = total_loss / len(train_loader.dataset)
print('Epoch {}: train loss = {}'.format(epoch, avg_loss))
# 在测试集上评估模型
avg_loss, avg_sdr, avg_sir, avg_sar = evaluate(model, test_loader)
print('Epoch {}: test loss = {}, SDR = {}, SIR = {}, SAR = {}'.format(epoch, avg_loss, avg_sdr, avg_sir, avg_sar))
# 保存模型
torch.save(model.state_dict(), 'model.pth')
在上面的代码中,我们训练模型10个epoch,并在每个epoch结束时在测试集上评估模型。在训练过程中,我们使用zero_grad函数清除梯度,并使用backward函数计算梯度。接着,我们使用step函数更新模型参数。最后,我们计算每个epoch的平均损失,并输出训练和测试集上的损失和评估指标。我们还使用torch.save函数保存模型参数。
- 降噪后音频存储和评估指标
在训练和评估模型之后,我们可以使用模型对音频进行降噪,并计算SDR、SIR和SAR等评估指标。
# 加载模型
model.load_state_dict(torch.load('model.pth'))
model.eval()
# 对音频进行降噪,并计算评估指标
x, _ = librosa.load('noisy.wav', sr=16000, mono=True)
x_noisy = preprocess(x)
x_noisy = x_noisy.to(device)
x_hat = model(x_noisy)
mag_db_hat = x_hat.squeeze(1)
mag_db = x_noisy.squeeze(1)
x_hat = postprocess(mag_db_hat, x_noisy.squeeze(1))
x = postprocess(mag_db, x_noisy.squeeze(1))
sdr, sir, sar, _ = mir_eval.separation.bss_eval_sources(x, x_hat)
# 存储降噪后音频
librosa.output.write_wav('denoised.wav', x_hat, sr=16000)
# 打印评估指标
print('SDR = {}, SIR = {}, SAR = {}'.format(sdr, sir, sar))
在上面的代码中,我们首先加载保存的模型参数,并将模型切换到评估模式。接着,我们使用librosa库加载需要降噪的音频,并将其转换成频谱图。然后,我们将频谱图输入到模型中进行降噪,并使用postprocess函数将降噪后的频谱图转换成音频。最后,我们使用mir_eval库计算SDR、SIR和SAR等评估指标,并使用librosa库将降噪后的音频存储到本地文件中。
完整代码如下:
import torch
import librosa
import numpy as np
from torch.utils.data import Dataset, DataLoader
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import mir_eval
# 设置设备
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
# 定义Dataset
class AudioDataset(Dataset):
def __init__(self, data_path, sr=16000, segment_length=2, transform=None):
self.sr = sr
self.segment_length = segment_length
self.transform = transform
self.data = []
with open(data_path, 'r') as f:
for line in f:
line = line.strip()
self.data.append(line)
def __len__(self):
return len(self.data)
def __getitem__(self, index):
audio_path = self.data[index]
x, _ = librosa.load(audio_path, sr=self.sr, mono=True)
x = self._preprocess(x)
if self.transform:
x = self.transform(x)
return x
def _preprocess(self, x):
# 如果音频的长度小于segment_length,就在后面填充0
if len(x) < self.segment_length * self.sr:
x = np.pad(x, (0, self.segment_length * self.sr - len(x)), 'constant')
# 如果音频的长度大于segment_length,就随机截取一个长度为segment_length的片段
elif len(x) > self.segment_length * self.sr:
start = np.random.randint(0, len(x) - self.segment_length * self.sr)
x = x[start:start+self.segment_length * self.sr]
return x
# 定义预处理函数
def preprocess(x):
# 转换成频谱图
spec = librosa.stft(x, n_fft=1024, hop_length=256, win_length=1024)
mag, phase = librosa.magphase(spec)
mag_db = librosa.amplitude_to_db(mag, ref=np.max)
# 归一化处理
mag_db = (mag_db + 80) / 80
# 转换成张量
mag_db = torch.tensor(mag_db).unsqueeze(0)
return mag_db
# 定义后处理函数
def postprocess(mag_db, x):
# 将张量转换成频谱图
mag_db = mag_db.squeeze(0).detach().cpu().numpy()
mag = librosa.db_to_amplitude(mag_db * 80 - 80, ref=1.0)
phase = np.angle(librosa.stft(x, n_fft=1024, hop_length=256, win_length=1024))
spec = mag * np.exp(1j * phase)
# 将频谱图转换成音频
x_hat = librosa.istft(spec, hop_length=256, win_length=1024)
return x_hat
# 定义模型
class AudioDenoiser(nn.Module):
def __init__(self):
super(AudioDenoiser, self).__init__()
self.conv1 = nn.Conv2d(1, 32, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
self.conv2 = nn.Conv2d(32, 32, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
self.pool1 = nn.MaxPool2d(kernel_size=(2, 2), stride=(2, 2))
self.conv3 = nn.Conv2d(32, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
self.conv4 = nn.Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
self.pool2 = nn.MaxPool2d(kernel_size=(2, 2), stride=(2, 2))
self.conv5 = nn.Conv2d(64, 128, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
self.conv6 = nn.Conv2d(128, 128, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
self.pool3 = nn.MaxPool2d(kernel_size=(2, 2), stride=(2, 2))
self.fc1 = nn.Linear(128 * 32 * 32, 1024)
self.fc2 = nn.Linear(1024, 128 * 32 * 32)
self.conv7 = nn.ConvTranspose2d(128, 64, kernel_size=(3, 3), stride=(2, 2), padding=(1, 1), output_padding=(1, 1))
self.conv8 = nn.Conv2d(64, 32, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
self.conv9 = nn.ConvTranspose2d(32, 16, kernel_size=(3, 3), stride=(2, 2), padding=(1, 1), output_padding=(1, 1))
self.conv10 = nn.Conv2d(16, 1, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
def forward(self, x):
x = F.relu(self.conv1(x))
x = F.relu(self.conv2(x))
x = self.pool1(x)
x = F.relu(self.conv3(x))
x = F.relu(self.conv4(x))
x = self.pool2(x)
x = F.relu(self.conv5(x))
x = F.relu(self.conv6(x))
x = self.pool3(x)
x = x.view(-1, 128 * 32 * 32)
x = F.relu(self.fc1(x))
x = F.relu(self.fc2(x))
x = x.view(-1, 128, 32, 32)
x = F.relu(self.conv7(x))
x = F.relu(self.conv8(x))
x = F.relu(self.conv9(x))
x = F.relu(self.conv10(x))
return x
# 加载数据集
train_dataset = AudioDataset('train.txt')
test_dataset = AudioDataset('test.txt')
train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False)
# 创建模型
model = AudioDenoiser().to(device)
# 定义损失函数和优化器
criterion = nn.MSELoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)
# 训练模型
for epoch in range(10):
model.train()
total_loss = 0
for data in train_loader:
x = data.to(device)
x_noisy = x + torch.randn_like(x) * 0.1
optimizer.zero_grad()
x_hat = model(x_noisy)
loss = criterion(x_hat, x)
loss.backward()
optimizer.step()
total_loss += loss.item() * x.size(0)
avg_loss = total_loss / len(train_loader.dataset)
print('Epoch {}: train loss = {}'.format(epoch, avg_loss))
# 在测试集上评估模型
avg_loss, avg_sdr, avg_sir, avg_sar = evaluate(model, test_loader)
print('Epoch {}: test loss = {}, SDR = {}, SIR = {}, SAR = {}'.format(epoch, avg_loss, avg_sdr, avg_sir, avg_sar))
# 保存模型
torch.save(model.state_dict(), 'model.pth')
# 将预处理函数应用到数据集上
class AudioDataset(Dataset):
def __init__(self, data_path, sr=16000, segment_length=2, transform=None):
self.sr = sr
self.segment_length = segment_length
self.transform = transform
self.data = []
with open(data_path, 'r') as f:
for line in f:
line = line.strip()
self.data.append(line)
def __len__(self):
return len(self.data)
def __getitem__(self, index):
audio_path = self.data[index]
x, _ = librosa.load(audio_path, sr=self.sr, mono=True)
x = self._preprocess(x)
if self.transform:
x = self.transform(x)
return x
def _preprocess(self, x):
x = preprocess(x)
return x
# 将后处理函数应用到模型的输出上
def evaluate(model, loader):
model.eval()
total_loss = 0
total_sdr = 0
total_sir = 0
total_sar = 0
with torch.no_grad():
for data in loader:
x = data.to(device)
x_noisy = x + torch.randn_like(x) * 0.1
x_hat = model(x_noisy)
loss = F.mse_loss(x_hat, x)
mag_db_hat = x_hat.squeeze(1)
mag_db = x.squeeze(1)
x_hat = postprocess(mag_db_hat, x_noisy.squeeze(1))
x = postprocess(mag_db, x.squeeze(1))
sdr, sir, sar, _ = mir_eval.separation.bss_eval_sources(x, x_hat)
total_loss += loss.item() * x.size(0)
total_sdr += sdr.sum()
total_sir += sir.sum()
total_sar += sar.sum()
avg_loss = total_loss / len(loader.dataset)
avg_sdr = total_sdr / (len(loader.dataset) * 2)
avg_sir = total_sir / (len(loader.dataset) * 2)
avg_sar = total_sar / (len(loader.dataset) * 2)
return avg_loss, avg_sdr, avg_sir, avg_sar
# 加载模型
model.load_state_dict(torch.load('model.pth'))
model.eval()
# 对音频进行降噪,并计算评估指标
x, _ = librosa.load('noisy.wav', sr=16000, mono=True)
x_noisy = preprocess(x)
x_noisy = x_noisy.to(device)
x_hat = model(x_noisy)
mag_db_hat = x_hat.squeeze(1)
mag_db = x_noisy.squeeze(1)
x_hat = postprocess(mag_db_hat, x_noisy.squeeze(1))
x = postprocess(mag_db, x_noisy.squeeze(1))
sdr, sir, sar, _ = mir_eval.separation.bss_eval_sources(x, x_hat)
# 存储降噪后音频
librosa.output.write_wav('denoised.wav', x_hat, sr=16000)
# 打印评估指标
print('SDR = {}, SIR = {}, SAR = {}'.format(sdr, sir, sar))
原文地址: https://www.cveoy.top/t/topic/nsML 著作权归作者所有。请勿转载和采集!