Skip to main content
 首页 » 编程设计

rabbitmq之具有 2 个队列的 Spring AMQP 项目

2024年07月26日14mq0036

我正在开发一个涉及 2 个队列和多个与它们交互的监听器的项目。 流程:

  • 新的 HTTP 请求到达服务器,然后将其转换为将成为消息的对象
  • 此消息必须在两个队列中发布
  • 我有两种类型的监听器,它们从每个队列获取消息,然后我做任何我想做的事情

我一直在阅读,最好的方法是使用扇出交换。这是我的代码:

监听器配置.xml

<!-- CREATE CONNECTION FACTORY --> 
<rabbit:connection-factory id="connectionFactory" 
    host="localhost" username="guest" password="guest" /> 
 
<rabbit:admin connection-factory="connectionFactory" /> 
 
<!-- <!-- RABBIT QUEUE'S --> 
<rabbit:queue id="trashroute.rabbit.queue" name="trashroute.rabbit.queue" auto-delete="false" auto-startup=false 
    durable="true" /> 
<!-- Webapp Queue --> 
<rabbit:queue id="trashroute2.rabbit.queue" name="trashroute2.rabbit.queue" auto-delete="false" auto-startup=false 
    durable="true" />  
 
<!-- CREATE AN EXCHANGE AND BIND THE QUEUE WITH MY.ROUTINGKEY.* TO THE EXCHANGE --> 
<rabbit:fanout-exchange id="myExchange" name="trashroute-exchange"> 
    <rabbit:bindings> 
        <rabbit:binding queue="trashroute.rabbit.queue"></rabbit:binding> 
        <rabbit:binding queue="trashroute2.rabbit.queue"></rabbit:binding> 
    </rabbit:bindings> 
</rabbit:fanout-exchange> 
 
<!-- CREATE THE RABBIT TEMPLATES --> 
<rabbit:template connection-factory="connectionFactory" exchange="myExchange" queue="trashroute.rabbit.queue"/> 
<rabbit:template connection-factory="connectionFactory" exchange="myExchange" queue="trashroute2.rabbit.queue"/> 
 
<!-- INSTANTIATE THE LISTENERS --> 
<bean id="persistenceListener" class="trashroute.rabbitmq.listener.PersistenceListener" /> 
<bean id="webappListener" class="trashroute.rabbitmq.listener.WebappListener" /> 
 
<!-- CREATE THE JsonMessageConverter BEAN --> 
<bean id="jsonMessageConverter" class="org.springframework.amqp.support.converter.JsonMessageConverter" /> 
 
<!-- GLUE THE LISTENER AND QUEUE TO THE LISTENER CONTAINER --> 
<rabbit:listener-container id="listenerContainer" 
    connection-factory="connectionFactory" message-converter="jsonMessageConverter"> 
    <rabbit:listener ref="persistenceListener" queues="trashroute.rabbit.queue" /> 
    <rabbit:listener ref="webappListener" queues="trashroute2.rabbit.queue" /> 
</rabbit:listener-container> 

发送者配置.xml

<!--  First following line creates a rabbit connection factory with specified parameters --> 
<rabbit:connection-factory id="connectionFactory" host="localhost" username="guest" password="guest" /> 
 
<!-- Obtain admin rights to create an exchange --> 
<rabbit:admin connection-factory="connectionFactory" /> 
 
<!-- Create a bean which can send message to trashroute-exchange for the Java program to call --> 
<rabbit:template id="template" connection-factory="connectionFactory"  exchange="myExchange" 
message-converter="jsonMessageConverter" /> 
 
 
<bean id="rabbitTemplate" class="org.springframework.amqp.rabbit.core.RabbitTemplate"> 
<property name="connectionFactory" ref="rabbitConnectionFactory"/> 
<property name="messageConverter"> 
    <bean class="org.springframework.amqp.support.converter.JsonMessageConverter"/> 
</property> 

监听器MainConfiguration.java

@Configuration 
public class MainConfiguration { 
 
protected final String persistenceQueue = "trashroute.rabbit.queue"; 
protected final String webappQueue = "trashroute2.rabbit.queue"; 
 
@Bean 
public ConnectionFactory connectionFactory() { 
    CachingConnectionFactory connectionFactory = new CachingConnectionFactory("localhost"); 
    connectionFactory.setUsername("guest"); 
    connectionFactory.setPassword("guest"); 
    return connectionFactory; 
} 
 
@Bean 
public AmqpAdmin amqpAdmin() { 
    return new RabbitAdmin(connectionFactory()); 
} 
 
@Bean 
public DataController DataController(){ 
    return new DataController(); 
} 
 
@Bean 
// Every queue is bound to the default direct exchange 
public Queue persistenceQueue() {  
    //Create a new queue with an specific name and the durability value in true. 
    return new Queue(this.persistenceQueue, true); 
} 
 
@Bean 
public Queue webappQueue() { 
    //Create a new queue with an specific name and the durability value in true. 
    return new Queue(this.webappQueue, true); 
} 
} 

发送者MainConfiguration.java

@Configuration 
public class SenderConfiguration { 
 
protected final String persistenceQueue = "trashroute.rabbit.queue"; 
protected final String webappQueue = "trashroute2.rabbit.queue"; 
 
//Create the Template 
@Bean 
public RabbitTemplate rabbitTemplate() { 
    RabbitTemplate template = new RabbitTemplate(connectionFactory()); 
    template.setMessageConverter(new JsonMessageConverter()); 
    return template; 
} 
 
@Bean 
public ConnectionFactory connectionFactory() { 
    CachingConnectionFactory connectionFactory = new CachingConnectionFactory( 
            "localhost"); 
    connectionFactory.setUsername("guest"); 
    connectionFactory.setPassword("guest"); 
    return connectionFactory; 
} 
 
@Bean 
public IServiceManager scheduledProducer() { 
    return new ServiceManagerImpl(); 
} 
 
@Bean 
public BeanPostProcessor postProcessor() { 
    return new ScheduledAnnotationBeanPostProcessor(); 
} 
 
} 

谁能告诉我我做错了什么吗?两个监听器之一工作完美,第二个从不读取消息。

请您参考如下方法:

根据上面解释的场景,我尝试创建一个使用 Spring Java Config 的示例应用程序。

消息被发布到trashroutewebapp队列,并且各自的接收者(persistencewebapp)接收消息消息。

RabbitConfiguration.java(包含发送方和接收方的配置)

@Configuration 
@EnableRabbit 
public class RabbitConfiguration { 
 
    public static final String BROADCAST_TRASHROUTE_QUEUE = "trashroute.rabbit.queue"; 
    public static final String BROADCAST_WEBAPP_QUEUE = "webapp.rabbit.queue"; 
 
    @Bean 
    public ConnectionFactory connectionFactory() { 
        CachingConnectionFactory connectionFactory = 
                new CachingConnectionFactory("localhost"); 
        return connectionFactory; 
    } 
 
 
    @Bean 
    public AmqpAdmin amqpAdmin() { 
        return new RabbitAdmin(connectionFactory()); 
    } 
 
    @Bean 
    public Queue trashRouteQueue() { 
        return new Queue(BROADCAST_TRASHROUTE_QUEUE); 
    } 
 
    @Bean 
    public Queue webAppQueue() { 
        return new Queue(BROADCAST_WEBAPP_QUEUE); 
    } 
 
    @Bean 
    public RabbitTemplate rabbitTemplate() { 
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory()); 
        return rabbitTemplate; 
    } 
 
    @Bean 
    public FanoutExchange trashRouteExchange() { 
        FanoutExchange exchange = new FanoutExchange("trashroute"); 
        return exchange; 
    } 
 
    @Bean 
    public Binding trashRouteBinding() { 
        return BindingBuilder.bind(trashRouteQueue()).to(trashRouteExchange()); 
    } 
 
    @Bean 
    public Binding webAppBinding() { 
        return BindingBuilder.bind(webAppQueue()).to(trashRouteExchange()); 
    } 
 
    @Bean 
    SimpleMessageListenerContainer persistenceListenerContainer(ConnectionFactory connectionFactory, @Qualifier("persistenceListenerAdapter") MessageListenerAdapter listenerAdapter) { 
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(); 
        container.setConnectionFactory(connectionFactory); 
        container.setQueues(trashRouteQueue(), webAppQueue()); 
        container.setMessageListener(listenerAdapter); 
        return container; 
    } 
 
    @Bean 
    MessageListenerAdapter persistenceListenerAdapter(PersistenceListener receiver) { 
        return new MessageListenerAdapter(receiver, "receiveMessage"); 
    } 
 
    @Bean 
    SimpleMessageListenerContainer webAppListenerContainer(ConnectionFactory connectionFactory, @Qualifier("webAppListenerAdapter") MessageListenerAdapter listenerAdapter) { 
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(); 
        container.setConnectionFactory(connectionFactory); 
        container.setQueues(trashRouteQueue(), webAppQueue()); 
        container.setMessageListener(listenerAdapter); 
        return container; 
    } 
 
    @Bean 
    MessageListenerAdapter webAppListenerAdapter(WebAppListener webAppListener) { 
        return new MessageListenerAdapter(webAppListener, "receiveMessage"); 
    } 
 
    @Bean 
    PersistenceListener persistenceListener() { 
        return new PersistenceListener(); 
    } 
 
    @Bean 
    WebAppListener webAppListener() { 
        return new WebAppListener(); 
    } 
 
} 

持久监听器.java

public class PersistenceListener { 
 
    public void receiveMessage(String message) { 
        System.out.println("Persistence Listener: Messsage Received <" + message + ">"); 
    } 
} 

WebAppListener.java

public class WebAppListener { 
    public void receiveMessage(String message) { 
        System.out.println("WebAppListener: Message Received <" + message + ">"); 
    } 
} 

应用程序.java

@SpringBootApplication 
public class Application implements CommandLineRunner { 
 
    @Autowired 
    AnnotationConfigApplicationContext context; 
 
    public static void main(String[] args) throws InterruptedException { 
        SpringApplication.run(Application.class, args); 
    } 
 
    @Override 
    public void run(String... args) throws Exception { 
        System.out.println("Waiting five seconds..."); 
        Thread.sleep(5000); 
        System.out.println("Sending message..."); 
 
        RabbitTemplate rabbitTemplate = (RabbitTemplate) context.getBean("rabbitTemplate"); 
 
        rabbitTemplate.convertAndSend(RabbitConfiguration.BROADCAST_TRASHROUTE_QUEUE, "Hello from trashroute queue!"); 
        rabbitTemplate.convertAndSend(RabbitConfiguration.BROADCAST_WEBAPP_QUEUE, "Hello from webapp queue!"); 
 
        Thread.sleep(10000); 
        context.close(); 
    } 
} 

希望这会有所帮助。尽管如果您想在生产中使用它,您需要重构代码。