由于涉及到数据集的版权问题,本文不提供数据集下载链接。请读者自行准备数据集。

  1. 数据加载

首先,我们需要加载音频数据集。这里使用PyTorch中的Dataset和DataLoader来加载数据集。

import torch
import librosa
import numpy as np
from torch.utils.data import Dataset, DataLoader

# 定义Dataset
class AudioDataset(Dataset):
    def __init__(self, data_path, sr=16000, segment_length=2, transform=None):
        self.sr = sr
        self.segment_length = segment_length
        self.transform = transform
        
        self.data = []
        with open(data_path, 'r') as f:
            for line in f:
                line = line.strip()
                self.data.append(line)
    
    def __len__(self):
        return len(self.data)
    
    def __getitem__(self, index):
        audio_path = self.data[index]
        x, _ = librosa.load(audio_path, sr=self.sr, mono=True)
        x = self._preprocess(x)
        if self.transform:
            x = self.transform(x)
        return x
    
    def _preprocess(self, x):
        # 如果音频的长度小于segment_length,就在后面填充0
        if len(x) < self.segment_length * self.sr:
            x = np.pad(x, (0, self.segment_length * self.sr - len(x)), 'constant')
        # 如果音频的长度大于segment_length,就随机截取一个长度为segment_length的片段
        elif len(x) > self.segment_length * self.sr:
            start = np.random.randint(0, len(x) - self.segment_length * self.sr)
            x = x[start:start+self.segment_length * self.sr]
        return x

# 加载数据集
train_dataset = AudioDataset('train.txt')
test_dataset = AudioDataset('test.txt')
train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False)

在上面的代码中,我们定义了一个AudioDataset类来加载音频数据集。在__getitem__方法中,我们首先使用librosa库加载音频文件,然后对音频进行预处理,最后返回预处理后的音频。在_preprocess方法中,我们首先判断音频的长度是否小于segment_length * sr,如果小于,就在后面填充0;如果大于,就随机截取一个长度为segment_length * sr的片段。这是为了保证所有音频的长度都一样。如果音频长度相同,我们就可以使用批量处理来加速训练过程。

  1. 数据处理

在训练模型之前,我们需要对音频进行预处理。首先,我们需要将音频转换成频谱图,并对其进行归一化处理。然后,我们需要将频谱图转换成张量,并将其输入到模型中。

import torch.nn.functional as F

# 定义预处理函数
def preprocess(x):
    # 转换成频谱图
    spec = librosa.stft(x, n_fft=1024, hop_length=256, win_length=1024)
    mag, phase = librosa.magphase(spec)
    mag_db = librosa.amplitude_to_db(mag, ref=np.max)
    # 归一化处理
    mag_db = (mag_db + 80) / 80
    # 转换成张量
    mag_db = torch.tensor(mag_db).unsqueeze(0)
    return mag_db

# 定义后处理函数
def postprocess(mag_db, x):
    # 将张量转换成频谱图
    mag_db = mag_db.squeeze(0).detach().cpu().numpy()
    mag = librosa.db_to_amplitude(mag_db * 80 - 80, ref=1.0)
    phase = np.angle(librosa.stft(x, n_fft=1024, hop_length=256, win_length=1024))
    spec = mag * np.exp(1j * phase)
    # 将频谱图转换成音频
    x_hat = librosa.istft(spec, hop_length=256, win_length=1024)
    return x_hat

# 将预处理函数应用到数据集上
class AudioDataset(Dataset):
    def __init__(self, data_path, sr=16000, segment_length=2, transform=None):
        self.sr = sr
        self.segment_length = segment_length
        self.transform = transform
        
        self.data = []
        with open(data_path, 'r') as f:
            for line in f:
                line = line.strip()
                self.data.append(line)
    
    def __len__(self):
        return len(self.data)
    
    def __getitem__(self, index):
        audio_path = self.data[index]
        x, _ = librosa.load(audio_path, sr=self.sr, mono=True)
        x = self._preprocess(x)
        if self.transform:
            x = self.transform(x)
        return x
    
    def _preprocess(self, x):
        x = preprocess(x)
        return x

# 将后处理函数应用到模型的输出上
def evaluate(model, loader):
    model.eval()
    total_loss = 0
    total_sdr = 0
    total_sir = 0
    total_sar = 0
    with torch.no_grad():
        for data in loader:
            x = data.to(device)
            x_noisy = x + torch.randn_like(x) * 0.1
            x_hat = model(x_noisy)
            loss = F.mse_loss(x_hat, x)
            mag_db_hat = x_hat.squeeze(1)
            mag_db = x.squeeze(1)
            x_hat = postprocess(mag_db_hat, x_noisy.squeeze(1))
            x = postprocess(mag_db, x.squeeze(1))
            sdr, sir, sar, _ = mir_eval.separation.bss_eval_sources(x, x_hat)
            total_loss += loss.item() * x.size(0)
            total_sdr += sdr.sum()
            total_sir += sir.sum()
            total_sar += sar.sum()
    avg_loss = total_loss / len(loader.dataset)
    avg_sdr = total_sdr / (len(loader.dataset) * 2)
    avg_sir = total_sir / (len(loader.dataset) * 2)
    avg_sar = total_sar / (len(loader.dataset) * 2)
    return avg_loss, avg_sdr, avg_sir, avg_sar

在上面的代码中,我们定义了一个preprocess函数来将音频转换成频谱图,并对其进行归一化处理。首先,我们使用librosa库的stft函数将音频转换成频谱图。然后,我们使用magphase函数将频谱图转换成振幅谱和相位谱。接着,我们使用amplitude_to_db函数将振幅谱转换成分贝单位的谱,并进行归一化处理。最后,我们将谱转换成张量,并将其返回。

在后处理函数postprocess中,我们首先将张量转换成频谱图,然后使用db_to_amplitude函数将分贝单位的谱转换成振幅谱。接着,我们使用stft函数将相位谱和振幅谱合并成频谱图。最后,我们使用istft函数将频谱图转换成音频,并返回音频。

在evaluate函数中,我们首先将模型切换到评估模式。然后,我们遍历数据集,并将每个音频输入到模型中进行评估。在评估过程中,我们使用F.mse_loss函数计算均方误差,并使用mir_eval库计算SDR、SIR和SAR等评估指标。最后,我们将评估指标的平均值返回。

  1. 模型

在本文中,我们使用了一个简单的卷积神经网络来进行音频降噪。这个神经网络由若干个卷积层和池化层组成,最后经过一个全连接层输出降噪后的音频。

import torch.nn as nn

# 定义模型
class AudioDenoiser(nn.Module):
    def __init__(self):
        super(AudioDenoiser, self).__init__()
        self.conv1 = nn.Conv2d(1, 32, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
        self.conv2 = nn.Conv2d(32, 32, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
        self.pool1 = nn.MaxPool2d(kernel_size=(2, 2), stride=(2, 2))
        self.conv3 = nn.Conv2d(32, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
        self.conv4 = nn.Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
        self.pool2 = nn.MaxPool2d(kernel_size=(2, 2), stride=(2, 2))
        self.conv5 = nn.Conv2d(64, 128, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
        self.conv6 = nn.Conv2d(128, 128, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
        self.pool3 = nn.MaxPool2d(kernel_size=(2, 2), stride=(2, 2))
        self.fc1 = nn.Linear(128 * 32 * 32, 1024)
        self.fc2 = nn.Linear(1024, 128 * 32 * 32)
        self.conv7 = nn.ConvTranspose2d(128, 64, kernel_size=(3, 3), stride=(2, 2), padding=(1, 1), output_padding=(1, 1))
        self.conv8 = nn.Conv2d(64, 32, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
        self.conv9 = nn.ConvTranspose2d(32, 16, kernel_size=(3, 3), stride=(2, 2), padding=(1, 1), output_padding=(1, 1))
        self.conv10 = nn.Conv2d(16, 1, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    
    def forward(self, x):
        x = F.relu(self.conv1(x))
        x = F.relu(self.conv2(x))
        x = self.pool1(x)
        x = F.relu(self.conv3(x))
        x = F.relu(self.conv4(x))
        x = self.pool2(x)
        x = F.relu(self.conv5(x))
        x = F.relu(self.conv6(x))
        x = self.pool3(x)
        x = x.view(-1, 128 * 32 * 32)
        x = F.relu(self.fc1(x))
        x = F.relu(self.fc2(x))
        x = x.view(-1, 128, 32, 32)
        x = F.relu(self.conv7(x))
        x = F.relu(self.conv8(x))
        x = F.relu(self.conv9(x))
        x = F.relu(self.conv10(x))
        return x

# 创建模型
model = AudioDenoiser().to(device)

在上面的代码中,我们定义了一个AudioDenoiser类来实现音频降噪。这个类继承自nn.Module类,并重载了forward方法。在forward方法中,我们使用了若干个卷积层和池化层来提取音频的特征,然后使用全连接层将这些特征转换成一个向量。接着,我们使用反卷积层将这个向量转换成一个频谱图,并输出降噪后的音频。

  1. 训练模型

在训练模型之前,我们需要定义损失函数和优化器。

import torch.optim as optim

# 定义损失函数和优化器
criterion = nn.MSELoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)

在训练模型时,我们遍历数据集,并将每个音频输入到模型中进行训练。在训练过程中,我们使用MSELoss函数计算均方误差,并使用Adam优化器更新模型参数。

# 训练模型
for epoch in range(10):
    model.train()
    total_loss = 0
    for data in train_loader:
        x = data.to(device)
        x_noisy = x + torch.randn_like(x) * 0.1
        optimizer.zero_grad()
        x_hat = model(x_noisy)
        loss = criterion(x_hat, x)
        loss.backward()
        optimizer.step()
        total_loss += loss.item() * x.size(0)
    avg_loss = total_loss / len(train_loader.dataset)
    print('Epoch {}: train loss = {}'.format(epoch, avg_loss))
    
    # 在测试集上评估模型
    avg_loss, avg_sdr, avg_sir, avg_sar = evaluate(model, test_loader)
    print('Epoch {}: test loss = {}, SDR = {}, SIR = {}, SAR = {}'.format(epoch, avg_loss, avg_sdr, avg_sir, avg_sar))
    
    # 保存模型
    torch.save(model.state_dict(), 'model.pth')

在上面的代码中,我们训练模型10个epoch,并在每个epoch结束时在测试集上评估模型。在训练过程中,我们使用zero_grad函数清除梯度,并使用backward函数计算梯度。接着,我们使用step函数更新模型参数。最后,我们计算每个epoch的平均损失,并输出训练和测试集上的损失和评估指标。我们还使用torch.save函数保存模型参数。

  1. 降噪后音频存储和评估指标

在训练和评估模型之后,我们可以使用模型对音频进行降噪,并计算SDR、SIR和SAR等评估指标。

# 加载模型
model.load_state_dict(torch.load('model.pth'))
model.eval()

# 对音频进行降噪,并计算评估指标
x, _ = librosa.load('noisy.wav', sr=16000, mono=True)
x_noisy = preprocess(x)
x_noisy = x_noisy.to(device)
x_hat = model(x_noisy)
mag_db_hat = x_hat.squeeze(1)
mag_db = x_noisy.squeeze(1)
x_hat = postprocess(mag_db_hat, x_noisy.squeeze(1))
x = postprocess(mag_db, x_noisy.squeeze(1))
sdr, sir, sar, _ = mir_eval.separation.bss_eval_sources(x, x_hat)

# 存储降噪后音频
librosa.output.write_wav('denoised.wav', x_hat, sr=16000)

# 打印评估指标
print('SDR = {}, SIR = {}, SAR = {}'.format(sdr, sir, sar))

在上面的代码中,我们首先加载保存的模型参数,并将模型切换到评估模式。接着,我们使用librosa库加载需要降噪的音频,并将其转换成频谱图。然后,我们将频谱图输入到模型中进行降噪,并使用postprocess函数将降噪后的频谱图转换成音频。最后,我们使用mir_eval库计算SDR、SIR和SAR等评估指标,并使用librosa库将降噪后的音频存储到本地文件中。

完整代码如下:

import torch
import librosa
import numpy as np
from torch.utils.data import Dataset, DataLoader
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import mir_eval

# 设置设备
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

# 定义Dataset
class AudioDataset(Dataset):
    def __init__(self, data_path, sr=16000, segment_length=2, transform=None):
        self.sr = sr
        self.segment_length = segment_length
        self.transform = transform
        
        self.data = []
        with open(data_path, 'r') as f:
            for line in f:
                line = line.strip()
                self.data.append(line)
    
    def __len__(self):
        return len(self.data)
    
    def __getitem__(self, index):
        audio_path = self.data[index]
        x, _ = librosa.load(audio_path, sr=self.sr, mono=True)
        x = self._preprocess(x)
        if self.transform:
            x = self.transform(x)
        return x
    
    def _preprocess(self, x):
        # 如果音频的长度小于segment_length,就在后面填充0
        if len(x) < self.segment_length * self.sr:
            x = np.pad(x, (0, self.segment_length * self.sr - len(x)), 'constant')
        # 如果音频的长度大于segment_length,就随机截取一个长度为segment_length的片段
        elif len(x) > self.segment_length * self.sr:
            start = np.random.randint(0, len(x) - self.segment_length * self.sr)
            x = x[start:start+self.segment_length * self.sr]
        return x

# 定义预处理函数
def preprocess(x):
    # 转换成频谱图
    spec = librosa.stft(x, n_fft=1024, hop_length=256, win_length=1024)
    mag, phase = librosa.magphase(spec)
    mag_db = librosa.amplitude_to_db(mag, ref=np.max)
    # 归一化处理
    mag_db = (mag_db + 80) / 80
    # 转换成张量
    mag_db = torch.tensor(mag_db).unsqueeze(0)
    return mag_db

# 定义后处理函数
def postprocess(mag_db, x):
    # 将张量转换成频谱图
    mag_db = mag_db.squeeze(0).detach().cpu().numpy()
    mag = librosa.db_to_amplitude(mag_db * 80 - 80, ref=1.0)
    phase = np.angle(librosa.stft(x, n_fft=1024, hop_length=256, win_length=1024))
    spec = mag * np.exp(1j * phase)
    # 将频谱图转换成音频
    x_hat = librosa.istft(spec, hop_length=256, win_length=1024)
    return x_hat

# 定义模型
class AudioDenoiser(nn.Module):
    def __init__(self):
        super(AudioDenoiser, self).__init__()
        self.conv1 = nn.Conv2d(1, 32, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
        self.conv2 = nn.Conv2d(32, 32, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
        self.pool1 = nn.MaxPool2d(kernel_size=(2, 2), stride=(2, 2))
        self.conv3 = nn.Conv2d(32, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
        self.conv4 = nn.Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
        self.pool2 = nn.MaxPool2d(kernel_size=(2, 2), stride=(2, 2))
        self.conv5 = nn.Conv2d(64, 128, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
        self.conv6 = nn.Conv2d(128, 128, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
        self.pool3 = nn.MaxPool2d(kernel_size=(2, 2), stride=(2, 2))
        self.fc1 = nn.Linear(128 * 32 * 32, 1024)
        self.fc2 = nn.Linear(1024, 128 * 32 * 32)
        self.conv7 = nn.ConvTranspose2d(128, 64, kernel_size=(3, 3), stride=(2, 2), padding=(1, 1), output_padding=(1, 1))
        self.conv8 = nn.Conv2d(64, 32, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
        self.conv9 = nn.ConvTranspose2d(32, 16, kernel_size=(3, 3), stride=(2, 2), padding=(1, 1), output_padding=(1, 1))
        self.conv10 = nn.Conv2d(16, 1, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    
    def forward(self, x):
        x = F.relu(self.conv1(x))
        x = F.relu(self.conv2(x))
        x = self.pool1(x)
        x = F.relu(self.conv3(x))
        x = F.relu(self.conv4(x))
        x = self.pool2(x)
        x = F.relu(self.conv5(x))
        x = F.relu(self.conv6(x))
        x = self.pool3(x)
        x = x.view(-1, 128 * 32 * 32)
        x = F.relu(self.fc1(x))
        x = F.relu(self.fc2(x))
        x = x.view(-1, 128, 32, 32)
        x = F.relu(self.conv7(x))
        x = F.relu(self.conv8(x))
        x = F.relu(self.conv9(x))
        x = F.relu(self.conv10(x))
        return x

# 加载数据集
train_dataset = AudioDataset('train.txt')
test_dataset = AudioDataset('test.txt')
train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False)

# 创建模型
model = AudioDenoiser().to(device)

# 定义损失函数和优化器
criterion = nn.MSELoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)

# 训练模型
for epoch in range(10):
    model.train()
    total_loss = 0
    for data in train_loader:
        x = data.to(device)
        x_noisy = x + torch.randn_like(x) * 0.1
        optimizer.zero_grad()
        x_hat = model(x_noisy)
        loss = criterion(x_hat, x)
        loss.backward()
        optimizer.step()
        total_loss += loss.item() * x.size(0)
    avg_loss = total_loss / len(train_loader.dataset)
    print('Epoch {}: train loss = {}'.format(epoch, avg_loss))
    
    # 在测试集上评估模型
    avg_loss, avg_sdr, avg_sir, avg_sar = evaluate(model, test_loader)
    print('Epoch {}: test loss = {}, SDR = {}, SIR = {}, SAR = {}'.format(epoch, avg_loss, avg_sdr, avg_sir, avg_sar))
    
    # 保存模型
    torch.save(model.state_dict(), 'model.pth')

# 将预处理函数应用到数据集上
class AudioDataset(Dataset):
    def __init__(self, data_path, sr=16000, segment_length=2, transform=None):
        self.sr = sr
        self.segment_length = segment_length
        self.transform = transform
        
        self.data = []
        with open(data_path, 'r') as f:
            for line in f:
                line = line.strip()
                self.data.append(line)
    
    def __len__(self):
        return len(self.data)
    
    def __getitem__(self, index):
        audio_path = self.data[index]
        x, _ = librosa.load(audio_path, sr=self.sr, mono=True)
        x = self._preprocess(x)
        if self.transform:
            x = self.transform(x)
        return x
    
    def _preprocess(self, x):
        x = preprocess(x)
        return x

# 将后处理函数应用到模型的输出上
def evaluate(model, loader):
    model.eval()
    total_loss = 0
    total_sdr = 0
    total_sir = 0
    total_sar = 0
    with torch.no_grad():
        for data in loader:
            x = data.to(device)
            x_noisy = x + torch.randn_like(x) * 0.1
            x_hat = model(x_noisy)
            loss = F.mse_loss(x_hat, x)
            mag_db_hat = x_hat.squeeze(1)
            mag_db = x.squeeze(1)
            x_hat = postprocess(mag_db_hat, x_noisy.squeeze(1))
            x = postprocess(mag_db, x.squeeze(1))
            sdr, sir, sar, _ = mir_eval.separation.bss_eval_sources(x, x_hat)
            total_loss += loss.item() * x.size(0)
            total_sdr += sdr.sum()
            total_sir += sir.sum()
            total_sar += sar.sum()
    avg_loss = total_loss / len(loader.dataset)
    avg_sdr = total_sdr / (len(loader.dataset) * 2)
    avg_sir = total_sir / (len(loader.dataset) * 2)
    avg_sar = total_sar / (len(loader.dataset) * 2)
    return avg_loss, avg_sdr, avg_sir, avg_sar

# 加载模型
model.load_state_dict(torch.load('model.pth'))
model.eval()

# 对音频进行降噪,并计算评估指标
x, _ = librosa.load('noisy.wav', sr=16000, mono=True)
x_noisy = preprocess(x)
x_noisy = x_noisy.to(device)
x_hat = model(x_noisy)
mag_db_hat = x_hat.squeeze(1)
mag_db = x_noisy.squeeze(1)
x_hat = postprocess(mag_db_hat, x_noisy.squeeze(1))
x = postprocess(mag_db, x_noisy.squeeze(1))
sdr, sir, sar, _ = mir_eval.separation.bss_eval_sources(x, x_hat)

# 存储降噪后音频
librosa.output.write_wav('denoised.wav', x_hat, sr=16000)

# 打印评估指标
print('SDR = {}, SIR = {}, SAR = {}'.format(sdr, sir, sar))
CNN音频降噪模型PyTorch实现:完整代码及评估指标

原文地址: https://www.cveoy.top/t/topic/nsML 著作权归作者所有。请勿转载和采集!

免费AI点我,无需注册和登录