通用订阅器添加 compile 方法定义

This commit is contained in:
shikong 2023-10-03 03:45:47 +08:00
parent 1c280ef64f
commit 7b3f2a57a2
6 changed files with 20 additions and 1 deletions

View File

@ -19,6 +19,8 @@ public interface GenericSubscribe<T> {
void addSubscribe(String key,Flow.Subscriber<T> subscribe); void addSubscribe(String key,Flow.Subscriber<T> subscribe);
void delPublisher(String key); void delPublisher(String key);
void compile(String key);
class Helper { class Helper {
public final static String SEPARATOR = ":"; public final static String SEPARATOR = ":";
public static String getKey(String prefix,String... ids){ public static String getKey(String prefix,String... ids){

View File

@ -30,6 +30,10 @@ public class InviteSubscribe implements GenericSubscribe<SIPResponse> {
Helper.addSubscribe(publishers, key, subscribe); Helper.addSubscribe(publishers, key, subscribe);
} }
public void compile(String key){
delPublisher(key);
}
@Override @Override
public void delPublisher(String key) { public void delPublisher(String key) {
Helper.delPublisher(publishers, key); Helper.delPublisher(publishers, key);

View File

@ -30,6 +30,10 @@ public class RecordInfoSubscribe implements GenericSubscribe<RecordInfoResponseD
Helper.addSubscribe(publishers, key, subscribe); Helper.addSubscribe(publishers, key, subscribe);
} }
public void compile(String key){
delPublisher(key);
}
@Override @Override
public void delPublisher(String key) { public void delPublisher(String key) {
Helper.delPublisher(publishers, key); Helper.delPublisher(publishers, key);

View File

@ -33,6 +33,10 @@ public class SipRequestSubscribe implements GenericTimeoutSubscribe<SIPRequest>,
Helper.addSubscribe(publishers, key, subscribe); Helper.addSubscribe(publishers, key, subscribe);
} }
public void compile(String key){
delPublisher(key);
}
@Override @Override
public void delPublisher(String key) { public void delPublisher(String key) {
ScheduledFuture<?> schedule = scheduledFutureManager.remove(key); ScheduledFuture<?> schedule = scheduledFutureManager.remove(key);

View File

@ -33,6 +33,10 @@ public class SipResponseSubscribe implements GenericTimeoutSubscribe<SIPResponse
Helper.addSubscribe(publishers, key, subscribe); Helper.addSubscribe(publishers, key, subscribe);
} }
public void compile(String key){
delPublisher(key);
}
@Override @Override
public void delPublisher(String key) { public void delPublisher(String key) {
ScheduledFuture<?> schedule = scheduledFutureManager.remove(key); ScheduledFuture<?> schedule = scheduledFutureManager.remove(key);

View File

@ -85,7 +85,7 @@ public class CatalogService {
data.addAll(catalogResponseDTO.getDeviceList().getDeviceList()); data.addAll(catalogResponseDTO.getDeviceList().getDeviceList());
if(curNum >= sumNum){ if(curNum >= sumNum){
log.info("获取完成 {}", key); log.info("获取完成 {}", key);
subscribe.getSipRequestSubscribe().delPublisher(key); subscribe.getSipRequestSubscribe().compile(key);
} else { } else {
subscription.request(1); subscription.request(1);
} }
@ -100,6 +100,7 @@ public class CatalogService {
@Override @Override
public void onComplete() { public void onComplete() {
log.info("{} 返回结果 {}", key, result.complete(data)); log.info("{} 返回结果 {}", key, result.complete(data));
subscribe.getSipRequestSubscribe().delPublisher(key);
} }
}); });
provider.sendRequest(request); provider.sendRequest(request);