Skip to main content
 首页 » 编程设计

Spring访问Apache kafka快速入门

2022年07月19日138yjmyzz

Apache kafka是分布式、容错流程处理系统。本文介绍Spring集成Kafka以及其提供对Kafka Java Client Api的抽象封装。
Spring Kafka利用KafkaTemplate发送消息,@KafkaListener注解消费消息,从而实现模板化编程模式,简化Java对Kafka交互访问。

1. 环境及依赖

读者可以参照官网进行安装,也可以通过docker快速搭建。spring-kafka需要pom.xml中增加依赖:

<dependency> 
    <groupId>org.springframework.kafka</groupId> 
    <artifactId>spring-kafka</artifactId> 
    <version>2.3.7.RELEASE</version> 
</dependency> 

我们的示例应用使用Spring boot实现。本文假设应用使用默认的配置,服务端口都没有改变。

1.1 配置主题

这里使用kafka命令创建主题:

$ bin/kafka-topics.sh --create \ 
  --zookeeper localhost:2181 \ 
  --replication-factor 1 --partitions 1 \ 
  --topic top001 

当然也可以通过Kafka提供的AdminClient编程式创建主题。首先增加KafkaAdmin Spring bean, 其实现了SmartInitializingSingleton接口,在afterSingletonsInstantiated方法中会自动为NewTopic类型的bean增加主题:

@Configuration 
public class KafkaTopicConfig { 
     
    @Value(value = "${kafka.bootstrapAddress}") 
    private String bootstrapAddress; 
  
    @Bean 
    public KafkaAdmin kafkaAdmin() { 
        Map<String, Object> configs = new HashMap<>(); 
        configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress); 
        return new KafkaAdmin(configs); 
    } 
     
    @Bean 
    public NewTopic topic1() { 
         return new NewTopic("top001", 1, (short) 1); 
    } 
} 

2. 生产消息

为了创建消息,首先需要配置 ProducerFactory,它负责设置创建Kafka Producer实例的策略。然后创建 KafkaTemplate,其包装Producer实例提供便捷的方法发送消息给Kafka主题。生产者实例是线程安全的,这样整个应用上下文使用一个实例可提供程序性能。故KakfaTemplate 实例也是现场安全的,建议使用单个实例。

2.1 生产者配置

@Configuration 
public class KafkaProducerConfig { 
    @Value(value = "${kafka.bootstrapAddress}") 
    private String bootstrapAddress; 
 
    @Bean 
    public ProducerFactory<String, String> producerFactory() { 
        Map<String, Object> configProps = new HashMap<>(); 
        configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress); 
        configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); 
        configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); 
 
        return new DefaultKafkaProducerFactory<>(configProps); 
    } 
  
    @Bean 
    public KafkaTemplate<String, String> kafkaTemplate() { 
        return new KafkaTemplate<>(producerFactory()); 
    } 
} 

2.2 发送消息

通过KafkaTemplate类发送消息:

 
@Autowired 
private KafkaTemplate<String, String> kafkaTemplate; 
  
public void sendMessage(String msg) { 
    kafkaTemplate.send(topicName, msg); 
} 

发送API返回ListenableFuture 对象。如果向阻塞发送线程、获取发送结构,可以调用ListenableFuture 对象的get方法。则线程等等结果,这样会使得生产者性能下降。Kafka是最快的流处理平台,最好的做法是采用异步方式处理结果。这样后续消息无需等等前面消息的结果,可以采用回调方式实现:

public void sendMessage(String message) { 
 
  ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(topicName, message); 
 
  future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() { 
 
      @Override 
      public void onSuccess(SendResult<String, String> result) { 
          System.out.println("Sent message=[" + message +  
            "] with offset=[" + result.getRecordMetadata().offset() + "]"); 
      } 
      @Override 
      public void onFailure(@NonNull Throwable ex) { 
          System.out.println("Unable to send message=["  
            + message + "] due to : " + ex.getMessage()); 
      } 
  }); 
} 

3. 消费消息

3.1 消费者配置

为了消费消息,需要配置ConsumerFactory 和 KafkaListenerContainerFactory。只要这些bean在Spring bean工厂中有效,基于pojo的消费者可以配置@KafkaListener 注解。为了启用检测有@KafkaListener注解的Spring bean,需要在配置类上增加@EnableKafka注解。

@EnableKafka 
@Configuration 
public class KafkaConsumerConfig { 
    @Value(value = "${kafka.bootstrapAddress}") 
    private String bootstrapAddress; 
 
    @Bean 
    public ConsumerFactory<String, String> consumerFactory() { 
        Map<String, Object> props = new HashMap<>(); 
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress); 
        props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); 
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); 
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); 
 
        return new DefaultKafkaConsumerFactory<>(props); 
    } 
  
    @Bean 
    public ConcurrentKafkaListenerContainerFactory<String, String>  
      kafkaListenerContainerFactory() { 
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>(); 
        factory.setConsumerFactory(consumerFactory()); 
        return factory; 
    } 
} 

3.2 消费消息

@KafkaListener(topics = "top001", groupId = "foo") 
public void listenGroupFoo(String message) { 
    System.out.println("Received Message in group foo: " + message); 
} 

对一个主题可以应用多个监听器,每个使用不同的groupId,而且一个消费者可以消费不同的主题消息:

@KafkaListener(topics = "topic1, topic2", groupId = "foo") 

Spring 也支持在监听器里使用@Header注解返回一个或多个消息头:

@KafkaListener(topics = "top001") 
public void listenWithHeaders(@Payload String message, @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition) { 
      System.out.println("Received Message: " + message" + "from partition: " + partition); 
} 

3.3 消费特定分区的消息

你可能已经注意到,我们已经创建了主题top001 ,它仅有一个分区。如果主题带多个分区,@KafkaListener可以通过初始化偏移量显示订阅特定分区:

@KafkaListener( 
  topicPartitions = @TopicPartition(topic = "top001", 
  partitionOffsets = { 
    @PartitionOffset(partition = "0", initialOffset = "0"),  
    @PartitionOffset(partition = "3", initialOffset = "0")}), 
  containerFactory = "partitionsKafkaListenerContainerFactory") 
public void listenToPartition(@Payload String message,  
                              @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition) { 
      System.out.println("Received Message: " + message" + "from partition: " + partition); 
} 

既然在这个监听器中initialOffset 被设置为0,所有从分区中之前消费的消息,从0到3每次都重新消费。如果偏移量不设置,可以使用@TopicPartition 注解的partitions 属性,无需设置偏移量:

@KafkaListener(topicPartitions  
  = @TopicPartition(topic = "topicName", partitions = { "0", "1" })) 
 

3.4 监听器增加消息过滤

监听器可以配置过滤器消费特定类型的消息。给 KafkaListenerContainerFactory设置RecordFilterStrategy:

@Bean 
public ConcurrentKafkaListenerContainerFactory<String, String> 
  filterKafkaListenerContainerFactory() { 
    ConcurrentKafkaListenerContainerFactory<String, String> factory = 
      new ConcurrentKafkaListenerContainerFactory<>(); 
    factory.setConsumerFactory(consumerFactory()); 
    factory.setRecordFilterStrategy( 
      record -> record.value().contains("00")); 
    return factory; 
} 

上面配置符合条件:包括“00”的消息则丢弃。然后配置监听器使用容工厂:

@KafkaListener( 
  topics = "top001",  
  containerFactory = "filterKafkaListenerContainerFactory") 
public void listenWithFilter(String message) { 
    System.out.println("Received Message in filtered listener: " + message); 
} 

在这个监听器中,所有与过滤器匹配的消息都将被丢弃。

4. 自定义消息转换器

前面我们仅涉及到字符串类型消息,当然也可以发送或接收java对象。我们需要配置相应的序列化类ProducerFactory 和反序列化类 ConsumerFactory。

假设有个简单pojo类:

public class Greeting { 
  
    private String msg; 
    private String name; 
  
    // standard getters, setters and constructor 
} 

4.1 生成自定义消息

下面示例使用 JsonSerializer,使用ProducerFactory 和 KafkaTemplate代码实现:

@Bean 
public ProducerFactory<String, Greeting> greetingProducerFactory() { 
    // ... 
    configProps.put( 
      ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,  
      JsonSerializer.class); 
    return new DefaultKafkaProducerFactory<>(configProps); 
} 
  
@Bean 
public KafkaTemplate<String, Greeting> greetingKafkaTemplate() { 
    return new KafkaTemplate<>(greetingProducerFactory()); 
} 

新的KafkaTemplate 能够发送Greeting 类型消息:

kafkaTemplate.send(topicName, new Greeting("Hello", "World")); 
 

4.2 消费自定义消息

类似的,我们可以修改ConsumerFactory 和 KafkaListenerContainerFactory 去反序列化Greeting 消息:

@Bean 
public ConsumerFactory<String, Greeting> greetingConsumerFactory() { 
    // ... 
    return new DefaultKafkaConsumerFactory<>( 
      props, 
      new StringDeserializer(),  
      new JsonDeserializer<>(Greeting.class)); 
} 
  
@Bean 
public ConcurrentKafkaListenerContainerFactory<String, Greeting>  
  greetingKafkaListenerContainerFactory() { 
  
    ConcurrentKafkaListenerContainerFactory<String, Greeting> factory = 
      new ConcurrentKafkaListenerContainerFactory<>(); 
    factory.setConsumerFactory(greetingConsumerFactory()); 
    return factory; 
} 

Spring-kafka 的json序列化和反序列化使用jackson库,我们可以增加它至pom.xml:

<dependency> 
    <groupId>com.fasterxml.jackson.core</groupId> 
    <artifactId>jackson-databind</artifactId> 
    <version>2.9.7</version> 
</dependency> 

最后我们写监听器消费Greeting消息:

@KafkaListener( 
  topics = "top001",  
  containerFactory = "greetingKafkaListenerContainerFactory") 
public void greetingListener(Greeting greeting) { 
    // process greeting message 
} 

5. 总结

本文我们介绍了Spring对kafka的支持,简要介绍了发送和接受消息的类和方法。


本文参考链接:https://blog.csdn.net/neweastsun/article/details/110195827