Apache kafka是分布式、容错流程处理系统。本文介绍Spring集成Kafka以及其提供对Kafka Java Client Api的抽象封装。
Spring Kafka利用KafkaTemplate发送消息,@KafkaListener注解消费消息,从而实现模板化编程模式,简化Java对Kafka交互访问。
1. 环境及依赖
读者可以参照官网进行安装,也可以通过docker快速搭建。spring-kafka需要pom.xml中增加依赖:
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>2.3.7.RELEASE</version>
</dependency>
我们的示例应用使用Spring boot实现。本文假设应用使用默认的配置,服务端口都没有改变。
1.1 配置主题
这里使用kafka命令创建主题:
$ bin/kafka-topics.sh --create \
--zookeeper localhost:2181 \
--replication-factor 1 --partitions 1 \
--topic top001
当然也可以通过Kafka提供的AdminClient编程式创建主题。首先增加KafkaAdmin Spring bean, 其实现了SmartInitializingSingleton接口,在afterSingletonsInstantiated方法中会自动为NewTopic类型的bean增加主题:
@Configuration
public class KafkaTopicConfig {
@Value(value = "${kafka.bootstrapAddress}")
private String bootstrapAddress;
@Bean
public KafkaAdmin kafkaAdmin() {
Map<String, Object> configs = new HashMap<>();
configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
return new KafkaAdmin(configs);
}
@Bean
public NewTopic topic1() {
return new NewTopic("top001", 1, (short) 1);
}
}
2. 生产消息
为了创建消息,首先需要配置 ProducerFactory,它负责设置创建Kafka Producer实例的策略。然后创建 KafkaTemplate,其包装Producer实例提供便捷的方法发送消息给Kafka主题。生产者实例是线程安全的,这样整个应用上下文使用一个实例可提供程序性能。故KakfaTemplate 实例也是现场安全的,建议使用单个实例。
2.1 生产者配置
@Configuration
public class KafkaProducerConfig {
@Value(value = "${kafka.bootstrapAddress}")
private String bootstrapAddress;
@Bean
public ProducerFactory<String, String> producerFactory() {
Map<String, Object> configProps = new HashMap<>();
configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return new DefaultKafkaProducerFactory<>(configProps);
}
@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
}
2.2 发送消息
通过KafkaTemplate类发送消息:
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
public void sendMessage(String msg) {
kafkaTemplate.send(topicName, msg);
}
发送API返回ListenableFuture 对象。如果向阻塞发送线程、获取发送结构,可以调用ListenableFuture 对象的get方法。则线程等等结果,这样会使得生产者性能下降。Kafka是最快的流处理平台,最好的做法是采用异步方式处理结果。这样后续消息无需等等前面消息的结果,可以采用回调方式实现:
public void sendMessage(String message) {
ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(topicName, message);
future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {
@Override
public void onSuccess(SendResult<String, String> result) {
System.out.println("Sent message=[" + message +
"] with offset=[" + result.getRecordMetadata().offset() + "]");
}
@Override
public void onFailure(@NonNull Throwable ex) {
System.out.println("Unable to send message=["
+ message + "] due to : " + ex.getMessage());
}
});
}
3. 消费消息
3.1 消费者配置
为了消费消息,需要配置ConsumerFactory 和 KafkaListenerContainerFactory。只要这些bean在Spring bean工厂中有效,基于pojo的消费者可以配置@KafkaListener 注解。为了启用检测有@KafkaListener注解的Spring bean,需要在配置类上增加@EnableKafka注解。
@EnableKafka
@Configuration
public class KafkaConsumerConfig {
@Value(value = "${kafka.bootstrapAddress}")
private String bootstrapAddress;
@Bean
public ConsumerFactory<String, String> consumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
return new DefaultKafkaConsumerFactory<>(props);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String>
kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
}
3.2 消费消息
@KafkaListener(topics = "top001", groupId = "foo")
public void listenGroupFoo(String message) {
System.out.println("Received Message in group foo: " + message);
}
对一个主题可以应用多个监听器,每个使用不同的groupId,而且一个消费者可以消费不同的主题消息:
@KafkaListener(topics = "topic1, topic2", groupId = "foo")
Spring 也支持在监听器里使用@Header注解返回一个或多个消息头:
@KafkaListener(topics = "top001")
public void listenWithHeaders(@Payload String message, @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition) {
System.out.println("Received Message: " + message" + "from partition: " + partition);
}
3.3 消费特定分区的消息
你可能已经注意到,我们已经创建了主题top001 ,它仅有一个分区。如果主题带多个分区,@KafkaListener可以通过初始化偏移量显示订阅特定分区:
@KafkaListener(
topicPartitions = @TopicPartition(topic = "top001",
partitionOffsets = {
@PartitionOffset(partition = "0", initialOffset = "0"),
@PartitionOffset(partition = "3", initialOffset = "0")}),
containerFactory = "partitionsKafkaListenerContainerFactory")
public void listenToPartition(@Payload String message,
@Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition) {
System.out.println("Received Message: " + message" + "from partition: " + partition);
}
既然在这个监听器中initialOffset 被设置为0,所有从分区中之前消费的消息,从0到3每次都重新消费。如果偏移量不设置,可以使用@TopicPartition 注解的partitions 属性,无需设置偏移量:
@KafkaListener(topicPartitions
= @TopicPartition(topic = "topicName", partitions = { "0", "1" }))
3.4 监听器增加消息过滤
监听器可以配置过滤器消费特定类型的消息。给 KafkaListenerContainerFactory设置RecordFilterStrategy:
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String>
filterKafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setRecordFilterStrategy(
record -> record.value().contains("00"));
return factory;
}
上面配置符合条件:包括“00”的消息则丢弃。然后配置监听器使用容工厂:
@KafkaListener(
topics = "top001",
containerFactory = "filterKafkaListenerContainerFactory")
public void listenWithFilter(String message) {
System.out.println("Received Message in filtered listener: " + message);
}
在这个监听器中,所有与过滤器匹配的消息都将被丢弃。
4. 自定义消息转换器
前面我们仅涉及到字符串类型消息,当然也可以发送或接收java对象。我们需要配置相应的序列化类ProducerFactory 和反序列化类 ConsumerFactory。
假设有个简单pojo类:
public class Greeting {
private String msg;
private String name;
// standard getters, setters and constructor
}
4.1 生成自定义消息
下面示例使用 JsonSerializer,使用ProducerFactory 和 KafkaTemplate代码实现:
@Bean
public ProducerFactory<String, Greeting> greetingProducerFactory() {
// ...
configProps.put(
ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
JsonSerializer.class);
return new DefaultKafkaProducerFactory<>(configProps);
}
@Bean
public KafkaTemplate<String, Greeting> greetingKafkaTemplate() {
return new KafkaTemplate<>(greetingProducerFactory());
}
新的KafkaTemplate 能够发送Greeting 类型消息:
kafkaTemplate.send(topicName, new Greeting("Hello", "World"));
4.2 消费自定义消息
类似的,我们可以修改ConsumerFactory 和 KafkaListenerContainerFactory 去反序列化Greeting 消息:
@Bean
public ConsumerFactory<String, Greeting> greetingConsumerFactory() {
// ...
return new DefaultKafkaConsumerFactory<>(
props,
new StringDeserializer(),
new JsonDeserializer<>(Greeting.class));
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, Greeting>
greetingKafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, Greeting> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(greetingConsumerFactory());
return factory;
}
Spring-kafka 的json序列化和反序列化使用jackson库,我们可以增加它至pom.xml:
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.9.7</version>
</dependency>
最后我们写监听器消费Greeting消息:
@KafkaListener(
topics = "top001",
containerFactory = "greetingKafkaListenerContainerFactory")
public void greetingListener(Greeting greeting) {
// process greeting message
}
5. 总结
本文我们介绍了Spring对kafka的支持,简要介绍了发送和接受消息的类和方法。
本文参考链接:https://blog.csdn.net/neweastsun/article/details/110195827