添加测试脚本

This commit is contained in:
shikong 2024-03-15 10:13:31 +08:00
parent 87423c734f
commit c9af1a4dc3
3 changed files with 223 additions and 0 deletions

95
test/main.py Normal file
View File

@ -0,0 +1,95 @@
from concurrent.futures import ThreadPoolExecutor
import os
import os.path as path
import sys
import datetime
import urllib
import urllib.parse
import urllib.request
work_path = sys.path[0]
# 设备列表
devices_file = "devices.txt"
devices_file_path = path.join(work_path, devices_file)
# 下载目录
tmp_dir = "download"
tmp_path = path.join(work_path, tmp_dir)
server = "http://127.0.0.1:18183/video"
# server = "http://httpbin.org/anything/video"
def check_or_mk_tmp_dir():
if not path.exists(tmp_path):
os.mkdir(tmp_path)
def check_or_create_devices_file():
if not path.exists(devices_file_path):
with open(devices_file_path, mode="w", encoding="utf8") as f:
f.write("# 设备编码 一行一个\n")
def read_devices_file():
check_or_create_devices_file()
devices = []
with open(devices_file_path, mode="r", encoding="utf8") as f:
while True:
line = f.readline()
if not line:
break
line = line.strip()
if line.startswith("#") or len(line) == 0:
continue
else:
devices.append(line)
print(f"读取设备数量: {len(devices)}")
return devices
def tasks(device: str, start_time: str, end_time: str):
params = {
"start_time": start_time,
"end_time": end_time,
"device_id": device,
"useDownload": True,
}
url_params = urllib.parse.urlencode(params)
url = urllib.parse.urljoin(server, f"?{url_params}")
start = datetime.datetime.now()
start_str = start.strftime("%Y-%m-%d %H:%M:%S.%f")
print(f"{start_str} 开始下载: {url}")
urllib.request.urlretrieve(url,
f"{tmp_path}/{device}_{start_time}_{end_time}.mp4")
end = datetime.datetime.now()
print(
f"{device} {start_time}-{end_time}: 下载用时: {(end - start).total_seconds()}")
if __name__ == '__main__':
check_or_mk_tmp_dir()
check_or_create_devices_file()
print(work_path)
# workers = os.cpu_count()
workers = 32
print(f"最大并发数: {workers}")
with ThreadPoolExecutor(max_workers=workers) as worker:
devices = read_devices_file()
# day = datetime.datetime.today()
day = datetime.date(year=2024, month=3, day=11)
# 开始时间
start = datetime.time(hour=8, minute=11, second=15)
# 结束时间
end = datetime.time(hour=8, minute=11, second=30)
start_time = datetime.datetime.combine(day, start).strftime(
"%Y%m%d%H%M%S")
end_time = datetime.datetime.combine(day, end).strftime("%Y%m%d%H%M%S")
for device in devices:
worker.submit(tasks, device, start_time, end_time)

84
test/push_zlm.py Normal file
View File

@ -0,0 +1,84 @@
import os
import os.path as path
import select
import sys
import urllib.parse
from concurrent.futures import ThreadPoolExecutor
import subprocess
import platform
work_path = sys.path[0]
# 下载目录
tmp_dir = "download"
tmp_path = path.join(work_path, tmp_dir)
zlm_server = "192.168.3.12"
zlm_rtmp_port = 1936
zlm_auth_params = {
"sign": "41db35390ddad33f83944f44b8b75ded"
}
ffmpeg_path = "ffmpeg"
ffmpeg_read_rate = 1
ffplay_path = "ffplay"
enable_preview = True
workers = os.cpu_count()
def get_rtmp_url(app: str, stream_id: str):
params = urllib.parse.urlencode(zlm_auth_params)
return "rtmp://{}:{}/{}/{}?{}".format(zlm_server, zlm_rtmp_port, app,
stream_id, params)
def check_or_mk_tmp_dir():
if not path.exists(tmp_path):
os.mkdir(tmp_path)
def push_stream(file: str):
stream_id = path.basename(file)
target = get_rtmp_url("ffmpeg", stream_id)
print("开始 ffmpeg 推流 {} => {}", file, target)
cmd = "{} -loglevel error -stream_loop -1 -readrate {} -i {} -t 60 -c copy -f flv {}".format(
ffmpeg_path, ffmpeg_read_rate, file, target)
print(cmd)
proc = subprocess.Popen(
cmd,
stdin=None,
stdout=None,
shell=True
)
if enable_preview and len(
ffplay_path) > 0 and platform.system() == "Windows":
subprocess.Popen(
"ffplay {} -x 400 -y 300 -autoexit".format(target),
stdin=subprocess.DEVNULL,
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
shell=True
)
proc.communicate()
print("ffmpeg 结束 {} =/= {}", file, target)
if __name__ == '__main__':
if len(sys.argv) > 1:
try:
workers = int(sys.argv[1])
except:
print("参数解析错误, 将使用默认线程数 {} ".format(workers))
print("最大并发数: {}".format(workers))
with ThreadPoolExecutor(max_workers=workers) as worker:
for item in os.listdir(tmp_path):
p = path.join(tmp_path, item)
if not path.isfile(p) or not p.endswith(".mp4"):
continue
else:
worker.submit(push_stream, p)

44
test/video.py Normal file
View File

@ -0,0 +1,44 @@
import os.path as path
import sys
import shutil
work_path = sys.path[0]
# 设备列表
devices_file = "devices.txt"
devices_file_path = path.join(work_path, devices_file)
# 源文件
source_video_file = "20240311081115_20240311081130.mp4"
def check_or_create_devices_file():
if not path.exists(devices_file_path):
with open(devices_file_path, mode="w", encoding="utf8") as f:
f.write("# 设备编码 一行一个\n")
def read_devices_file():
check_or_create_devices_file()
devices = []
with open(devices_file_path, mode="r", encoding="utf8") as f:
while True:
line = f.readline()
if not line:
break
line = line.strip()
if line.startswith("#") or len(line) == 0:
continue
else:
devices.append(line)
print("读取设备数量: {}".format(len(devices)))
return devices
if __name__ == '__main__':
check_or_create_devices_file()
devices = read_devices_file()
src = path.join(work_path, source_video_file)
for device in devices:
dst = path.join(work_path, "{}_{}".format(device, source_video_file))
shutil.copyfile(src, dst)