diff --git a/kafka-01/kafka-01-producer/src/test/java/cn/skcks/study/springcloud/kafka/tests/KafkaConfigTest.java b/kafka-01/kafka-01-producer/src/test/java/cn/skcks/study/springcloud/kafka/tests/KafkaConfigTest.java index dba0395..53d0cec 100644 --- a/kafka-01/kafka-01-producer/src/test/java/cn/skcks/study/springcloud/kafka/tests/KafkaConfigTest.java +++ b/kafka-01/kafka-01-producer/src/test/java/cn/skcks/study/springcloud/kafka/tests/KafkaConfigTest.java @@ -77,32 +77,39 @@ public class KafkaConfigTest { void testTopicInfo() { try (AdminClient client = adminClient()) { ConfigResource resource = new ConfigResource(ConfigResource.Type.TOPIC, TOPIC); - DescribeConfigsResult describeConfigsResult = client.describeConfigs(Collections.singletonList(resource)); - describeConfigsResult.all().get().forEach((source, config) -> { - log.info("{} =>", source.name()); - config.entries().forEach(entry -> { - log.info("\t{} => {}", entry.name(), entry.value()); - }); - }); + log.info("原配置"); + getTopicConfig(client, resource); - Collection configs = Arrays.asList( - new AlterConfigOp(new ConfigEntry("min.cleanable.dirty.ratio", "0.5"), AlterConfigOp.OpType.SET), - new AlterConfigOp(new ConfigEntry("unclean.leader.election.enable", "false"), AlterConfigOp.OpType.SET), - // 设置30秒过期 - new AlterConfigOp(new ConfigEntry("retention.ms", "30000"), AlterConfigOp.OpType.SET), - new AlterConfigOp(new ConfigEntry("segment.ms", "30000"), AlterConfigOp.OpType.SET) - ); + log.info("修改配置"); + alterTopicConfig(client, resource); - AlterConfigsResult result = client.incrementalAlterConfigs(Collections.singletonMap(resource, configs)); - result.all().get(); - - describeConfigsResult = client.describeConfigs(Collections.singletonList(resource)); - describeConfigsResult.all().get().forEach((source, config) -> { - log.info("{} =>", source.name()); - config.entries().forEach(entry -> { - log.info("\t{} => {}", entry.name(), entry.value()); - }); - }); + log.info("修改后配置"); + getTopicConfig(client, resource); } } + + @SneakyThrows + void getTopicConfig(AdminClient client, ConfigResource resource) { + DescribeConfigsResult describeConfigsResult = client.describeConfigs(Collections.singletonList(resource)); + describeConfigsResult.all().get().forEach((source, config) -> { + log.info("{} =>", source.name()); + config.entries().forEach(entry -> { + log.info("\t{} => {}", entry.name(), entry.value()); + }); + }); + } + + @SneakyThrows + void alterTopicConfig(AdminClient client, ConfigResource resource) { + Collection configs = Arrays.asList( + new AlterConfigOp(new ConfigEntry("min.cleanable.dirty.ratio", "0.5"), AlterConfigOp.OpType.SET), + new AlterConfigOp(new ConfigEntry("unclean.leader.election.enable", "false"), AlterConfigOp.OpType.SET), + // 设置30秒过期 + new AlterConfigOp(new ConfigEntry("retention.ms", "30000"), AlterConfigOp.OpType.SET), + new AlterConfigOp(new ConfigEntry("segment.ms", "30000"), AlterConfigOp.OpType.SET) + ); + + AlterConfigsResult result = client.incrementalAlterConfigs(Collections.singletonMap(resource, configs)); + result.all().get(); + } }