OpenPose 视频姿势检测和标注
import copy import numpy as np import cv2 from glob import glob import os import argparse import json
video file processing setup
从:https://stackoverflow.com/a/61927951 中获取
import argparse import subprocess import sys from pathlib import Path from typing import NamedTuple
class FFProbeResult(NamedTuple): return_code: int json: str error: str
def ffprobe(file_path) -> FFProbeResult: command_array = ["ffprobe", "-v", "quiet", "-print_format", "json", "-show_format", "-show_streams", file_path] result = subprocess.run(command_array, stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True) return FFProbeResult(return_code=result.returncode, json=result.stdout, error=result.stderr)
openpose setup
from src import model from src import util from src.body import Body from src.hand import Hand
加载模型
body_estimation = Body('model/body_pose_model.pth') hand_estimation = Hand('model/hand_pose_model.pth')
处理帧
def process_frame(frame, body=True, hands=True): canvas = copy.deepcopy(frame) if body: candidate, subset = body_estimation(frame) canvas = util.draw_bodypose(canvas, candidate, subset) if hands: hands_list = util.handDetect(candidate, subset, frame) all_hand_peaks = [] for x, y, w, is_left in hands_list: peaks = hand_estimation(frame[y:y+w, x:x+w, :]) peaks[:, 0] = np.where(peaks[:, 0]==0, peaks[:, 0], peaks[:, 0]+x) peaks[:, 1] = np.where(peaks[:, 1]==0, peaks[:, 1], peaks[:, 1]+y) all_hand_peaks.append(peaks) canvas = util.draw_handpose(canvas, all_hand_peaks) return canvas
使用ffmpeg写视频,因为cv2的writer失败了
https://stackoverflow.com/questions/61036822/opencv-videowriter-produces-cant-find-starting-number-error
import ffmpeg
打开指定的视频
parser = argparse.ArgumentParser( description="Process a video annotating poses detected.") parser.add_argument('file', type=str, help='Video file location to process.') parser.add_argument('--no_hands', action='store_true', help='No hand pose') parser.add_argument('--no_body', action='store_true', help='No body pose') args = parser.parse_args() video_file = args.file cap = cv2.VideoCapture(video_file)
获取视频文件信息
ffprobe_result = ffprobe(args.file) info = json.loads(ffprobe_result.json) videoinfo = [i for i in info["streams"] if i["codec_type"] == "video"][0] input_fps = videoinfo["avg_frame_rate"]
input_fps = float(input_fps[0])/float(input_fps[1])
input_pix_fmt = videoinfo["pix_fmt"] input_vcodec = videoinfo["codec_name"]
定义一个writer对象,用于写入一个修改后的视频文件
postfix = info["format"]["format_name"].split(",")[0] output_file = ".".join(video_file.split(".")[:-1])+".processed." + postfix
class Writer(): def init(self, output_file, input_fps, input_framesize, input_pix_fmt, input_vcodec): if os.path.exists(output_file): os.remove(output_file) self.ff_proc = ( ffmpeg .input('pipe:', format='rawvideo', pix_fmt="bgr24", s='%sx%s'%(input_framesize[1],input_framesize[0]), r=input_fps) .output(output_file, pix_fmt=input_pix_fmt, vcodec=input_vcodec) .overwrite_output() .run_async(pipe_stdin=True) )
def __call__(self, frame):
self.ff_proc.stdin.write(frame.tobytes())
def close(self):
self.ff_proc.stdin.close()
self.ff_proc.wait()
writer = None while(cap.isOpened()): ret, frame = cap.read() if frame is None: break
posed_frame = process_frame(frame, body=not args.no_body,
hands=not args.no_hands)
if writer is None:
input_framesize = posed_frame.shape[:2]
writer = Writer(output_file, input_fps, input_framesize, input_pix_fmt,
input_vcodec)
cv2.imshow('frame', posed_frame)
# 写入帧
writer(posed_frame)
if cv2.waitKey(1) & 0xFF == ord('q'):
break
cap.release() writer.close() cv2.destroyAllWindows()
原文地址: http://www.cveoy.top/t/topic/gnG5 著作权归作者所有。请勿转载和采集!