kafka 简单试水

This commit is contained in:
Shikong 2023-04-25 14:24:42 +08:00
parent 8ae54c0e1c
commit 5ff94d641e
6 changed files with 110 additions and 2 deletions

View File

@ -6,7 +6,6 @@ import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class Kafka01Consumer {
public static void main(String[] args) {
SpringApplication.run(Kafka01Consumer.class,args);
}
}

View File

@ -22,6 +22,5 @@ spring:
listener:
#手工ack调用ack后立刻提交offset
ack-mode: manual_immediate
#容器运行的线程数
server:
port: 8082

View File

@ -0,0 +1,37 @@
spring:
kafka:
# bootstrap-servers: 10.10.10.200:9092
bootstrap-servers: 10.10.10.200:9192,10.10.10.200:9292,10.10.10.200:9392
producer:
retries: 1
batch-size: 1024
buffer-memory: 10240
# 键的序列化方式
key-serializer: org.apache.kafka.common.serialization.StringSerializer
# 值的序列化方式
value-serializer: org.apache.kafka.common.serialization.StringSerializer
#配置事务的话:如果用户显式地指定了 acks 参数,那么这个参数的值必须-1 all
acks: all
consumer:
# 自动提交的时间间隔 在spring boot 2.X 版本是值的类型为Duration 需要符合特定的格式如1S,1M,2H,5D
auto-commit-interval: 1S
# 该属性指定了消费者在读取一个没有偏移量的分区或者偏移量无效的情况下该作何处理:
auto-offset-reset: earliest
# 是否自动提交偏移量默认值是true,为了避免出现重复数据和数据丢失可以把它设置为false,然后手动提交偏移量
enable-auto-commit: false
# 键的反序列化方式
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
# 值的反序列化方式
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
listener:
#手工ack调用ack后立刻提交offset
ack-mode: manual_immediate
server:
port: 8081

View File

@ -0,0 +1,11 @@
package cn.skcks.study.springcloud.kafka;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class TestApplication {
public static void main(String[] args) {
SpringApplication.run(TestApplication.class,args);
}
}

View File

@ -0,0 +1,57 @@
package cn.skcks.study.springcloud.kafka.tests;
import cn.skcks.study.springcloud.kafka.TestApplication;
import cn.skcks.study.springcloud.kafka.common.utils.JsonUtils;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.ListTopicsResult;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.ActiveProfiles;
import javax.annotation.PostConstruct;
import java.util.Map;
@ActiveProfiles("test")
@SpringBootTest(classes = TestApplication.class, webEnvironment = SpringBootTest.WebEnvironment.NONE)
@Slf4j
public class KafkaConfigTest {
@Autowired
private KafkaProperties kafkaProperties;
private Map<String,Object> adminProperties;
private Map<String,Object> producerProperties;
private Map<String,Object> consumerProperties;
private Map<String,Object> streamsProperties;
@PostConstruct
void init(){
adminProperties = kafkaProperties.buildAdminProperties();
producerProperties = kafkaProperties.buildProducerProperties();
consumerProperties = kafkaProperties.buildConsumerProperties();
streamsProperties = kafkaProperties.buildStreamsProperties();
}
@Test
void properties () {
log.info("AdminProperties {}", JsonUtils.toJson(adminProperties));
log.info("ProducerProperties {}",JsonUtils.toJson(producerProperties));
log.info("ConsumerProperties {}", JsonUtils.toJson(consumerProperties));
log.info("StreamsProperties {}", JsonUtils.toJson(streamsProperties));
}
@Test
@SneakyThrows
void topics(){
try (AdminClient adminClient = AdminClient.create(kafkaProperties.buildAdminProperties())) {
ListTopicsResult topicsResult = adminClient.listTopics();
topicsResult.listings().get().forEach(topic -> {
log.info("topic uuid={} name={}",topic.topicId(),topic.name());
});
}
}
}

View File

@ -41,5 +41,10 @@
<artifactId>lombok</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
</dependency>
</dependencies>
</project>