历史回放

This commit is contained in:
shikong 2023-08-31 16:14:59 +08:00
parent f23b64038d
commit d17dfb7f76
4 changed files with 196 additions and 3 deletions

View File

@ -4,6 +4,8 @@ import cn.skcks.docking.gb28181.annotation.web.JsonMapping;
import cn.skcks.docking.gb28181.annotation.web.methods.GetJson;
import cn.skcks.docking.gb28181.api.play.dto.RealTimePlayDTO;
import cn.skcks.docking.gb28181.api.play.dto.RealTimeStopDTO;
import cn.skcks.docking.gb28181.api.play.dto.RecordPlayDTO;
import cn.skcks.docking.gb28181.api.play.dto.RecordStopDTO;
import cn.skcks.docking.gb28181.common.json.JsonResponse;
import cn.skcks.docking.gb28181.config.SwaggerConfig;
import cn.skcks.docking.gb28181.service.play.PlayService;
@ -16,7 +18,7 @@ import org.springframework.validation.annotation.Validated;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.context.request.async.DeferredResult;
@Tag(name="播放")
@Tag(name = "播放")
@RestController
@JsonMapping("/api/device/play")
@RequiredArgsConstructor
@ -29,12 +31,22 @@ public class PlayController {
}
@GetJson("/realTimePlay")
public DeferredResult<JsonResponse<String>> realTimePlay(@ParameterObject @Validated RealTimePlayDTO dto){
public DeferredResult<JsonResponse<String>> realTimePlay(@ParameterObject @Validated RealTimePlayDTO dto) {
return playService.realTimePlay(dto.getDeviceId(), dto.getChannelId(), dto.getTimeout());
}
@GetJson("/realtimeStop")
public JsonResponse<Void> realTimeStop(@ParameterObject @Validated RealTimeStopDTO dto){
public JsonResponse<Void> realTimeStop(@ParameterObject @Validated RealTimeStopDTO dto) {
return playService.realTimeStop(dto.getDeviceId(), dto.getChannelId());
}
@GetJson("/recordPlay")
public DeferredResult<JsonResponse<String>> recordPlay(@ParameterObject @Validated RecordPlayDTO dto) {
return playService.recordPlay(dto.getDeviceId(), dto.getChannelId(), dto.getStartTime(), dto.getEndTime(), dto.getTimeout());
}
@GetJson("/recordStop")
public JsonResponse<Void> recordStop(@ParameterObject @Validated RecordStopDTO dto) {
return playService.recordStop(dto.getDeviceId(), dto.getChannelId(), dto.getStartTime(), dto.getEndTime());
}
}

View File

@ -0,0 +1,38 @@
package cn.skcks.docking.gb28181.api.play.dto;
import cn.hutool.core.date.DatePattern;
import cn.skcks.docking.gb28181.core.sip.gb28181.constant.GB28181Constant;
import com.fasterxml.jackson.annotation.JsonFormat;
import io.swagger.v3.oas.annotations.media.Schema;
import jakarta.validation.constraints.Min;
import jakarta.validation.constraints.NotBlank;
import lombok.Data;
import org.springframework.format.annotation.DateTimeFormat;
import java.util.Date;
@Schema(title = "历史回放")
@Data
public class RecordPlayDTO {
@NotBlank
@Schema(description = "设备id", example = "44050100001180000001")
private String deviceId;
@NotBlank
@Schema(description = "通道id", example = "44050100001180000001")
private String channelId;
@DateTimeFormat(pattern= DatePattern.NORM_DATETIME_PATTERN)
@JsonFormat(pattern = DatePattern.UTC_SIMPLE_PATTERN, timezone = GB28181Constant.TIME_ZONE)
@Schema(description = "开始时间", example = "2023-08-31 00:00:00")
private Date startTime;
@DateTimeFormat(pattern= DatePattern.NORM_DATETIME_PATTERN)
@JsonFormat(pattern = DatePattern.UTC_SIMPLE_PATTERN, timezone = GB28181Constant.TIME_ZONE)
@Schema(description = "结束时间", example = "2023-08-31 00:15:00")
private Date endTime;
@Min(30)
@Schema(description = "超时时间(秒)", example = "30")
private long timeout = 30;
}

View File

@ -0,0 +1,33 @@
package cn.skcks.docking.gb28181.api.play.dto;
import cn.hutool.core.date.DatePattern;
import cn.skcks.docking.gb28181.core.sip.gb28181.constant.GB28181Constant;
import com.fasterxml.jackson.annotation.JsonFormat;
import io.swagger.v3.oas.annotations.media.Schema;
import jakarta.validation.constraints.NotBlank;
import lombok.Data;
import org.springframework.format.annotation.DateTimeFormat;
import java.util.Date;
@Schema(title = "关闭历史回放")
@Data
public class RecordStopDTO {
@NotBlank
@Schema(description = "设备id", example = "44050100001180000001")
private String deviceId;
@NotBlank
@Schema(description = "通道id", example = "44050100001180000001")
private String channelId;
@DateTimeFormat(pattern= DatePattern.NORM_DATETIME_PATTERN)
@JsonFormat(pattern = DatePattern.UTC_SIMPLE_PATTERN, timezone = GB28181Constant.TIME_ZONE)
@Schema(description = "开始时间", example = "2023-08-31 00:00:00")
private Date startTime;
@DateTimeFormat(pattern= DatePattern.NORM_DATETIME_PATTERN)
@JsonFormat(pattern = DatePattern.UTC_SIMPLE_PATTERN, timezone = GB28181Constant.TIME_ZONE)
@Schema(description = "结束时间", example = "2023-08-31 00:15:00")
private Date endTime;
}

View File

@ -215,6 +215,116 @@ public class PlayService {
result.setResult(JsonResponse.success(videoUrl(streamId)));
return result;
}
GetRtpInfoResp rtpInfo = zlmMediaService.getRtpInfo(streamId);
if(rtpInfo.getExist()){
result.setResult(JsonResponse.error(MessageFormat.format("流 {0} 已存在", streamId)));
return result;
}
int streamMode = device.getStreamMode() == null || device.getStreamMode().equalsIgnoreCase(ListeningPoint.UDP) ? 0 : 1;
OpenRtpServer openRtpServer = new OpenRtpServer();
openRtpServer.setPort(0);
openRtpServer.setStreamId(streamId);
openRtpServer.setTcpMode(streamMode);
OpenRtpServerResp openRtpServerResp = zlmMediaService.openRtpServer(openRtpServer);
log.info("openRtpServerResp => {}", openRtpServerResp);
if(!openRtpServerResp.getCode().equals(ResponseStatus.Success)){
result.setResult(JsonResponse.error(openRtpServerResp.getCode().getMsg()));
return result;
}
String ip = zlmMediaConfig.getIp();
int port = openRtpServerResp.getPort();
String ssrc = ssrcService.getPlaySsrc();
GB28181Description description = MediaSdpHelper.playback(deviceId, channelId, Connection.IP4, ip, port, ssrc, StreamMode.of(device.getStreamMode()),startTime,endTime);
String transport = device.getTransport();
String senderIp = device.getLocalIp();
SipProvider provider = sipService.getProvider(transport, senderIp);
CallIdHeader callId = provider.getNewCallId();
Request request = SipRequestBuilder.createInviteRequest(device, channelId, description.toString(), SipUtil.generateViaTag(), SipUtil.generateFromTag(), null, ssrc, callId);
String subscribeKey = GenericSubscribe.Helper.getKey(MessageProcessor.Method.INVITE, callId.getCallId());
subscribe.getInviteSubscribe().addPublisher(subscribeKey);
Flow.Subscriber<SIPResponse> subscriber = new Flow.Subscriber<>() {
private Flow.Subscription subscription;
@Override
public void onSubscribe(Flow.Subscription subscription) {
this.subscription = subscription;
log.info("订阅 {} {}",MessageProcessor.Method.INVITE,subscribeKey);
subscription.request(1);
}
@Override
public void onNext(SIPResponse item) {
int statusCode = item.getStatusCode();
log.debug("{} 收到订阅消息 {}", subscribeKey, item);
if(statusCode == Response.TRYING){
log.info("订阅 {} {} 尝试连接流媒体服务", MessageProcessor.Method.INVITE,subscribeKey);
subscription.request(1);
} else if(statusCode>=Response.OK && statusCode < Response.MULTIPLE_CHOICES){
log.info("订阅 {} {} 流媒体服务连接成功, 开始推送视频流", MessageProcessor.Method.INVITE,subscribeKey);
RedisUtil.StringOps.set(key, JsonUtils.toCompressJson(new SipTransactionInfo(item)));
RedisUtil.StringOps.set(CacheUtil.getKey(key,"ssrc"), ssrc);
result.setResult(JsonResponse.success(videoUrl(streamId)));
onComplete();
} else {
log.info("订阅 {} {} 连接流媒体服务时出现异常, 终止订阅", MessageProcessor.Method.INVITE,subscribeKey);
RedisUtil.KeyOps.delete(key);
RedisUtil.KeyOps.delete(CacheUtil.getKey(key,"ssrc"));
result.setResult(JsonResponse.error("连接流媒体服务失败"));
ssrcService.releaseSsrc(zlmMediaConfig.getId(), ssrc);
onComplete();
}
}
@Override
public void onError(Throwable throwable) {
}
@Override
public void onComplete() {
subscribe.getRecordInfoSubscribe().delPublisher(subscribeKey);
}
};
subscribe.getInviteSubscribe().addSubscribe(subscribeKey, subscriber);
sender.send(senderIp, request);
result.onTimeout(()->{
subscribe.getInviteSubscribe().delPublisher(subscribeKey);
result.setResult(JsonResponse.error("点播超时"));
});
return result;
}
@SneakyThrows
public JsonResponse<Void> recordStop(String deviceId, String channelId, Date startTime, Date endTime){
DockingDevice device = deviceService.getDevice(deviceId);
if (device == null) {
log.info("未能找到 编码为 => {} 的设备", deviceId);
return JsonResponse.error(null, "未找到设备");
}
long start = startTime.toInstant().getEpochSecond();
long end = endTime.toInstant().getEpochSecond();
String streamId = MediaSdpHelper.getStreamId(deviceId,channelId,String.valueOf(start), String.valueOf(end));
String key = CacheUtil.getKey(MediaSdpHelper.Action.PLAY_BACK.getAction(), deviceId, channelId);
String ssrcKey = CacheUtil.getKey(key,"ssrc");
zlmMediaService.closeRtpServer(new CloseRtpServer(streamId));
SipTransactionInfo transactionInfo = JsonUtils.parse(RedisUtil.StringOps.get(key), SipTransactionInfo.class);
if(transactionInfo == null){
return JsonResponse.error("未找到连接信息");
}
Request request = SipRequestBuilder.createByeRequest(device, channelId, transactionInfo);
String senderIp = device.getLocalIp();
sender.send(senderIp, request);
String ssrc = RedisUtil.StringOps.get(ssrcKey);
ssrcService.releaseSsrc(zlmMediaConfig.getId(),ssrc);
RedisUtil.KeyOps.delete(ssrcKey);
RedisUtil.KeyOps.delete(key);
return JsonResponse.success(null);
}
}