This commit is contained in:
Shikong 2023-04-25 14:59:34 +08:00
parent c2f6bac38d
commit e4ebff30b9

View File

@ -5,6 +5,7 @@ import cn.skcks.study.springcloud.utils.JsonUtils;
import lombok.SneakyThrows; import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.admin.AdminClient; import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.ListTopicsOptions;
import org.apache.kafka.clients.admin.ListTopicsResult; import org.apache.kafka.clients.admin.ListTopicsResult;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
@ -22,13 +23,13 @@ public class KafkaConfigTest {
@Autowired @Autowired
private KafkaProperties kafkaProperties; private KafkaProperties kafkaProperties;
private Map<String,Object> adminProperties; private Map<String, Object> adminProperties;
private Map<String,Object> producerProperties; private Map<String, Object> producerProperties;
private Map<String,Object> consumerProperties; private Map<String, Object> consumerProperties;
private Map<String,Object> streamsProperties; private Map<String, Object> streamsProperties;
@PostConstruct @PostConstruct
void init(){ void init() {
adminProperties = kafkaProperties.buildAdminProperties(); adminProperties = kafkaProperties.buildAdminProperties();
producerProperties = kafkaProperties.buildProducerProperties(); producerProperties = kafkaProperties.buildProducerProperties();
consumerProperties = kafkaProperties.buildConsumerProperties(); consumerProperties = kafkaProperties.buildConsumerProperties();
@ -36,20 +37,33 @@ public class KafkaConfigTest {
} }
@Test @Test
void properties () { void properties() {
log.info("AdminProperties {}", JsonUtils.toJson(adminProperties)); log.info("AdminProperties {}", JsonUtils.toJson(adminProperties));
log.info("ProducerProperties {}",JsonUtils.toJson(producerProperties)); log.info("ProducerProperties {}", JsonUtils.toJson(producerProperties));
log.info("ConsumerProperties {}", JsonUtils.toJson(consumerProperties)); log.info("ConsumerProperties {}", JsonUtils.toJson(consumerProperties));
log.info("StreamsProperties {}", JsonUtils.toJson(streamsProperties)); log.info("StreamsProperties {}", JsonUtils.toJson(streamsProperties));
} }
@Test @Test
@SneakyThrows @SneakyThrows
void topics(){ void topics() {
try (AdminClient adminClient = AdminClient.create(kafkaProperties.buildAdminProperties())) { try (AdminClient adminClient = AdminClient.create(kafkaProperties.buildAdminProperties())) {
ListTopicsResult topicsResult = adminClient.listTopics(); ListTopicsOptions options = new ListTopicsOptions();
topicsResult.listings().get().forEach(topic -> { // 包含内置主题
log.info("topic uuid={} name={}",topic.topicId(),topic.name()); options.listInternal(true);
ListTopicsResult topicsResult = adminClient.listTopics(options);
topicsResult.listings().get().stream().sorted((a,b) -> {
if(a.isInternal() && b.isInternal()){
return 0;
}
if (a.isInternal()){
return -1;
} else {
return 1;
}
}).forEach(topic -> {
log.info("topic isInternal=>{}, uuid=>{}, name=>{}", topic.isInternal(), topic.topicId(), topic.name());
}); });
} }
} }