topic 设置读取/修改

This commit is contained in:
Shikong 2023-04-25 21:17:35 +08:00
parent 4e70a57964
commit 9b175c581e

View File

@ -86,13 +86,23 @@ public class KafkaConfigTest {
});
Collection<AlterConfigOp> configs = Arrays.asList(
new AlterConfigOp(new ConfigEntry("min.cleanable.dirty.ratio", "0.5"), AlterConfigOp.OpType.SET),
new AlterConfigOp(new ConfigEntry("unclean.leader.election.enable", "false"), AlterConfigOp.OpType.SET),
new AlterConfigOp(new ConfigEntry("retention.ms","86400000"), AlterConfigOp.OpType.SET)
new AlterConfigOp(new ConfigEntry("min.cleanable.dirty.ratio", "0.5"), AlterConfigOp.OpType.SET),
new AlterConfigOp(new ConfigEntry("unclean.leader.election.enable", "false"), AlterConfigOp.OpType.SET),
// 设置30秒过期
new AlterConfigOp(new ConfigEntry("retention.ms", "30000"), AlterConfigOp.OpType.SET),
new AlterConfigOp(new ConfigEntry("segment.ms", "30000"), AlterConfigOp.OpType.SET)
);
AlterConfigsResult result = client.incrementalAlterConfigs(Collections.singletonMap(resource,configs));
AlterConfigsResult result = client.incrementalAlterConfigs(Collections.singletonMap(resource, configs));
result.all().get();
describeConfigsResult = client.describeConfigs(Collections.singletonList(resource));
describeConfigsResult.all().get().forEach((source, config) -> {
log.info("{} =>", source.name());
config.entries().forEach(entry -> {
log.info("\t{} => {}", entry.name(), entry.value());
});
});
}
}
}