模拟设备注册
This commit is contained in:
parent
801479c248
commit
2cc7894fd3
@ -0,0 +1,47 @@
|
|||||||
|
package cn.skcks.docking.gb28181.mocking.core.sip.message.processor.register.response;
|
||||||
|
|
||||||
|
import cn.skcks.docking.gb28181.core.sip.listener.SipListener;
|
||||||
|
import cn.skcks.docking.gb28181.core.sip.message.processor.MessageProcessor;
|
||||||
|
import cn.skcks.docking.gb28181.core.sip.message.subscribe.GenericSubscribe;
|
||||||
|
import cn.skcks.docking.gb28181.mocking.core.sip.message.subscribe.SipSubscribe;
|
||||||
|
import gov.nist.javax.sip.address.SipUri;
|
||||||
|
import gov.nist.javax.sip.message.SIPResponse;
|
||||||
|
import jakarta.annotation.PostConstruct;
|
||||||
|
import lombok.RequiredArgsConstructor;
|
||||||
|
import lombok.extern.slf4j.Slf4j;
|
||||||
|
import org.springframework.stereotype.Component;
|
||||||
|
|
||||||
|
import javax.sip.ResponseEvent;
|
||||||
|
import javax.sip.address.Address;
|
||||||
|
import javax.sip.header.CallIdHeader;
|
||||||
|
import javax.sip.header.ToHeader;
|
||||||
|
import javax.sip.message.Request;
|
||||||
|
import java.util.EventObject;
|
||||||
|
import java.util.Optional;
|
||||||
|
|
||||||
|
@Slf4j
|
||||||
|
@RequiredArgsConstructor
|
||||||
|
@Component
|
||||||
|
public class RegisterResponseProcessor implements MessageProcessor {
|
||||||
|
private final SipListener sipListener;
|
||||||
|
private final SipSubscribe subscribe;
|
||||||
|
|
||||||
|
@PostConstruct
|
||||||
|
@Override
|
||||||
|
public void init() {
|
||||||
|
sipListener.addResponseProcessor(Request.REGISTER, this);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void process(EventObject event) {
|
||||||
|
ResponseEvent requestEvent = (ResponseEvent) event;
|
||||||
|
SIPResponse response = (SIPResponse)requestEvent.getResponse();
|
||||||
|
ToHeader toHeader = response.getTo();
|
||||||
|
Address address = toHeader.getAddress();
|
||||||
|
CallIdHeader callIdHeader = response.getCallIdHeader();
|
||||||
|
SipUri uri = (SipUri)address.getURI();
|
||||||
|
String deviceId = uri.getUser();
|
||||||
|
String key = GenericSubscribe.Helper.getKey(Request.REGISTER, deviceId, callIdHeader.getCallId());
|
||||||
|
Optional.ofNullable(subscribe.getRegisterSubscribe().getPublisher(key)).ifPresent(publisher->publisher.submit(response));
|
||||||
|
}
|
||||||
|
}
|
@ -89,7 +89,7 @@ public class SipRequestBuilder implements ApplicationContextAware {
|
|||||||
}
|
}
|
||||||
|
|
||||||
@SneakyThrows
|
@SneakyThrows
|
||||||
public static Request createRegisterRequestWithAuthorization(MockingDevice device, String ip, int port, long cSeq, String fromTag, String viaTag, CallIdHeader callIdHeader, String callId, WWWAuthenticateHeader www) {
|
public static Request createRegisterRequestWithAuthorization(MockingDevice device, String ip, int port, long cSeq, String fromTag, String viaTag, CallIdHeader callIdHeader, WWWAuthenticateHeader www) {
|
||||||
Request request = createRegisterRequest(device, ip, port, cSeq, fromTag, viaTag, callIdHeader);
|
Request request = createRegisterRequest(device, ip, port, cSeq, fromTag, viaTag, callIdHeader);
|
||||||
String realm = www.getRealm();
|
String realm = www.getRealm();
|
||||||
String nonce = www.getNonce();
|
String nonce = www.getNonce();
|
||||||
|
@ -5,6 +5,7 @@ import cn.skcks.docking.gb28181.mocking.config.sip.SipConfig;
|
|||||||
import lombok.RequiredArgsConstructor;
|
import lombok.RequiredArgsConstructor;
|
||||||
import lombok.extern.slf4j.Slf4j;
|
import lombok.extern.slf4j.Slf4j;
|
||||||
import org.springframework.stereotype.Component;
|
import org.springframework.stereotype.Component;
|
||||||
|
import org.springframework.web.context.request.async.DeferredResult;
|
||||||
|
|
||||||
import javax.sip.ListeningPoint;
|
import javax.sip.ListeningPoint;
|
||||||
import javax.sip.SipException;
|
import javax.sip.SipException;
|
||||||
|
@ -2,18 +2,30 @@ package cn.skcks.docking.gb28181.mocking.service.gb28181.register;
|
|||||||
|
|
||||||
|
|
||||||
import cn.skcks.docking.gb28181.common.json.JsonResponse;
|
import cn.skcks.docking.gb28181.common.json.JsonResponse;
|
||||||
|
import cn.skcks.docking.gb28181.core.sip.gb28181.constant.CmdType;
|
||||||
|
import cn.skcks.docking.gb28181.core.sip.message.processor.message.types.recordinfo.reponse.dto.RecordInfoResponseDTO;
|
||||||
|
import cn.skcks.docking.gb28181.core.sip.message.subscribe.GenericSubscribe;
|
||||||
import cn.skcks.docking.gb28181.core.sip.utils.SipUtil;
|
import cn.skcks.docking.gb28181.core.sip.utils.SipUtil;
|
||||||
|
import cn.skcks.docking.gb28181.mocking.core.sip.message.subscribe.SipSubscribe;
|
||||||
import cn.skcks.docking.gb28181.mocking.core.sip.request.SipRequestBuilder;
|
import cn.skcks.docking.gb28181.mocking.core.sip.request.SipRequestBuilder;
|
||||||
import cn.skcks.docking.gb28181.mocking.core.sip.sender.SipSender;
|
import cn.skcks.docking.gb28181.mocking.core.sip.sender.SipSender;
|
||||||
import cn.skcks.docking.gb28181.mocking.orm.mybatis.dynamic.model.MockingDevice;
|
import cn.skcks.docking.gb28181.mocking.orm.mybatis.dynamic.model.MockingDevice;
|
||||||
import cn.skcks.docking.gb28181.mocking.service.device.DeviceService;
|
import cn.skcks.docking.gb28181.mocking.service.device.DeviceService;
|
||||||
|
import gov.nist.javax.sip.message.SIPResponse;
|
||||||
import lombok.RequiredArgsConstructor;
|
import lombok.RequiredArgsConstructor;
|
||||||
import lombok.SneakyThrows;
|
import lombok.SneakyThrows;
|
||||||
import lombok.extern.slf4j.Slf4j;
|
import lombok.extern.slf4j.Slf4j;
|
||||||
|
import org.bouncycastle.cert.ocsp.Req;
|
||||||
import org.springframework.stereotype.Service;
|
import org.springframework.stereotype.Service;
|
||||||
import org.springframework.web.context.request.async.DeferredResult;
|
import org.springframework.web.context.request.async.DeferredResult;
|
||||||
|
|
||||||
|
import javax.sip.SipProvider;
|
||||||
|
import javax.sip.header.CallIdHeader;
|
||||||
|
import javax.sip.header.WWWAuthenticateHeader;
|
||||||
|
import javax.sip.message.Request;
|
||||||
|
import javax.sip.message.Response;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
import java.util.concurrent.*;
|
||||||
|
|
||||||
@Slf4j
|
@Slf4j
|
||||||
@RequiredArgsConstructor
|
@RequiredArgsConstructor
|
||||||
@ -23,15 +35,87 @@ public class RegisterService {
|
|||||||
|
|
||||||
private final SipSender sender;
|
private final SipSender sender;
|
||||||
|
|
||||||
|
private final SipSubscribe subscribe;
|
||||||
|
|
||||||
@SneakyThrows
|
@SneakyThrows
|
||||||
public DeferredResult<JsonResponse<Boolean>> register(){
|
public DeferredResult<JsonResponse<Boolean>> register(){
|
||||||
DeferredResult<JsonResponse<Boolean>> result = new DeferredResult<>();
|
DeferredResult<JsonResponse<Boolean>> result = new DeferredResult<>();
|
||||||
List<MockingDevice> allDevice = deviceService.getAllDevice();
|
List<MockingDevice> allDevice = deviceService.getAllDevice();
|
||||||
allDevice.parallelStream().forEach(device -> {
|
allDevice.parallelStream().forEach(this::register);
|
||||||
sender.sendRequest((provider, ip, port) -> SipRequestBuilder.createRegisterRequest(device, ip, port, 1, SipUtil.generateFromTag(), null, provider.getNewCallId()));
|
|
||||||
|
|
||||||
});
|
|
||||||
result.setResult(JsonResponse.success(true));
|
result.setResult(JsonResponse.success(true));
|
||||||
return result;
|
return result;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@SneakyThrows
|
||||||
|
public void register(MockingDevice device){
|
||||||
|
ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
|
||||||
|
sender.sendRequest((provider,ip,port)->{
|
||||||
|
CallIdHeader callIdHeader = provider.getNewCallId();
|
||||||
|
String callId = callIdHeader.getCallId();
|
||||||
|
Request request = SipRequestBuilder.createRegisterRequest(device, ip, port, 1, SipUtil.generateFromTag(), null, callIdHeader);
|
||||||
|
String key = GenericSubscribe.Helper.getKey(Request.REGISTER, device.getGbDeviceId(), callId);
|
||||||
|
subscribe.getRegisterSubscribe().addPublisher(key);
|
||||||
|
final ScheduledFuture<?>[] schedule = new ScheduledFuture<?>[1];
|
||||||
|
Flow.Subscriber<SIPResponse> subscriber = new Flow.Subscriber<>() {
|
||||||
|
Flow.Subscription subscription;
|
||||||
|
private boolean usedAuthorization = false;
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onSubscribe(Flow.Subscription subscription) {
|
||||||
|
this.subscription = subscription;
|
||||||
|
log.debug("建立订阅 => {}", key);
|
||||||
|
subscription.request(1);
|
||||||
|
}
|
||||||
|
|
||||||
|
@SneakyThrows
|
||||||
|
@Override
|
||||||
|
public void onNext(SIPResponse response) {
|
||||||
|
int statusCode = response.getStatusCode();
|
||||||
|
if(statusCode == Response.UNAUTHORIZED && !usedAuthorization){
|
||||||
|
usedAuthorization = true;
|
||||||
|
WWWAuthenticateHeader authorizationHeader = (WWWAuthenticateHeader) response.getHeader(WWWAuthenticateHeader.NAME);
|
||||||
|
provider.sendRequest(SipRequestBuilder.createRegisterRequestWithAuthorization(
|
||||||
|
device,
|
||||||
|
ip,
|
||||||
|
port,
|
||||||
|
response.getCSeq().getSeqNumber()+1,
|
||||||
|
response.getFromTag(),
|
||||||
|
null,
|
||||||
|
callIdHeader,
|
||||||
|
authorizationHeader
|
||||||
|
));
|
||||||
|
subscription.request(1);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (statusCode == Response.UNAUTHORIZED) {
|
||||||
|
this.onComplete();
|
||||||
|
log.info("设备: {}({}), 注册失败, 认证失败", device.getDeviceCode(), device.getGbDeviceId());
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
if(statusCode == Response.OK){
|
||||||
|
log.info("设备: {}({}), 注册成功",device.getDeviceCode(), device.getGbDeviceId());
|
||||||
|
this.onComplete();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onError(Throwable throwable) {
|
||||||
|
log.error("设备: {}({}), 注册失败 处理响应时出现异常: {}", device.getDeviceCode(), device.getGbDeviceId(), throwable.getMessage());
|
||||||
|
this.onComplete();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onComplete() {
|
||||||
|
subscribe.getRegisterSubscribe().delPublisher(key);
|
||||||
|
schedule[0].cancel(false);
|
||||||
|
log.debug("结束订阅 => {}", key);
|
||||||
|
}
|
||||||
|
};
|
||||||
|
subscribe.getRegisterSubscribe().addSubscribe(key, subscriber);
|
||||||
|
schedule[0] = scheduledExecutorService.schedule(subscriber::onComplete, 30, TimeUnit.SECONDS);
|
||||||
|
return request;
|
||||||
|
});
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user