import os
import librosa
import h5py
import pandas as pd
import numpy as np
from scipy import signal
from glob import glob
from itertools import chain

pd.options.mode.chained_assignment = None


def create_dataset(df_pos, pcen, glob_cls_name, file_name, hf, seg_len, hop_seg, fps):
    '''
    将时频表示按片段长度进行分块,并存储在 h5py 数据集中

    Args:
        -df_pos : 数据框
        -log_mel_spec : 对数梅尔谱图
        -glob_cls_name: 音频文件中使用的类名,其中只存在一个类
        -file_name : csv 文件的名称
        -hf: h5py 对象
        -seg_len : 固定片段长度
        -fps: 每秒帧数
    Out:
        - label_list: 提取的梅尔补丁的标签列表
    '''

    label_list = []
    if len(hf['features'][:]) == 0:
        file_index = 0
    else:
        file_index = len(hf['features'][:])


    start_time, end_time = time_2_frame(df_pos, fps)


    '对于具有“Call”列名的 csv 文件,选取全局类名'

    if 'CALL' in df_pos.columns:
        cls_list = [glob_cls_name] * len(start_time)
    else:
        cls_list = [df_pos.columns[(df_pos == 'POS').loc[index]].values for index, row in df_pos.iterrows()]
        cls_list = list(chain.from_iterable(cls_list))

    assert len(start_time) == len(end_time)
    assert len(cls_list) == len(start_time)

    for index in range(len(start_time)):

        str_ind = start_time[index]
        end_ind = end_time[index]
        label = cls_list[index]

        '提取片段,并按 hop_seg 步长前进'

        if end_ind - str_ind > seg_len:
            shift = 0
            while end_ind - (str_ind + shift) > seg_len:

                pcen_patch = pcen[int(str_ind + shift):int(str_ind + shift + seg_len)]

                hf['features'].resize((file_index + 1, pcen_patch.shape[0], pcen_patch.shape[1]))
                hf['features'][file_index] = pcen_patch
                label_list.append(label)
                file_index += 1
                shift = shift + hop_seg

            pcen_patch_last = pcen[end_ind - seg_len:end_ind]



            hf['features'].resize((file_index + 1, pcen_patch.shape[0], pcen_patch.shape[1]))
            hf['features'][file_index] = pcen_patch_last
            label_list.append(label)
            file_index += 1
        else:

            '如果补丁长度小于片段长度,则将补丁重复多次,直到达到片段长度'

            pcen_patch = pcen[str_ind:end_ind]
            if pcen_patch.shape[0] == 0:
                print(pcen_patch.shape[0])
                print('The patch is of 0 length')
                continue

            repeat_num = int(seg_len / (pcen_patch.shape[0])) + 1
            pcen_patch_new = np.tile(pcen_patch, (repeat_num, 1))
            pcen_patch_new = pcen_patch_new[0:int(seg_len)]
            hf['features'].resize((file_index + 1, pcen_patch_new.shape[0], pcen_patch_new.shape[1]))
            hf['features'][file_index] = pcen_patch_new
            label_list.append(label)
            file_index += 1

    
    print('Total files created : {}'.format(file_index))
    return label_list

class Feature_Extractor():

       def __init__(self, conf):
           self.sr =conf.features.sr
           self.n_fft = conf.features.n_fft
           self.hop = conf.features.hop_mel
           self.n_mels = conf.features.n_mels
           self.fmax = conf.features.fmax
           #self.win_length = conf.features.win_length
       def extract_feature(self,audio):

           mel_spec = librosa.feature.melspectrogram(audio,sr=self.sr, n_fft=self.n_fft,
                                                     hop_length=self.hop,n_mels=self.n_mels,fmax=self.fmax)
           pcen = librosa.core.pcen(mel_spec,sr=22050)
           pcen = pcen.astype(np.float32)

           return pcen

def extract_feature(audio_path,feat_extractor,conf):

    y,fs = librosa.load(audio_path,sr=conf.features.sr)

    '根据 librosa 文档中的建议对音频进行缩放'

    y = y * (2**32)
    pcen = feat_extractor.extract_feature(y)
    return pcen.T



def time_2_frame(df,fps):


    '在 onset 和 offset 周围留出 25 毫秒的余量'

    df.loc[:,'Starttime'] = df['Starttime'] - 0.025
    df.loc[:,'Endtime'] = df['Endtime'] + 0.025

    '将时间转换为帧'

    start_time = [int(np.floor(start * fps)) for start in df['Starttime']]

    end_time = [int(np.floor(end * fps)) for end in df['Endtime']]

    return start_time,end_time

def feature_transform(conf=None,mode=None):
    '''
       训练:
          提取梅尔谱图/PCEN,并将每个数据样本切分成长度为 conf.seg_len 的片段。
          每个片段继承剪辑级别的标签。片段长度在训练和验证集中保持一致。
       评估:
           目前使用验证集进行评估。
           
           对于每个音频文件,提取时频表示并创建 3 个子集:
           a) 正样本集 - 根据提供的 onset-offset 注释提取片段。
           b) 负样本集 - 由于没有提供负样本注释,我们将整个
                         音频文件视为负样本类,并提取长度为 conf.seg_len 的片段
           c) 查询集 - 从第 5 个注释的结束时间到音频文件的结束时间。
                          对这个子集进行 onset-offset 预测。

       Args:
       - config: 配置对象
       - mode: 训练/验证

       Out:
       - Num_extract_train/Num_extract_valid - 训练/验证集中的样本数量
                                                                                               '''


    label_tr = []
    pcen_extractor = Feature_Extractor(conf)

    fps =  conf.features.sr / conf.features.hop_mel
    '将固定片段长度转换为帧'

    seg_len = int(round(conf.features.seg_len * fps))
    hop_seg = int(round(conf.features.hop_seg * fps))
    extension = '*.csv'


    if mode == 'train':

        print('=== Processing training set ===')
        meta_path = conf.path.train_dir
        all_csv_files = [file
                         for path_dir, subdir, files in os.walk(meta_path)
                         for file in glob(os.path.join(path_dir, extension))]
        all_csv_files = all_csv_files[:100]
        hdf_tr = os.path.join(conf.path.feat_train,'Mel_train.h5')
        hf = h5py.File(hdf_tr,'w')
        hf.create_dataset('features', shape=(0, seg_len, conf.features.n_mels),
                          maxshape=(None, seg_len, conf.features.n_mels))
        num_extract = 0
        for file in all_csv_files:

            split_list = file.split('/')
            glob_cls_name = split_list[split_list.index('Training_Set') + 1]
            file_name = split_list[split_list.index('Training_Set') + 2]
            df = pd.read_csv(file, header=0, index_col=False)
            audio_path = file.replace('csv', 'wav')
            print('Processing file name {}'.format(audio_path))
            pcen = extract_feature(audio_path, pcen_extractor,conf)
            df_pos = df[(df == 'POS').any(axis=1)]
            label_list = create_dataset(df_pos,pcen,glob_cls_name,file_name,hf,seg_len,hop_seg,fps)
            label_tr.append(label_list)
        print(' Feature extraction for training set complete')
        num_extract = len(hf['features'])
        flat_list = [item for sublist in label_tr for item in sublist]
        hf.create_dataset('labels', data=[s.encode() for s in flat_list], dtype='S20')
        data_shape = hf['features'].shape
        hf.close()
        return num_extract,data_shape

    else:

        print('=== Processing Validation set ===')

        meta_path = conf.path.eval_dir

        all_csv_files = [file
                         for path_dir, subdir, files in os.walk(meta_path)
                         for file in glob(os.path.join(path_dir, extension))]

        num_extract_eval = 0

        for file in all_csv_files:

            idx_pos = 0
            idx_neg = 0
            start_neg = 0
            hop_neg = 0
            idx_query = 0
            hop_query = 0
            strt_index = 0

            split_list = file.split('/')
            name = str(split_list[-1].split('.')[0])
            feat_name = name + '.h5'
            audio_path = file.replace('csv', 'wav')
            feat_info = []
            hdf_eval = os.path.join(conf.path.feat_eval,feat_name)
            hf = h5py.File(hdf_eval,'w')
            

            df_eval = pd.read_csv(file, header=0, index_col=False)
            Q_list = df_eval['Q'].to_numpy()

            start_time,end_time = time_2_frame(df_eval,fps)

            index_sup = np.where(Q_list == 'POS')[0][:conf.train.n_shot]

            difference = []
            for index in index_sup:
                difference.append(end_time[index] - start_time[index])
            
            # 根据音频文件自适应片段长度。
            max_len = max(difference)
            
            # 根据 5-shot 中的最大值选择片段长度。
            # 逻辑基于 12GB GPU 的适应性,因为有些片段非常长。
            if max_len < 100:

                seg_len = max_len
            elif max_len > 100 and max_len < 500 :
                seg_len = max_len//4
            else:
                seg_len = max_len//8
                

            
            print(f'Segment length for file is {seg_len}')
            hop_seg = seg_len//2

            hf.create_dataset('feat_pos', shape=(0, seg_len, conf.features.n_mels),
                              maxshape= (None, seg_len, conf.features.n_mels))
            hf.create_dataset('feat_query',shape=(0,seg_len,conf.features.n_mels),maxshape=(None,seg_len,conf.features.n_mels))
            hf.create_dataset('feat_neg',shape=(0,seg_len,conf.features.n_mels),maxshape=(None,seg_len,conf.features.n_mels))
            hf.create_dataset('start_index_query',shape=(1,),maxshape=(None))

            

            
            hf.create_dataset('seg_len',shape=(1,), maxshape=(None))
            hf.create_dataset('hop_seg',shape=(1,), maxshape=(None))
            pcen = extract_feature(audio_path, pcen_extractor,conf)
            mean = np.mean(pcen)
            std = np.mean(pcen)
            hf['seg_len'][:] = seg_len
            hf['hop_seg'][:] = hop_seg

            strt_indx_query = end_time[index_sup[-1]]
            end_idx_neg = pcen.shape[0] - 1
            hf['start_index_query'][:] = strt_indx_query

            print('Creating negative dataset')

            while end_idx_neg - (strt_index + hop_neg) > seg_len:

                patch_neg = pcen[int(strt_index + hop_neg):int(strt_index + hop_neg + seg_len)]

                hf['feat_neg'].resize((idx_neg + 1, patch_neg.shape[0], patch_neg.shape[1]))
                hf['feat_neg'][idx_neg] = patch_neg
                idx_neg += 1
                hop_neg += hop_seg

            last_patch = pcen[end_idx_neg - seg_len:end_idx_neg]
            hf['feat_neg'].resize((idx_neg + 1, last_patch.shape[0], last_patch.shape[1]))
            hf['feat_neg'][idx_neg] = last_patch

            print('Creating Positive dataset')
            for index in index_sup:

                str_ind = int(start_time[index])
                end_ind = int(end_time[index])

                if end_ind - str_ind > seg_len:

                    shift = 0
                    while end_ind - (str_ind + shift) > seg_len:

                        patch_pos = pcen[int(str_ind + shift):int(str_ind + shift + seg_len)]

                        hf['feat_pos'].resize((idx_pos + 1, patch_pos.shape[0], patch_pos.shape[1]))
                        hf['feat_pos'][idx_pos] = patch_pos
                        idx_pos += 1
                        shift += hop_seg
                    last_patch_pos = pcen[end_ind - seg_len:end_ind]
                    hf['feat_pos'].resize((idx_pos + 1, patch_pos.shape[0], patch_pos.shape[1]))
                    hf['feat_pos'][idx_pos] = last_patch_pos
                    idx_pos += 1

                else:
                    patch_pos = pcen[str_ind:end_ind]

                    if patch_pos.shape[0] == 0:
                        print(patch_pos.shape[0])
                        print('The patch is of 0 length')
                        continue
                    repeat_num = int(seg_len / (patch_pos.shape[0])) + 1

                    patch_new = np.tile(patch_pos, (repeat_num, 1))
                    patch_new = patch_new[0:int(seg_len)]
                    hf['feat_pos'].resize((idx_pos + 1, patch_new.shape[0], patch_new.shape[1]))
                    hf['feat_pos'][idx_pos] = patch_new
                    idx_pos += 1



            print('Creating query dataset')

            while end_idx_neg - (strt_indx_query + hop_query) > seg_len:

                patch_query = pcen[int(strt_indx_query + hop_query):int(strt_indx_query + hop_query + seg_len)]
                hf['feat_query'].resize((idx_query + 1, patch_query.shape[0], patch_query.shape[1]))
                hf['feat_query'][idx_query] = patch_query
                idx_query += 1
                hop_query += hop_seg


            last_patch_query = pcen[end_idx_neg - seg_len:end_idx_neg]

            hf['feat_query'].resize((idx_query + 1, last_patch_query.shape[0], last_patch_query.shape[1]))
            hf['feat_query'][idx_query] = last_patch_query
            num_extract_eval += len(hf['feat_query'])

            hf.close()

        return num_extract_eval

这段代码的功能是将音频文件提取特征后,将特征按照固定长度分割成多个片段,并将这些片段保存在 h5 文件中。同时,也将每个片段所属的类别标签记录在 h5 文件中。其中,片段的长度、片段之间的间隔以及 h5 文件的路径等参数都由输入参数决定。

代码中构建 h5 文件的部分

feature_transform 函数中,代码创建了三个数据集:

  • feat_pos:存储正样本片段的特征
  • feat_neg:存储负样本片段的特征
  • feat_query:存储查询片段的特征

这些数据集的形状都为 (样本数量, 片段长度, 特征维度)

此外,还创建了几个辅助数据集:

  • seg_len:存储片段长度
  • hop_seg:存储片段之间的间隔
  • start_index_query:存储查询片段的起始索引

总结

这段代码通过使用 librosah5py 库,将音频文件提取特征后,进行分段,并将片段和标签保存到 h5 文件中,方便后续的训练和评估。

关于代码的一些细节

  • 代码中使用了 create_dataset 函数将提取的特征片段保存到 h5 文件中。
  • Feature_Extractor 类用于提取音频文件的特征。
  • extract_feature 函数用于读取音频文件并提取特征。
  • time_2_frame 函数用于将时间戳转换为帧索引。
  • 代码中使用了 conf 对象来存储配置参数,例如片段长度、片段之间的间隔、h5 文件的路径等等。
  • 代码使用了 pandas 库来读取 CSV 文件中的标签信息。
  • 代码使用了 numpy 库来进行数组操作。
  • 代码使用了 glob 模块来查找音频文件。
  • 代码使用了 itertools 模块来对迭代器进行操作。
  • 代码中使用了 assert 语句来进行断言,以确保代码的正确性。
  • 代码中使用了 print 语句来打印调试信息。

改进建议

  • 可以添加对不同音频格式的支持,例如 MP3 和 OGG。
  • 可以添加对不同特征提取方法的支持,例如 MFCCs 和 Chroma。
  • 可以添加对不同数据增强方法的支持,例如噪声添加和时间拉伸。
  • 可以添加对不同机器学习模型的支持,例如 SVM 和 Random Forest。
  • 可以添加对不同评估指标的支持,例如准确率和召回率。

其他

  • 代码中的一些变量名可以更具描述性。
  • 代码可以添加更多的注释,以提高可读性。
  • 代码可以进行测试,以确保其正确性。
  • 代码可以进行优化,以提高其效率。

希望这些信息能够帮助你理解这段代码。如果你有任何其他问题,请随时提问。

音频特征提取与数据集构建:使用 librosa 和 h5py 生成音频片段

原文地址: https://www.cveoy.top/t/topic/onBI 著作权归作者所有。请勿转载和采集!

免费AI点我,无需注册和登录