Skip to main content
 首页 » 编程设计

apache-kafka之spring-kafka 请求回复 : Different Types for Request and Reply

2024年12月31日13jackei

提供 Request-Reply 支持的 ReplyingKafkaTemplate 文档(在 Spring-Kafka 2.1.3 中引入)表明请求和回复可以使用不同的类型:

ReplyingKafkaTemplate<K, V, R> 

其中参数化类型 K 指定消息键,V 指定值(即请求),R 指定回复。

到目前为止还不错。但是用于实现服务器端 Request-Reply 的相应支持类似乎不支持 V、R 的不同类型。文档建议使用带有添加的 @SendTo 注释的 KafkaListener,它在幕后使用 MessageListenerContainer 上配置的 replyTemplate .但是 AbstractKafkaListenerEndpoint 只支持单一类型的监听器以及 replyTemplate:

public abstract class AbstractKafkaListenerEndpoint<K, V> 
        implements KafkaListenerEndpoint, BeanFactoryAware, InitializingBean { 
 
    ... 
 
    /** 
     * Set the {@link KafkaTemplate} to use to send replies. 
     * @param replyTemplate the template. 
     * @since 2.0 
     */ 
    public void setReplyTemplate(KafkaTemplate<K, V> replyTemplate) { 
        this.replyTemplate = replyTemplate; 
    } 
 
    ... 
 
} 

因此 V 和 R 需要是同一类型。

文档中使用的示例确实对请求和回复都使用了字符串。

我是否遗漏了什么,或者这是应该报告和更正的 Spring-Kafka 请求-回复支持中的设计缺陷?

请您参考如下方法:

这是 fixed in the 2.2 release .

对于早期版本,只需注入(inject)一个原始的KafkaTemplate(没有泛型)。

编辑

@SpringBootApplication 
public class So53151961Application { 
 
    public static void main(String[] args) { 
        SpringApplication.run(So53151961Application.class, args); 
    } 
 
    @KafkaListener(id = "so53151961", topics = "so53151961") 
    @SendTo 
    public Bar handle(Foo foo) { 
        System.out.println(foo); 
        return new Bar(foo.getValue().toUpperCase()); 
    } 
 
    @Bean 
    public ReplyingKafkaTemplate<String, Foo, Bar> replyingTemplate(ProducerFactory<String, Foo> pf, 
            ConcurrentKafkaListenerContainerFactory<String, Bar> factory) { 
 
        ConcurrentMessageListenerContainer<String, Bar> replyContainer = 
                factory.createContainer("so53151961-replyTopic"); 
        replyContainer.getContainerProperties().setGroupId("so53151961.reply"); 
        ReplyingKafkaTemplate<String, Foo, Bar> replyingKafkaTemplate = new ReplyingKafkaTemplate<>(pf, replyContainer); 
        return replyingKafkaTemplate; 
    } 
 
    @Bean 
    public KafkaTemplate<String, Bar> replyTemplate(ProducerFactory<String, Bar> pf, 
            ConcurrentKafkaListenerContainerFactory<String, Bar> factory) { 
 
        KafkaTemplate<String, Bar> kafkaTemplate = new KafkaTemplate<>(pf); 
        factory.setReplyTemplate(kafkaTemplate); 
        return kafkaTemplate; 
    } 
 
    @Bean 
    public ApplicationRunner runner(ReplyingKafkaTemplate<String, Foo, Bar> template) { 
        return args -> { 
            ProducerRecord<String, Foo> record = new ProducerRecord<>("so53151961", null, "key", new Foo("foo")); 
            RequestReplyFuture<String, Foo, Bar> future = template.sendAndReceive(record); 
            System.out.println(future.get(10, TimeUnit.SECONDS).value()); 
        }; 
    } 
 
    @Bean 
    public NewTopic topic() { 
        return new NewTopic("so53151961", 1, (short) 1); 
    } 
 
    @Bean 
    public NewTopic reply() { 
        return new NewTopic("so53151961-replyTopic", 1, (short) 1); 
    } 
 
    public static class Foo { 
 
        public String value; 
 
        public Foo() { 
            super(); 
        } 
 
        public Foo(String value) { 
            this.value = value; 
        } 
 
        public String getValue() { 
            return this.value; 
        } 
 
        public void setValue(String value) { 
            this.value = value; 
        } 
 
        @Override 
        public String toString() { 
            return "Foo [value=" + this.value + "]"; 
        } 
 
    } 
 
    public static class Bar { 
 
        public String value; 
 
        public Bar() { 
            super(); 
        } 
 
        public Bar(String value) { 
            this.value = value; 
        } 
 
        public String getValue() { 
            return this.value; 
        } 
 
        public void setValue(String value) { 
            this.value = value; 
        } 
 
        @Override 
        public String toString() { 
            return "Bar [value=" + this.value + "]"; 
        } 
 
    } 
 
} 
spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer 
spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer 
spring.kafka.consumer.enable-auto-commit=false 
spring.kafka.consumer.auto-offset-reset=earliest 
spring.kafka.consumer.properties.spring.json.trusted.packages=com.example 

结果

Foo [value=foo] 
Bar [value=FOO]