bye 事件处理
This commit is contained in:
parent
0e3d7195c7
commit
4b84f9e3b2
@ -0,0 +1,51 @@
|
|||||||
|
package cn.skcks.docking.gb28181.mocking.core.sip.message.processor.bye.request;
|
||||||
|
|
||||||
|
import cn.skcks.docking.gb28181.core.sip.listener.SipListener;
|
||||||
|
import cn.skcks.docking.gb28181.core.sip.message.processor.MessageProcessor;
|
||||||
|
import cn.skcks.docking.gb28181.core.sip.message.subscribe.GenericSubscribe;
|
||||||
|
import cn.skcks.docking.gb28181.mocking.core.sip.message.subscribe.SipSubscribe;
|
||||||
|
import cn.skcks.docking.gb28181.mocking.core.sip.response.SipResponseBuilder;
|
||||||
|
import cn.skcks.docking.gb28181.mocking.core.sip.sender.SipSender;
|
||||||
|
import gov.nist.javax.sip.message.SIPRequest;
|
||||||
|
import jakarta.annotation.PostConstruct;
|
||||||
|
import lombok.RequiredArgsConstructor;
|
||||||
|
import lombok.extern.slf4j.Slf4j;
|
||||||
|
import org.springframework.stereotype.Component;
|
||||||
|
|
||||||
|
import javax.sip.RequestEvent;
|
||||||
|
import javax.sip.message.Request;
|
||||||
|
import javax.sip.message.Response;
|
||||||
|
import java.util.EventObject;
|
||||||
|
import java.util.Optional;
|
||||||
|
|
||||||
|
@Slf4j
|
||||||
|
@RequiredArgsConstructor
|
||||||
|
@Component
|
||||||
|
public class ByeRequestProcessor implements MessageProcessor {
|
||||||
|
private final SipListener sipListener;
|
||||||
|
|
||||||
|
private final SipSubscribe subscribe;
|
||||||
|
|
||||||
|
private final SipSender sender;
|
||||||
|
|
||||||
|
@PostConstruct
|
||||||
|
@Override
|
||||||
|
public void init() {
|
||||||
|
sipListener.addRequestProcessor(Request.BYE, this);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void process(EventObject eventObject) {
|
||||||
|
RequestEvent requestEvent = (RequestEvent) eventObject;
|
||||||
|
SIPRequest request = (SIPRequest) requestEvent.getRequest();
|
||||||
|
String callId = request.getCallId().getCallId();
|
||||||
|
String key = GenericSubscribe.Helper.getKey(Request.BYE, callId);
|
||||||
|
String ip = request.getLocalAddress().getHostAddress();
|
||||||
|
String transport = request.getTopmostViaHeader().getTransport();
|
||||||
|
Optional.ofNullable(subscribe.getByeSubscribe().getPublisher(key))
|
||||||
|
.ifPresentOrElse(
|
||||||
|
publisher -> publisher.submit(request),
|
||||||
|
() -> sender.sendResponse(ip, transport, ((provider, ip1, port) ->
|
||||||
|
SipResponseBuilder.response(request, Response.OK, "OK"))));
|
||||||
|
}
|
||||||
|
}
|
@ -206,7 +206,7 @@ public class InviteRequestProcessor implements MessageProcessor {
|
|||||||
public void onNext(SIPRequest item) {
|
public void onNext(SIPRequest item) {
|
||||||
log.info("收到 ack 确认请求: {} 开始推流",key);
|
log.info("收到 ack 确认请求: {} 开始推流",key);
|
||||||
// RTP 推流
|
// RTP 推流
|
||||||
deviceProxyService.proxyVideo2Rtp(device, start, stop, address, port);
|
deviceProxyService.proxyVideo2Rtp(callId, device, start, stop, address, port);
|
||||||
onComplete();
|
onComplete();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -0,0 +1,39 @@
|
|||||||
|
package cn.skcks.docking.gb28181.mocking.core.sip.message.subscribe;
|
||||||
|
|
||||||
|
import cn.skcks.docking.gb28181.core.sip.message.subscribe.GenericSubscribe;
|
||||||
|
import gov.nist.javax.sip.message.SIPRequest;
|
||||||
|
import lombok.RequiredArgsConstructor;
|
||||||
|
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
|
import java.util.concurrent.Executor;
|
||||||
|
import java.util.concurrent.Flow;
|
||||||
|
import java.util.concurrent.SubmissionPublisher;
|
||||||
|
|
||||||
|
@RequiredArgsConstructor
|
||||||
|
public class ByeSubscribe implements GenericSubscribe<SIPRequest> {
|
||||||
|
private final Executor executor;
|
||||||
|
|
||||||
|
private static final Map<String, SubmissionPublisher<SIPRequest>> publishers = new ConcurrentHashMap<>();
|
||||||
|
|
||||||
|
public void close() {
|
||||||
|
Helper.close(publishers);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void addPublisher(String key) {
|
||||||
|
Helper.addPublisher(executor, publishers, key);
|
||||||
|
}
|
||||||
|
|
||||||
|
public SubmissionPublisher<SIPRequest> getPublisher(String key) {
|
||||||
|
return Helper.getPublisher(publishers, key);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void addSubscribe(String key, Flow.Subscriber<SIPRequest> subscribe) {
|
||||||
|
Helper.addSubscribe(publishers, key, subscribe);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void delPublisher(String key) {
|
||||||
|
Helper.delPublisher(publishers, key);
|
||||||
|
}
|
||||||
|
}
|
@ -23,16 +23,19 @@ public class SipSubscribe {
|
|||||||
private final Executor executor;
|
private final Executor executor;
|
||||||
private GenericSubscribe<SIPResponse> registerSubscribe;
|
private GenericSubscribe<SIPResponse> registerSubscribe;
|
||||||
private GenericSubscribe<SIPRequest> ackSubscribe;
|
private GenericSubscribe<SIPRequest> ackSubscribe;
|
||||||
|
private GenericSubscribe<SIPRequest> byeSubscribe;
|
||||||
|
|
||||||
@PostConstruct
|
@PostConstruct
|
||||||
private void init() {
|
private void init() {
|
||||||
registerSubscribe = new RegisterSubscribe(executor);
|
registerSubscribe = new RegisterSubscribe(executor);
|
||||||
ackSubscribe = new AckSubscribe(executor);
|
ackSubscribe = new AckSubscribe(executor);
|
||||||
|
byeSubscribe = new AckSubscribe(executor);
|
||||||
}
|
}
|
||||||
|
|
||||||
@PreDestroy
|
@PreDestroy
|
||||||
private void destroy() {
|
private void destroy() {
|
||||||
registerSubscribe.close();
|
registerSubscribe.close();
|
||||||
ackSubscribe.close();
|
ackSubscribe.close();
|
||||||
|
byeSubscribe.close();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -4,8 +4,14 @@ import cn.hutool.core.date.DatePattern;
|
|||||||
import cn.hutool.core.date.DateUnit;
|
import cn.hutool.core.date.DateUnit;
|
||||||
import cn.hutool.core.date.DateUtil;
|
import cn.hutool.core.date.DateUtil;
|
||||||
import cn.hutool.core.util.URLUtil;
|
import cn.hutool.core.util.URLUtil;
|
||||||
|
import cn.skcks.docking.gb28181.core.sip.message.subscribe.GenericSubscribe;
|
||||||
import cn.skcks.docking.gb28181.mocking.config.sip.DeviceProxyConfig;
|
import cn.skcks.docking.gb28181.mocking.config.sip.DeviceProxyConfig;
|
||||||
|
import cn.skcks.docking.gb28181.mocking.core.sip.executor.MockingExecutor;
|
||||||
|
import cn.skcks.docking.gb28181.mocking.core.sip.message.subscribe.SipSubscribe;
|
||||||
|
import cn.skcks.docking.gb28181.mocking.core.sip.response.SipResponseBuilder;
|
||||||
|
import cn.skcks.docking.gb28181.mocking.core.sip.sender.SipSender;
|
||||||
import cn.skcks.docking.gb28181.mocking.orm.mybatis.dynamic.model.MockingDevice;
|
import cn.skcks.docking.gb28181.mocking.orm.mybatis.dynamic.model.MockingDevice;
|
||||||
|
import gov.nist.javax.sip.message.SIPRequest;
|
||||||
import lombok.RequiredArgsConstructor;
|
import lombok.RequiredArgsConstructor;
|
||||||
import lombok.SneakyThrows;
|
import lombok.SneakyThrows;
|
||||||
import lombok.extern.slf4j.Slf4j;
|
import lombok.extern.slf4j.Slf4j;
|
||||||
@ -17,24 +23,31 @@ import org.bytedeco.javacv.FFmpegFrameGrabber;
|
|||||||
import org.bytedeco.javacv.FFmpegFrameRecorder;
|
import org.bytedeco.javacv.FFmpegFrameRecorder;
|
||||||
import org.springframework.stereotype.Service;
|
import org.springframework.stereotype.Service;
|
||||||
|
|
||||||
|
import javax.sip.message.Request;
|
||||||
|
import javax.sip.message.Response;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.nio.charset.StandardCharsets;
|
import java.nio.charset.StandardCharsets;
|
||||||
import java.util.Date;
|
import java.util.Date;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.concurrent.Executors;
|
import java.util.Optional;
|
||||||
import java.util.concurrent.ScheduledExecutorService;
|
import java.util.concurrent.*;
|
||||||
import java.util.concurrent.TimeUnit;
|
|
||||||
import java.util.concurrent.atomic.AtomicBoolean;
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
|
|
||||||
@Slf4j
|
@Slf4j
|
||||||
@Service
|
@Service
|
||||||
@RequiredArgsConstructor
|
@RequiredArgsConstructor
|
||||||
public class DeviceProxyService {
|
public class DeviceProxyService {
|
||||||
private final DeviceService deviceService;
|
private final MockingExecutor mockingExecutor;
|
||||||
|
|
||||||
private final DeviceProxyConfig proxyConfig;
|
private final DeviceProxyConfig proxyConfig;
|
||||||
|
|
||||||
public void proxyVideo2Rtp(MockingDevice device, Date startTime, Date endTime, String rtpAddr, int rtpPort){
|
private final SipSubscribe subscribe;
|
||||||
|
|
||||||
|
private final ConcurrentHashMap<String, CompletableFuture<Void>> task = new ConcurrentHashMap<>();
|
||||||
|
|
||||||
|
private final SipSender sender;
|
||||||
|
|
||||||
|
public synchronized void proxyVideo2Rtp(String callId, MockingDevice device, Date startTime, Date endTime, String rtpAddr, int rtpPort){
|
||||||
String fromUrl = URLUtil.completeUrl(proxyConfig.getUrl(), "/video");
|
String fromUrl = URLUtil.completeUrl(proxyConfig.getUrl(), "/video");
|
||||||
HashMap<String, String> map = new HashMap<>(3);
|
HashMap<String, String> map = new HashMap<>(3);
|
||||||
String deviceCode = device.getDeviceCode();
|
String deviceCode = device.getDeviceCode();
|
||||||
@ -46,7 +59,50 @@ public class DeviceProxyService {
|
|||||||
log.info("设备: {} 视频 url: {}", deviceCode, fromUrl);
|
log.info("设备: {} 视频 url: {}", deviceCode, fromUrl);
|
||||||
String toUrl = StringUtils.joinWith("", "rtp://", rtpAddr, ":", rtpPort);
|
String toUrl = StringUtils.joinWith("", "rtp://", rtpAddr, ":", rtpPort);
|
||||||
long time = DateUtil.between(startTime, endTime, DateUnit.SECOND);
|
long time = DateUtil.between(startTime, endTime, DateUnit.SECOND);
|
||||||
pushRtp(fromUrl, toUrl, time);
|
|
||||||
|
String key = GenericSubscribe.Helper.getKey(Request.BYE, callId);
|
||||||
|
subscribe.getByeSubscribe().addPublisher(key);
|
||||||
|
Flow.Subscriber<SIPRequest> subscriber = new Flow.Subscriber<>() {
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onSubscribe(Flow.Subscription subscription) {
|
||||||
|
log.info("订阅 bye {}", key);
|
||||||
|
subscription.request(1);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onNext(SIPRequest item) {
|
||||||
|
String ip = item.getLocalAddress().getHostAddress();
|
||||||
|
String transPort = item.getTopmostViaHeader().getTransport();
|
||||||
|
sender.sendResponse(ip, transPort,((provider, ip1, port) ->
|
||||||
|
SipResponseBuilder.response(item, Response.OK, "OK")));
|
||||||
|
onComplete();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onError(Throwable throwable) {
|
||||||
|
onComplete();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onComplete() {
|
||||||
|
log.info("bye 订阅结束 {}", key);
|
||||||
|
subscribe.getByeSubscribe().delPublisher(key);
|
||||||
|
Optional.ofNullable(task.get(device.getDeviceCode())).ifPresent(task->{
|
||||||
|
task.cancel(true);
|
||||||
|
});
|
||||||
|
task.remove(device.getDeviceCode());
|
||||||
|
}
|
||||||
|
};
|
||||||
|
final String finalFromUrl = fromUrl;
|
||||||
|
ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
|
||||||
|
CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
|
||||||
|
pushRtp(finalFromUrl, toUrl, time);
|
||||||
|
// 推送结束后 60 秒内未收到 bye 则结束订阅 释放内存
|
||||||
|
scheduledExecutorService.schedule(subscriber::onComplete, time + 60 , TimeUnit.SECONDS);
|
||||||
|
}, mockingExecutor.sipTaskExecutor());
|
||||||
|
task.put(device.getDeviceCode(), future);
|
||||||
|
subscribe.getByeSubscribe().addSubscribe(key, subscriber);
|
||||||
}
|
}
|
||||||
|
|
||||||
@SneakyThrows
|
@SneakyThrows
|
||||||
@ -55,14 +111,16 @@ public class DeviceProxyService {
|
|||||||
// FFmpeg 调试日志
|
// FFmpeg 调试日志
|
||||||
// FFmpegLogCallback.set();
|
// FFmpegLogCallback.set();
|
||||||
FFmpegFrameGrabber grabber = new FFmpegFrameGrabber(fromUrl);
|
FFmpegFrameGrabber grabber = new FFmpegFrameGrabber(fromUrl);
|
||||||
grabber.setOption("re","");
|
|
||||||
grabber.start();
|
grabber.start();
|
||||||
|
grabber.flush();
|
||||||
|
|
||||||
FFmpegFrameRecorder recorder = new FFmpegFrameRecorder(toUrl, grabber.getImageWidth(), grabber.getImageHeight(), grabber.getAudioChannels());
|
FFmpegFrameRecorder recorder = new FFmpegFrameRecorder(toUrl, grabber.getImageWidth(), grabber.getImageHeight(), grabber.getAudioChannels());
|
||||||
recorder.setInterleaved(true);
|
recorder.setInterleaved(true);
|
||||||
recorder.setVideoOption("preset", "ultrafast");
|
recorder.setVideoOption("preset", "ultrafast");
|
||||||
recorder.setVideoOption("tune", "zerolatency");
|
recorder.setVideoOption("tune", "zerolatency");
|
||||||
recorder.setVideoOption("crf", "25");
|
recorder.setVideoOption("crf", "25");
|
||||||
|
// recorder.setMaxDelay(500);
|
||||||
|
recorder.setGopSize(10);
|
||||||
recorder.setFrameRate(grabber.getFrameRate());
|
recorder.setFrameRate(grabber.getFrameRate());
|
||||||
recorder.setSampleRate(grabber.getSampleRate());
|
recorder.setSampleRate(grabber.getSampleRate());
|
||||||
recorder.setOption("flvflags", "no_duration_filesize");
|
recorder.setOption("flvflags", "no_duration_filesize");
|
||||||
@ -90,7 +148,14 @@ public class DeviceProxyService {
|
|||||||
}, time, TimeUnit.SECONDS);
|
}, time, TimeUnit.SECONDS);
|
||||||
try {
|
try {
|
||||||
AVPacket k;
|
AVPacket k;
|
||||||
while (record.get() && (k = grabber.grabPacket()) != null) {
|
int no_frame_index = 0;
|
||||||
|
while (record.get() && no_frame_index < 10 ) {
|
||||||
|
k = grabber.grabPacket();
|
||||||
|
if(k == null || k.size() <= 0 || k.data() == null) {
|
||||||
|
//空包记录次数跳过
|
||||||
|
no_frame_index++;
|
||||||
|
continue;
|
||||||
|
}
|
||||||
recorder.recordPacket(k);
|
recorder.recordPacket(k);
|
||||||
avcodec.av_packet_unref(k);
|
avcodec.av_packet_unref(k);
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user