diff --git a/gb28181-service/src/main/java/cn/skcks/docking/gb28181/core/sip/gb28181/constant/DeviceConstant.java b/gb28181-service/src/main/java/cn/skcks/docking/gb28181/core/sip/gb28181/constant/DeviceConstant.java index 4e61cd2..2187c79 100644 --- a/gb28181-service/src/main/java/cn/skcks/docking/gb28181/core/sip/gb28181/constant/DeviceConstant.java +++ b/gb28181-service/src/main/java/cn/skcks/docking/gb28181/core/sip/gb28181/constant/DeviceConstant.java @@ -13,6 +13,7 @@ public class DeviceConstant { public class Cache { public final static String DEVICE = "DEVICE"; public final static String ONLINE = "ONLINE"; + public final static String TRANSACTION = "TRANSACTION"; } } diff --git a/gb28181-service/src/main/java/cn/skcks/docking/gb28181/core/sip/message/processor/register/request/RegisterRequestProcessor.java b/gb28181-service/src/main/java/cn/skcks/docking/gb28181/core/sip/message/processor/register/request/RegisterRequestProcessor.java index da6603a..8a473f7 100644 --- a/gb28181-service/src/main/java/cn/skcks/docking/gb28181/core/sip/message/processor/register/request/RegisterRequestProcessor.java +++ b/gb28181-service/src/main/java/cn/skcks/docking/gb28181/core/sip/message/processor/register/request/RegisterRequestProcessor.java @@ -16,7 +16,6 @@ import gov.nist.javax.sip.address.SipUri; import gov.nist.javax.sip.header.Authorization; import gov.nist.javax.sip.header.SIPDateHeader; import gov.nist.javax.sip.message.SIPRequest; -import gov.nist.javax.sip.message.SIPResponse; import jakarta.annotation.PostConstruct; import lombok.RequiredArgsConstructor; import lombok.SneakyThrows; @@ -64,16 +63,24 @@ public class RegisterRequestProcessor implements MessageProcessor { String deviceId = uri.getUser(); log.debug("请求注册 设备id => {}", deviceId); DockingDevice device = dockingDeviceService.getDeviceInfo(deviceId); - if(device == null){ - log.info("新注册的设备 deviceId => {}", deviceId); - } - + String senderIp = request.getLocalAddress().getHostAddress(); RemoteInfo remoteInfo = SipUtil.getRemoteInfoFromRequest(request, false); log.debug("远程连接信息 => {}", remoteInfo); + if(device == null){ + log.info("新注册的设备 deviceId => {}", deviceId); + } else { + if(dockingDeviceService.isOnline(deviceId) && dockingDeviceService.hasTransaction(deviceId)){ + SipTransactionInfo transactionInfo = dockingDeviceService.getTransaction(deviceId); + if(request.getCallIdHeader().getCallId().equals(transactionInfo.getCallId())){ + log.info("设备注册续订 deviceId => {}",deviceId); + registerDevice(deviceId, device, request, senderIp,remoteInfo); + } + } + } + String password = sipConfig.getPassword(); Authorization authorization = request.getAuthorization(); - String senderIp = request.getLocalAddress().getHostAddress(); if(authorization == null && StringUtils.isNotBlank(password)){ Response response = getMessageFactory().createResponse(Response.UNAUTHORIZED, request); DigestServerAuthenticationHelper.generateChallenge(getHeaderFactory(),response,sipConfig.getDomain()); @@ -94,13 +101,38 @@ public class RegisterRequestProcessor implements MessageProcessor { log.debug("设备 deviceId => {}, 认证通过", deviceId); + registerDevice(deviceId, device, request, senderIp,remoteInfo); + } + + @SneakyThrows + private Response generateRegisterResponse(Request request){ + SIPRequest sipRequest = (SIPRequest) request; + ExpiresHeader expires = sipRequest.getExpires(); + if(expires == null){ + return getMessageFactory().createResponse(Response.BAD_REQUEST, request); + } + + Response response = getMessageFactory().createResponse(Response.OK, request); + // 添加date头 + SIPDateHeader dateHeader = new SIPDateHeader(); + // GB28181 日期 + GbSipDate gbSipDate = new GbSipDate(Calendar.getInstance(Locale.ENGLISH).getTimeInMillis()); + dateHeader.setDate(gbSipDate); + + response.addHeader(dateHeader); + response.addHeader(sipRequest.getContactHeader()); + response.addHeader(expires); + + return response; + } + + private void registerDevice(String deviceId, DockingDevice device, SIPRequest request, String senderIp, RemoteInfo remoteInfo) { Response response = generateRegisterResponse(request); if(response.getStatusCode() != Response.OK){ sender.send(senderIp, response); return; } - if (device == null) { device = new DockingDevice(); device.setStreamMode(ListeningPoint.UDP); @@ -137,31 +169,9 @@ public class RegisterRequestProcessor implements MessageProcessor { dockingDeviceService.offline(device); } else { device.setRegisterTime(DateUtil.now()); - dockingDeviceService.online(device); + dockingDeviceService.online(device, response); } sender.send(senderIp, response); } - - @SneakyThrows - private Response generateRegisterResponse(Request request){ - SIPRequest sipRequest = (SIPRequest) request; - ExpiresHeader expires = sipRequest.getExpires(); - if(expires == null){ - return getMessageFactory().createResponse(Response.BAD_REQUEST, request); - } - - Response response = getMessageFactory().createResponse(Response.OK, request); - // 添加date头 - SIPDateHeader dateHeader = new SIPDateHeader(); - // GB28181 日期 - GbSipDate gbSipDate = new GbSipDate(Calendar.getInstance(Locale.ENGLISH).getTimeInMillis()); - dateHeader.setDate(gbSipDate); - - response.addHeader(dateHeader); - response.addHeader(sipRequest.getContactHeader()); - response.addHeader(expires); - - return response; - } } diff --git a/gb28181-service/src/main/java/cn/skcks/docking/gb28181/service/docking/device/DockingDeviceService.java b/gb28181-service/src/main/java/cn/skcks/docking/gb28181/service/docking/device/DockingDeviceService.java index f17f129..79480f9 100644 --- a/gb28181-service/src/main/java/cn/skcks/docking/gb28181/service/docking/device/DockingDeviceService.java +++ b/gb28181-service/src/main/java/cn/skcks/docking/gb28181/service/docking/device/DockingDeviceService.java @@ -1,18 +1,21 @@ package cn.skcks.docking.gb28181.service.docking.device; import cn.hutool.core.date.DateUtil; +import cn.skcks.docking.gb28181.core.sip.dto.SipTransactionInfo; import cn.skcks.docking.gb28181.core.sip.gb28181.constant.DeviceConstant; import cn.skcks.docking.gb28181.orm.mybatis.dynamic.mapper.DockingDeviceDynamicSqlSupport; import cn.skcks.docking.gb28181.orm.mybatis.dynamic.mapper.DockingDeviceMapper; import cn.skcks.docking.gb28181.orm.mybatis.dynamic.model.DockingDevice; import cn.skcks.docking.gb28181.service.docking.device.cache.DeviceOnlineCacheService; +import cn.skcks.docking.gb28181.service.docking.device.cache.DeviceOnlineTransactionCacheService; import cn.skcks.docking.gb28181.service.docking.device.cache.DockingDeviceCacheService; +import gov.nist.javax.sip.message.SIPResponse; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; -import org.apache.commons.lang3.math.NumberUtils; import org.mybatis.dynamic.sql.SqlBuilder; import org.springframework.stereotype.Service; +import javax.sip.message.Response; import java.util.concurrent.TimeUnit; @Slf4j @@ -22,6 +25,7 @@ public class DockingDeviceService { private final DockingDeviceMapper dockingDeviceMapper; private final DockingDeviceCacheService deviceCacheService; private final DeviceOnlineCacheService onlineCacheService; + private final DeviceOnlineTransactionCacheService transactionCacheService; /** * 根据设备Id 获取设备信息 并缓存 @@ -41,7 +45,11 @@ public class DockingDeviceService { return device; } - public void online(DockingDevice device) { + public boolean isOnline(String deviceId){ + return onlineCacheService.isOnline(deviceId); + } + + public void online(DockingDevice device, Response response) { String deviceId = device.getDeviceId(); log.info("[设备上线] deviceId => {}, {}://{}:{}", deviceId, device.getTransport(), device.getIp(), device.getPort()); device.setUpdateTime(DateUtil.now()); @@ -58,7 +66,8 @@ public class DockingDeviceService { }); getDeviceInfo(deviceId); - onlineCacheService.setOnline(deviceId, 180, TimeUnit.SECONDS); + onlineCacheService.setOnline(deviceId, DeviceConstant.KEEP_ALIVE_INTERVAL * 3, DeviceConstant.UNIT); + setTransaction(deviceId, response); } public void offline(DockingDevice device){ @@ -67,6 +76,24 @@ public class DockingDeviceService { log.info("[设备离线] deviceId => {}", deviceId); deviceCacheService.removeDevice(deviceId); + delTransaction(deviceId); onlineCacheService.setOffline(deviceId); } + + public boolean hasTransaction(String deviceId){ + return transactionCacheService.hasTransaction(deviceId); + } + + public void setTransaction(String deviceId, Response response){ + SipTransactionInfo sipTransactionInfo = new SipTransactionInfo((SIPResponse)response); + transactionCacheService.setTransaction(deviceId, sipTransactionInfo, DeviceConstant.KEEP_ALIVE_INTERVAL * 3, DeviceConstant.UNIT); + } + + public SipTransactionInfo getTransaction(String deviceId){ + return transactionCacheService.getTransaction(deviceId); + } + + public void delTransaction(String deviceId){ + transactionCacheService.delTransaction(deviceId); + } } diff --git a/gb28181-service/src/main/java/cn/skcks/docking/gb28181/service/docking/device/cache/DeviceOnlineCacheService.java b/gb28181-service/src/main/java/cn/skcks/docking/gb28181/service/docking/device/cache/DeviceOnlineCacheService.java index b96b4ad..bcb12d1 100644 --- a/gb28181-service/src/main/java/cn/skcks/docking/gb28181/service/docking/device/cache/DeviceOnlineCacheService.java +++ b/gb28181-service/src/main/java/cn/skcks/docking/gb28181/service/docking/device/cache/DeviceOnlineCacheService.java @@ -26,7 +26,7 @@ public class DeviceOnlineCacheService { public void setOnline(String deviceId, long time, TimeUnit unit){ String key = getKey(deviceId); RedisUtil.StringOps.set(key, DateUtil.now()); - RedisUtil.KeyOps.expire(key, DeviceConstant.KEEP_ALIVE_INTERVAL * 3, DeviceConstant.UNIT); + RedisUtil.KeyOps.expire(key, time, unit); } public void setOffline(String deviceId){ diff --git a/gb28181-service/src/main/java/cn/skcks/docking/gb28181/service/docking/device/cache/DeviceOnlineTransactionCacheService.java b/gb28181-service/src/main/java/cn/skcks/docking/gb28181/service/docking/device/cache/DeviceOnlineTransactionCacheService.java new file mode 100644 index 0000000..71e36e7 --- /dev/null +++ b/gb28181-service/src/main/java/cn/skcks/docking/gb28181/service/docking/device/cache/DeviceOnlineTransactionCacheService.java @@ -0,0 +1,46 @@ +package cn.skcks.docking.gb28181.service.docking.device.cache; + +import cn.skcks.docking.gb28181.common.json.JsonUtils; +import cn.skcks.docking.gb28181.common.redis.RedisUtil; +import cn.skcks.docking.gb28181.core.sip.dto.SipTransactionInfo; +import cn.skcks.docking.gb28181.core.sip.gb28181.cache.CacheUtil; +import cn.skcks.docking.gb28181.core.sip.gb28181.constant.DeviceConstant; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Service; + +import java.util.concurrent.TimeUnit; + +@Slf4j +@Service +@RequiredArgsConstructor +public class DeviceOnlineTransactionCacheService { + private String getKey(String deviceId) { + return CacheUtil.getKey(DeviceConstant.Cache.TRANSACTION,deviceId); + } + + public boolean hasTransaction(String deviceId){ + return RedisUtil.KeyOps.hasKey(getKey(deviceId)); + } + + public void setTransaction(String deviceId, SipTransactionInfo transaction, long time, TimeUnit unit){ + String key = getKey(deviceId); + RedisUtil.StringOps.set(key, JsonUtils.toCompressJson(transaction)); + RedisUtil.KeyOps.expire(key, time, unit); + } + + public SipTransactionInfo getTransaction(String deviceId){ + String key = getKey(deviceId); + String json = RedisUtil.StringOps.get(key); + if(json == null){ + return null; + } else { + return JsonUtils.parse(json,SipTransactionInfo.class); + } + } + + public void delTransaction(String deviceId){ + String key = getKey(deviceId); + RedisUtil.KeyOps.delete(key); + } +}