DeviceProxyService 调整

This commit is contained in:
shikong 2024-03-14 11:07:23 +08:00
parent 39af43f7aa
commit d64bba2c9c
2 changed files with 8 additions and 9 deletions

View File

@ -28,7 +28,7 @@ public class MockingExecutor{
public ThreadPoolTaskExecutor sipTaskExecutor() { public ThreadPoolTaskExecutor sipTaskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(CPU_NUM * 2); executor.setCorePoolSize(CPU_NUM * 2);
executor.setMaxPoolSize(100); executor.setMaxPoolSize(1000);
executor.setQueueCapacity(10000); executor.setQueueCapacity(10000);
executor.setKeepAliveSeconds(30); executor.setKeepAliveSeconds(30);
executor.setThreadNamePrefix(THREAD_NAME_PREFIX); executor.setThreadNamePrefix(THREAD_NAME_PREFIX);

View File

@ -27,6 +27,7 @@ import cn.skcks.docking.gb28181.mocking.config.sip.DeviceProxyConfig;
import cn.skcks.docking.gb28181.mocking.config.sip.FfmpegConfig; import cn.skcks.docking.gb28181.mocking.config.sip.FfmpegConfig;
import cn.skcks.docking.gb28181.mocking.config.sip.ZlmHookConfig; import cn.skcks.docking.gb28181.mocking.config.sip.ZlmHookConfig;
import cn.skcks.docking.gb28181.mocking.config.sip.ZlmRtmpConfig; import cn.skcks.docking.gb28181.mocking.config.sip.ZlmRtmpConfig;
import cn.skcks.docking.gb28181.mocking.core.sip.executor.MockingExecutor;
import cn.skcks.docking.gb28181.mocking.core.sip.message.processor.message.request.notify.dto.MediaStatusRequestDTO; import cn.skcks.docking.gb28181.mocking.core.sip.message.processor.message.request.notify.dto.MediaStatusRequestDTO;
import cn.skcks.docking.gb28181.mocking.core.sip.message.subscribe.SipSubscribe; import cn.skcks.docking.gb28181.mocking.core.sip.message.subscribe.SipSubscribe;
import cn.skcks.docking.gb28181.mocking.core.sip.request.SipRequestBuilder; import cn.skcks.docking.gb28181.mocking.core.sip.request.SipRequestBuilder;
@ -54,6 +55,7 @@ import org.apache.commons.exec.ExecuteException;
import org.apache.commons.exec.ExecuteResultHandler; import org.apache.commons.exec.ExecuteResultHandler;
import org.apache.commons.exec.Executor; import org.apache.commons.exec.Executor;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import javax.sdp.MediaDescription; import javax.sdp.MediaDescription;
@ -108,7 +110,10 @@ public class DeviceProxyService {
private final FfmpegConfig ffmpegConfig; private final FfmpegConfig ffmpegConfig;
ScheduledExecutorService scheduledExecutorService = new ScheduledThreadPoolExecutor(128); private final ScheduledExecutorService scheduledExecutorService = new ScheduledThreadPoolExecutor(128);
@Qualifier(MockingExecutor.EXECUTOR_BEAN_NAME)
private final java.util.concurrent.Executor executor;
public interface TaskProcessor { public interface TaskProcessor {
void process(Runnable sendOkResponse,SIPRequest request,String callId,String fromUrl, String toAddr,int toPort, MockingDevice device, String key, long time,String ssrc); void process(Runnable sendOkResponse,SIPRequest request,String callId,String fromUrl, String toAddr,int toPort, MockingDevice device, String key, long time,String ssrc);
@ -245,12 +250,6 @@ public class DeviceProxyService {
public TaskProcessor downloadTask(){ public TaskProcessor downloadTask(){
return (Runnable sendOkResponse,SIPRequest request,String callId,String fromUrl, String toAddr,int toPort, MockingDevice device, String key, long time,String ssrc)->{ return (Runnable sendOkResponse,SIPRequest request,String callId,String fromUrl, String toAddr,int toPort, MockingDevice device, String key, long time,String ssrc)->{
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
log.error("{}", e.getMessage());
}
String ackKey = GenericSubscribe.Helper.getKey(Request.ACK, callId); String ackKey = GenericSubscribe.Helper.getKey(Request.ACK, callId);
subscribe.getAckSubscribe().addPublisher(ackKey, 1, TimeUnit.MINUTES); subscribe.getAckSubscribe().addPublisher(ackKey, 1, TimeUnit.MINUTES);
subscribe.getAckSubscribe().addSubscribe(ackKey, new Flow.Subscriber<>() { subscribe.getAckSubscribe().addSubscribe(ackKey, new Flow.Subscriber<>() {
@ -275,7 +274,7 @@ public class DeviceProxyService {
FfmpegExecuteResultHandler executeResultHandler = mediaStatus(request, device, key); FfmpegExecuteResultHandler executeResultHandler = mediaStatus(request, device, key);
if (!ffmpegConfig.getRtp().getUseRtpToDownload()) { if (!ffmpegConfig.getRtp().getUseRtpToDownload()) {
String zlmRtpUrl = getZlmRtmpUrl(DEFAULT_ZLM_APP, callId); String zlmRtpUrl = getZlmRtmpUrl(DEFAULT_ZLM_APP, callId);
scheduledExecutorService.submit(() -> { executor.execute(()->{
try { try {
requestZlmPushStream(request, callId, fromUrl, toAddr, toPort, device, key, time, ssrc); requestZlmPushStream(request, callId, fromUrl, toAddr, toPort, device, key, time, ssrc);
} catch (Exception e) { } catch (Exception e) {