程序退出时 在5秒内 关闭所有 ack 和 bye 订阅 并 关闭所有推流

This commit is contained in:
shikong 2024-01-15 11:43:07 +08:00
parent 85ec74771a
commit 2da0523310
3 changed files with 66 additions and 18 deletions

View File

@ -25,12 +25,14 @@ import lombok.RequiredArgsConstructor;
import lombok.SneakyThrows; import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
import org.springframework.context.SmartLifecycle;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import javax.sdp.*; import javax.sdp.*;
import javax.sip.RequestEvent; import javax.sip.RequestEvent;
import javax.sip.message.Request; import javax.sip.message.Request;
import javax.sip.message.Response; import javax.sip.message.Response;
import java.time.Duration;
import java.util.Date; import java.util.Date;
import java.util.EventObject; import java.util.EventObject;
import java.util.Vector; import java.util.Vector;
@ -40,7 +42,7 @@ import java.util.concurrent.*;
@RequiredArgsConstructor @RequiredArgsConstructor
@Component @Component
@SuppressWarnings("Duplicates") @SuppressWarnings("Duplicates")
public class InviteRequestProcessor implements MessageProcessor { public class InviteRequestProcessor implements MessageProcessor, SmartLifecycle {
private final SipListener sipListener; private final SipListener sipListener;
private final SipSender sender; private final SipSender sender;
@ -55,6 +57,8 @@ public class InviteRequestProcessor implements MessageProcessor {
private final DeviceProxyConfig deviceProxyConfig; private final DeviceProxyConfig deviceProxyConfig;
private boolean running;
@PostConstruct @PostConstruct
@Override @Override
public void init() { public void init() {
@ -99,8 +103,6 @@ public class InviteRequestProcessor implements MessageProcessor {
if (StringUtils.equalsAnyIgnoreCase(type, "Play", "PlayBack")) { if (StringUtils.equalsAnyIgnoreCase(type, "Play", "PlayBack")) {
log.info("点播/回放请求"); log.info("点播/回放请求");
if (StringUtils.equalsIgnoreCase(type, "Play")) { if (StringUtils.equalsIgnoreCase(type, "Play")) {
// 暂不支持实时
// sender.sendResponse(senderIp, transport, unsupported(request));
play(request, device, gb28181Description, (MediaDescription) item); play(request, device, gb28181Description, (MediaDescription) item);
} else { } else {
playback(request, device, gb28181Description, (MediaDescription) item); playback(request, device, gb28181Description, (MediaDescription) item);
@ -368,4 +370,29 @@ public class InviteRequestProcessor implements MessageProcessor {
} }
}; };
} }
@Override
public void start() {
running = true;
}
@SneakyThrows
@Override
public void stop() {
subscribe.getAckSubscribe().close();
subscribe.getByeSubscribe().close();
log.info("关闭所有 ack 和 bye 订阅 并 关闭所有推流");
Thread.sleep(Duration.ofSeconds(5).toMillis());
running = false;
}
@Override
public boolean isRunning() {
return running;
}
@Override
public int getPhase() {
return Integer.MAX_VALUE;
}
} }

View File

@ -132,11 +132,11 @@ public class DeviceProxyService {
return "rtmp://" + zlmMediaConfig.getIp() + ":" + zlmRtmpConfig.getPort() + "/live/" + callId; return "rtmp://" + zlmMediaConfig.getIp() + ":" + zlmRtmpConfig.getPort() + "/live/" + callId;
} }
private Flow.Subscriber<SIPRequest> ffmpegTask(ConcurrentHashMap<String, Executor> tasks, String callId, String key, MockingDevice device){ private Flow.Subscriber<SIPRequest> ffmpegTask(SIPRequest request,ConcurrentHashMap<String, Executor> tasks, String callId, String key, MockingDevice device){
Optional.ofNullable(tasks.get(callId)).ifPresent(task->{ Optional.ofNullable(tasks.get(callId)).ifPresent(task->{
task.getWatchdog().destroyProcess(); task.getWatchdog().destroyProcess();
}); });
Flow.Subscriber<SIPRequest> subscriber = ffmpegByeSubscriber(key, device, tasks); Flow.Subscriber<SIPRequest> subscriber = ffmpegByeSubscriber(request, key, device, tasks);
subscribe.getByeSubscribe().addSubscribe(key, subscriber); subscribe.getByeSubscribe().addSubscribe(key, subscriber);
int num = taskNum.incrementAndGet(); int num = taskNum.incrementAndGet();
log.info("当前任务数 {}", num); log.info("当前任务数 {}", num);
@ -145,7 +145,7 @@ public class DeviceProxyService {
public TaskProcessor playbackTask(){ public TaskProcessor playbackTask(){
return (SIPRequest request,String callId,String fromUrl, String toAddr,int toPort, MockingDevice device, String key, long time,String ssrc) -> { return (SIPRequest request,String callId,String fromUrl, String toAddr,int toPort, MockingDevice device, String key, long time,String ssrc) -> {
Flow.Subscriber<SIPRequest> task = ffmpegTask(callbackTask, callId, key, device); Flow.Subscriber<SIPRequest> task = ffmpegTask(request, callbackTask, callId, key, device);
ScheduledFuture<?> schedule = trying(request); ScheduledFuture<?> schedule = trying(request);
try { try {
String zlmRtpUrl = requestZlmPushStream(schedule, request, callId, fromUrl, toAddr, toPort, device, key, time, ssrc); String zlmRtpUrl = requestZlmPushStream(schedule, request, callId, fromUrl, toAddr, toPort, device, key, time, ssrc);
@ -156,6 +156,7 @@ public class DeviceProxyService {
executeResultHandler.waitFor(); executeResultHandler.waitFor();
} catch (Exception e) { } catch (Exception e) {
schedule.cancel(true); schedule.cancel(true);
sendBye(request,device,"");
throw new RuntimeException(e); throw new RuntimeException(e);
} }
}; };
@ -163,7 +164,7 @@ public class DeviceProxyService {
public TaskProcessor downloadTask(){ public TaskProcessor downloadTask(){
return (SIPRequest request,String callId,String fromUrl, String toAddr,int toPort, MockingDevice device, String key, long time,String ssrc)->{ return (SIPRequest request,String callId,String fromUrl, String toAddr,int toPort, MockingDevice device, String key, long time,String ssrc)->{
Flow.Subscriber<SIPRequest> task = ffmpegTask(downloadTask, callId, key, device); Flow.Subscriber<SIPRequest> task = ffmpegTask(request, downloadTask, callId, key, device);
ScheduledFuture<?> schedule = trying(request); ScheduledFuture<?> schedule = trying(request);
try { try {
String zlmRtpUrl = requestZlmPushStream(schedule, request, callId, fromUrl, toAddr, toPort, device, key, time, ssrc); String zlmRtpUrl = requestZlmPushStream(schedule, request, callId, fromUrl, toAddr, toPort, device, key, time, ssrc);
@ -174,6 +175,7 @@ public class DeviceProxyService {
executeResultHandler.waitFor(); executeResultHandler.waitFor();
} catch (Exception e) { } catch (Exception e) {
schedule.cancel(true); schedule.cancel(true);
sendBye(request,device,"");
throw new RuntimeException(e); throw new RuntimeException(e);
} }
}; };
@ -189,8 +191,9 @@ public class DeviceProxyService {
}, 200, TimeUnit.MILLISECONDS); }, 200, TimeUnit.MILLISECONDS);
} }
public Flow.Subscriber<SIPRequest> ffmpegByeSubscriber(String key, MockingDevice device, ConcurrentHashMap<String, Executor> task){ public Flow.Subscriber<SIPRequest> ffmpegByeSubscriber(SIPRequest inviteRequest,String key, MockingDevice device, ConcurrentHashMap<String, Executor> task){
return new Flow.Subscriber<>() { return new Flow.Subscriber<>() {
SIPRequest request;
@Override @Override
public void onSubscribe(Flow.Subscription subscription) { public void onSubscribe(Flow.Subscription subscription) {
log.info("订阅 bye {}", key); log.info("订阅 bye {}", key);
@ -199,10 +202,7 @@ public class DeviceProxyService {
@Override @Override
public void onNext(SIPRequest item) { public void onNext(SIPRequest item) {
String ip = item.getLocalAddress().getHostAddress(); request = item;
String transPort = item.getTopmostViaHeader().getTransport();
sender.sendResponse(ip, transPort, ((provider, ip1, port) ->
SipResponseBuilder.response(item, Response.OK, "OK")));
onComplete(); onComplete();
} }
@ -214,6 +214,15 @@ public class DeviceProxyService {
@Override @Override
public void onComplete() { public void onComplete() {
log.info("bye 订阅结束 {}", key); log.info("bye 订阅结束 {}", key);
if(request == null){
sendBye(inviteRequest,device,"");
} else {
String ip = request.getLocalAddress().getHostAddress();
String transPort = request.getTopmostViaHeader().getTransport();
sender.sendResponse(ip, transPort, ((provider, ip1, port) ->
SipResponseBuilder.response(request, Response.OK, "OK")));
}
subscribe.getByeSubscribe().delPublisher(key); subscribe.getByeSubscribe().delPublisher(key);
Optional.ofNullable(task.get(device.getDeviceCode())).ifPresent(task -> { Optional.ofNullable(task.get(device.getDeviceCode())).ifPresent(task -> {
task.getWatchdog().destroyProcess(); task.getWatchdog().destroyProcess();
@ -223,8 +232,9 @@ public class DeviceProxyService {
}; };
} }
public Flow.Subscriber<SIPRequest> zlmByeSubscriber(String key, MockingDevice device){ public Flow.Subscriber<SIPRequest> zlmByeSubscriber(String key, SIPRequest inviteRequest,MockingDevice device){
return new Flow.Subscriber<>() { return new Flow.Subscriber<>() {
private SIPRequest request;
@Override @Override
public void onSubscribe(Flow.Subscription subscription) { public void onSubscribe(Flow.Subscription subscription) {
log.info("订阅 bye {}", key); log.info("订阅 bye {}", key);
@ -233,10 +243,7 @@ public class DeviceProxyService {
@Override @Override
public void onNext(SIPRequest item) { public void onNext(SIPRequest item) {
String ip = item.getLocalAddress().getHostAddress(); request = item;
String transPort = item.getTopmostViaHeader().getTransport();
sender.sendResponse(ip, transPort, ((provider, ip1, port) ->
SipResponseBuilder.response(item, Response.OK, "OK")));
subscribe.getByeSubscribe().delPublisher(key); subscribe.getByeSubscribe().delPublisher(key);
} }
@ -246,6 +253,15 @@ public class DeviceProxyService {
@Override @Override
public void onComplete() { public void onComplete() {
log.info("bye 订阅结束 {}", key); log.info("bye 订阅结束 {}", key);
if(request == null){
sendBye(inviteRequest,device,"");
} else {
String ip = request.getLocalAddress().getHostAddress();
String transPort = request.getTopmostViaHeader().getTransport();
sender.sendResponse(ip, transPort, ((provider, ip1, port) ->
SipResponseBuilder.response(request, Response.OK, "OK")));
}
String cacheKey = CacheUtil.getKey("INVITE", "PROXY", key); String cacheKey = CacheUtil.getKey("INVITE", "PROXY", key);
String proxyKey = RedisUtil.StringOps.get(cacheKey); String proxyKey = RedisUtil.StringOps.get(cacheKey);
log.info("关闭拉流代理 {}", zlmMediaService.delStreamProxy(proxyKey)); log.info("关闭拉流代理 {}", zlmMediaService.delStreamProxy(proxyKey));
@ -323,7 +339,7 @@ public class DeviceProxyService {
}); });
// zlmStreamChangeHookService.getUnregistHandler().put(callId,()-> sendBye(request,device,key)); // zlmStreamChangeHookService.getUnregistHandler().put(callId,()-> sendBye(request,device,key));
Flow.Subscriber<SIPRequest> subscriber = zlmByeSubscriber(key,device); Flow.Subscriber<SIPRequest> subscriber = zlmByeSubscriber(key,request,device);
subscribe.getByeSubscribe().addPublisher(key); subscribe.getByeSubscribe().addPublisher(key);
subscribe.getByeSubscribe().addSubscribe(key, subscriber); subscribe.getByeSubscribe().addSubscribe(key, subscriber);
} catch (Exception e) { } catch (Exception e) {

View File

@ -38,6 +38,11 @@ public class SipStarter implements SmartLifecycle {
isRunning = false; isRunning = false;
} }
@Override
public int getPhase() {
return 0;
}
@Override @Override
public boolean isRunning() { public boolean isRunning() {
return isRunning; return isRunning;