import os import os.path as path import select import sys import urllib.parse from concurrent.futures import ThreadPoolExecutor import subprocess import platform work_path = sys.path[0] # 下载目录 tmp_dir = "download" tmp_path = path.join(work_path, tmp_dir) zlm_server = "192.168.3.12" zlm_rtmp_port = 1936 zlm_auth_params = { "sign": "41db35390ddad33f83944f44b8b75ded" } ffmpeg_path = "ffmpeg" ffmpeg_read_rate = 1 ffplay_path = "ffplay" enable_preview = True workers = os.cpu_count() def get_rtmp_url(app: str, stream_id: str): params = urllib.parse.urlencode(zlm_auth_params) return "rtmp://{}:{}/{}/{}?{}".format(zlm_server, zlm_rtmp_port, app, stream_id, params) def check_or_mk_tmp_dir(): if not path.exists(tmp_path): os.mkdir(tmp_path) def push_stream(file: str): stream_id = path.basename(file) target = get_rtmp_url("ffmpeg", stream_id) print("开始 ffmpeg 推流 {} => {}", file, target) cmd = "{} -loglevel error -stream_loop -1 -readrate {} -i {} -t 60 -c copy -f flv {}".format( ffmpeg_path, ffmpeg_read_rate, file, target) print(cmd) proc = subprocess.Popen( cmd, stdin=None, stdout=None, shell=True ) if enable_preview and len( ffplay_path) > 0 and platform.system() == "Windows": subprocess.Popen( "ffplay {} -x 400 -y 300 -autoexit".format(target), stdin=subprocess.DEVNULL, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, shell=True ) proc.communicate() print("ffmpeg 结束 {} =/= {}", file, target) if __name__ == '__main__': if len(sys.argv) > 1: try: workers = int(sys.argv[1]) except: print("参数解析错误, 将使用默认线程数 {} ".format(workers)) print("最大并发数: {}".format(workers)) with ThreadPoolExecutor(max_workers=workers) as worker: for item in os.listdir(tmp_path): p = path.join(tmp_path, item) if not path.isfile(p) or not p.endswith(".mp4"): continue else: worker.submit(push_stream, p)