音频数据预处理:切片、序列化和降采样
import os
import librosa
import numpy as np
from tqdm import tqdm
#clean_train_folder = 'data/test/clean_trainset_56spk'
#noisy_train_folder = 'data/test/noisy_trainset_56spk'
#clean_test_folder = 'data/test/clean_testset'
#noisy_test_folder = 'data/test/noisy_testset'
#serialized_train_folder = 'data/test/serialized_train_data'
#serialized_test_folder = 'data/test/serialized_test_data'
clean_train_folder = 'D:/test_module/SEGAN/data/clean_trainset'
noisy_train_folder = 'D:/test_module/SEGAN/data/noisy_trainset'
clean_test_folder = 'data/test2/clean_testset'
noisy_test_folder = 'data/test2/noisy_testset'
serialized_train_folder = 'data/test2/serialized_train_data'
serialized_test_folder = 'data/test2/serialized_test_data'
window_size = 2 ** 14 # about 1 second of samples
sample_rate = 16000
def slice_signal(file, window_size, stride, sample_rate):
'''
Helper function for slicing the audio file
by window size and sample rate with [1-stride] percent overlap (default 50%).
'''
wav, sr = librosa.load(file, sr=sample_rate)
hop = int(window_size * stride)
slices = []
for end_idx in range(window_size, len(wav), hop):
start_idx = end_idx - window_size
slice_sig = wav[start_idx:end_idx]
slices.append(slice_sig)
return slices
def process_and_serialize(data_type):
'''
Serialize, down-sample the sliced signals and save on separate folder.
'''
stride = 0.5
if data_type == 'train':
clean_folder = clean_train_folder
noisy_folder = noisy_train_folder
serialized_folder = serialized_train_folder
else:
clean_folder = clean_test_folder
noisy_folder = noisy_test_folder
serialized_folder = serialized_test_folder
if not os.path.exists(serialized_folder):
os.makedirs(serialized_folder)
# walk through the path, slice the audio file, and save the serialized result
for root, dirs, files in os.walk(clean_folder):
if len(files) == 0:
continue
for filename in tqdm(files, desc='Serialize and down-sample {} audios'.format(data_type)):
clean_file = os.path.join(clean_folder, filename)
noisy_file = os.path.join(noisy_folder, filename)
# slice both clean signal and noisy signal
clean_sliced = slice_signal(clean_file, window_size, stride, sample_rate)
noisy_sliced = slice_signal(noisy_file, window_size, stride, sample_rate)
# serialize - file format goes [original_file]_[slice_number].npy
# ex) p293_154.wav_5.npy denotes 5th slice of p293_154.wav file
for idx, slice_tuple in enumerate(zip(clean_sliced, noisy_sliced)):
pair = np.array([slice_tuple[0], slice_tuple[1]])
np.save(os.path.join(serialized_folder, '{}_{}'.format(filename, idx)), arr=pair)
def data_verify(data_type):
'''
Verifies the length of each data after pre-process.
'''
if data_type == 'train':
serialized_folder = serialized_train_folder
else:
serialized_folder = serialized_test_folder
for root, dirs, files in os.walk(serialized_folder):
for filename in tqdm(files, desc='Verify serialized {} audios'.format(data_type)):
data_pair = np.load(os.path.join(root, filename))
if data_pair.shape[1] != window_size:
print('Snippet length not {} : {} instead'.format(window_size, data_pair.shape[1]))
break
if __name__ == '__main__':
process_and_serialize('train')
data_verify('train')
process_and_serialize('test')
data_verify('test')
该代码首先定义了用于处理音频的参数,包括窗口大小、采样率等。然后定义了一个辅助函数slice_signal(),用于对音频文件进行切片处理。该函数将音频文件读入到内存中,然后按照指定的窗口大小和重叠率进行切片,并返回一个列表,其中包含所有切片。
接下来的函数process_and_serialize()用于对切片后的音频数据进行序列化和降采样处理,并将处理后的数据保存在新的文件夹中。该函数首先根据数据类型(训练集或测试集)确定输入和输出文件夹,然后遍历输入文件夹中的所有文件,依次对每个文件进行切片、序列化和降采样处理,并将处理后的数据保存在输出文件夹中。处理后的文件名格式为[原始文件名]_[切片编号].npy。
最后的函数data_verify()用于验证处理后的数据是否符合预期。该函数首先根据数据类型确定输入文件夹,然后遍历文件夹中的所有文件,依次读入数据,并检查每个数据是否符合预期(即长度是否为窗口大小)。如果发现某个数据不符合预期,则输出错误提示并终止程序运行。
原文地址: https://www.cveoy.top/t/topic/n6Eh 著作权归作者所有。请勿转载和采集!