测试调整

This commit is contained in:
Shikong 2023-04-26 15:33:38 +08:00
parent 7d5859fa4b
commit afdf7c8187

View File

@ -77,32 +77,39 @@ public class KafkaConfigTest {
void testTopicInfo() { void testTopicInfo() {
try (AdminClient client = adminClient()) { try (AdminClient client = adminClient()) {
ConfigResource resource = new ConfigResource(ConfigResource.Type.TOPIC, TOPIC); ConfigResource resource = new ConfigResource(ConfigResource.Type.TOPIC, TOPIC);
DescribeConfigsResult describeConfigsResult = client.describeConfigs(Collections.singletonList(resource)); log.info("原配置");
describeConfigsResult.all().get().forEach((source, config) -> { getTopicConfig(client, resource);
log.info("{} =>", source.name());
config.entries().forEach(entry -> {
log.info("\t{} => {}", entry.name(), entry.value());
});
});
Collection<AlterConfigOp> configs = Arrays.asList( log.info("修改配置");
new AlterConfigOp(new ConfigEntry("min.cleanable.dirty.ratio", "0.5"), AlterConfigOp.OpType.SET), alterTopicConfig(client, resource);
new AlterConfigOp(new ConfigEntry("unclean.leader.election.enable", "false"), AlterConfigOp.OpType.SET),
// 设置30秒过期
new AlterConfigOp(new ConfigEntry("retention.ms", "30000"), AlterConfigOp.OpType.SET),
new AlterConfigOp(new ConfigEntry("segment.ms", "30000"), AlterConfigOp.OpType.SET)
);
AlterConfigsResult result = client.incrementalAlterConfigs(Collections.singletonMap(resource, configs)); log.info("修改后配置");
result.all().get(); getTopicConfig(client, resource);
describeConfigsResult = client.describeConfigs(Collections.singletonList(resource));
describeConfigsResult.all().get().forEach((source, config) -> {
log.info("{} =>", source.name());
config.entries().forEach(entry -> {
log.info("\t{} => {}", entry.name(), entry.value());
});
});
} }
} }
@SneakyThrows
void getTopicConfig(AdminClient client, ConfigResource resource) {
DescribeConfigsResult describeConfigsResult = client.describeConfigs(Collections.singletonList(resource));
describeConfigsResult.all().get().forEach((source, config) -> {
log.info("{} =>", source.name());
config.entries().forEach(entry -> {
log.info("\t{} => {}", entry.name(), entry.value());
});
});
}
@SneakyThrows
void alterTopicConfig(AdminClient client, ConfigResource resource) {
Collection<AlterConfigOp> configs = Arrays.asList(
new AlterConfigOp(new ConfigEntry("min.cleanable.dirty.ratio", "0.5"), AlterConfigOp.OpType.SET),
new AlterConfigOp(new ConfigEntry("unclean.leader.election.enable", "false"), AlterConfigOp.OpType.SET),
// 设置30秒过期
new AlterConfigOp(new ConfigEntry("retention.ms", "30000"), AlterConfigOp.OpType.SET),
new AlterConfigOp(new ConfigEntry("segment.ms", "30000"), AlterConfigOp.OpType.SET)
);
AlterConfigsResult result = client.incrementalAlterConfigs(Collections.singletonMap(resource, configs));
result.all().get();
}
} }