Python 音频播放和录制:使用 PyAudio 和 adb 获取性能数据
这段代码实现了一个音频播放和录制的功能,并且在播放和录制时可以选择特定的设备进行操作。主要包括以下几个类和函数:
-
AudioPlay类:该类继承了PyAudio类,并实现了音频录制和播放的相关方法。包括初始化设定参数,设置回调函数等。其中,
callback_output函数是音频播放时的回调函数,可以在其中实现音频的读取和播放。 -
get_in_out_devices函数:该函数可以获取系统中的输入输出设备,并将其存储在
input_dict和output_dict中,以供后续使用。 -
get_device_index函数:该函数可以根据设备名称查找对应的设备索引。
-
record_play_thread函数:该函数实现了一个线程,在其中进行音频的录制和播放,并且在录制和播放之间随机休眠一段时间。
-
主程序部分:在主程序中,首先实例化一个 AudioPlay 对象,并获取系统中的输入输出设备。然后开启一个线程来执行
record_play_thread函数,在另一个线程中使用 adb 命令获取系统中某个进程的性能数据,并将其保存到指定的目录中。
此外,还有一些其他的细节实现,如文件的读写等。
代码示例:
import wave
import os
import time
import datetime
from threading import Thread
import subprocess
import random
from pyaudio import PyAudio, paInt16, paContinue, paComplete
class AudioPlay(PyAudio):
def __init__(self, channels=1):
super().__init__()
self.chunk = 1024 # 每个缓冲区的帧数
self.format_sample = paInt16 # 采样位数
self.channels = channels # 声道:1,单声道;2,双声道
self.fps = 44100 # 采样频率
self.input_dict = None
self.output_dict = None
self.stream = None
self.filename = 'test.wav'
self.duration = 0 # 音频时长
self.flag = False
self.kill = False
def __call__(self, filename):
'重载文件名'
self.filename = filename
def callback_output(self, in_data, frame_count, time_info, status):
'播放回调函数'
data = self.wf.readframes(frame_count)
return data, paContinue
def run(self, filename=None, name=None, sleep=0):
time.sleep(1)
'音频录制线程'
# 播放
if not filename:
raise Exception('未输入音频文件名,不能播放,请输入后再试!')
thread_2 = Thread(target=self.read_audio, args=(filename, name, sleep, ))
thread_2.start()
def read_audio(self, filename, name=None, sleep=0):
'音频播放'
output_device_index = self.get_device_index(name, False) if name else None
time.sleep(sleep)
print('play audio time, ', datetime.datetime.now())
with wave.open(filename, 'rb') as self.wf:
self.duration = self.get_duration(self.wf)
self.stream = self.open(format=self.get_format_from_width(self.wf.getsampwidth()),
channels=self.wf.getnchannels(),
rate=self.wf.getframerate(),
output=True,
output_device_index=output_device_index, # 输出设备索引
stream_callback=self.callback_output
)
self.stream.start_stream()
while self.stream.is_active():
time.sleep(0.1)
print(self.duration)
# self.terminate_run()
@staticmethod
def get_duration(wf):
'获取音频时长'
return round(wf.getnframes() / wf.getframerate(), 2)
def get_in_out_devices(self):
'获取系统输入输出设备'
self.input_dict = {}
self.output_dict = {}
for i in range(self.get_device_count()):
dev_info = self.get_device_info_by_index(i)
default_rate = int(dev_info['defaultSampleRate'])
if not dev_info['hostApi'] and default_rate == self.fps and '映射器' not in dev_info['name']:
if dev_info['maxInputChannels']:
self.input_dict[dev_info['name']] = i
elif dev_info['maxOutputChannels']:
self.output_dict[dev_info['name']] = i
def get_device_index(self, name, input_in=True):
'获取选定设备索引'
if input_in and self.input_dict:
return self.input_dict.get(name, -1)
elif not input_in and self.output_dict:
return self.output_dict.get(name, -1)
def terminate_run(self):
'结束流录制或流播放'
if self.stream:
self.stream.stop_stream()
self.stream.close()
self.terminate()
def record_play_thread():
time.sleep(5)
while True:
audio_play = AudioPlay()
audio_play.get_in_out_devices()
audio_play.run(base_dir + 'xiaogexiaoge.wav', name='test')
time.sleep(6)
audio_play.run(base_dir + '039_9.wav', name='test')
# sleep_time = random.randint(10, 30)
# print(sleep_time, datetime.datetime.now())
time.sleep(15)
audio_play.terminate_run()
sleep_time = random.randint(10, 30)
print(sleep_time, datetime.datetime.now())
time.sleep(sleep_time)
if __name__ == '__main__':
base_dir = r'D:\python-jiaoben\性能测试\'
data_dir = r'D:\python-jiaoben\性能测试\内存监控\0412\'
if not os.path.exists(data_dir):
os.makedirs(data_dir)
thread_play = Thread(target=record_play_thread)
thread_play.start()
for index in range(1, 20):
try:
print('adb time, ', datetime.datetime.now())
# p1 = subprocess.Popen('adb shell top -b -d 1 -n 60 | grep SmartSpeaker', stdout=subprocess.PIPE, stderr=subprocess.PIPE)
p1 = subprocess.Popen('adb shell top -b -d 10 -n 360 | grep YxwAiAudioDemo ', stdout=subprocess.PIPE, stderr=subprocess.PIPE)
xingnneg_data = p1.stdout.read()
if xingnneg_data:
out_file = open(data_dir + str(index) + '.log', 'w', encoding='utf-8')
out_file.write(xingnneg_data.decode('utf-8'))
out_file.close()
except:
continue
原文地址: https://www.cveoy.top/t/topic/nwCN 著作权归作者所有。请勿转载和采集!