目录
场景
准备
代码
场景
获取摄像头资源。实时播放以及录制视频的功能需求。此段代码就是对接萤石云视频,并录制视频记录。
准备
- 注册萤石云账户,配置好摄像头协议
- 安装ffmpeg 工具
- 安装依赖包cv2
- 接口文档
代码
# -*- coding: UTF-8 -*-
# Desc :萤石云视频流对接,并录制视频
import subprocess
import os
import time
import cv2
import requests# 视频配置
APP_KEY = "app_key"
APP_SECRET = "app_secret"
HEADERS = {'Host': 'open.ys7.com', 'Content-Type': 'application/x-www-form-urlencoded'}def get_camera_token():# 获取camera tokentoken_url = f'https://open.ys7.com/api/lapp/token/get?appKey={APP_KEY}&appSecret={APP_SECRET}'response = requests.post(token_url, headers=HEADERS)# result = response.json()# token = response.json()['data']['accessToken']token = 'at.7jrcjmna8qnqg8d3dgnzs87m4v2dme3l-32enpqgusd-1jvdfe4-uxo15ik0s'return tokendef get_video_stream_url(accessToken, deviceSerial, protocol):""":param accessToken: 访问token:param deviceSerial: 设备序列号:param protocol: 协议编号:return:"""# 获取视频流URLtry:url = f"https://open.ys7.com/api/lapp/v2/live/address/get?accessToken={accessToken}&deviceSerial={deviceSerial}&protocol={protocol}&supportH265=0"response = requests.post(url, headers=HEADERS, verify=False)msg = response.json()['msg']if "Operation succeeded" not in msg:return False, msgvideo_url = response.json()['data']['url']return True, video_urlexcept Exception as e:print(e)return False, ""def record_video(deviceSerial, save_path, fps=24, seconds=10):""":param deviceSerial: 设备序列号:param save_path: 录制视频保存路径:param fps: 视频帧数:param seconds: 视频录制时长:return:"""file_dir = os.path.join(save_path, deviceSerial)tmp_dir = os.path.join(save_path, 'tmp')if not os.path.exists(file_dir):os.makedirs(file_dir)if not os.path.exists(tmp_dir):os.makedirs(tmp_dir)accessToken = get_camera_token()status, stream_url = get_video_stream_url(accessToken, deviceSerial, "2")if not status:raise Exception("获取stream_url失败")# 初始化视频cap = cv2.VideoCapture(stream_url)# 经尝试,MP4其他的协议格式均不成功!fourcc = cv2.VideoWriter_fourcc('M', 'P', '4', 'v')# 设置要保存视频的像素大小 原值是1920*1080,按照原值保存,视频文件很大,# 可设置视频帧数,以及时间长度,保存像素大小来改变保存的视频文件大小size = (int(cap.get(cv2.CAP_PROP_FRAME_WIDTH) / 3),int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT) / 3))start_time = time.time()title = f'{int(start_time)}'video_tmp = os.path.join(tmp_dir, f'{title}.mp4')video_result = os.path.join(file_dir, f'{title}.mp4')video = cv2.VideoWriter(video_tmp, fourcc, fps, size)while cap.isOpened():# 采集一帧一帧的图像数据is_success, frame = cap.read()if is_success:img = cv2.resize(frame, size, interpolation=cv2.INTER_LINEAR)video.write(img)# 实现按下“q”键退出程序# if cv2.waitKey(1) & 0xFF == ord('q'):# breakif time.time() - start_time >= seconds:break# 释放视频资源video.release()cap.release()cv2.destroyAllWindows()return video_tmp, video_result, titledef mp4_video_record(deviceSerial, save_path):video_tmp, video_path, title = record_video(deviceSerial=deviceSerial,save_path=save_path)# 还需要对录制下来的视频做处理,之后才能在前端显示,播放cmd = f'ffmpeg -i {video_tmp} -strict -2 {video_path}'res = subprocess.Popen(cmd, shell=True,stdout=subprocess.PIPE,stderr=subprocess.PIPE)result = str(res.stderr.readlines()[-1], encoding='utf-8')if 'No such file or directory' in result:print("录制视频失败")elif not os.path.exists(video_path):print("录制视频失败")else:print("保存视频路径到数据库")def back2():cap = cv2.VideoCapture(r['data']['url'])fps = cap.get(cv2.CAP_PROP_FPS)print(fps)# 获取cap视频流的每帧像素大小size = (int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)),int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)))# 定义编码格式mpge-4fourcc = cv2.VideoWriter_fourcc('M', 'P', '4', '2')# 定义视频文件输入对象outVideo = cv2.VideoWriter('saveDir.avi', fourcc, fps, size) #size必须和视频流的帧大 小一致,否则无法执行# 获取视频流打开状态if cap.isOpened():rval, frame = cap.read()print('ture')else:rval = Falseprint('False')tot = 1c = 1# 循环使用cv2的read()方法读取视频帧while rval:rval, frame = cap.read()frame = cv2.resize(frame, (400, 300)) #调整画面大小cv2.imshow('test', frame)# 使用VideoWriter类中的write(frame)方法,将图像帧写入视频文件outVideo.write(frame)cv2.waitKey(25)cap.release()outVideo.release()cv2.destroyAllWindows()if __name__ == '__main__':# 对接萤石云摄像头mp4_video_record("123", "/")