回放和下载 添加 trying

This commit is contained in:
shikong 2024-01-10 15:36:56 +08:00
parent 2124c20c7f
commit 2d84b17fe5

View File

@ -26,6 +26,7 @@ import cn.skcks.docking.gb28181.mocking.service.ffmpeg.FfmpegSupportService;
import cn.skcks.docking.gb28181.mocking.service.zlm.hook.ZlmStreamChangeHookService;
import cn.skcks.docking.gb28181.sdp.GB28181Description;
import cn.skcks.docking.gb28181.sdp.parser.GB28181DescriptionParser;
import cn.skcks.docking.gb28181.sip.method.invite.response.InviteResponseBuilder;
import com.github.rholder.retry.Retryer;
import com.github.rholder.retry.RetryerBuilder;
import com.github.rholder.retry.StopStrategies;
@ -92,11 +93,14 @@ public class DeviceProxyService {
subscribe.getByeSubscribe().addSubscribe(key, subscriber);
int num = taskNum.incrementAndGet();
log.info("当前任务数 {}", num);
ScheduledFuture<?> schedule = trying(request);
try {
GB28181Description gb28181Description = new GB28181DescriptionParser(new String(request.getRawContent())).parse();
MediaDescription mediaDescription = (MediaDescription)gb28181Description.getMediaDescriptions(true).get(0);
boolean tcp = StringUtils.containsIgnoreCase(mediaDescription.getMedia().getProtocol(), "TCP");
zlmStreamChangeHookService.getRegistHandler().put(callId,()->{
schedule.cancel(false);
Retryer<StartSendRtpResp> retryer = RetryerBuilder.<StartSendRtpResp>newBuilder()
.retryIfResult(resp -> resp.getLocalPort() == null || resp.getLocalPort() <= 0)
.retryIfException()
@ -121,6 +125,7 @@ public class DeviceProxyService {
return startSendRtpResp;
});
} catch (Exception e) {
schedule.cancel(true);
Optional.ofNullable(zlmStreamChangeHookService.getUnregistHandler().remove(callId))
.ifPresent(ZlmStreamChangeHookService.ZlmStreamChangeHookHandler::handler);
throw new RuntimeException(e);
@ -136,6 +141,7 @@ public class DeviceProxyService {
callbackTask.put(device.getDeviceCode(), executor);
executeResultHandler.waitFor();
} catch (Exception e) {
schedule.cancel(true);
throw new RuntimeException(e);
}
};
@ -150,11 +156,13 @@ public class DeviceProxyService {
subscribe.getByeSubscribe().addSubscribe(key, subscriber);
int num = taskNum.incrementAndGet();
log.info("当前任务数 {}", num);
ScheduledFuture<?> schedule = trying(request);
try {
GB28181Description gb28181Description = new GB28181DescriptionParser(new String(request.getRawContent())).parse();
MediaDescription mediaDescription = (MediaDescription)gb28181Description.getMediaDescriptions(true).get(0);
boolean tcp = StringUtils.containsIgnoreCase(mediaDescription.getMedia().getProtocol(), "TCP");
zlmStreamChangeHookService.getRegistHandler().put(callId,()->{
schedule.cancel(false);
Retryer<StartSendRtpResp> retryer = RetryerBuilder.<StartSendRtpResp>newBuilder()
.retryIfResult(resp -> resp.getLocalPort() == null || resp.getLocalPort() <= 0)
.retryIfException()
@ -179,6 +187,7 @@ public class DeviceProxyService {
return startSendRtpResp;
});
} catch (Exception e) {
schedule.cancel(true);
Optional.ofNullable(zlmStreamChangeHookService.getUnregistHandler().remove(callId))
.ifPresent(ZlmStreamChangeHookService.ZlmStreamChangeHookHandler::handler);
throw new RuntimeException(e);
@ -194,11 +203,22 @@ public class DeviceProxyService {
downloadTask.put(device.getDeviceCode(), executor);
executeResultHandler.waitFor();
} catch (Exception e) {
schedule.cancel(true);
throw new RuntimeException(e);
}
};
}
private ScheduledFuture<?> trying(SIPRequest request){
return scheduledExecutorService.schedule(() -> {
InviteResponseBuilder inviteRequestBuilder = InviteResponseBuilder.builder().build();
Response tryingInviteResponse = inviteRequestBuilder.createTryingInviteResponse(request);
String ip = request.getLocalAddress().getHostAddress();
String transPort = request.getTopmostViaHeader().getTransport();
sender.sendResponse(ip, transPort, ((provider, ip1, port) -> tryingInviteResponse));
}, 200, TimeUnit.MILLISECONDS);
}
public Flow.Subscriber<SIPRequest> byeSubscriber(String key, MockingDevice device, ConcurrentHashMap<String, Executor> task){
return new Flow.Subscriber<>() {
@Override