Skip to main content
 首页 » 编程设计

rabbitmq之Spring AMQP RabbitMq 中的计划/延迟消息传递

2024年08月05日5虾米姐

我正在努力寻找 Spring AMQP/Rabbit MQ 中计划/延迟消息的方法。
经过大量搜索,我仍然无法在 Spring AMQP 中做到这一点。有人可以告诉我如何在 Spring AMQP 中执行x-delay
如果消费者端发生某些异常,我想延迟消息。 RabbitMQ 说添加 x-delay 并安装我已经完成的插件,但消息仍然立即发送,没有任何延迟



我在消息中收到此信息
已收到 <(Body:'[B@60a4ae5f(byte[26])'MessageProperties [headers={x-delay=15000}

 @Bean 
ConnectionFactory connectionFactory(){ 
 
    CachingConnectionFactory connectionFactory=new CachingConnectionFactory("127.0.0.1"); 
    connectionFactory.setUsername("guest"); 
    connectionFactory.setPassword("guest"); 
    connectionFactory.setPort(1500); 
    connectionFactory.setPublisherReturns(true); 
    return connectionFactory; 
 
} 
 
@Bean 
Binding binding(@Qualifier("queue")Queue queue, DirectExchange exchange) { 
    return new Binding(queue.getName(), Binding.DestinationType.QUEUE, exchange.getName(), queue.getName(), null); 
    //return BindingBuilder.bind(queue).to(exchange).with(queueName);    
} 
 
@Bean 
DirectExchange exchange() { 
    DirectExchange exchange=new DirectExchange("delay-exchange"); 
    return exchange; 
} 

消费者---
@覆盖

public void onMessage(Message message, Channel channel) throws Exception { 
 
    System.out.println("Received <" + message+ ">" +rabbitTemplate); 
 
    if(i==1){ 
        AMQP.BasicProperties.Builder props = new AMQP.BasicProperties.Builder(); 
        Map<String,Object> headers = message.getMessageProperties().getHeaders(); 
        headers.put("x-delay", 15000); 
        props.headers(headers); 
        i++; 
        channel.basicPublish(message.getMessageProperties().getReceivedExchange(), message.getMessageProperties().getReceivedRoutingKey(), 
                props.build(), message.getBody()); 
    } 
    } 

请您参考如下方法:

首先看起来您没有遵循Scheduling Messages with RabbitMQ文章:

To use the Delayed Message Exchange you just need to declare an exchange providing the "x-delayed-message" exchange type as follows:

Map<String, Object> args = new HashMap<String, Object>(); 
args.put("x-delayed-type", "direct"); 
channel.exchangeDeclare("my-exchange", "x-delayed-message", true, false, args); 

我想说使用 Spring AMQP 也可以实现同样的效果:

@Bean 
CustomExchange delayExchange() { 
    Map<String, Object> args = new HashMap<String, Object>(); 
    args.put("x-delayed-type", "direct"); 
    return new CustomExchange("my-exchange", "x-delayed-message", true, false, args); 
} 

另一个问题是您确实应该将消息发布到该delay-exchange,而不是任何其他。再说一遍:无论如何,该文档中提到了这一点。

更新

自 Spring AMQP 1.6 起,延迟消息作为开箱即用的功能得到支持: https://spring.io/blog/2016/02/16/spring-amqp-1-6-0-milestone-1-and-1-5-4-available .