webflux 试水

This commit is contained in:
shikong 2023-09-07 02:42:28 +08:00
parent 111d44b187
commit 2912cff24f
12 changed files with 99 additions and 66 deletions

View File

@ -37,7 +37,7 @@
<dependency>
<groupId>org.springdoc</groupId>
<artifactId>springdoc-openapi-starter-webmvc-ui</artifactId>
<artifactId>springdoc-openapi-starter-webflux-ui</artifactId>
<version>${springdoc.version}</version>
</dependency>
@ -48,7 +48,7 @@
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
<artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
<dependency>
@ -62,4 +62,4 @@
<artifactId>jakarta.servlet-api</artifactId>
</dependency>
</dependencies>
</project>
</project>

View File

@ -1,7 +1,7 @@
package cn.skcks.docking.gb28181.wvp.advice;
import jakarta.servlet.http.HttpServletRequest;
import lombok.extern.slf4j.Slf4j;
import org.springframework.http.server.reactive.ServerHttpRequest;
import org.springframework.web.bind.annotation.ControllerAdvice;
import org.springframework.web.bind.annotation.ExceptionHandler;
@ -14,8 +14,8 @@ import org.springframework.web.bind.annotation.ExceptionHandler;
@ControllerAdvice
public class BasicExceptionAdvice {
@ExceptionHandler(Exception.class)
public void exception(HttpServletRequest request, Exception e) {
if(request.getRequestURI().equals("/video")){
public void exception(ServerHttpRequest request, Exception e) {
if(request.getURI().getPath().equals("/video")){
return;
}
e.printStackTrace();

View File

@ -1,10 +1,12 @@
package cn.skcks.docking.gb28181.wvp.api.video;
import cn.skcks.docking.gb28181.wvp.service.video.RecordService;
import jakarta.servlet.http.HttpServletResponse;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.http.server.reactive.ServerHttpResponse;
import org.springframework.web.bind.annotation.*;
import org.springframework.web.server.ServerWebExchange;
import reactor.core.publisher.Mono;
@Slf4j
@RequiredArgsConstructor
@ -14,12 +16,13 @@ public class RecordController {
private final RecordService recordService;
@RequestMapping(method = {RequestMethod.HEAD,RequestMethod.OPTIONS})
public void record(HttpServletResponse response){
recordService.header(response);
public void record(ServerWebExchange exchange){
recordService.header(exchange.getResponse());
}
@GetMapping
public void record(HttpServletResponse response, @RequestParam String url,@RequestParam long time){
recordService.record(response,url,time);
public Mono<Void> record(ServerWebExchange exchange, @RequestParam String url, @RequestParam long time){
log.info("{} {}", url,time);
return recordService.record(exchange.getResponse(),url,time);
}
}

View File

@ -1,4 +1,4 @@
package cn.skcks.docking.gb28181.wvp.api;
package cn.skcks.docking.gb28181.wvp.api.video;
import cn.hutool.core.io.IoUtil;
import cn.hutool.core.util.IdUtil;

View File

@ -1,22 +1,34 @@
package cn.skcks.docking.gb28181.wvp.config;
import cn.skcks.docking.gb28181.wvp.interceptor.RequestInterceptor;
import cn.hutool.core.util.ReUtil;
import jakarta.validation.constraints.NotNull;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.servlet.config.annotation.InterceptorRegistry;
import org.springframework.web.servlet.config.annotation.WebMvcConfigurer;
import org.springframework.web.server.ServerWebExchange;
import org.springframework.web.server.WebFilter;
import org.springframework.web.server.WebFilterChain;
import reactor.core.publisher.Mono;
import java.net.InetSocketAddress;
import java.util.Optional;
@Slf4j
@Configuration
@RequiredArgsConstructor
public class WebConfig implements WebMvcConfigurer {
private final RequestInterceptor requestInterceptor;
public class WebConfig implements WebFilter {
@Override
public void addInterceptors(@NotNull InterceptorRegistry registry) {
registry.addInterceptor(requestInterceptor)
.excludePathPatterns("/swagger-ui/**","/v3/api-docs/**")
.addPathPatterns("/**");
public Mono<Void> filter(ServerWebExchange exchange, WebFilterChain chain) {
String path = exchange.getRequest().getPath().toString();
if(ReUtil.isMatch(".*/swagger-ui/.*",path) || ReUtil.isMatch("/v3/api-docs.*",path) ){
return chain.filter(exchange);
} else {
String ip = Optional.ofNullable(exchange.getRequest().getRemoteAddress())
.orElse(new InetSocketAddress("0.0.0.0",0)).getHostString();
log.info("{} 访问 {}", ip, path);
}
return chain.filter(exchange);
}
}

View File

@ -1,20 +0,0 @@
package cn.skcks.docking.gb28181.wvp.interceptor;
import jakarta.servlet.http.HttpServletRequest;
import jakarta.servlet.http.HttpServletResponse;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import org.springframework.web.servlet.HandlerInterceptor;
@Slf4j
@Component
@SuppressWarnings({"unused"})
@RequiredArgsConstructor
public class RequestInterceptor implements HandlerInterceptor {
@Override
public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception {
log.info("{} 访问 {}",request.getRemoteHost(), request.getRequestURI());
return true;
}
}

View File

@ -32,11 +32,17 @@
<dependency>
<groupId>cn.skcks.docking</groupId>
<artifactId>zlmediakit-service</artifactId>
<exclusions>
<exclusion>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
<artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
<dependency>

View File

@ -2,15 +2,24 @@ package cn.skcks.docking.gb28181.wvp.service.video;
import cn.hutool.core.io.IoUtil;
import cn.hutool.core.util.IdUtil;
import jakarta.servlet.ServletOutputStream;
import jakarta.servlet.http.HttpServletResponse;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.bytedeco.ffmpeg.global.avcodec;
import org.bytedeco.ffmpeg.global.avutil;
import org.bytedeco.javacv.*;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.DefaultDataBuffer;
import org.springframework.core.io.buffer.DefaultDataBufferFactory;
import org.springframework.http.HttpHeaders;
import org.springframework.http.MediaType;
import org.springframework.http.server.reactive.ServerHttpResponse;
import org.springframework.stereotype.Service;
import reactor.core.publisher.Mono;
import java.io.*;
import java.net.http.HttpClient;
import java.nio.file.Path;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
@ -20,45 +29,48 @@ import java.util.concurrent.atomic.AtomicBoolean;
@Slf4j
@Service
public class RecordService {
public void header(HttpServletResponse response){
response.setContentType("video/mp4");
response.setHeader("Accept-Ranges","none");
response.setHeader("Connection","close");
public void header(ServerHttpResponse response){
HttpHeaders headers = response.getHeaders();
headers.setContentType(MediaType.APPLICATION_OCTET_STREAM);
headers.set("Accept-Ranges","none");
headers.setConnection("close");
}
// @Async(Default)
@SneakyThrows
public void record(HttpServletResponse response, String url, long timeout){
response.reset();
public Mono<Void> record(ServerHttpResponse response, String url, long timeout){
Mono<Void> mono = Mono.empty();
// response.setContentType(MediaType.APPLICATION_OCTET_STREAM_VALUE);
header(response);
Path tmp = Path.of(System.getProperty("java.io.tmpdir"), IdUtil.getSnowflakeNextIdStr()).toAbsolutePath();
File file = new File(tmp + ".mp4");
log.info("创建文件 {}, {}", file, file.createNewFile());
log.info("url {}", url);
DataBuffer dataBuffer = response.bufferFactory().allocateBuffer(DefaultDataBufferFactory.DEFAULT_INITIAL_CAPACITY);
OutputStream outputStream = dataBuffer.asOutputStream();
try (FFmpegFrameGrabber grabber = new FFmpegFrameGrabber(url)) {
grabber.start();
try(FFmpegFrameRecorder recorder = new FFmpegFrameRecorder(file, grabber.getImageWidth(), grabber.getImageHeight(),grabber.getAudioChannels())){
try (FFmpegFrameRecorder recorder = new FFmpegFrameRecorder(file, grabber.getImageWidth(), grabber.getImageHeight(), grabber.getAudioChannels())) {
recorder.start();
log.info("开始录像");
log.info("{}", file);
recorder.setVideoCodec(avcodec.AV_CODEC_ID_H264);
recorder.setPixelFormat(avutil.AV_PIX_FMT_YUV420P); //视频源数据yuv
recorder.setAudioCodec(avcodec.AV_CODEC_ID_AAC); //设置音频压缩方式
recorder.setPixelFormat(avutil.AV_PIX_FMT_YUV420P); // 视频源数据yuv
recorder.setAudioCodec(avcodec.AV_CODEC_ID_AAC); // 设置音频压缩方式
recorder.setFormat("mp4");
recorder.setVideoOption("threads", String.valueOf(Runtime.getRuntime().availableProcessors())); //解码线程数
recorder.setVideoOption("threads", String.valueOf(Runtime.getRuntime().availableProcessors())); // 解码线程数
ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
AtomicBoolean record = new AtomicBoolean(true);
scheduledExecutorService.schedule(()->{
scheduledExecutorService.schedule(() -> {
log.info("到达超时时间, 结束录制");
record.set(false);
}, timeout, TimeUnit.SECONDS);
try {
Frame frame;
while (record.get() && (frame = grabber.grab()) != null) {
while (!response.isCommitted() && record.get() && (frame = grabber.grab()) != null) {
recorder.record(frame);
}
grabber.stop();
@ -67,15 +79,22 @@ public class RecordService {
throw new RuntimeException(e);
}
}
} finally {
// response.getOutputStream()
} catch (Exception e){
log.info("{}",e.getMessage());
}finally {
log.info("结束录制");
InputStream inputStream = new BufferedInputStream(new FileInputStream(file));
OutputStream outputStream = new BufferedOutputStream(response.getOutputStream());
try{
IoUtil.copy(inputStream, outputStream);
} catch (Exception ignore){}
log.info("临时文件 {} 写入 响应 完成", file);
if(!response.isCommitted()){
InputStream inputStream = new BufferedInputStream(new FileInputStream(file));
try{
IoUtil.copy(inputStream, outputStream);
} catch (Exception ignore){}
log.info("临时文件 {} 写入 响应", file);
mono = response.writeWith(Mono.just(dataBuffer));
}
log.info("删除临时文件 {} {}", file, file.delete());
}
return mono;
}
}

View File

@ -53,7 +53,7 @@
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
<artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
<dependency>
@ -158,4 +158,4 @@
</testResource>
</testResources>
</build>
</project>
</project>

View File

@ -4,6 +4,8 @@ project:
version: @project.version@
spring:
main:
web-application-type: reactive
data:
redis:
# [必须修改] Redis服务器IP, REDIS安装在本机的,使用127.0.0.1
@ -45,3 +47,6 @@ proxy:
url: http://192.168.3.13:18978
user: admin
passwd: admin
springdoc:
api-docs:
enabled: true

View File

@ -6,6 +6,8 @@ project:
spring:
main:
web-application-type: reactive
data:
redis:
# [必须修改] Redis服务器IP, REDIS安装在本机的,使用127.0.0.1

View File

@ -118,6 +118,12 @@
<groupId>cn.skcks.docking</groupId>
<artifactId>zlmediakit-service</artifactId>
<version>${gb28181.docking.version}</version>
<exclusions>
<exclusion>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
@ -202,7 +208,7 @@
<dependency>
<groupId>org.springdoc</groupId>
<artifactId>springdoc-openapi-starter-webmvc-ui</artifactId>
<artifactId>springdoc-openapi-starter-webflux-ui</artifactId>
<version>${springdoc.version}</version>
</dependency>