引入依赖
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
配置
spring:
kafka:
bootstrap-servers: ${KAFKA_SERVERS:localhost:9092}
producer:
retries: 0
acks: 1
batch-size: 100000
buffer-memory: 33554432
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
consumer:
group-id: service-group
enable-auto-commit: false
auto-commit-interval: 1000
auto-offset-reset: latest
max-poll-records: 1000
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
listener:
ack-mode: manual_immediate
missing-topics-fatal: false
type: batch
log:
topics: topic1,topic2,topic3
java
@KafkaListener(topics ="#{'${log.topics}'.split(',')}")
public void processMessage(List<ConsumerRecord<?, ?>> records, Acknowledgment ack) {
try {
System.out.println(records.size());
System.out.println(records.get(0).value());
ack.acknowledge();
} catch (Exception e) {
e.printStackTrace();
}
}