这段代码实现了一个音频播放和录制的功能,并且在播放和录制时可以选择特定的设备进行操作。主要包括以下几个类和函数:

  1. AudioPlay类:该类继承了PyAudio类,并实现了音频录制和播放的相关方法。包括初始化设定参数,设置回调函数等。其中,callback_output 函数是音频播放时的回调函数,可以在其中实现音频的读取和播放。

  2. get_in_out_devices函数:该函数可以获取系统中的输入输出设备,并将其存储在 input_dictoutput_dict 中,以供后续使用。

  3. get_device_index函数:该函数可以根据设备名称查找对应的设备索引。

  4. record_play_thread函数:该函数实现了一个线程,在其中进行音频的录制和播放,并且在录制和播放之间随机休眠一段时间。

  5. 主程序部分:在主程序中,首先实例化一个 AudioPlay 对象,并获取系统中的输入输出设备。然后开启一个线程来执行 record_play_thread 函数,在另一个线程中使用 adb 命令获取系统中某个进程的性能数据,并将其保存到指定的目录中。

此外,还有一些其他的细节实现,如文件的读写等。

代码示例:

import wave
import os
import time
import datetime
from threading import Thread
import subprocess
import random
from pyaudio import PyAudio, paInt16, paContinue, paComplete


class AudioPlay(PyAudio):

    def __init__(self, channels=1):
        super().__init__()
        self.chunk = 1024  # 每个缓冲区的帧数
        self.format_sample = paInt16  # 采样位数
        self.channels = channels  # 声道:1,单声道;2,双声道
        self.fps = 44100  # 采样频率
        self.input_dict = None
        self.output_dict = None
        self.stream = None
        self.filename = 'test.wav'
        self.duration = 0   # 音频时长
        self.flag = False
        self.kill = False

    def __call__(self, filename):
        '重载文件名'
        self.filename = filename

    def callback_output(self, in_data, frame_count, time_info, status):
        '播放回调函数'
        data = self.wf.readframes(frame_count)
        return data, paContinue

    def run(self, filename=None, name=None, sleep=0):
        time.sleep(1)
        '音频录制线程'
        # 播放
        if not filename:
            raise Exception('未输入音频文件名,不能播放,请输入后再试!')
        thread_2 = Thread(target=self.read_audio, args=(filename, name, sleep, ))
        thread_2.start()

    def read_audio(self, filename, name=None, sleep=0):
        '音频播放'
        output_device_index = self.get_device_index(name, False) if name else None
        time.sleep(sleep)
        print('play audio time, ', datetime.datetime.now())
        with wave.open(filename, 'rb') as self.wf:
            self.duration = self.get_duration(self.wf)
            self.stream = self.open(format=self.get_format_from_width(self.wf.getsampwidth()),
                                    channels=self.wf.getnchannels(),
                                    rate=self.wf.getframerate(),
                                    output=True,
                                    output_device_index=output_device_index,  # 输出设备索引
                                    stream_callback=self.callback_output
                                    )
            self.stream.start_stream()
            while self.stream.is_active():
                time.sleep(0.1)
        print(self.duration)
        # self.terminate_run()

    @staticmethod
    def get_duration(wf):
        '获取音频时长'
        return round(wf.getnframes() / wf.getframerate(), 2)

    def get_in_out_devices(self):
        '获取系统输入输出设备'
        self.input_dict = {}
        self.output_dict = {}
        for i in range(self.get_device_count()):
            dev_info = self.get_device_info_by_index(i)
            default_rate = int(dev_info['defaultSampleRate'])
            if not dev_info['hostApi'] and default_rate == self.fps and '映射器' not in dev_info['name']:
                if dev_info['maxInputChannels']:
                    self.input_dict[dev_info['name']] = i
                elif dev_info['maxOutputChannels']:
                    self.output_dict[dev_info['name']] = i

    def get_device_index(self, name, input_in=True):
        '获取选定设备索引'
        if input_in and self.input_dict:
            return self.input_dict.get(name, -1)
        elif not input_in and self.output_dict:
            return self.output_dict.get(name, -1)

    def terminate_run(self):
        '结束流录制或流播放'
        if self.stream:
            self.stream.stop_stream()
            self.stream.close()
        self.terminate()


def record_play_thread():
    time.sleep(5)
    while True:

        audio_play = AudioPlay()
        audio_play.get_in_out_devices()

        audio_play.run(base_dir + 'xiaogexiaoge.wav', name='test')
        time.sleep(6)
        audio_play.run(base_dir + '039_9.wav', name='test')
        # sleep_time = random.randint(10, 30)
        # print(sleep_time, datetime.datetime.now())
        time.sleep(15)

        audio_play.terminate_run()

        sleep_time = random.randint(10, 30)
        print(sleep_time, datetime.datetime.now())
        time.sleep(sleep_time)


if __name__ == '__main__':
    base_dir = r'D:\python-jiaoben\性能测试\'

    data_dir = r'D:\python-jiaoben\性能测试\内存监控\0412\'
    if not os.path.exists(data_dir):
        os.makedirs(data_dir)

    thread_play = Thread(target=record_play_thread)
    thread_play.start()

    for index in range(1, 20):

        try:

            print('adb time, ', datetime.datetime.now())
            # p1 = subprocess.Popen('adb shell top -b -d 1 -n 60 | grep SmartSpeaker', stdout=subprocess.PIPE, stderr=subprocess.PIPE)
            p1 = subprocess.Popen('adb shell top -b -d 10 -n 360 | grep YxwAiAudioDemo ', stdout=subprocess.PIPE, stderr=subprocess.PIPE)
            xingnneg_data = p1.stdout.read()

            if xingnneg_data:
                out_file = open(data_dir + str(index) + '.log', 'w', encoding='utf-8')
                out_file.write(xingnneg_data.decode('utf-8'))
                out_file.close()
        except:
            continue
Python 音频播放和录制:使用 PyAudio 和 adb 获取性能数据

原文地址: https://www.cveoy.top/t/topic/nwCN 著作权归作者所有。请勿转载和采集!

免费AI点我,无需注册和登录