diff --git a/gb28181-service/src/main/java/cn/skcks/docking/gb28181/core/sip/listener/SipListenerImpl.java b/gb28181-service/src/main/java/cn/skcks/docking/gb28181/core/sip/listener/SipListenerImpl.java index 3b58ac4..8a075d7 100644 --- a/gb28181-service/src/main/java/cn/skcks/docking/gb28181/core/sip/listener/SipListenerImpl.java +++ b/gb28181-service/src/main/java/cn/skcks/docking/gb28181/core/sip/listener/SipListenerImpl.java @@ -7,8 +7,10 @@ import org.springframework.scheduling.annotation.Async; import org.springframework.stereotype.Component; import javax.sip.*; +import javax.sip.header.CSeqHeader; import javax.sip.header.CallIdHeader; import javax.sip.message.Request; +import javax.sip.message.Response; import java.util.Optional; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; @@ -34,7 +36,48 @@ public class SipListenerImpl implements SipListener { @Override public void processResponse(ResponseEvent responseEvent) { + Response response = responseEvent.getResponse(); + int status = response.getStatusCode(); + // Success + if (((status >= Response.OK) && (status < Response.MULTIPLE_CHOICES)) || status == Response.UNAUTHORIZED) { + CSeqHeader cseqHeader = (CSeqHeader) responseEvent.getResponse().getHeader(CSeqHeader.NAME); + String method = cseqHeader.getMethod(); + // ISIPResponseProcessor sipRequestProcessor = responseProcessorMap.get(method); + // if (sipRequestProcessor != null) { + // sipRequestProcessor.process(responseEvent); + // } + + // if (status != Response.UNAUTHORIZED && responseEvent.getResponse() != null && sipSubscribe.getOkSubscribesSize() > 0 ) { + // CallIdHeader callIdHeader = (CallIdHeader)responseEvent.getResponse().getHeader(CallIdHeader.NAME); + // if (callIdHeader != null) { + // SipSubscribe.Event subscribe = sipSubscribe.getOkSubscribe(callIdHeader.getCallId()); + // if (subscribe != null) { + // SipSubscribe.EventResult eventResult = new SipSubscribe.EventResult(responseEvent); + // sipSubscribe.removeOkSubscribe(callIdHeader.getCallId()); + // subscribe.response(eventResult); + // } + // } + // } + } else if ((status >= Response.TRYING) && (status < Response.OK)) { + // 增加其它无需回复的响应,如101、180等 + } else { + log.warn("接收到失败的response响应!status:" + status + ",message:" + response.getReasonPhrase()); + // if (responseEvent.getResponse() != null && sipSubscribe.getErrorSubscribesSize() > 0 ) { + // CallIdHeader callIdHeader = (CallIdHeader)responseEvent.getResponse().getHeader(CallIdHeader.NAME); + // if (callIdHeader != null) { + // SipSubscribe.Event subscribe = sipSubscribe.getErrorSubscribe(callIdHeader.getCallId()); + // if (subscribe != null) { + // SipSubscribe.EventResult eventResult = new SipSubscribe.EventResult(responseEvent); + // subscribe.response(eventResult); + // sipSubscribe.removeErrorSubscribe(callIdHeader.getCallId()); + // } + // } + // } + if (responseEvent.getDialog() != null) { + responseEvent.getDialog().delete(); + } + } } @Override diff --git a/gb28181-service/src/main/java/cn/skcks/docking/gb28181/core/sip/message/processor/request/RegisterRequestProcessor.java b/gb28181-service/src/main/java/cn/skcks/docking/gb28181/core/sip/message/processor/request/RegisterRequestProcessor.java index f99f617..bbc96fa 100644 --- a/gb28181-service/src/main/java/cn/skcks/docking/gb28181/core/sip/message/processor/request/RegisterRequestProcessor.java +++ b/gb28181-service/src/main/java/cn/skcks/docking/gb28181/core/sip/message/processor/request/RegisterRequestProcessor.java @@ -5,6 +5,7 @@ import cn.skcks.docking.gb28181.core.sip.dto.RemoteInfo; import cn.skcks.docking.gb28181.core.sip.listener.SipListener; import cn.skcks.docking.gb28181.core.sip.message.auth.DigestServerAuthenticationHelper; import cn.skcks.docking.gb28181.core.sip.message.processor.MessageProcessor; +import cn.skcks.docking.gb28181.core.sip.message.sender.SipMessageSender; import cn.skcks.docking.gb28181.core.sip.utils.SipUtil; import gov.nist.javax.sip.address.SipUri; import gov.nist.javax.sip.header.Authorization; @@ -27,6 +28,7 @@ public class RegisterRequestProcessor implements MessageProcessor { private final static String METHOD = "REGISTER"; private final SipListener sipListener; + private final SipMessageSender sender; private final SipConfig sipConfig; @@ -56,5 +58,6 @@ public class RegisterRequestProcessor implements MessageProcessor { Response response = getMessageFactory().createResponse(Response.UNAUTHORIZED, request); DigestServerAuthenticationHelper.generateChallenge(getHeaderFactory(),response,sipConfig.getDomain()); + sender.send(request.getLocalAddress().getHostAddress(),response); } } diff --git a/gb28181-service/src/main/java/cn/skcks/docking/gb28181/core/sip/message/send/SipMessageSender.java b/gb28181-service/src/main/java/cn/skcks/docking/gb28181/core/sip/message/send/SipMessageSender.java deleted file mode 100644 index e03b9da..0000000 --- a/gb28181-service/src/main/java/cn/skcks/docking/gb28181/core/sip/message/send/SipMessageSender.java +++ /dev/null @@ -1,13 +0,0 @@ -package cn.skcks.docking.gb28181.core.sip.message.send; - -import cn.skcks.docking.gb28181.core.sip.service.SipService; -import lombok.RequiredArgsConstructor; -import lombok.extern.slf4j.Slf4j; -import org.springframework.stereotype.Component; - -@Slf4j -@RequiredArgsConstructor -@Component -public class SipMessageSender { - private final SipService sipService; -} diff --git a/gb28181-service/src/main/java/cn/skcks/docking/gb28181/core/sip/message/sender/SipMessageSender.java b/gb28181-service/src/main/java/cn/skcks/docking/gb28181/core/sip/message/sender/SipMessageSender.java new file mode 100644 index 0000000..7f22e06 --- /dev/null +++ b/gb28181-service/src/main/java/cn/skcks/docking/gb28181/core/sip/message/sender/SipMessageSender.java @@ -0,0 +1,64 @@ +package cn.skcks.docking.gb28181.core.sip.message.sender; + +import cn.skcks.docking.gb28181.core.sip.service.SipService; +import cn.skcks.docking.gb28181.core.sip.utils.SipUtil; +import gov.nist.javax.sip.message.SIPMessage; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.ObjectUtils; +import org.springframework.stereotype.Component; + +import javax.sip.ListeningPoint; +import javax.sip.PeerUnavailableException; +import javax.sip.SipException; +import javax.sip.SipProvider; +import javax.sip.header.CallIdHeader; +import javax.sip.header.UserAgentHeader; +import javax.sip.header.ViaHeader; +import javax.sip.message.Message; +import javax.sip.message.Request; +import javax.sip.message.Response; +import java.text.ParseException; +import java.util.Optional; + +@Slf4j +@RequiredArgsConstructor +@Component +public class SipMessageSender { + private final SipService sipService; + + public void send(String ip, Message message) { + SIPMessage sipMessage = (SIPMessage)message; + ViaHeader viaHeader = sipMessage.getTopmostViaHeader(); + String transport; + if(ObjectUtils.anyNull(viaHeader)){ + transport = ListeningPoint.UDP; + log.warn("缺少信息头 ViaHeader, 默认使用 {} 连接发送信息",transport); + } else { + transport = viaHeader.getTransport(); + } + log.debug("将使用 {} 连接发送消息", transport); + + if (message.getHeader(UserAgentHeader.NAME) == null) { + try { + message.addHeader(SipUtil.createUserAgentHeader()); + } catch (PeerUnavailableException | ParseException e) { + log.error("UserAgent 添加失败"); + } + } + + CallIdHeader callIdHeader = sipMessage.getCallIdHeader(); + SipProvider sipProvider = sipService.getProvider(transport, ip); + Optional.ofNullable(sipProvider).ifPresentOrElse(provider->{ + try { + if (message instanceof Request) { + provider.sendRequest((Request) message); + } else if (message instanceof Response) { + provider.sendResponse((Response) message); + } + } catch (SipException e) { + log.error("消息发送失败"); + } + },()-> log.error("未能找到 {}://{} 监听信息", transport,ip)); + } +} diff --git a/gb28181-service/src/main/java/cn/skcks/docking/gb28181/core/sip/service/SipService.java b/gb28181-service/src/main/java/cn/skcks/docking/gb28181/core/sip/service/SipService.java index b595307..8bc2c16 100644 --- a/gb28181-service/src/main/java/cn/skcks/docking/gb28181/core/sip/service/SipService.java +++ b/gb28181-service/src/main/java/cn/skcks/docking/gb28181/core/sip/service/SipService.java @@ -1,6 +1,10 @@ package cn.skcks.docking.gb28181.core.sip.service; +import javax.sip.SipProvider; + public interface SipService { void run(); void stop(); + + SipProvider getProvider(String transport, String ip); } diff --git a/gb28181-service/src/main/java/cn/skcks/docking/gb28181/core/sip/service/SipServiceImpl.java b/gb28181-service/src/main/java/cn/skcks/docking/gb28181/core/sip/service/SipServiceImpl.java index 9ac9782..10540e5 100644 --- a/gb28181-service/src/main/java/cn/skcks/docking/gb28181/core/sip/service/SipServiceImpl.java +++ b/gb28181-service/src/main/java/cn/skcks/docking/gb28181/core/sip/service/SipServiceImpl.java @@ -59,6 +59,14 @@ public class SipServiceImpl implements SipService { pool.clear(); } + @Override + public SipProvider getProvider(String transport, String ip) { + return pool.parallelStream().filter(sipProvider -> { + ListeningPoint listeningPoint = sipProvider.getListeningPoint(); + return listeningPoint != null && listeningPoint.getIPAddress().equals(ip) && listeningPoint.getTransport().equalsIgnoreCase(transport); + }).findFirst().orElse(null); + } + public void listen(String ip, int port){ try{ sipStack = (SipStackImpl)sipFactory.createSipStack(DefaultProperties.getProperties("GB28181_SIP_LOG",true));