From 65445b06c261b7e3d422ef8fc4e49e4895686fff Mon Sep 17 00:00:00 2001 From: chenparty <870300816@qq.com> Date: Tue, 23 Nov 2021 17:09:15 +0800 Subject: [PATCH] =?UTF-8?q?=E5=A2=9E=E5=8A=A0=E5=9B=9E=E6=94=BE=E6=8E=A7?= =?UTF-8?q?=E5=88=B6=E5=8A=9F=E8=83=BD=EF=BC=88=E5=9B=9E=E6=94=BE=E6=9A=82?= =?UTF-8?q?=E5=81=9C=E3=80=81=E5=9B=9E=E6=94=BE=E6=81=A2=E5=A4=8D=E3=80=81?= =?UTF-8?q?=E5=9B=9E=E6=94=BE=E6=8B=96=E5=8A=A8=E6=92=AD=E6=94=BE=E3=80=81?= =?UTF-8?q?=E5=9B=9E=E6=94=BE=E5=80=8D=E9=80=9F=E6=92=AD=E6=94=BE=EF=BC=89?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../gb28181/transmit/cmd/ISIPCommander.java | 21 ++++ .../cmd/SIPRequestHeaderProvider.java | 53 +++++++++ .../transmit/cmd/impl/SIPCommander.java | 108 ++++++++++++++++++ .../gb28181/playback/PlaybackController.java | 95 +++++++++++++++ .../gb28181/session/InfoCseqCache.java | 14 +++ 5 files changed, 291 insertions(+) create mode 100644 src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/session/InfoCseqCache.java diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/ISIPCommander.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/ISIPCommander.java index 9f413779..6238b70c 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/ISIPCommander.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/ISIPCommander.java @@ -1,5 +1,6 @@ package com.genersoft.iot.vmp.gb28181.transmit.cmd; +import com.genersoft.iot.vmp.common.StreamInfo; import com.genersoft.iot.vmp.gb28181.bean.Device; import com.genersoft.iot.vmp.gb28181.event.SipSubscribe; import com.genersoft.iot.vmp.media.zlm.ZLMHttpHookSubscribe; @@ -121,6 +122,26 @@ public interface ISIPCommander { void streamByeCmd(String deviceId, String channelId, SipSubscribe.Event okEvent); void streamByeCmd(String deviceId, String channelId); + /** + * 回放暂停 + */ + void playPauseCmd(Device device, StreamInfo streamInfo); + + /** + * 回放恢复 + */ + void playResumeCmd(Device device, StreamInfo streamInfo); + + /** + * 回放拖动播放 + */ + void playSeekCmd(Device device, StreamInfo streamInfo, long seekTime); + + /** + * 回放倍速播放 + */ + void playSpeedCmd(Device device, StreamInfo streamInfo, String speed); + /** * 语音广播 * diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/SIPRequestHeaderProvider.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/SIPRequestHeaderProvider.java index bb62902a..98ea7c9c 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/SIPRequestHeaderProvider.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/SIPRequestHeaderProvider.java @@ -3,6 +3,7 @@ package com.genersoft.iot.vmp.gb28181.transmit.cmd; import java.text.ParseException; import java.util.ArrayList; +import javax.sip.Dialog; import javax.sip.InvalidArgumentException; import javax.sip.PeerUnavailableException; import javax.sip.SipFactory; @@ -11,6 +12,9 @@ import javax.sip.address.SipURI; import javax.sip.header.*; import javax.sip.message.Request; +import com.genersoft.iot.vmp.common.StreamInfo; +import com.genersoft.iot.vmp.gb28181.session.VideoStreamSessionManager; +import com.genersoft.iot.vmp.vmanager.gb28181.session.InfoCseqCache; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; @@ -30,6 +34,9 @@ public class SIPRequestHeaderProvider { @Autowired private SipFactory sipFactory; + + @Autowired + private VideoStreamSessionManager streamSession; public Request createMessageRequest(Device device, String content, String viaTag, String fromTag, String toTag, CallIdHeader callIdHeader) throws ParseException, InvalidArgumentException, PeerUnavailableException { Request request = null; @@ -210,4 +217,50 @@ public class SIPRequestHeaderProvider { request.setContent(content, contentTypeHeader); return request; } + + public Request createInfoRequest(Device device, StreamInfo streamInfo, String content) + throws PeerUnavailableException, ParseException, InvalidArgumentException { + Request request = null; + Dialog dialog = streamSession.getDialog(streamInfo.getDeviceID(), streamInfo.getChannelId()); + + SipURI requestLine = sipFactory.createAddressFactory().createSipURI(device.getDeviceId(), + device.getHostAddress()); + // via + ArrayList viaHeaders = new ArrayList(); + ViaHeader viaHeader = sipFactory.createHeaderFactory().createViaHeader(device.getIp(), device.getPort(), + device.getTransport(), null); + viaHeader.setRPort(); + viaHeaders.add(viaHeader); + // from + SipURI fromSipURI = sipFactory.createAddressFactory().createSipURI(sipConfig.getId(), + sipConfig.getDomain()); + Address fromAddress = sipFactory.createAddressFactory().createAddress(fromSipURI); + FromHeader fromHeader = sipFactory.createHeaderFactory().createFromHeader(fromAddress, dialog.getLocalTag()); + // to + SipURI toSipURI = sipFactory.createAddressFactory().createSipURI(streamInfo.getChannelId(), + sipConfig.getDomain()); + Address toAddress = sipFactory.createAddressFactory().createAddress(toSipURI); + ToHeader toHeader = sipFactory.createHeaderFactory().createToHeader(toAddress, dialog.getRemoteTag()); + + // callid + CallIdHeader callIdHeader = dialog.getCallId(); + + // Forwards + MaxForwardsHeader maxForwards = sipFactory.createHeaderFactory().createMaxForwardsHeader(70); + + // ceq + CSeqHeader cSeqHeader = sipFactory.createHeaderFactory() + .createCSeqHeader(InfoCseqCache.CSEQCACHE.get(streamInfo.getStreamId()), Request.INFO); + + request = sipFactory.createMessageFactory().createRequest(requestLine, Request.INFO, callIdHeader, cSeqHeader, + fromHeader, toHeader, viaHeaders, maxForwards); + Address concatAddress = sipFactory.createAddressFactory().createAddress(sipFactory.createAddressFactory() + .createSipURI(sipConfig.getId(), sipConfig.getIp() + ":" + sipConfig.getPort())); + request.addHeader(sipFactory.createHeaderFactory().createContactHeader(concatAddress)); + + ContentTypeHeader contentTypeHeader = sipFactory.createHeaderFactory().createContentTypeHeader("Application", + "MANSRTSP"); + request.setContent(content, contentTypeHeader); + return request; + } } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java index 61647aa3..d90705c3 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java @@ -1,6 +1,7 @@ package com.genersoft.iot.vmp.gb28181.transmit.cmd.impl; import com.alibaba.fastjson.JSONObject; +import com.genersoft.iot.vmp.common.StreamInfo; import com.genersoft.iot.vmp.conf.SipConfig; import com.genersoft.iot.vmp.conf.UserSetup; import com.genersoft.iot.vmp.gb28181.bean.Device; @@ -17,6 +18,7 @@ import com.genersoft.iot.vmp.service.IMediaServerService; import com.genersoft.iot.vmp.service.bean.SSRCInfo; import com.genersoft.iot.vmp.storager.IRedisCatchStorage; import com.genersoft.iot.vmp.storager.IVideoManagerStorager; +import com.genersoft.iot.vmp.vmanager.gb28181.session.InfoCseqCache; import gov.nist.javax.sip.SipProviderImpl; import gov.nist.javax.sip.SipStackImpl; import gov.nist.javax.sip.message.SIPRequest; @@ -1543,4 +1545,110 @@ public class SIPCommander implements ISIPCommander { clientTransaction.sendRequest(); return clientTransaction; } + + /** + * 回放暂停 + */ + @Override + public void playPauseCmd(Device device, StreamInfo streamInfo) { + try { + + StringBuffer content = new StringBuffer(200); + content.append("PAUSE RTSP/1.0\r\n"); + content.append("CSeq: " + InfoCseqCache.CSEQCACHE.get(streamInfo.getStreamId()) + "\r\n"); + content.append("PauseTime: now\r\n"); + Request request = headerProvider.createInfoRequest(device, streamInfo, content.toString()); + logger.info(request.toString()); + ClientTransaction clientTransaction = null; + if ("TCP".equals(device.getTransport())) { + clientTransaction = tcpSipProvider.getNewClientTransaction(request); + } else if ("UDP".equals(device.getTransport())) { + clientTransaction = udpSipProvider.getNewClientTransaction(request); + } + if (clientTransaction != null) { + clientTransaction.sendRequest(); + } + + } catch (SipException | ParseException | InvalidArgumentException e) { + e.printStackTrace(); + } + } + + /** + * 回放恢复 + */ + @Override + public void playResumeCmd(Device device, StreamInfo streamInfo) { + try { + StringBuffer content = new StringBuffer(200); + content.append("PLAY RTSP/1.0\r\n"); + content.append("CSeq: " + InfoCseqCache.CSEQCACHE.get(streamInfo.getStreamId()) + "\r\n"); + content.append("Range: npt=now-\r\n"); + Request request = headerProvider.createInfoRequest(device, streamInfo, content.toString()); + logger.info(request.toString()); + ClientTransaction clientTransaction = null; + if ("TCP".equals(device.getTransport())) { + clientTransaction = tcpSipProvider.getNewClientTransaction(request); + } else if ("UDP".equals(device.getTransport())) { + clientTransaction = udpSipProvider.getNewClientTransaction(request); + } + + clientTransaction.sendRequest(); + + } catch (SipException | ParseException | InvalidArgumentException e) { + e.printStackTrace(); + } + } + + /** + * 回放拖动播放 + */ + @Override + public void playSeekCmd(Device device, StreamInfo streamInfo, long seekTime) { + try { + StringBuffer content = new StringBuffer(200); + content.append("PLAY RTSP/1.0\r\n"); + content.append("CSeq: " + InfoCseqCache.CSEQCACHE.get(streamInfo.getStreamId()) + "\r\n"); + content.append("Range: npt=" + seekTime + "-\r\n"); + Request request = headerProvider.createInfoRequest(device, streamInfo, content.toString()); + logger.info(request.toString()); + ClientTransaction clientTransaction = null; + if ("TCP".equals(device.getTransport())) { + clientTransaction = tcpSipProvider.getNewClientTransaction(request); + } else if ("UDP".equals(device.getTransport())) { + clientTransaction = udpSipProvider.getNewClientTransaction(request); + } + + clientTransaction.sendRequest(); + + } catch (SipException | ParseException | InvalidArgumentException e) { + e.printStackTrace(); + } + } + + /** + * 回放倍速播放 + */ + @Override + public void playSpeedCmd(Device device, StreamInfo streamInfo, String speed) { + try { + StringBuffer content = new StringBuffer(200); + content.append("PLAY RTSP/1.0\r\n"); + content.append("CSeq: " + InfoCseqCache.CSEQCACHE.get(streamInfo.getStreamId()) + "\r\n"); + content.append("Scale: " + speed + ".000000\r\n"); + Request request = headerProvider.createInfoRequest(device, streamInfo, content.toString()); + logger.info(request.toString()); + ClientTransaction clientTransaction = null; + if ("TCP".equals(device.getTransport())) { + clientTransaction = tcpSipProvider.getNewClientTransaction(request); + } else if ("UDP".equals(device.getTransport())) { + clientTransaction = udpSipProvider.getNewClientTransaction(request); + } + + clientTransaction.sendRequest(); + + } catch (SipException | ParseException | InvalidArgumentException e) { + e.printStackTrace(); + } + } } diff --git a/src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/playback/PlaybackController.java b/src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/playback/PlaybackController.java index 98df8ddf..90ecfd42 100644 --- a/src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/playback/PlaybackController.java +++ b/src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/playback/PlaybackController.java @@ -9,6 +9,7 @@ import com.genersoft.iot.vmp.service.IMediaServerService; import com.genersoft.iot.vmp.service.bean.SSRCInfo; import com.genersoft.iot.vmp.storager.IRedisCatchStorage; import com.genersoft.iot.vmp.service.IPlayService; +import com.genersoft.iot.vmp.vmanager.gb28181.session.InfoCseqCache; import io.swagger.annotations.Api; import io.swagger.annotations.ApiImplicitParam; import io.swagger.annotations.ApiImplicitParams; @@ -152,4 +153,98 @@ public class PlaybackController { return new ResponseEntity(HttpStatus.INTERNAL_SERVER_ERROR); } } + + @ApiOperation("回放暂停") + @ApiImplicitParams({ + @ApiImplicitParam(name = "streamId", value = "回放流ID", dataTypeClass = String.class), + }) + @GetMapping("/pause/{streamId}") + public ResponseEntity playPause(@PathVariable String streamId) { + logger.info("playPause: "+streamId); + JSONObject json = new JSONObject(); + StreamInfo streamInfo = redisCatchStorage.queryPlaybackByStreamId(streamId); + if (null == streamInfo) { + json.put("msg", "streamId不存在"); + logger.warn("streamId不存在!"); + return new ResponseEntity(json.toString(), HttpStatus.BAD_REQUEST); + } + setCseq(streamId); + Device device = storager.queryVideoDevice(streamInfo.getDeviceID()); + cmder.playPauseCmd(device, streamInfo); + json.put("msg", "ok"); + return new ResponseEntity(json.toString(), HttpStatus.OK); + } + + @ApiOperation("回放恢复") + @ApiImplicitParams({ + @ApiImplicitParam(name = "streamId", value = "回放流ID", dataTypeClass = String.class), + }) + @GetMapping("/resume/{streamId}") + public ResponseEntity playResume(@PathVariable String streamId) { + logger.info("playResume: "+streamId); + JSONObject json = new JSONObject(); + StreamInfo streamInfo = redisCatchStorage.queryPlaybackByStreamId(streamId); + if (null == streamInfo) { + json.put("msg", "streamId不存在"); + logger.warn("streamId不存在!"); + return new ResponseEntity(json.toString(), HttpStatus.BAD_REQUEST); + } + setCseq(streamId); + Device device = storager.queryVideoDevice(streamInfo.getDeviceID()); + cmder.playResumeCmd(device, streamInfo); + json.put("msg", "ok"); + return new ResponseEntity(json.toString(), HttpStatus.OK); + } + + @ApiOperation("回放拖动播放") + @ApiImplicitParams({ + @ApiImplicitParam(name = "streamId", value = "回放流ID", dataTypeClass = String.class), + @ApiImplicitParam(name = "seekTime", value = "拖动偏移量,单位s", dataTypeClass = Long.class), + }) + @GetMapping("/seek/{streamId}/{seekTime}") + public ResponseEntity playSeek(@PathVariable String streamId, @PathVariable long seekTime) { + logger.info("playSeek: "+streamId+", "+seekTime); + JSONObject json = new JSONObject(); + StreamInfo streamInfo = redisCatchStorage.queryPlaybackByStreamId(streamId); + if (null == streamInfo) { + json.put("msg", "streamId不存在"); + logger.warn("streamId不存在!"); + return new ResponseEntity(json.toString(), HttpStatus.BAD_REQUEST); + } + setCseq(streamId); + Device device = storager.queryVideoDevice(streamInfo.getDeviceID()); + cmder.playSeekCmd(device, streamInfo, seekTime); + json.put("msg", "ok"); + return new ResponseEntity(json.toString(), HttpStatus.OK); + } + + @ApiOperation("回放倍速播放") + @ApiImplicitParams({ + @ApiImplicitParam(name = "streamId", value = "回放流ID", dataTypeClass = String.class), + @ApiImplicitParam(name = "speed", value = "倍速 1、2、4", dataTypeClass = String.class), + }) + @GetMapping("/speed/{streamId}/{speed}") + public ResponseEntity playSpeed(@PathVariable String streamId, @PathVariable String speed) { + logger.info("playSpeed: "+streamId+", "+speed); + JSONObject json = new JSONObject(); + StreamInfo streamInfo = redisCatchStorage.queryPlaybackByStreamId(streamId); + if (null == streamInfo) { + json.put("msg", "streamId不存在"); + logger.warn("streamId不存在!"); + return new ResponseEntity(json.toString(), HttpStatus.BAD_REQUEST); + } + setCseq(streamId); + Device device = storager.queryVideoDevice(streamInfo.getDeviceID()); + cmder.playSpeedCmd(device, streamInfo, speed); + json.put("msg", "ok"); + return new ResponseEntity(json.toString(), HttpStatus.OK); + } + + public void setCseq(String streamId) { + if (InfoCseqCache.CSEQCACHE.containsKey(streamId)) { + InfoCseqCache.CSEQCACHE.put(streamId, InfoCseqCache.CSEQCACHE.get(streamId) + 1); + } else { + InfoCseqCache.CSEQCACHE.put(streamId, 2L); + } + } } diff --git a/src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/session/InfoCseqCache.java b/src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/session/InfoCseqCache.java new file mode 100644 index 00000000..051f9817 --- /dev/null +++ b/src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/session/InfoCseqCache.java @@ -0,0 +1,14 @@ +package com.genersoft.iot.vmp.vmanager.gb28181.session; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +/** + * @ClassName: InfoCseqCache + * @Description: INFO类型的Sip中cseq的缓存 + */ +public class InfoCseqCache { + + public static Map CSEQCACHE = new ConcurrentHashMap<>(); + +} \ No newline at end of file