diff --git a/gb28181-mocking-service/pom.xml b/gb28181-mocking-service/pom.xml index 9e57ae5..d7be108 100644 --- a/gb28181-mocking-service/pom.xml +++ b/gb28181-mocking-service/pom.xml @@ -40,105 +40,6 @@ zlmediakit-service - - - org.bytedeco - javacv - 1.5.8 - - - org.openjfx - javafx-graphics - - - org.bytedeco - javacpp - - - org.bytedeco - openblas - - - org.bytedeco - opencv - - - org.bytedeco - tesseract - - - org.bytedeco - flycapture - - - org.bytedeco - libdc1394 - - - org.bytedeco - libfreenect - - - org.bytedeco - libfreenect2 - - - org.bytedeco - librealsense - - - org.bytedeco - librealsense2 - - - org.bytedeco - videoinput - - - org.bytedeco - artoolkitplus - - - org.bytedeco - flandmark - - - org.bytedeco - leptonica - - - - - org.bytedeco - javacpp - 1.5.8 - - - - org.bytedeco - ffmpeg - 5.1.2-1.5.8 - windows-x86_64 - - - org.bytedeco - javacpp - - - - - org.bytedeco - ffmpeg - 5.1.2-1.5.8 - linux-x86_64 - - - org.bytedeco - javacpp - - - - com.google.guava @@ -210,6 +111,13 @@ org.springframework spring-web + + + + org.apache.commons + commons-exec + 1.3 + diff --git a/gb28181-mocking-service/src/main/java/cn/skcks/docking/gb28181/mocking/config/sip/FfmpegConfig.java b/gb28181-mocking-service/src/main/java/cn/skcks/docking/gb28181/mocking/config/sip/FfmpegConfig.java new file mode 100644 index 0000000..306976a --- /dev/null +++ b/gb28181-mocking-service/src/main/java/cn/skcks/docking/gb28181/mocking/config/sip/FfmpegConfig.java @@ -0,0 +1,22 @@ +package cn.skcks.docking.gb28181.mocking.config.sip; + +import lombok.Data; +import org.springframework.boot.context.properties.ConfigurationProperties; +import org.springframework.context.annotation.Configuration; + +@ConfigurationProperties(prefix = "ffmpeg-support") +@Configuration +@Data +public class FfmpegConfig { + private String ffmpeg; + private String ffprobe; + + private Rtp rtp; + + @Data + public static class Rtp { + private String input; + private String output; + private String logLevel = "fatal"; + } +} diff --git a/gb28181-mocking-service/src/main/java/cn/skcks/docking/gb28181/mocking/core/sip/message/processor/invite/request/InviteRequestProcessor.java b/gb28181-mocking-service/src/main/java/cn/skcks/docking/gb28181/mocking/core/sip/message/processor/invite/request/InviteRequestProcessor.java index 6499b91..550e073 100644 --- a/gb28181-mocking-service/src/main/java/cn/skcks/docking/gb28181/mocking/core/sip/message/processor/invite/request/InviteRequestProcessor.java +++ b/gb28181-mocking-service/src/main/java/cn/skcks/docking/gb28181/mocking/core/sip/message/processor/invite/request/InviteRequestProcessor.java @@ -167,25 +167,29 @@ public class InviteRequestProcessor implements MessageProcessor { String senderIp = request.getLocalAddress().getHostAddress(); SdpFactory sdpFactory = SdpFactory.getInstance(); SessionDescriptionImpl sessionDescription = new SessionDescriptionImpl(); - sessionDescription.setVersion(sdpFactory.createVersion(0)); + GB28181Description description = GB28181Description.Convertor.convert(sessionDescription); + description.setVersion(sdpFactory.createVersion(0)); // 目前只配置 ipv4 - sessionDescription.setOrigin(sdpFactory.createOrigin(channelId, 0, 0, ConnectionField.IN, Connection.IP4, senderIp)); - sessionDescription.setSessionName(gb28181Description.getSessionName()); - sessionDescription.setConnection(sdpFactory.createConnection(ConnectionField.IN, Connection.IP4, senderIp)); + description.setOrigin(sdpFactory.createOrigin(channelId, 0, 0, ConnectionField.IN, Connection.IP4, senderIp)); + description.setSessionName(gb28181Description.getSessionName()); + description.setConnection(sdpFactory.createConnection(ConnectionField.IN, Connection.IP4, senderIp)); TimeField respTime = new TimeField(); respTime.setZero(); TimeDescription timeDescription = SdpFactory.getInstance().createTimeDescription(respTime); - sessionDescription.setTimeDescriptions(new Vector<>() {{ + description.setTimeDescriptions(new Vector<>() {{ add(timeDescription); }}); String[] mediaTypeCodes = new String[]{"98","96"}; MediaDescription respMediaDescription = SdpFactory.getInstance().createMediaDescription("video", port, 0, SdpConstants.RTP_AVP, mediaTypeCodes); Arrays.stream(mediaTypeCodes).forEach((k)->{ String v = MediaSdpHelper.RTPMAP.get(k); - mediaDescription.addAttribute((AttributeField) SdpFactory.getInstance().createAttribute(SdpConstants.RTPMAP, StringUtils.joinWith(Separators.SP,k,v))); + respMediaDescription.addAttribute((AttributeField) SdpFactory.getInstance().createAttribute(SdpConstants.RTPMAP, StringUtils.joinWith(Separators.SP,k,v))); }); respMediaDescription.addAttribute((AttributeField) SdpFactory.getInstance().createAttribute("sendonly", null)); - GB28181Description description = GB28181Description.Convertor.convert(sessionDescription); + + description.setMediaDescriptions(new Vector<>(){{ + add(respMediaDescription); + }}); description.setSsrcField(gb28181Description.getSsrcField()); String transport = request.getTopmostViaHeader().getTransport(); @@ -225,7 +229,10 @@ public class InviteRequestProcessor implements MessageProcessor { schedule[0] = scheduledExecutorService.schedule(subscriber::onComplete, 60 , TimeUnit.SECONDS); // 推流 ack 事件订阅 subscribe.getAckSubscribe().addSubscribe(key, subscriber); - // 发送 sdp 响应 - sender.sendResponse(senderIp, transport, (ignore, ignore2, ignore3) -> SipResponseBuilder.responseSdp(request, description)); + + scheduledExecutorService.schedule(()->{ + // 发送 sdp 响应 + sender.sendResponse(senderIp, transport, (ignore, ignore2, ignore3) -> SipResponseBuilder.responseSdp(request, description)); + }, 1,TimeUnit.SECONDS); } } diff --git a/gb28181-mocking-service/src/main/java/cn/skcks/docking/gb28181/mocking/core/sip/response/SipResponseBuilder.java b/gb28181-mocking-service/src/main/java/cn/skcks/docking/gb28181/mocking/core/sip/response/SipResponseBuilder.java index a7d27aa..fc51d4d 100644 --- a/gb28181-mocking-service/src/main/java/cn/skcks/docking/gb28181/mocking/core/sip/response/SipResponseBuilder.java +++ b/gb28181-mocking-service/src/main/java/cn/skcks/docking/gb28181/mocking/core/sip/response/SipResponseBuilder.java @@ -15,6 +15,7 @@ import javax.sip.SipFactory; import javax.sip.address.Address; import javax.sip.address.SipURI; import javax.sip.header.ContentTypeHeader; +import javax.sip.header.MaxForwardsHeader; import javax.sip.message.Response; @Slf4j @@ -42,11 +43,13 @@ public class SipResponseBuilder { messageFactory.setDefaultContentEncodingCharset(GB28181Constant.CHARSET); SIPResponse response = (SIPResponse)messageFactory.createResponse(Response.OK, request); SipFactory sipFactory = SipFactory.getInstance(); - ContentTypeHeader contentTypeHeader = sipFactory.createHeaderFactory().createContentTypeHeader("APPLICATION", "SDP"); + ContentTypeHeader contentTypeHeader = sipFactory.createHeaderFactory().createContentTypeHeader("application", "sdp"); response.setContent(sdp.toString(), contentTypeHeader); SipURI sipURI = (SipURI) request.getRequestURI(); SipURI uri = MessageHelper.createSipURI(sipURI.getUser(), StringUtils.joinWith(":", sipURI.getHost() + ":" + sipURI.getPort())); Address concatAddress = sipFactory.createAddressFactory().createAddress(uri); + MaxForwardsHeader maxForwardsHeader = MessageHelper.createMaxForwardsHeader(70); + response.setMaxForwards(maxForwardsHeader); response.addHeader(sipFactory.createHeaderFactory().createContactHeader(concatAddress)); return response; } diff --git a/gb28181-mocking-service/src/main/java/cn/skcks/docking/gb28181/mocking/service/device/DeviceProxyService.java b/gb28181-mocking-service/src/main/java/cn/skcks/docking/gb28181/mocking/service/device/DeviceProxyService.java index e9f9dc9..13d6700 100644 --- a/gb28181-mocking-service/src/main/java/cn/skcks/docking/gb28181/mocking/service/device/DeviceProxyService.java +++ b/gb28181-mocking-service/src/main/java/cn/skcks/docking/gb28181/mocking/service/device/DeviceProxyService.java @@ -6,48 +6,45 @@ import cn.hutool.core.date.DateUtil; import cn.hutool.core.util.URLUtil; import cn.skcks.docking.gb28181.core.sip.message.subscribe.GenericSubscribe; import cn.skcks.docking.gb28181.mocking.config.sip.DeviceProxyConfig; -import cn.skcks.docking.gb28181.mocking.core.sip.executor.MockingExecutor; import cn.skcks.docking.gb28181.mocking.core.sip.message.subscribe.SipSubscribe; import cn.skcks.docking.gb28181.mocking.core.sip.response.SipResponseBuilder; import cn.skcks.docking.gb28181.mocking.core.sip.sender.SipSender; import cn.skcks.docking.gb28181.mocking.orm.mybatis.dynamic.model.MockingDevice; +import cn.skcks.docking.gb28181.mocking.service.ffmpeg.FfmpegSupportService; import gov.nist.javax.sip.message.SIPRequest; import lombok.RequiredArgsConstructor; import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; +import org.apache.commons.exec.Executor; import org.apache.commons.lang3.StringUtils; -import org.bytedeco.ffmpeg.avcodec.AVPacket; -import org.bytedeco.ffmpeg.global.avcodec; -import org.bytedeco.ffmpeg.global.avutil; -import org.bytedeco.javacv.FFmpegFrameGrabber; -import org.bytedeco.javacv.FFmpegFrameRecorder; import org.springframework.stereotype.Service; import javax.sip.message.Request; import javax.sip.message.Response; -import java.io.IOException; import java.nio.charset.StandardCharsets; import java.util.Date; import java.util.HashMap; import java.util.Optional; import java.util.concurrent.*; -import java.util.concurrent.atomic.AtomicBoolean; @Slf4j @Service @RequiredArgsConstructor public class DeviceProxyService { - private final MockingExecutor mockingExecutor; - private final DeviceProxyConfig proxyConfig; private final SipSubscribe subscribe; - private final ConcurrentHashMap> task = new ConcurrentHashMap<>(); + private final ConcurrentHashMap task = new ConcurrentHashMap<>(); private final SipSender sender; - public synchronized void proxyVideo2Rtp(String callId, MockingDevice device, Date startTime, Date endTime, String rtpAddr, int rtpPort){ + private final FfmpegSupportService ffmpegSupportService; + + public synchronized void proxyVideo2Rtp(String callId, MockingDevice device, Date startTime, Date endTime, String rtpAddr, int rtpPort) { + Optional.ofNullable(task.get(device.getDeviceCode())).ifPresent(task->{ + task.getWatchdog().destroyProcess(); + }); String fromUrl = URLUtil.completeUrl(proxyConfig.getUrl(), "/video"); HashMap map = new HashMap<>(3); String deviceCode = device.getDeviceCode(); @@ -74,7 +71,7 @@ public class DeviceProxyService { public void onNext(SIPRequest item) { String ip = item.getLocalAddress().getHostAddress(); String transPort = item.getTopmostViaHeader().getTransport(); - sender.sendResponse(ip, transPort,((provider, ip1, port) -> + sender.sendResponse(ip, transPort, ((provider, ip1, port) -> SipResponseBuilder.response(item, Response.OK, "OK"))); onComplete(); } @@ -88,102 +85,20 @@ public class DeviceProxyService { public void onComplete() { log.info("bye 订阅结束 {}", key); subscribe.getByeSubscribe().delPublisher(key); - Optional.ofNullable(task.get(device.getDeviceCode())).ifPresent(task->{ - task.cancel(true); + Optional.ofNullable(task.get(device.getDeviceCode())).ifPresent(task -> { + task.getWatchdog().destroyProcess(); }); task.remove(device.getDeviceCode()); } }; - final String finalFromUrl = fromUrl; - ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(); - CompletableFuture future = CompletableFuture.runAsync(() -> { - pushRtp(finalFromUrl, toUrl, time); - // 推送结束后 60 秒内未收到 bye 则结束订阅 释放内存 - scheduledExecutorService.schedule(subscriber::onComplete, time + 60 , TimeUnit.SECONDS); - }, mockingExecutor.sipTaskExecutor()); - task.put(device.getDeviceCode(), future); subscribe.getByeSubscribe().addSubscribe(key, subscriber); + ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(); + task.put(device.getDeviceCode(), pushRtpTask( fromUrl, toUrl, time + 60)); + scheduledExecutorService.schedule(subscriber::onComplete, time + 60, TimeUnit.SECONDS); } @SneakyThrows - public void pushRtp(String fromUrl, String toUrl, long time) { - log.info("创建推流任务 fromUrl {}, toUrl {}, time: {}", fromUrl, toUrl, time); - // FFmpeg 调试日志 -// FFmpegLogCallback.set(); - FFmpegFrameGrabber grabber = new FFmpegFrameGrabber(fromUrl); - // 30秒超时 - grabber.setOption("stimeout", "30000000"); - grabber.start(); - - FFmpegFrameRecorder recorder = new FFmpegFrameRecorder(toUrl, grabber.getImageWidth(), grabber.getImageHeight(), grabber.getAudioChannels()); - recorder.setInterleaved(true); - recorder.setVideoOption("preset", "ultrafast"); - recorder.setVideoOption("tune", "zerolatency"); - recorder.setVideoOption("crf", "25"); -// recorder.setMaxDelay(500); - recorder.setGopSize((int) (grabber.getFrameRate() * 2)); - recorder.setFrameRate(grabber.getFrameRate()); - recorder.setSampleRate(grabber.getSampleRate()); - recorder.setOption("flvflags", "no_duration_filesize"); - recorder.setOption("movflags","frag_keyframe+empty_moov"); - if (grabber.getAudioChannels() > 0) { - recorder.setAudioChannels(grabber.getAudioChannels()); - recorder.setAudioBitrate(grabber.getAudioBitrate()); - recorder.setAudioCodec(grabber.getAudioCodec()); - } - recorder.setFrameRate(grabber.getVideoFrameRate()); - recorder.setVideoBitrate(grabber.getVideoBitrate()); - // recorder.setVideoCodec(grabber.getVideoCodec()); - recorder.setVideoCodec(avcodec.AV_CODEC_ID_H264); - recorder.setPixelFormat(avutil.AV_PIX_FMT_YUV420P); // 视频源数据yuv - recorder.setAudioCodec(avcodec.AV_CODEC_ID_AAC); // 设置音频压缩方式 - recorder.setFormat("rtp_mpegts"); - recorder.setVideoOption("threads", String.valueOf(Runtime.getRuntime().availableProcessors())); // 解码线程数 - recorder.start(grabber.getFormatContext()); - grabber.flush(); - - ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(); - AtomicBoolean record = new AtomicBoolean(true); - scheduledExecutorService.schedule(() -> { - log.info("到达结束时间, 结束推送 fromUrl: {}, toUrl: {}", fromUrl, toUrl); - record.set(false); - }, time, TimeUnit.SECONDS); - try { - long begin = System.currentTimeMillis(); - AVPacket k; - long dts = 0; - long pts = 0; - int no_frame_index = 0; - while (record.get() && no_frame_index < 10 ) { - k = grabber.grabPacket(); - if(k == null || k.size() <= 0 || k.data() == null) { - //空包记录次数跳过 - no_frame_index++; - continue; - } - // 获取到的pkt的dts,pts异常,将此包丢弃掉。 - if (k.dts() == avutil.AV_NOPTS_VALUE && k.pts() == avutil.AV_NOPTS_VALUE || k.pts() < dts) { - avcodec.av_packet_unref(k); - continue; - } - // 记录上一pkt的dts,pts - dts = k.dts(); - pts = k.pts(); - recorder.recordPacket(k); - avcodec.av_packet_unref(k); - long end = System.currentTimeMillis(); - long sleep_real = (long) ((1000 / grabber.getFrameRate()) - (end - begin)); - begin = end; - if (sleep_real > 0) { - Thread.sleep(sleep_real); - } - } - grabber.close(); - recorder.close(); - } catch (IOException e) { - throw new RuntimeException(e); - } - - log.info("结束推送 fromUrl: {}, toUrl: {}", fromUrl, toUrl); + public Executor pushRtpTask(String fromUrl, String toUrl, long time){ + return ffmpegSupportService.pushToRtp("http://10.10.10.200:5080/live/test.live.flv", toUrl, time, TimeUnit.SECONDS); } } diff --git a/gb28181-mocking-service/src/main/java/cn/skcks/docking/gb28181/mocking/service/ffmpeg/FfmpegSupportService.java b/gb28181-mocking-service/src/main/java/cn/skcks/docking/gb28181/mocking/service/ffmpeg/FfmpegSupportService.java new file mode 100644 index 0000000..e3386ef --- /dev/null +++ b/gb28181-mocking-service/src/main/java/cn/skcks/docking/gb28181/mocking/service/ffmpeg/FfmpegSupportService.java @@ -0,0 +1,39 @@ +package cn.skcks.docking.gb28181.mocking.service.ffmpeg; + +import cn.skcks.docking.gb28181.mocking.config.sip.FfmpegConfig; +import lombok.RequiredArgsConstructor; +import lombok.SneakyThrows; +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.exec.*; +import org.apache.commons.lang3.StringUtils; +import org.springframework.stereotype.Service; + +import java.util.concurrent.TimeUnit; + +@Slf4j +@Service +@RequiredArgsConstructor +public class FfmpegSupportService { + private final FfmpegConfig ffmpegConfig; + + @SneakyThrows + public Executor pushToRtp(String input, String output, long time, TimeUnit unit){ + FfmpegConfig.Rtp rtp = ffmpegConfig.getRtp(); + String inputParam = StringUtils.joinWith(" ", rtp.getInput(), input); + log.info("视频输入参数 {}", inputParam); + + String outputParam = StringUtils.joinWith(" ", rtp.getOutput(), output); + log.info("视频输出参数 {}", outputParam); + + String logLevelParam = StringUtils.joinWith(" ","-loglevel", rtp.getLogLevel()); + String command = StringUtils.joinWith(" ", ffmpegConfig.getFfmpeg(), inputParam, outputParam, logLevelParam); + CommandLine commandLine = CommandLine.parse(command); + DefaultExecuteResultHandler resultHandler = new DefaultExecuteResultHandler(); + Executor executor = new DefaultExecutor(); + ExecuteWatchdog watchdog = new ExecuteWatchdog(unit.toMillis(time)); + executor.setExitValue(0); + executor.setWatchdog(watchdog); + executor.execute(commandLine, resultHandler); + return executor; + } +} diff --git a/gb28181-mocking-starter/src/main/resources/application-local.yml b/gb28181-mocking-starter/src/main/resources/application-local.yml index bd01d47..d57d5b6 100644 --- a/gb28181-mocking-starter/src/main/resources/application-local.yml +++ b/gb28181-mocking-starter/src/main/resources/application-local.yml @@ -57,7 +57,8 @@ gb28181: ip: 10.10.10.20 # ip: 192.168.10.32 # ip: 192.168.3.12 - port: 5060 +# port: 5060 + port: 5061 password: 123456 domain: 4405010000 id: 44050100002000000001 @@ -71,4 +72,10 @@ media: secret: 4155cca6-2f9f-11ee-85e6-8de4ce2e7333 proxy: device: - url: http://192.168.2.3:18183 + url: http://10.10.10.20:18186 +ffmpeg-support: + ffmpeg: D:\Soft\Captura\ffmpeg\ffmpeg.exe + ffprobe: D:\Soft\Captura\ffmpeg\ffprobe.exe + rtp: + input: -re -i + output: -vcodec copy -acodec aac -f rtp_mpegts diff --git a/gb28181-mocking-starter/src/main/resources/application.yml b/gb28181-mocking-starter/src/main/resources/application.yml index 7ccb140..dab5bc1 100644 --- a/gb28181-mocking-starter/src/main/resources/application.yml +++ b/gb28181-mocking-starter/src/main/resources/application.yml @@ -24,8 +24,8 @@ spring: username: root password: 123456a url: jdbc:mysql://192.168.1.241:3306/gb28181_docking_platform?createDatabaseIfNotExist=true&characterEncoding=UTF-8&serverTimezone=Asia/Shanghai -# profiles: -# active: local + profiles: + active: local gb28181: # 作为28181服务器的配置 @@ -69,4 +69,10 @@ media: secret: 4155cca6-2f9f-11ee-85e6-8de4ce2e7333 proxy: device: - url: http://192.168.2.3:18183 \ No newline at end of file + url: http://192.168.2.3:18183 +ffmpeg-support: + ffmpeg: /usr/bin/ffmpeg + ffprobe: /usr/bin/ffprobe + rtp: + input: -re -i + output: -vcodec h264 -acodec aac -f rtp_mpegts