Compare commits

..

No commits in common. "dev-0.1.0" and "dev-0.1.0_rebuild-sdp" have entirely different histories.

47 changed files with 146 additions and 1256 deletions

View File

@ -26,61 +26,8 @@ docker run --name gb28181 --rm \
skcks.cn/gb28181-docking-platform:0.0.1-SNAPSHOT
```
### 清理私仓
切换到私仓目录
例: cd H:/Repository/skcks.cn/gb28181-docking-platform-mvn-repo
```shell
rm -rf ./*/*/*/*/0.1.0-SNAPSHOT
rm -rf ./*/*/*/*/*/0.1.0-SNAPSHOT
```
### 打包到本地私仓
```shell
mvn deploy -s settings.xml -DaltDeploymentRepository=local-repo::default::file:H:/Repository/skcks.cn/gb28181-docking-platform-mvn-repo
mvn deploy -s settings.xml -DaltDeploymentRepository=amleixun-mvn-reop::default::file:H:/Repository/skcks.cn/gb28181-docking-platform-mvn-repo
```
git push 推送即可
### 公共仓库
https://central.sonatype.com
https://central.sonatype.org/publish/requirements/gpg/#signing-a-file
gpg签名
https://central.sonatype.org/publish/requirements/gpg/
https://www.gpg4win.org/thanks-for-download.html
gpg服务器
- keyserver.ubuntu.com
- keys.openpgp.org
- pgp.mit.edu
settings.xml
```xml
<settings>
<servers>
<server>
<id>ossrh</id>
<username><!-- your token username --></username>
<password><!-- your token password --></password>
</server>
</servers>
<profiles>
<profile>
<id>ossrh</id>
<activation>
<activeByDefault>true</activeByDefault>
</activation>
<properties>
<gpg.executable>gpg</gpg.executable>
<gpg.keyname>GPG KEY NAME</gpg.keyname>
<gpg.passphrase>GPG KEY PASSWORD</gpg.passphrase>
</properties>
</profile>
</profiles>
</settings>
```
#### 解决idea 控制台乱码
- 设置 > 构建/运行/部署 > 构建工具 > Maven > 运行程序 > VM options
- 添加 -Dfile.encoding=GBK

View File

@ -6,7 +6,7 @@
<parent>
<groupId>cn.skcks.docking</groupId>
<artifactId>gb28181</artifactId>
<version>0.1.0</version>
<version>0.1.0-SNAPSHOT</version>
</parent>
<groupId>cn.skcks.docking.gb28181</groupId>

View File

@ -6,7 +6,7 @@
<parent>
<groupId>cn.skcks.docking</groupId>
<artifactId>gb28181</artifactId>
<version>0.1.0</version>
<version>0.1.0-SNAPSHOT</version>
</parent>
<groupId>cn.skcks.docking.gb28181</groupId>

View File

@ -5,7 +5,6 @@ import cn.skcks.docking.gb28181.annotation.web.methods.GetJson;
import cn.skcks.docking.gb28181.common.json.JsonResponse;
import cn.skcks.docking.gb28181.service.catalog.CatalogService;
import cn.skcks.docking.gb28181.sip.manscdp.catalog.response.CatalogItemDTO;
import cn.skcks.docking.gb28181.utils.FutureDeferredResult;
import io.swagger.v3.oas.annotations.tags.Tag;
import lombok.RequiredArgsConstructor;
import lombok.SneakyThrows;
@ -24,8 +23,12 @@ public class CatalogController {
@SneakyThrows
@GetJson
public DeferredResult<JsonResponse<List<CatalogItemDTO>>> catalog(String gbDeviceId){
public DeferredResult<JsonResponse<List<?>>> catalog(String gbDeviceId){
DeferredResult<JsonResponse<List<?>>> result = new DeferredResult<>();
CompletableFuture<List<CatalogItemDTO>> catalog = catalogService.catalog(gbDeviceId);
return FutureDeferredResult.toDeferredResultWithJson(catalog);
catalog.whenComplete((data,throwable)->{
result.setResult(JsonResponse.success(data));
});
return result;
}
}

View File

@ -6,7 +6,7 @@
<parent>
<groupId>cn.skcks.docking</groupId>
<artifactId>gb28181</artifactId>
<version>0.1.0</version>
<version>0.1.0-SNAPSHOT</version>
</parent>
<groupId>cn.skcks.docking.gb28181</groupId>

View File

@ -1,7 +1,5 @@
package cn.skcks.docking.gb28181.common.json;
import lombok.Builder;
public class JsonException extends Exception{
public JsonException(String message){
super(message);

View File

@ -6,7 +6,7 @@
<parent>
<groupId>cn.skcks.docking</groupId>
<artifactId>gb28181</artifactId>
<version>0.1.0</version>
<version>0.1.0-SNAPSHOT</version>
</parent>
<groupId>cn.skcks.docking.gb28181</groupId>

View File

@ -2,7 +2,6 @@ package cn.skcks.docking.gb28181.core.sip.message.processor.message.request;
import cn.skcks.docking.gb28181.common.json.ResponseStatus;
import cn.skcks.docking.gb28181.common.xml.XmlUtils;
import cn.skcks.docking.gb28181.config.sip.SipConfig;
import cn.skcks.docking.gb28181.constant.CmdType;
import cn.skcks.docking.gb28181.core.sip.gb28181.constant.GB28181Constant;
import cn.skcks.docking.gb28181.core.sip.listener.SipListener;
@ -13,12 +12,9 @@ import cn.skcks.docking.gb28181.core.sip.message.subscribe.SipSubscribe;
import cn.skcks.docking.gb28181.core.sip.utils.SipUtil;
import cn.skcks.docking.gb28181.orm.mybatis.dynamic.model.DockingDevice;
import cn.skcks.docking.gb28181.service.docking.device.DockingDeviceService;
import cn.skcks.docking.gb28181.service.notify.MediaStatusService;
import cn.skcks.docking.gb28181.sip.manscdp.MessageDTO;
import cn.skcks.docking.gb28181.sip.manscdp.catalog.response.CatalogResponseDTO;
import cn.skcks.docking.gb28181.sip.manscdp.mediastatus.notify.MediaStatusRequestDTO;
import cn.skcks.docking.gb28181.sip.manscdp.recordinfo.response.RecordInfoResponseDTO;
import cn.skcks.docking.gb28181.sip.method.invite.request.InviteRequestBuilder;
import cn.skcks.docking.gb28181.sip.utils.MANSCDPUtils;
import gov.nist.javax.sip.message.SIPRequest;
import gov.nist.javax.sip.message.SIPResponse;
@ -26,11 +22,9 @@ import jakarta.annotation.PostConstruct;
import lombok.RequiredArgsConstructor;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.stereotype.Component;
import javax.sip.RequestEvent;
import javax.sip.address.SipURI;
import javax.sip.message.Response;
import java.util.EventObject;
import java.util.Optional;
@ -43,9 +37,6 @@ public class MessageRequestProcessor implements MessageProcessor {
private final DockingDeviceService deviceService;
private final SipMessageSender sender;
private final SipSubscribe subscribe;
private final SipConfig sipConfig;
private final MediaStatusService mediaStatusService;
@PostConstruct
@Override
@ -96,12 +87,6 @@ public class MessageRequestProcessor implements MessageProcessor {
response = ok;
} else if(messageDto.getCmdType().equalsIgnoreCase(CmdType.MEDIA_STATUS)){
response = ok;
sender.send(senderIp, response);
MediaStatusRequestDTO mediaStatusRequestDTO = MANSCDPUtils.parse(content, MediaStatusRequestDTO.class);
if(StringUtils.equalsIgnoreCase(mediaStatusRequestDTO.getNotifyType(),"121")){
mediaStatusService.process(request, mediaStatusRequestDTO);
return;
}
} else {
response = response(request, Response.NOT_IMPLEMENTED, ResponseStatus.NOT_IMPLEMENTED.getMessage());
}

View File

@ -133,7 +133,6 @@ public class RegisterRequestProcessor implements MessageProcessor {
private void registerDevice(String deviceId, DockingDevice device, SIPRequest request, String senderIp, RemoteInfo remoteInfo) {
Response response = generateRegisterResponse(request);
log.debug("response.getStatusCode {}", response.getStatusCode());
if(response.getStatusCode() != Response.OK){
sender.send(senderIp, response);
return;
@ -141,14 +140,14 @@ public class RegisterRequestProcessor implements MessageProcessor {
if (device == null) {
device = new DockingDevice();
device.setStreamMode(MediaStreamMode.UDP.getMode());
device.setStreamMode(MediaStreamMode.TCP_PASSIVE.getMode());
device.setCharset(GB28181Constant.CHARSET);
device.setGeoCoordSys(GB28181Constant.GEO_COORD_SYS);
device.setDeviceId(deviceId);
device.setOnLine(false);
} else {
if (ObjectUtils.isEmpty(device.getStreamMode())) {
device.setStreamMode(MediaStreamMode.UDP.getMode());
device.setStreamMode(MediaStreamMode.TCP_PASSIVE.getMode());
}
if (ObjectUtils.isEmpty(device.getCharset())) {
device.setCharset(GB28181Constant.CHARSET);

View File

@ -23,7 +23,7 @@ import java.util.concurrent.ScheduledExecutorService;
public class SipSubscribe {
@Qualifier(DefaultSipExecutor.EXECUTOR_BEAN_NAME)
private final Executor executor;
private final ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(Runtime.getRuntime().availableProcessors());
private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
private GenericTimeoutSubscribe<SIPResponse> sipResponseSubscribe;
private GenericTimeoutSubscribe<SIPRequest> sipRequestSubscribe;

View File

@ -1,7 +1,5 @@
package cn.skcks.docking.gb28181.service.catalog;
import cn.skcks.docking.gb28181.common.json.JsonException;
import cn.skcks.docking.gb28181.common.json.JsonResponse;
import cn.skcks.docking.gb28181.config.sip.SipConfig;
import cn.skcks.docking.gb28181.constant.CmdType;
import cn.skcks.docking.gb28181.core.sip.message.request.SipRequestBuilder;
@ -14,9 +12,11 @@ import cn.skcks.docking.gb28181.service.device.DeviceChannelService;
import cn.skcks.docking.gb28181.service.docking.device.cache.DockingDeviceCacheService;
import cn.skcks.docking.gb28181.sip.manscdp.catalog.query.CatalogQueryDTO;
import cn.skcks.docking.gb28181.sip.manscdp.catalog.response.CatalogItemDTO;
import cn.skcks.docking.gb28181.sip.manscdp.catalog.response.CatalogResponseDTO;
import cn.skcks.docking.gb28181.sip.method.message.request.MessageRequestBuilder;
import cn.skcks.docking.gb28181.sip.utils.MANSCDPUtils;
import cn.skcks.docking.gb28181.sip.utils.SipUtil;
import gov.nist.javax.sip.message.SIPRequest;
import lombok.RequiredArgsConstructor;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
@ -24,11 +24,13 @@ import org.springframework.stereotype.Service;
import javax.sip.SipProvider;
import javax.sip.message.Request;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Flow;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
@Slf4j
@Service
@ -45,11 +47,6 @@ public class CatalogService {
CompletableFuture<List<CatalogItemDTO>> result = new CompletableFuture<>();
result.completeOnTimeout(Collections.emptyList(), 60, TimeUnit.SECONDS);
DockingDevice device = deviceCacheService.getDevice(gbDeviceId);
if (device == null) {
log.info("未能找到 编码为 => {} 的设备", gbDeviceId);
result.completeExceptionally(new JsonException("未找到设备 " + gbDeviceId));
return result;
}
SipProvider provider = sipService.getProvider(device.getTransport(), device.getLocalIp());
MessageRequestBuilder requestBuilder = MessageRequestBuilder.builder()
.localIp(device.getLocalIp())
@ -70,7 +67,56 @@ public class CatalogService {
Request request = requestBuilder.createMessageRequest(callId, cSeq, MANSCDPUtils.toByteXml(catalogQueryDTO, device.getCharset()));
String key = GenericSubscribe.Helper.getKey(CmdType.CATALOG, gbDeviceId, sn);
subscribe.getSipRequestSubscribe().addPublisher(key, 60, TimeUnit.SECONDS);
subscribe.getSipRequestSubscribe().addSubscribe(key, new CatalogSubscriber(subscribe, key, result, device.getDeviceId(), deviceChannelService::add));
subscribe.getSipRequestSubscribe().addSubscribe(key, new Flow.Subscriber<>() {
private Flow.Subscription subscription;
private final AtomicLong num = new AtomicLong(0);
private long sumNum = 0;
private final List<CatalogItemDTO> data = new ArrayList<>();
@Override
public void onSubscribe(Flow.Subscription subscription) {
this.subscription = subscription;
subscription.request(1);
}
@Override
public void onNext(SIPRequest item) {
CatalogResponseDTO catalogResponseDTO = MANSCDPUtils.parse(item.getRawContent(), CatalogResponseDTO.class);
sumNum = Math.max(sumNum,catalogResponseDTO.getSumNum());
long curNum = num.addAndGet(catalogResponseDTO.getDeviceList().getNum());
log.debug("当前获取数量: {}/{}", curNum, sumNum);
data.addAll(catalogResponseDTO.getDeviceList().getDeviceList());
if(curNum >= sumNum){
log.info("获取完成 {}", key);
subscribe.getSipRequestSubscribe().complete(key);
} else {
subscription.request(1);
}
}
@Override
public void onError(Throwable throwable) {
throwable.printStackTrace();
onComplete();
}
@Override
public void onComplete() {
log.info("{} 返回结果 {}", key, result.complete(data));
data.stream().map(item->{
DockingDeviceChannel model = new DockingDeviceChannel();
model.setGbDeviceId(device.getDeviceId());
model.setGbDeviceChannelId(item.getDeviceId());
model.setName(item.getName());
model.setAddress(item.getAddress());
return model;
}).forEach(deviceChannelService::add);
subscribe.getSipRequestSubscribe().delPublisher(key);
}
});
provider.sendRequest(request);
return result;
}

View File

@ -1,76 +0,0 @@
package cn.skcks.docking.gb28181.service.catalog;
import cn.skcks.docking.gb28181.core.sip.message.subscribe.SipSubscribe;
import cn.skcks.docking.gb28181.orm.mybatis.dynamic.model.DockingDeviceChannel;
import cn.skcks.docking.gb28181.sip.manscdp.catalog.response.CatalogItemDTO;
import cn.skcks.docking.gb28181.sip.manscdp.catalog.response.CatalogResponseDTO;
import cn.skcks.docking.gb28181.sip.utils.MANSCDPUtils;
import gov.nist.javax.sip.message.SIPRequest;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Flow;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
@Slf4j
@RequiredArgsConstructor
public class CatalogSubscriber implements Flow.Subscriber<SIPRequest>{
private final SipSubscribe subscribe;
private final String key;
private final CompletableFuture<List<CatalogItemDTO>> result;
private final String deviceId;
private final Consumer<? super DockingDeviceChannel> addDeviceChannelFunc;
private Flow.Subscription subscription;
private final AtomicLong num = new AtomicLong(0);
private long sumNum = 0;
private final List<CatalogItemDTO> data = new ArrayList<>();
@Override
public void onSubscribe(Flow.Subscription subscription) {
this.subscription = subscription;
subscription.request(1);
}
@Override
public void onNext(SIPRequest item) {
CatalogResponseDTO catalogResponseDTO = MANSCDPUtils.parse(item.getRawContent(), CatalogResponseDTO.class);
sumNum = Math.max(sumNum,catalogResponseDTO.getSumNum());
long curNum = num.addAndGet(catalogResponseDTO.getDeviceList().getNum());
log.debug("当前获取数量: {}/{}", curNum, sumNum);
data.addAll(catalogResponseDTO.getDeviceList().getDeviceList());
if(curNum >= sumNum){
log.info("获取完成 {}", key);
subscribe.getSipRequestSubscribe().complete(key);
} else {
subscription.request(1);
}
}
@Override
public void onError(Throwable throwable) {
throwable.printStackTrace();
onComplete();
}
@Override
public void onComplete() {
log.info("{} 返回结果 {}", key, result.complete(data));
data.stream().map(item->{
DockingDeviceChannel model = new DockingDeviceChannel();
model.setGbDeviceId(deviceId);
model.setGbDeviceChannelId(item.getDeviceId());
model.setName(item.getName());
model.setAddress(item.getAddress());
return model;
}).forEach(addDeviceChannelFunc);
subscribe.getSipRequestSubscribe().delPublisher(key);
}
}

View File

@ -1,66 +0,0 @@
package cn.skcks.docking.gb28181.service.notify;
import cn.hutool.core.collection.CollectionUtil;
import cn.skcks.docking.gb28181.common.json.JsonUtils;
import cn.skcks.docking.gb28181.common.redis.RedisUtil;
import cn.skcks.docking.gb28181.config.sip.SipConfig;
import cn.skcks.docking.gb28181.core.sip.dto.SipTransactionInfo;
import cn.skcks.docking.gb28181.core.sip.gb28181.cache.CacheUtil;
import cn.skcks.docking.gb28181.core.sip.message.sender.SipMessageSender;
import cn.skcks.docking.gb28181.core.sip.message.subscribe.GenericSubscribe;
import cn.skcks.docking.gb28181.core.sip.message.subscribe.SipSubscribe;
import cn.skcks.docking.gb28181.sdp.GB28181SDPBuilder;
import cn.skcks.docking.gb28181.sip.manscdp.mediastatus.notify.MediaStatusRequestDTO;
import cn.skcks.docking.gb28181.sip.method.invite.request.InviteRequestBuilder;
import gov.nist.javax.sip.message.SIPRequest;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.stereotype.Service;
import javax.sip.address.SipURI;
import javax.sip.message.Request;
import java.util.Set;
@RequiredArgsConstructor
@Slf4j
@Service
public class MediaStatusService {
private final SipConfig sipConfig;
private final SipMessageSender sender;
private final SipSubscribe subscribe;
public void process(SIPRequest request, MediaStatusRequestDTO dto){
String senderIp = request.getLocalAddress().getHostAddress();
String deviceId = ((SipURI)request.getFromHeader().getAddress().getURI()).getUser();
if(StringUtils.equalsIgnoreCase(dto.getNotifyType(),"121")){
InviteRequestBuilder inviteRequestBuilder = InviteRequestBuilder.builder()
.localIp(request.getLocalAddress().getHostAddress())
.localPort(sipConfig.getPort())
.localId(((SipURI)request.getToHeader().getAddress().getURI()).getUser())
.targetIp(request.getRemoteAddress().getHostAddress())
.targetPort(request.getRemotePort())
.targetId(((SipURI)request.getFromHeader().getAddress().getURI()).getUser())
.transport(request.getTopmostViaHeader().getTransport())
.build();
String keyPattern = CacheUtil.getKey(GB28181SDPBuilder.Action.PLAY_BACK.getAction(), deviceId,"*");
Set<String> keys = RedisUtil.KeyOps.keys(keyPattern);
if (CollectionUtil.isEmpty(keys)){
// 实在找不到就原样发回去 ()
sender.send(senderIp, inviteRequestBuilder.createByeRequest(request.getCallId().getCallId(), request.getCSeq().getSeqNumber() + 1));
} else {
keys.forEach(key -> {
String json = RedisUtil.StringOps.get(key);
if(StringUtils.isNotBlank(json)){
log.debug("{} {}",key,json);
SipTransactionInfo transactionInfo = JsonUtils.parse(json, SipTransactionInfo.class);
String callId = transactionInfo.getCallId();
String subscribeKey = GenericSubscribe.Helper.getKey(Request.BYE, callId);
log.debug("{} {}",callId,subscribeKey);
subscribe.getSipRequestSubscribe().getPublisher(subscribeKey).submit(request);
}
});
}
}
}
}

View File

@ -47,7 +47,6 @@ import javax.sip.message.Request;
import javax.sip.message.Response;
import java.text.MessageFormat;
import java.util.Date;
import java.util.Objects;
import java.util.concurrent.Flow;
import java.util.concurrent.TimeUnit;
@ -205,7 +204,6 @@ public class PlayService {
subscribe.getSipResponseSubscribe().delPublisher(subscribeKey);
}
};
// 1小时自动关闭
byeSubscribe(inviteRequestBuilder,provider,callId,3600,()->{
RedisUtil.KeyOps.delete(key);
});
@ -273,7 +271,6 @@ public class PlayService {
Request request = inviteRequestBuilder.createPlaybackInviteRequest(callId, SipRequestBuilder.getCSeq(),channelId,ip,port,ssrc,MediaStreamMode.of(device.getStreamMode()),startTime,endTime);
String subscribeKey = GenericSubscribe.Helper.getKey(Request.INVITE, callIdHeader.getCallId());
subscribe.getSipResponseSubscribe().addPublisher(subscribeKey);
Flow.Subscriber<SIPResponse> subscriber = new Flow.Subscriber<>() {
private Flow.Subscription subscription;
@ -294,7 +291,7 @@ public class PlayService {
} else if (statusCode >= Response.OK && statusCode < Response.MULTIPLE_CHOICES) {
log.info("订阅 {} {} 流媒体服务连接成功, 开始推送视频流", Request.INVITE, subscribeKey);
RedisUtil.StringOps.set(key, JsonUtils.toCompressJson(new SipTransactionInfo(item, ssrc)));
RedisUtil.KeyOps.expire(key, DateUtil.between(startTime, endTime, DateUnit.SECOND) + 30, TimeUnit.SECONDS);
RedisUtil.KeyOps.expire(key, DateUtil.between(startTime, endTime, DateUnit.SECOND), TimeUnit.SECONDS);
result.setResult(JsonResponse.success(videoUrl(streamId)));
onComplete();
} else {
@ -355,7 +352,7 @@ public class PlayService {
@SneakyThrows
@Override
public void onComplete() {
if(request != null && Objects.equals(request.getMethod(), Request.BYE)){
if(request != null){
Response byeResponse = InviteResponseBuilder.builder().build().createByeResponse(request, SipUtil.nanoId());
provider.sendResponse(byeResponse);
} else {

View File

@ -55,16 +55,17 @@ public class RecordService {
public DeferredResult<JsonResponse<List<RecordInfoItemVO>>> requestRecordInfo(String deviceId, String channelId, long timeout, Date date) {
log.info("查询 设备 => {} {} 的历史媒体记录, 超时时间 {} 秒", deviceId, DateUtil.formatDate(date), timeout);
DeferredResult<JsonResponse<List<RecordInfoItemVO>>> result = new DeferredResult<>(TimeUnit.SECONDS.toMillis(timeout));
DockingDevice device = deviceService.getDevice(deviceId);
if (device == null) {
log.info("未能找到 编码为 => {} 的设备", deviceId);
result.setResult(JsonResponse.error("未找到设备"));
result.setResult(JsonResponse.error(null, "未找到设备"));
return result;
}
Optional<DockingDeviceChannel> deviceChannel = deviceChannelService.getDeviceChannel(deviceId, channelId);
if(deviceChannel.isEmpty()){
log.info("未能找到 设备编码为 => {}, 通道 => {} 的信息", deviceId, channelId);
result.setResult(JsonResponse.error("未找到通道信息"));
result.setResult(JsonResponse.error(null, "未找到通道信息"));
return result;
}
@ -92,10 +93,58 @@ public class RecordService {
Request request = requestBuilder.createMessageRequest(callId,SipRequestBuilder.getCSeq(), MANSCDPUtils.toByteXml(dto, device.getCharset()));
String key = GenericSubscribe.Helper.getKey(CmdType.RECORD_INFO, channelId, sn);
subscribe.getSipRequestSubscribe().addPublisher(key);
subscribe.getSipRequestSubscribe().addSubscribe(key, new RecordSubscriber(subscribe, key, result, deviceId));
Flow.Subscriber<SIPRequest> subscriber = new Flow.Subscriber<>() {
final List<RecordInfoItemDTO> list = new ArrayList<>();
final AtomicLong atomicSum = new AtomicLong(0);
final AtomicLong atomicNum = new AtomicLong(0);
Flow.Subscription subscription;
@Override
public void onSubscribe(Flow.Subscription subscription) {
this.subscription = subscription;
log.debug("建立订阅 => {}", key);
subscription.request(1);
}
@Override
public void onNext(SIPRequest item) {
RecordInfoResponseDTO data = MANSCDPUtils.parse(item.getRawContent(), RecordInfoResponseDTO.class);
atomicSum.set(Math.max(data.getSumNum(), atomicNum.get()));
atomicNum.addAndGet(data.getRecordList().getNum());
list.addAll(data.getRecordList().getRecordList());
long num = atomicNum.get();
long sum = atomicSum.get();
if(num > sum){
log.warn("检测到 设备 => {}, 未按规范实现, 订阅 => {}, 期望总数为 => {}, 已接收数量 => {}", deviceId, key, atomicSum.get(), atomicNum.get());
} else {
log.info("获取订阅 => {}, {}/{}", key, atomicNum.get(), atomicSum.get());
}
if (num >= sum) {
// 针对某些不按规范的设备
// 如果已获取数量 >= 约定的总数
// 就执行定时任务, 500ms 内未收到新的数据视为已结束
subscribe.getSipRequestSubscribe().refreshPublisher(key,500, TimeUnit.MILLISECONDS);
}
subscription.request(1);
}
@Override
public void onError(Throwable throwable) {
}
@Override
public void onComplete() {
result.setResult(JsonResponse.success(RecordConvertor.INSTANCE.dto2Vo(sortedRecordList(list))));
log.debug("订阅结束 => {}", key);
subscribe.getSipRequestSubscribe().delPublisher(key);
}
};
subscribe.getSipRequestSubscribe().addSubscribe(key, subscriber);
result.onTimeout(() -> {
result.setResult(JsonResponse.build(ResponseStatus.PARTIAL_CONTENT,
RecordConvertor.INSTANCE.dto2Vo(Collections.emptyList()),
RecordConvertor.INSTANCE.dto2Vo(sortedRecordList(Collections.emptyList())),
"查询超时, 结果可能不完整"));
subscribe.getSipRequestSubscribe().delPublisher(key);
});

View File

@ -1,81 +0,0 @@
package cn.skcks.docking.gb28181.service.record;
import cn.hutool.core.date.DateUtil;
import cn.skcks.docking.gb28181.common.json.JsonResponse;
import cn.skcks.docking.gb28181.core.sip.message.subscribe.SipSubscribe;
import cn.skcks.docking.gb28181.service.record.convertor.RecordConvertor;
import cn.skcks.docking.gb28181.service.record.vo.RecordInfoItemVO;
import cn.skcks.docking.gb28181.sip.manscdp.recordinfo.response.RecordInfoItemDTO;
import cn.skcks.docking.gb28181.sip.manscdp.recordinfo.response.RecordInfoResponseDTO;
import cn.skcks.docking.gb28181.sip.utils.MANSCDPUtils;
import gov.nist.javax.sip.message.SIPRequest;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.web.context.request.async.DeferredResult;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Flow;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
@Slf4j
@RequiredArgsConstructor
public class RecordSubscriber implements Flow.Subscriber<SIPRequest>{
private final SipSubscribe subscribe;
private final String key;
private final DeferredResult<JsonResponse<List<RecordInfoItemVO>>> result;
private final String deviceId;
private final List<RecordInfoItemDTO> list = new ArrayList<>();
private final AtomicLong atomicSum = new AtomicLong(0);
private final AtomicLong atomicNum = new AtomicLong(0);
private Flow.Subscription subscription;
@Override
public void onSubscribe(Flow.Subscription subscription) {
this.subscription = subscription;
log.debug("建立订阅 => {}", key);
subscription.request(1);
}
@Override
public void onNext(SIPRequest item) {
RecordInfoResponseDTO data = MANSCDPUtils.parse(item.getRawContent(), RecordInfoResponseDTO.class);
atomicSum.set(Math.max(data.getSumNum(), atomicNum.get()));
atomicNum.addAndGet(data.getRecordList().getNum());
list.addAll(data.getRecordList().getRecordList());
long num = atomicNum.get();
long sum = atomicSum.get();
if(num > sum){
log.warn("检测到 设备 => {}, 未按规范实现, 订阅 => {}, 期望总数为 => {}, 已接收数量 => {}", deviceId, key, atomicSum.get(), atomicNum.get());
} else {
log.info("获取订阅 => {}, {}/{}", key, atomicNum.get(), atomicSum.get());
}
if (num >= sum) {
// 针对某些不按规范的设备
// 如果已获取数量 >= 约定的总数
// 就执行定时任务, 500ms 内未收到新的数据视为已结束
subscribe.getSipRequestSubscribe().refreshPublisher(key,500, TimeUnit.MILLISECONDS);
}
subscription.request(1);
}
@Override
public void onError(Throwable throwable) {
}
@Override
public void onComplete() {
result.setResult(JsonResponse.success(RecordConvertor.INSTANCE.dto2Vo(sortedRecordList(list))));
log.debug("订阅结束 => {}", key);
subscribe.getSipRequestSubscribe().delPublisher(key);
}
private List<RecordInfoItemDTO> sortedRecordList(List<RecordInfoItemDTO> list){
return list.stream().sorted((a,b)-> DateUtil.compare(a.getStartTime(),b.getStartTime())).collect(Collectors.toList());
}
}

View File

@ -1,51 +0,0 @@
package cn.skcks.docking.gb28181.utils;
import cn.skcks.docking.gb28181.common.json.JsonResponse;
import org.springframework.web.context.request.async.DeferredResult;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
public class FutureDeferredResult {
public static <T> DeferredResult<JsonResponse<T>> toDeferredResultWithJson(CompletableFuture<T> future){
DeferredResult<JsonResponse<T>> result = new DeferredResult<>();
future.whenComplete((data,throwable)->{
if(throwable!= null){
result.setResult(JsonResponse.error(throwable.getMessage()));
return;
}
result.setResult(JsonResponse.success(data));
});
return result;
}
public static <T> DeferredResult<JsonResponse<T>> toDeferredResultWithJsonAndTimeout(CompletableFuture<T> future, long time, TimeUnit timeUnit){
DeferredResult<JsonResponse<T>> result = new DeferredResult<>(timeUnit.toMillis(time));
result.onTimeout(()-> result.setResult(JsonResponse.error("请求超时")));
future.whenComplete((data,throwable)->{
if(throwable!= null){
result.setResult(JsonResponse.error(throwable.getMessage()));
return;
}
result.setResult(JsonResponse.success(data));
});
return result;
}
public static <T> DeferredResult<T> toDeferredResult(CompletableFuture<T> future){
DeferredResult<T> result = new DeferredResult<>();
future.whenComplete((data,throwable)->{
result.setResult(data);
});
return result;
}
public static <T> DeferredResult<T> toDeferredResultWithTimeout(CompletableFuture<T> future, T timeoutResult,long time, TimeUnit timeUnit){
DeferredResult<T> result = new DeferredResult<>(timeUnit.toMillis(time), timeoutResult);
future.completeOnTimeout(timeoutResult,time,timeUnit);
future.whenComplete((data, throwable) -> {
result.setResult(data);
});
return result;
}
}

View File

@ -6,7 +6,7 @@
<parent>
<groupId>cn.skcks.docking</groupId>
<artifactId>gb28181</artifactId>
<version>0.1.0</version>
<version>0.1.0-SNAPSHOT</version>
</parent>
<groupId>cn.skcks.docking.gb28181</groupId>

View File

@ -11,7 +11,6 @@ import gov.nist.javax.sdp.fields.ConnectionField;
import gov.nist.javax.sdp.fields.TimeField;
import gov.nist.javax.sdp.fields.URIField;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Getter;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
@ -43,53 +42,6 @@ public class GB28181SDPBuilder {
put("125", "profile-level-id=42e01e");
}};
public static class StreamType {
public interface Attribute<T> {
AttributeField stream();
}
public static AttributeField getAttribute(Attribute<?> attribute) {
return attribute.stream();
}
@AllArgsConstructor
public static class TPLink implements Attribute<String>{
private String stream;
public static final TPLink MAIN = new TPLink("main");
public static final TPLink SUB = new TPLink("sub");
public AttributeField stream() {
return (AttributeField) SdpFactory.getInstance().createAttribute("streamMode", stream);
}
}
@AllArgsConstructor
public static class StreamProfile implements Attribute<Integer> {
private Integer stream;
public static final StreamProfile MAIN = new StreamProfile(0);
public static final StreamProfile SUB = new StreamProfile(1);
@Override
public AttributeField stream() {
return (AttributeField) SdpFactory.getInstance().createAttribute("streamprofile", String.valueOf(stream));
}
}
@AllArgsConstructor
public static class GB2022 implements Attribute<Integer> {
private Integer stream;
public static final GB2022 MAIN = new GB2022(0);
public static final GB2022 SUB = new GB2022(1);
public AttributeField stream(){
return (AttributeField) SdpFactory.getInstance().createAttribute("streamnumber", String.valueOf(stream));
}
}
public static final GB2022 DEFAULT_GB_2022 = GB2022.MAIN;
public static final StreamProfile DEFAULT = StreamProfile.MAIN;
}
@AllArgsConstructor
@Getter
public enum Protocol {
@ -218,26 +170,6 @@ public class GB28181SDPBuilder {
return build(Action.PLAY, deviceId, channelId, netType, rtpIp, rtpPort, ssrc, streamMode, timeDescription);
}
/**
*
* @param deviceId 设备id
* @param channelId 通道id
* @param netType 网络类型
* @param rtpIp rtp服务器ip
* @param rtpPort rtp端口
* @param ssrc ssrc
* @param streamMode 网络类型
* @param streamType 流类型 (/子码流)
* @return GB28181Description sdp
*/
@SneakyThrows
public static GB28181Description play(String deviceId, String channelId, String netType, String rtpIp, int rtpPort, String ssrc, MediaStreamMode streamMode, StreamType.Attribute<?> streamType) {
GB28181Description play = play(deviceId, channelId, netType, rtpIp, rtpPort, ssrc, streamMode);
MediaDescription m = (MediaDescription)play.getMediaDescriptions(false).get(0);
m.addAttribute(StreamType.getAttribute(streamType));
return play;
}
@SneakyThrows
public static GB28181Description playback(String deviceId, String channelId, String netType, String rtpIp, int rtpPort, String ssrc, MediaStreamMode streamMode, Date start, Date end) {
TimeField timeField = new TimeField();

View File

@ -1,77 +0,0 @@
package cn.skcks.docking.gb28181.sip.manscdp.control;
import cn.skcks.docking.gb28181.constant.CmdType;
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.dataformat.xml.annotation.JacksonXmlProperty;
import com.fasterxml.jackson.dataformat.xml.annotation.JacksonXmlRootElement;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
@JacksonXmlRootElement(localName = "Control")
@JsonInclude(JsonInclude.Include.NON_EMPTY)
@Builder
@NoArgsConstructor
@AllArgsConstructor
@Data
public class DeviceControlRequestDTO {
@Builder.Default
private String cmdType = CmdType.DEVICE_CONTROL;
@JacksonXmlProperty(localName = "SN")
private String sn;
/**
* 目标设备的设备编码(必选)
*/
@JacksonXmlProperty(localName = "DeviceID")
private String deviceId;
/**
* 录像控制命令
*/
private String recordCmd;
/**
* 云台控制命令
*/
@JacksonXmlProperty(localName = "PTZCmd")
private String ptzCmd;
/**
* 远程启动
*/
private String teleBoot;
/**
* 布防撤防
*/
private String guardCmd;
/**
* 告警控制
*/
private String alarmCmd;
/**
* 强制关键帧
*/
@JacksonXmlProperty(localName = "IFameCmd")
private String iFameCmd;
/**
* 拉框放大
*/
private String dragZoomIn;
/**
* 拉框缩小
*/
private String dragZoomOut;
/**
* 看守位
*/
private String homePosition;
}

View File

@ -1,30 +0,0 @@
package cn.skcks.docking.gb28181.sip.manscdp.mediastatus.notify;
import cn.skcks.docking.gb28181.constant.CmdType;
import com.fasterxml.jackson.dataformat.xml.annotation.JacksonXmlProperty;
import com.fasterxml.jackson.dataformat.xml.annotation.JacksonXmlRootElement;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
@JacksonXmlRootElement(localName = "Notify")
@AllArgsConstructor
@NoArgsConstructor
@Builder
@Data
public class MediaStatusRequestDTO {
@Builder.Default
private String cmdType = CmdType.MEDIA_STATUS;
@JacksonXmlProperty(localName = "SN")
private String sn;
/**
* 目标设备的设备编码(必选)
*/
@JacksonXmlProperty(localName = "DeviceID")
private String deviceId;
@JacksonXmlProperty(localName = "NotifyType")
private String notifyType = "121";
}

View File

@ -19,7 +19,7 @@ import java.util.Date;
@Builder
public class RecordInfoRequestDTO {
/**
* 命令类型:设备录像查询(必选)
* 命令类型:设备信息查询(必选)
*/
@Builder.Default
private String cmdType = CmdType.RECORD_INFO;
@ -42,26 +42,9 @@ public class RecordInfoRequestDTO {
@JsonFormat(pattern = GB28181Constant.DATETIME_FORMAT, timezone = GB28181Constant.TIME_ZONE)
private Date endTime;
private String filePath;
private String address;
@Builder.Default
private Integer secrecy = 0;
private Integer Secrecy = 0;
@Builder.Default
private String type = "all";
@JacksonXmlProperty(localName = "RecorderID")
private String recorderId;
/**
* 录像模糊查询属性(可选) 缺省为 0; <p/>
* 0: 不进行模糊查询,此时根据SIP消息中To头域
* URI 中的ID值确定查询录像位置, ID 值为本域系统 ID 则进行中心历史记录检索
* 若为前端设备 ID 则进行前端设备历史记录检索; <p/>
*
* 1: 进行模糊查询,此时设备所在域应同时进行中心检索和前端检索并将结果统一返回
*/
private Integer indistinctQuery;
}

View File

@ -6,7 +6,6 @@ import cn.skcks.docking.gb28181.sip.generic.SipBuilder;
import lombok.*;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.ObjectUtils;
import org.apache.commons.lang3.StringUtils;
import javax.sip.SipFactory;
import javax.sip.address.SipURI;
@ -155,7 +154,7 @@ public class DigestAuthenticationHelper {
}
public static boolean doAuthenticatePlainTextPassword(String method,AuthorizationHeader authorizationHeader, String password) {
if(ObjectUtils.anyNull(authorizationHeader)){
if ( authorizationHeader == null || authorizationHeader.getRealm() == null) {
return false;
}
@ -184,6 +183,7 @@ public class DigestAuthenticationHelper {
String ncStr = String.format("%08x", nc).toUpperCase();
String A1 = String.join(":",username , realm , password);
String A2 = String.join(":", method.toUpperCase() , uri.toString());
byte[] mdbytes = messageDigest.digest(A1.getBytes());
String HA1 = toHexString(mdbytes);
log.debug("A1: " + A1);
@ -213,22 +213,15 @@ public class DigestAuthenticationHelper {
mdbytes = messageDigest.digest(KD.getBytes());
String mdString = toHexString(mdbytes);
log.debug("mdString: " + mdString);
String mdString2 = toHexString(messageDigest.digest(StringUtils.joinWith(":", HA1, nonce, nc, cnonce, qop, HA2).getBytes()));
log.debug("mdString2: " + mdString2);
String mdString3 = toHexString(messageDigest.digest(StringUtils.joinWith(":", HA1, nonce, nc, HA2).getBytes()));
log.debug("mdString3: " + mdString);
String response = authorizationHeader.getResponse();
log.debug("response: " + response);
return mdString.equals(response) || mdString2.equals(response) || mdString3.equals(response);
return mdString.equals(response);
}
@SneakyThrows
public static AuthorizationHeader createAuthorization(String method,SipURI sipURI, String deviceId,String password, int nonceCount, WWWAuthenticateHeader www){
public static AuthorizationHeader createAuthorization(String method,String serverIp, int serverPort, String serverId, String deviceId,String password, int nonceCount, WWWAuthenticateHeader www){
String hostAddress = SipBuilder.createHostAddress(serverIp, serverPort);
SipURI sipURI = SipBuilder.createSipURI(serverId, hostAddress);
if (www == null) {
AuthorizationHeader authorizationHeader = SipBuilder.getHeaderFactory().createAuthorizationHeader("Digest");
authorizationHeader.setUsername(deviceId);
@ -287,17 +280,4 @@ public class DigestAuthenticationHelper {
}
return authorizationHeader;
}
@SneakyThrows
public static AuthorizationHeader createAuthorization(String method,String domain, String serverId, String deviceId,String password, int nonceCount, WWWAuthenticateHeader www){
SipURI sipURI = SipBuilder.createSipURI(serverId, domain);
return createAuthorization(method, sipURI, deviceId, password, nonceCount, www);
}
@SneakyThrows
public static AuthorizationHeader createAuthorization(String method,String serverIp, int serverPort, String serverId, String deviceId,String password, int nonceCount, WWWAuthenticateHeader www){
String hostAddress = SipBuilder.createHostAddress(serverIp, serverPort);
SipURI sipURI = SipBuilder.createSipURI(serverId, hostAddress);
return createAuthorization(method, sipURI, deviceId, password, nonceCount, www);
}
}

View File

@ -62,9 +62,6 @@ public class SdpTest {
GB28181Description play = GB28181SDPBuilder.Receiver.play(deviceId, channelId, rtpNetType, rtpIp, rtpPort, ssrc, MediaStreamMode.TCP_ACTIVE);
log.info("sdp play 请求\n{}",play);
play = GB28181SDPBuilder.Receiver.play(deviceId, channelId, rtpNetType, rtpIp, rtpPort, ssrc, MediaStreamMode.TCP_ACTIVE, GB28181SDPBuilder.StreamType.GB2022.MAIN);
log.info("sdp play 请求\n{}",play);
final String psType = "96";
Map<String,String> respRtpMap = new HashMap<>(){{
put(psType, GB28181SDPBuilder.RTPMAP.get(psType));
@ -73,14 +70,4 @@ public class SdpTest {
GB28181Description resp = GB28181SDPBuilder.Sender.build(play, sendRtpIp, sendRtpPort,respRtpMap,null);
log.info("sdp 响应\n{}", resp);
}
@Test
public void streamType() {
log.info("{}", GB28181SDPBuilder.StreamType.getAttribute(GB28181SDPBuilder.StreamType.TPLink.MAIN));
log.info("{}", GB28181SDPBuilder.StreamType.getAttribute(GB28181SDPBuilder.StreamType.TPLink.SUB));
log.info("{}", GB28181SDPBuilder.StreamType.getAttribute(GB28181SDPBuilder.StreamType.GB2022.MAIN));
log.info("{}", GB28181SDPBuilder.StreamType.getAttribute(GB28181SDPBuilder.StreamType.GB2022.SUB));
log.info("{}", GB28181SDPBuilder.StreamType.getAttribute(new GB28181SDPBuilder.StreamType.GB2022(5)));
}
}

View File

@ -1,6 +1,5 @@
package cn.skcks.docking.gb28181.sip.utils;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.Test;
@ -10,39 +9,25 @@ import javax.sip.message.Request;
@Slf4j
public class AuthenticationTest {
public static final String serverId = "44050100002000000003";
public static final String serverIp = "10.10.10.20";
public static final String serverId = "44050100002000000001";
public static final String serverIp = "10.10.10.200";
public static final int serverPort = 5060;
public static final String domain = "4405010000";
public static final String deviceId = "44050100001110000035";
public static final String deviceId = "44050100001110000010";
@SneakyThrows
@Test
void test() {
AuthorizationHeader authorization = DigestAuthenticationHelper.createAuthorization(Request.REGISTER, serverIp, serverPort, serverId, deviceId, "123456", 1,null);
log.info("\n{}", authorization);
WWWAuthenticateHeader wwwAuthenticateHeader = DigestAuthenticationHelper.generateChallenge(domain);
wwwAuthenticateHeader.setAlgorithm("MD5");
wwwAuthenticateHeader.setQop("auth");
wwwAuthenticateHeader.setNonce("08a895ede05c7ac592ced4070c1ef4aa");
wwwAuthenticateHeader.setRealm(domain);
log.info("\n{}", wwwAuthenticateHeader);
authorization = DigestAuthenticationHelper.createAuthorization(Request.REGISTER, serverIp, serverPort, serverId, deviceId, "123456", 1, wwwAuthenticateHeader);
log.info("\n{}", authorization);
boolean passed = DigestAuthenticationHelper.doAuthenticatePlainTextPassword(Request.REGISTER, authorization, "123456");
log.info("authorization passed {}", passed);
authorization = DigestAuthenticationHelper.createAuthorization(Request.REGISTER, domain, serverId, deviceId, "123456", 1, wwwAuthenticateHeader);
log.info("\n{}", authorization);
passed = DigestAuthenticationHelper.doAuthenticatePlainTextPassword(Request.REGISTER, authorization, "123456");
log.info("authorization passed {}", passed);
}
}

View File

@ -6,7 +6,7 @@
<parent>
<groupId>cn.skcks.docking</groupId>
<artifactId>gb28181</artifactId>
<version>0.1.0</version>
<version>0.1.0-SNAPSHOT</version>
</parent>
<groupId>cn.skcks.docking.gb28181</groupId>

117
pom.xml
View File

@ -8,35 +8,12 @@
<version>3.1.2</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>cn.skcks.docking</groupId>
<artifactId>gb28181</artifactId>
<version>0.1.0</version>
<version>0.1.0-SNAPSHOT</version>
<packaging>pom</packaging>
<name>gb28181-docking-platform</name>
<description>GB28181 Docking Platform</description>
<developers>
<developer>
<name>shikong</name>
<email>919411476@qq.com</email>
</developer>
</developers>
<licenses>
<license>
<name>Apache License, Version 2.0</name>
<url>https://www.apache.org/licenses/LICENSE-2.0.txt</url>
</license>
</licenses>
<scm>
<connection>scm:git:git://github.com/shikong-sk/gb28181-docking-platform.git</connection>
<developerConnection>scm:git:ssh://github.com/shikong-sk/gb28181-docking-platform.git</developerConnection>
<url>https://github.com/shikong-sk/gb28181-docking-platform/tree/master</url>
</scm>
<url>https://github.com/shikong-sk/gb28181-docking-platform.git</url>
<modules>
<module>starter</module>
<module>annotation</module>
@ -48,7 +25,6 @@
<module>gb28181-sip</module>
</modules>
<properties>
<maven.compiler.source>17</maven.compiler.source>
<maven.compiler.target>17</maven.compiler.target>
@ -79,10 +55,10 @@
<springdoc.version>2.2.0</springdoc.version>
<!--Docker打包配置-->
<!-- <docker.repository.url>10.10.10.200:5000</docker.repository.url>-->
<!-- <docker.registry.name>skcks.cn</docker.registry.name>-->
<!-- <docker.registry.username>XXX</docker.registry.username>-->
<!-- <docker.registry.password>XXX</docker.registry.password>-->
<!-- <docker.repository.url>10.10.10.200:5000</docker.repository.url>-->
<!-- <docker.registry.name>skcks.cn</docker.registry.name>-->
<!-- <docker.registry.username>XXX</docker.registry.username>-->
<!-- <docker.registry.password>XXX</docker.registry.password>-->
<docker.maven.plugin.version>1.4.13</docker.maven.plugin.version>
<jain-sip.version>1.3.0-91</jain-sip.version>
@ -108,89 +84,6 @@
<activeByDefault>false</activeByDefault>
</activation>
</profile>
<profile>
<id>ossrh</id>
<properties>
<skip.docker>true</skip.docker>
</properties>
<activation>
<activeByDefault>false</activeByDefault>
</activation>
<build>
<plugins>
<plugin>
<groupId>org.sonatype.central</groupId>
<artifactId>central-publishing-maven-plugin</artifactId>
<version>0.4.0</version>
<extensions>true</extensions>
<configuration>
<publishingServerId>ossrh</publishingServerId>
<centralBaseUrl>https://central.sonatype.com</centralBaseUrl>
<tokenAuth>true</tokenAuth>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-source-plugin</artifactId>
<version>2.2.1</version>
<executions>
<execution>
<id>attach-sources</id>
<goals>
<goal>jar-no-fork</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-javadoc-plugin</artifactId>
<version>2.9.1</version>
<configuration>
<additionalparam>-Xdoclint:none</additionalparam>
</configuration>
<executions>
<execution>
<id>attach-javadocs</id>
<goals>
<goal>jar</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-gpg-plugin</artifactId>
<version>1.5</version>
<executions>
<execution>
<id>sign-artifacts</id>
<phase>verify</phase>
<goals>
<goal>sign</goal>
</goals>
<configuration>
<keyname>2FA8C646FBB668DAF3E6B08CBD85FF18B373C341</keyname>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
<distributionManagement>
<snapshotRepository>
<id>ossrh</id>
<url>https://s01.oss.sonatype.org/content/repositories/snapshots</url>
</snapshotRepository>
<repository>
<id>ossrh</id>
<url>https://s01.oss.sonatype.org/service/local/staging/deploy/maven2/</url>
</repository>
</distributionManagement>
</profile>
</profiles>
<dependencyManagement>

View File

@ -6,7 +6,7 @@
<parent>
<groupId>cn.skcks.docking</groupId>
<artifactId>gb28181</artifactId>
<version>0.1.0</version>
<version>0.1.0-SNAPSHOT</version>
</parent>
<groupId>cn.skcks.docking.gb28181</groupId>

View File

@ -6,7 +6,7 @@
<parent>
<groupId>cn.skcks.docking</groupId>
<artifactId>gb28181</artifactId>
<version>0.1.0</version>
<version>0.1.0-SNAPSHOT</version>
</parent>
<artifactId>zlmediakit-service</artifactId>

View File

@ -1,41 +0,0 @@
package cn.skcks.docking.gb28181.media.dto.proxy;
import com.fasterxml.jackson.databind.PropertyNamingStrategies;
import com.fasterxml.jackson.databind.annotation.JsonNaming;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
@JsonNaming(PropertyNamingStrategies.SnakeCaseStrategy.class)
public class AddFFmpegSource {
/**
* FFmpeg 拉流地址,支持任意协议或格式(只要 FFmpeg 支持即可)
*/
private String srcUrl;
/**
* FFmpeg rtmp 推流地址一般都是推给自己
* <p>例如 rtmp://127.0.0.1/live/stream_form_ffmpeg</p>
*/
private String dstUrl;
/**
* FFmpeg 推流成功超时时间
*/
private long timeoutMs;
/**
* 是否开启 hls 录制
*/
private Boolean enableHls;
/**
* 是否开启 mp4 录制
*/
private Boolean enableMp4;
/**
* 配置文件中 FFmpeg 命令参数模板 key(非内容)置空则采用默认模板:ffmpeg.cmd
*/
private String ffmpegCmdKey;
}

View File

@ -1,11 +0,0 @@
package cn.skcks.docking.gb28181.media.dto.proxy;
import lombok.Data;
@Data
public class AddFFmpegSourceResp {
/**
* 流的唯一标识
*/
private String key;
}

View File

@ -1,8 +0,0 @@
package cn.skcks.docking.gb28181.media.dto.proxy;
import lombok.Data;
@Data
public class DelFFmpegSourceResp {
private Boolean flag;
}

View File

@ -1,41 +0,0 @@
package cn.skcks.docking.gb28181.media.dto.record;
import com.fasterxml.jackson.databind.PropertyNamingStrategies;
import com.fasterxml.jackson.databind.annotation.JsonNaming;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
@Builder
@AllArgsConstructor
@NoArgsConstructor
@JsonNaming(PropertyNamingStrategies.SnakeCaseStrategy.class)
@Data
public class DeleteRecordDirectory {
/**
* 添加的流的虚拟主机例如__defaultVhost__
*/
@Builder.Default
private String vhost = "__defaultVhost__";
/**
* 添加的流的应用名例如live
*/
private String app;
/**
* 添加的流的id名
*/
private String stream;
/**
* 流的录像日期格式为 2020-02-01,如果不是完整的日期那么是搜索录像文件夹列表否则搜索对应日期下的 mp4 文件列表
*/
private String period;
/**
* 自定义搜索路径 startRecord 方法中的 customized_path 一样默认为配置文件的路径
*/
private String customizedPath;
}

View File

@ -1,19 +0,0 @@
package cn.skcks.docking.gb28181.media.dto.record;
import cn.skcks.docking.gb28181.media.dto.status.ResponseStatus;
import com.fasterxml.jackson.databind.PropertyNamingStrategies;
import com.fasterxml.jackson.databind.annotation.JsonNaming;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
@Builder
@AllArgsConstructor
@NoArgsConstructor
@JsonNaming(PropertyNamingStrategies.SnakeCaseStrategy.class)
@Data
public class DeleteRecordDirectoryResp {
private ResponseStatus code;
private String path;
}

View File

@ -1,41 +0,0 @@
package cn.skcks.docking.gb28181.media.dto.record;
import com.fasterxml.jackson.databind.PropertyNamingStrategies;
import com.fasterxml.jackson.databind.annotation.JsonNaming;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
@Builder
@AllArgsConstructor
@NoArgsConstructor
@JsonNaming(PropertyNamingStrategies.SnakeCaseStrategy.class)
@Data
public class GetMp4RecordFile {
/**
* 添加的流的虚拟主机例如__defaultVhost__
*/
@Builder.Default
private String vhost = "__defaultVhost__";
/**
* 添加的流的应用名例如live
*/
private String app;
/**
* 添加的流的id名
*/
private String stream;
/**
* 流的录像日期格式为 2020-02-01,如果不是完整的日期那么是搜索录像文件夹列表否则搜索对应日期下的 mp4 文件列表
*/
private String period;
/**
* 自定义搜索路径 startRecord 方法中的 customized_path 一样默认为配置文件的路径
*/
private String customizedPath;
}

View File

@ -1,9 +0,0 @@
package cn.skcks.docking.gb28181.media.dto.record;
import cn.skcks.docking.gb28181.media.dto.response.ZlmResponse;
import lombok.NoArgsConstructor;
@NoArgsConstructor
public class GetMp4RecordFileResp extends ZlmResponse<GetMp4RecordFileRespData> {
}

View File

@ -1,15 +0,0 @@
package cn.skcks.docking.gb28181.media.dto.record;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.util.List;
@NoArgsConstructor
@AllArgsConstructor
@Data
public class GetMp4RecordFileRespData {
private List<String> paths;
private String rootPath;
}

View File

@ -1,36 +0,0 @@
package cn.skcks.docking.gb28181.media.dto.record;
import com.fasterxml.jackson.databind.PropertyNamingStrategies;
import com.fasterxml.jackson.databind.annotation.JsonNaming;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
@Builder
@NoArgsConstructor
@AllArgsConstructor
@Data
@JsonNaming(PropertyNamingStrategies.SnakeCaseStrategy.class)
public class IsRecording {
/**
* 添加的流的虚拟主机例如__defaultVhost__
*/
@Builder.Default
private String vhost = "__defaultVhost__";
/**
* 0 hls1 mp4
*/
@Builder.Default
private Integer type = 1;
/**
* 添加的流的应用名例如live
*/
private String app;
/**
* 添加的流的id名
*/
private String stream;
}

View File

@ -1,19 +0,0 @@
package cn.skcks.docking.gb28181.media.dto.record;
import cn.skcks.docking.gb28181.media.dto.status.ResponseStatus;
import com.fasterxml.jackson.databind.PropertyNamingStrategies;
import com.fasterxml.jackson.databind.annotation.JsonNaming;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
@Builder
@NoArgsConstructor
@AllArgsConstructor
@Data
@JsonNaming(PropertyNamingStrategies.SnakeCaseStrategy.class)
public class IsRecordingResp {
private ResponseStatus code;
private Boolean status;
}

View File

@ -1,46 +0,0 @@
package cn.skcks.docking.gb28181.media.dto.record;
import com.fasterxml.jackson.databind.PropertyNamingStrategies;
import com.fasterxml.jackson.databind.annotation.JsonNaming;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
@NoArgsConstructor
@AllArgsConstructor
@Builder
@Data
@JsonNaming(PropertyNamingStrategies.SnakeCaseStrategy.class)
public class StartRecord {
/**
* 添加的流的虚拟主机例如__defaultVhost__
*/
@Builder.Default
private String vhost = "__defaultVhost__";
/**
* 0 hls1 mp4
*/
@Builder.Default
private Integer type = 1;
/**
* 添加的流的应用名例如live
*/
private String app;
/**
* 添加的流的id名
*/
private String stream;
/**
* 录像保存目录
*/
private String customizedPath;
/**
* mp4 录像切片时间大小,单位秒 0 则采用配置项
*/
private int maxSecond = 0;
}

View File

@ -1,19 +0,0 @@
package cn.skcks.docking.gb28181.media.dto.record;
import cn.skcks.docking.gb28181.media.dto.status.ResponseStatus;
import com.fasterxml.jackson.databind.PropertyNamingStrategies;
import com.fasterxml.jackson.databind.annotation.JsonNaming;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
@Builder
@NoArgsConstructor
@AllArgsConstructor
@Data
@JsonNaming(PropertyNamingStrategies.SnakeCaseStrategy.class)
public class StartRecordResp {
private ResponseStatus code;
private Boolean result;
}

View File

@ -1,36 +0,0 @@
package cn.skcks.docking.gb28181.media.dto.record;
import com.fasterxml.jackson.databind.PropertyNamingStrategies;
import com.fasterxml.jackson.databind.annotation.JsonNaming;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
@Builder
@NoArgsConstructor
@AllArgsConstructor
@Data
@JsonNaming(PropertyNamingStrategies.SnakeCaseStrategy.class)
public class StopRecord {
/**
* 添加的流的虚拟主机例如__defaultVhost__
*/
@Builder.Default
private String vhost = "__defaultVhost__";
/**
* 0 hls1 mp4
*/
@Builder.Default
private Integer type = 1;
/**
* 添加的流的应用名例如live
*/
private String app;
/**
* 添加的流的id名
*/
private String stream;
}

View File

@ -1,19 +0,0 @@
package cn.skcks.docking.gb28181.media.dto.record;
import cn.skcks.docking.gb28181.media.dto.status.ResponseStatus;
import com.fasterxml.jackson.databind.PropertyNamingStrategies;
import com.fasterxml.jackson.databind.annotation.JsonNaming;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
@Builder
@NoArgsConstructor
@AllArgsConstructor
@Data
@JsonNaming(PropertyNamingStrategies.SnakeCaseStrategy.class)
public class StopRecordResp {
private ResponseStatus code;
private Boolean result;
}

View File

@ -4,7 +4,6 @@ import cn.skcks.docking.gb28181.media.dto.config.ServerConfig;
import cn.skcks.docking.gb28181.media.dto.media.GetMediaList;
import cn.skcks.docking.gb28181.media.dto.media.MediaResp;
import cn.skcks.docking.gb28181.media.dto.proxy.*;
import cn.skcks.docking.gb28181.media.dto.record.*;
import cn.skcks.docking.gb28181.media.dto.response.ZlmResponse;
import cn.skcks.docking.gb28181.media.dto.rtp.*;
import cn.skcks.docking.gb28181.media.dto.snap.Snap;
@ -18,7 +17,6 @@ import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestParam;
import java.util.List;
import java.util.Map;
@FeignClient(name="zlmMediaServerProxy", url = "${media.url}", configuration = IgnoreSSLFeignClientConfig.class)
public interface ZlmMediaHttpClient {
@ -78,25 +76,4 @@ public interface ZlmMediaHttpClient {
@GetMapping("/index/api/getRtpInfo")
GetRtpInfoResp getRtpInfo(@RequestParam String secret,@RequestParam("stream_id") String streamId);
@PostMapping("/index/api/addFFmpegSource")
ZlmResponse<AddFFmpegSourceResp> addFFmpegSource(@RequestParam String secret,@RequestBody AddFFmpegSource params);
@GetMapping("/index/api/delFFmpegSource")
ZlmResponse<DelFFmpegSourceResp> delFFmpegSource(@RequestParam String secret, @RequestParam String key);
@PostMapping("/index/api/startRecord")
StartRecordResp startRecord(@RequestParam String secret, @RequestBody StartRecord params);
@PostMapping("/index/api/stopRecord")
StopRecordResp stopRecord(@RequestParam String secret, @RequestBody StopRecord params);
@PostMapping("/index/api/isRecording")
IsRecordingResp isRecording(@RequestParam String secret, @RequestBody IsRecording params);
@PostMapping("/index/api/getMp4RecordFile")
GetMp4RecordFileResp getMp4RecordFile(@RequestParam String secret, @RequestBody GetMp4RecordFile params);
@PostMapping("/index/api/deleteRecordDirectory")
DeleteRecordDirectoryResp deleteRecordDirectory(@RequestParam String secret, @RequestBody DeleteRecordDirectory params);
}

View File

@ -4,19 +4,14 @@ import cn.skcks.docking.gb28181.media.dto.config.ServerConfig;
import cn.skcks.docking.gb28181.media.dto.media.GetMediaList;
import cn.skcks.docking.gb28181.media.dto.media.MediaResp;
import cn.skcks.docking.gb28181.media.dto.proxy.*;
import cn.skcks.docking.gb28181.media.dto.record.*;
import cn.skcks.docking.gb28181.media.dto.response.ZlmResponse;
import cn.skcks.docking.gb28181.media.dto.rtp.*;
import cn.skcks.docking.gb28181.media.dto.snap.Snap;
import cn.skcks.docking.gb28181.media.dto.version.VersionResp;
import lombok.Builder;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestParam;
import java.util.List;
import java.util.Map;
@Builder
@SuppressWarnings("unused")
@ -164,55 +159,5 @@ public class ZlmMediaService {
public GetRtpInfoResp getRtpInfo(String streamId){
return exchange.getRtpInfo(secret, streamId);
}
/**
* 功能通过 fork FFmpeg 进程的方式拉流代理支持任意协议
*/
public ZlmResponse<AddFFmpegSourceResp> addFfmpegSource(AddFFmpegSource params){
return exchange.addFFmpegSource(secret, params);
}
/**
* 功能关闭 ffmpeg 拉流代理
*/
public ZlmResponse<DelFFmpegSourceResp> delFfmpegSource(String key){
return exchange.delFFmpegSource(secret, key);
}
/**
* 开始录制 hls MP4
*/
public StartRecordResp startRecord(@RequestBody StartRecord params){
return exchange.startRecord(secret, params);
}
/**
* 停止录制流
*/
public StopRecordResp stopRecord(@RequestBody StopRecord params){
return exchange.stopRecord(secret, params);
}
/**
* 获取流录制状态
*/
public IsRecordingResp isRecording(@RequestBody IsRecording params){
return exchange.isRecording(secret, params);
}
/**
* 搜索文件系统获取流对应的录像文件列表或日期文件夹列表
*/
public GetMp4RecordFileResp getMp4RecordFile(@RequestBody GetMp4RecordFile params){
return exchange.getMp4RecordFile(secret, params);
}
public DeleteRecordDirectoryResp deleteRecordDirectory(@RequestBody DeleteRecordDirectory params){
return exchange.deleteRecordDirectory(secret, params);
}
}

View File

@ -3,7 +3,7 @@ project:
media:
ip: 10.10.10.200
url: 'http://10.10.10.200:5081'
url: 'http://10.10.10.200:5080'
# url: 'http://10.10.10.200:12580/anything/'
id: amrWMKmbKqoBjRQ9
secret: 4155cca6-2f9f-11ee-85e6-8de4ce2e7333

View File

@ -1,7 +1,6 @@
package cn.skcks.docking.gb28181.test;
import cn.hutool.core.codec.Base64;
import cn.hutool.core.date.DateUtil;
import cn.hutool.core.date.LocalDateTimeUtil;
import cn.skcks.docking.gb28181.common.json.JsonResponse;
import cn.skcks.docking.gb28181.common.json.JsonUtils;
@ -11,7 +10,6 @@ import cn.skcks.docking.gb28181.media.dto.config.ServerConfig;
import cn.skcks.docking.gb28181.media.dto.media.GetMediaList;
import cn.skcks.docking.gb28181.media.dto.media.MediaResp;
import cn.skcks.docking.gb28181.media.dto.proxy.*;
import cn.skcks.docking.gb28181.media.dto.record.*;
import cn.skcks.docking.gb28181.media.dto.response.ZlmResponse;
import cn.skcks.docking.gb28181.media.dto.response.ZlmResponseConvertor;
import cn.skcks.docking.gb28181.media.dto.rtp.*;
@ -33,10 +31,6 @@ import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneOffset;
import java.util.*;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
@Slf4j
@SpringBootTest
@ -47,7 +41,6 @@ public class MediaServiceTest {
private ZlmMediaService zlmMediaService;
@Autowired
private ZlmMediaConfig config;
private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
@Test
void test(){
@ -219,72 +212,4 @@ public class MediaServiceTest {
GetRtpInfoResp rtpInfo = zlmMediaService.getRtpInfo("test");
log.info("{}", rtpInfo);
}
@SneakyThrows
@Test
void recordTest(){
CountDownLatch countDownLatch = new CountDownLatch(1);
String customizedPath = "/tmp/record";
DeleteRecordDirectoryResp deleteRecordDirectoryResp = zlmMediaService.deleteRecordDirectory(DeleteRecordDirectory.builder()
.app("live")
.stream("test")
.period(DateUtil.formatDate(DateUtil.date()))
.customizedPath(customizedPath).build());
log.info("{}", deleteRecordDirectoryResp);
// ZlmResponse<AddStreamProxyResp> addedStreamProxy = zlmMediaService.addStreamProxy(AddStreamProxy.builder()
// .app("live")
// .stream("test")
// .vhost("__defaultVhost__")
// .url("rtsp://10.10.10.200:554/camera/121")
// .build());
// log.info("addedStreamProxy {}", addedStreamProxy);
ZlmResponse<AddFFmpegSourceResp> addFFmpegSourceRespZlmResponse = zlmMediaService.addFfmpegSource(AddFFmpegSource.builder()
.dstUrl("rtmp://10.10.10.200:1936/live/test")
.srcUrl("http://10.10.10.200:18183/video")
.timeoutMs(30 * 1000L)
.enableHls(false)
.enableMp4(false)
.build());
log.info("addFfmpegSource {}", addFFmpegSourceRespZlmResponse);
ZlmResponse<List<MediaResp>> mediaList = zlmMediaService.getMediaList(GetMediaList.builder()
.schema("rtsp")
.app("live")
.stream("test")
.build());
log.info("mediaList {}", mediaList);
StartRecordResp startRecordResp = zlmMediaService.startRecord(StartRecord.builder()
.app("live")
.stream("test")
.vhost("__defaultVhost__")
.customizedPath(customizedPath)
.build());
log.info("startRecordResp {}", startRecordResp);
scheduledExecutorService.schedule(() -> {
ZlmResponse<DelFFmpegSourceResp> delFFmpegSourceRespZlmResponse = zlmMediaService.delFfmpegSource(addFFmpegSourceRespZlmResponse.getData().getKey());
log.info("delFFmpegSourceRespZlmResponse {}", delFFmpegSourceRespZlmResponse);
StopRecordResp stopRecordResp = zlmMediaService.stopRecord(StopRecord.builder()
.app("live")
.stream("test")
.vhost("__defaultVhost__")
.build());
log.info("stopRecordResp {}", stopRecordResp);
GetMp4RecordFileResp mp4RecordFile = zlmMediaService.getMp4RecordFile(GetMp4RecordFile.builder()
.app("live")
.stream("test")
.period(DateUtil.formatDate(DateUtil.date()))
.customizedPath(customizedPath).build());
log.info("mp4RecordFile {}", mp4RecordFile);
countDownLatch.countDown();
}, 15, TimeUnit.SECONDS);
countDownLatch.await();
}
}