This commit is contained in:
Shikong 2023-04-25 17:27:17 +08:00
parent 2bb7c6dc65
commit 4e70a57964

View File

@ -4,10 +4,7 @@ import cn.skcks.study.springcloud.kafka.TestApplication;
import cn.skcks.study.springcloud.utils.JsonUtils; import cn.skcks.study.springcloud.utils.JsonUtils;
import lombok.SneakyThrows; import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.admin.AdminClient; import org.apache.kafka.clients.admin.*;
import org.apache.kafka.clients.admin.DescribeConfigsResult;
import org.apache.kafka.clients.admin.ListTopicsOptions;
import org.apache.kafka.clients.admin.ListTopicsResult;
import org.apache.kafka.common.config.ConfigResource; import org.apache.kafka.common.config.ConfigResource;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
@ -16,6 +13,8 @@ import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.ActiveProfiles; import org.springframework.test.context.ActiveProfiles;
import javax.annotation.PostConstruct; import javax.annotation.PostConstruct;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections; import java.util.Collections;
import java.util.Map; import java.util.Map;
@ -49,7 +48,7 @@ public class KafkaConfigTest {
log.info("StreamsProperties {}", JsonUtils.toJson(streamsProperties)); log.info("StreamsProperties {}", JsonUtils.toJson(streamsProperties));
} }
private AdminClient adminClient(){ private AdminClient adminClient() {
return AdminClient.create(kafkaProperties.buildAdminProperties()); return AdminClient.create(kafkaProperties.buildAdminProperties());
} }
@ -62,11 +61,11 @@ public class KafkaConfigTest {
options.listInternal(true); options.listInternal(true);
ListTopicsResult topicsResult = client.listTopics(options); ListTopicsResult topicsResult = client.listTopics(options);
topicsResult.listings().get().stream().sorted((a,b) -> { topicsResult.listings().get().stream().sorted((a, b) -> {
if(a.isInternal() && b.isInternal()){ if (a.isInternal() && b.isInternal()) {
return 0; return 0;
} }
return a.isInternal()?-1:1; return a.isInternal() ? -1 : 1;
}).forEach(topic -> { }).forEach(topic -> {
log.info("topic isInternal=>{}, uuid=>{}, name=>{}", topic.isInternal(), topic.topicId(), topic.name()); log.info("topic isInternal=>{}, uuid=>{}, name=>{}", topic.isInternal(), topic.topicId(), topic.name());
}); });
@ -75,16 +74,25 @@ public class KafkaConfigTest {
@Test @Test
@SneakyThrows @SneakyThrows
void testTopicInfo(){ void testTopicInfo() {
try(AdminClient client = adminClient()){ try (AdminClient client = adminClient()) {
ConfigResource resource = new ConfigResource(ConfigResource.Type.TOPIC,TOPIC); ConfigResource resource = new ConfigResource(ConfigResource.Type.TOPIC, TOPIC);
DescribeConfigsResult describeConfigsResult = client.describeConfigs(Collections.singletonList(resource)); DescribeConfigsResult describeConfigsResult = client.describeConfigs(Collections.singletonList(resource));
describeConfigsResult.all().get().forEach((key,config) -> { describeConfigsResult.all().get().forEach((source, config) -> {
log.info("{} =>",key); log.info("{} =>", source.name());
config.entries().forEach(entry->{ config.entries().forEach(entry -> {
log.info("\t{} => {}",entry.name(),entry.value()); log.info("\t{} => {}", entry.name(), entry.value());
}); });
}); });
Collection<AlterConfigOp> configs = Arrays.asList(
new AlterConfigOp(new ConfigEntry("min.cleanable.dirty.ratio", "0.5"), AlterConfigOp.OpType.SET),
new AlterConfigOp(new ConfigEntry("unclean.leader.election.enable", "false"), AlterConfigOp.OpType.SET),
new AlterConfigOp(new ConfigEntry("retention.ms","86400000"), AlterConfigOp.OpType.SET)
);
AlterConfigsResult result = client.incrementalAlterConfigs(Collections.singletonMap(resource,configs));
result.all().get();
} }
} }
} }