消息发送, 设备注册 未完

This commit is contained in:
shikong 2023-08-10 16:44:08 +08:00
parent 6ca8cd7265
commit a1b1a17d23
6 changed files with 122 additions and 13 deletions

View File

@ -7,8 +7,10 @@ import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;
import javax.sip.*;
import javax.sip.header.CSeqHeader;
import javax.sip.header.CallIdHeader;
import javax.sip.message.Request;
import javax.sip.message.Response;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
@ -34,7 +36,48 @@ public class SipListenerImpl implements SipListener {
@Override
public void processResponse(ResponseEvent responseEvent) {
Response response = responseEvent.getResponse();
int status = response.getStatusCode();
// Success
if (((status >= Response.OK) && (status < Response.MULTIPLE_CHOICES)) || status == Response.UNAUTHORIZED) {
CSeqHeader cseqHeader = (CSeqHeader) responseEvent.getResponse().getHeader(CSeqHeader.NAME);
String method = cseqHeader.getMethod();
// ISIPResponseProcessor sipRequestProcessor = responseProcessorMap.get(method);
// if (sipRequestProcessor != null) {
// sipRequestProcessor.process(responseEvent);
// }
// if (status != Response.UNAUTHORIZED && responseEvent.getResponse() != null && sipSubscribe.getOkSubscribesSize() > 0 ) {
// CallIdHeader callIdHeader = (CallIdHeader)responseEvent.getResponse().getHeader(CallIdHeader.NAME);
// if (callIdHeader != null) {
// SipSubscribe.Event subscribe = sipSubscribe.getOkSubscribe(callIdHeader.getCallId());
// if (subscribe != null) {
// SipSubscribe.EventResult eventResult = new SipSubscribe.EventResult(responseEvent);
// sipSubscribe.removeOkSubscribe(callIdHeader.getCallId());
// subscribe.response(eventResult);
// }
// }
// }
} else if ((status >= Response.TRYING) && (status < Response.OK)) {
// 增加其它无需回复的响应如101180等
} else {
log.warn("接收到失败的response响应status" + status + ",message:" + response.getReasonPhrase());
// if (responseEvent.getResponse() != null && sipSubscribe.getErrorSubscribesSize() > 0 ) {
// CallIdHeader callIdHeader = (CallIdHeader)responseEvent.getResponse().getHeader(CallIdHeader.NAME);
// if (callIdHeader != null) {
// SipSubscribe.Event subscribe = sipSubscribe.getErrorSubscribe(callIdHeader.getCallId());
// if (subscribe != null) {
// SipSubscribe.EventResult<?> eventResult = new SipSubscribe.EventResult(responseEvent);
// subscribe.response(eventResult);
// sipSubscribe.removeErrorSubscribe(callIdHeader.getCallId());
// }
// }
// }
if (responseEvent.getDialog() != null) {
responseEvent.getDialog().delete();
}
}
}
@Override

View File

@ -5,6 +5,7 @@ import cn.skcks.docking.gb28181.core.sip.dto.RemoteInfo;
import cn.skcks.docking.gb28181.core.sip.listener.SipListener;
import cn.skcks.docking.gb28181.core.sip.message.auth.DigestServerAuthenticationHelper;
import cn.skcks.docking.gb28181.core.sip.message.processor.MessageProcessor;
import cn.skcks.docking.gb28181.core.sip.message.sender.SipMessageSender;
import cn.skcks.docking.gb28181.core.sip.utils.SipUtil;
import gov.nist.javax.sip.address.SipUri;
import gov.nist.javax.sip.header.Authorization;
@ -27,6 +28,7 @@ public class RegisterRequestProcessor implements MessageProcessor {
private final static String METHOD = "REGISTER";
private final SipListener sipListener;
private final SipMessageSender sender;
private final SipConfig sipConfig;
@ -56,5 +58,6 @@ public class RegisterRequestProcessor implements MessageProcessor {
Response response = getMessageFactory().createResponse(Response.UNAUTHORIZED, request);
DigestServerAuthenticationHelper.generateChallenge(getHeaderFactory(),response,sipConfig.getDomain());
sender.send(request.getLocalAddress().getHostAddress(),response);
}
}

View File

@ -1,13 +0,0 @@
package cn.skcks.docking.gb28181.core.sip.message.send;
import cn.skcks.docking.gb28181.core.sip.service.SipService;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
@Slf4j
@RequiredArgsConstructor
@Component
public class SipMessageSender {
private final SipService sipService;
}

View File

@ -0,0 +1,64 @@
package cn.skcks.docking.gb28181.core.sip.message.sender;
import cn.skcks.docking.gb28181.core.sip.service.SipService;
import cn.skcks.docking.gb28181.core.sip.utils.SipUtil;
import gov.nist.javax.sip.message.SIPMessage;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.ObjectUtils;
import org.springframework.stereotype.Component;
import javax.sip.ListeningPoint;
import javax.sip.PeerUnavailableException;
import javax.sip.SipException;
import javax.sip.SipProvider;
import javax.sip.header.CallIdHeader;
import javax.sip.header.UserAgentHeader;
import javax.sip.header.ViaHeader;
import javax.sip.message.Message;
import javax.sip.message.Request;
import javax.sip.message.Response;
import java.text.ParseException;
import java.util.Optional;
@Slf4j
@RequiredArgsConstructor
@Component
public class SipMessageSender {
private final SipService sipService;
public void send(String ip, Message message) {
SIPMessage sipMessage = (SIPMessage)message;
ViaHeader viaHeader = sipMessage.getTopmostViaHeader();
String transport;
if(ObjectUtils.anyNull(viaHeader)){
transport = ListeningPoint.UDP;
log.warn("缺少信息头 ViaHeader, 默认使用 {} 连接发送信息",transport);
} else {
transport = viaHeader.getTransport();
}
log.debug("将使用 {} 连接发送消息", transport);
if (message.getHeader(UserAgentHeader.NAME) == null) {
try {
message.addHeader(SipUtil.createUserAgentHeader());
} catch (PeerUnavailableException | ParseException e) {
log.error("UserAgent 添加失败");
}
}
CallIdHeader callIdHeader = sipMessage.getCallIdHeader();
SipProvider sipProvider = sipService.getProvider(transport, ip);
Optional.ofNullable(sipProvider).ifPresentOrElse(provider->{
try {
if (message instanceof Request) {
provider.sendRequest((Request) message);
} else if (message instanceof Response) {
provider.sendResponse((Response) message);
}
} catch (SipException e) {
log.error("消息发送失败");
}
},()-> log.error("未能找到 {}://{} 监听信息", transport,ip));
}
}

View File

@ -1,6 +1,10 @@
package cn.skcks.docking.gb28181.core.sip.service;
import javax.sip.SipProvider;
public interface SipService {
void run();
void stop();
SipProvider getProvider(String transport, String ip);
}

View File

@ -59,6 +59,14 @@ public class SipServiceImpl implements SipService {
pool.clear();
}
@Override
public SipProvider getProvider(String transport, String ip) {
return pool.parallelStream().filter(sipProvider -> {
ListeningPoint listeningPoint = sipProvider.getListeningPoint();
return listeningPoint != null && listeningPoint.getIPAddress().equals(ip) && listeningPoint.getTransport().equalsIgnoreCase(transport);
}).findFirst().orElse(null);
}
public void listen(String ip, int port){
try{
sipStack = (SipStackImpl)sipFactory.createSipStack(DefaultProperties.getProperties("GB28181_SIP_LOG",true));