我正在开发一个涉及 2 个队列和多个与它们交互的监听器的项目。 流程:
- 新的 HTTP 请求到达服务器,然后将其转换为将成为消息的对象
- 此消息必须在两个队列中发布
- 我有两种类型的监听器,它们从每个队列获取消息,然后我做任何我想做的事情
我一直在阅读,最好的方法是使用扇出交换。这是我的代码:
监听器配置.xml
<!-- CREATE CONNECTION FACTORY -->
<rabbit:connection-factory id="connectionFactory"
host="localhost" username="guest" password="guest" />
<rabbit:admin connection-factory="connectionFactory" />
<!-- <!-- RABBIT QUEUE'S -->
<rabbit:queue id="trashroute.rabbit.queue" name="trashroute.rabbit.queue" auto-delete="false" auto-startup=false
durable="true" />
<!-- Webapp Queue -->
<rabbit:queue id="trashroute2.rabbit.queue" name="trashroute2.rabbit.queue" auto-delete="false" auto-startup=false
durable="true" />
<!-- CREATE AN EXCHANGE AND BIND THE QUEUE WITH MY.ROUTINGKEY.* TO THE EXCHANGE -->
<rabbit:fanout-exchange id="myExchange" name="trashroute-exchange">
<rabbit:bindings>
<rabbit:binding queue="trashroute.rabbit.queue"></rabbit:binding>
<rabbit:binding queue="trashroute2.rabbit.queue"></rabbit:binding>
</rabbit:bindings>
</rabbit:fanout-exchange>
<!-- CREATE THE RABBIT TEMPLATES -->
<rabbit:template connection-factory="connectionFactory" exchange="myExchange" queue="trashroute.rabbit.queue"/>
<rabbit:template connection-factory="connectionFactory" exchange="myExchange" queue="trashroute2.rabbit.queue"/>
<!-- INSTANTIATE THE LISTENERS -->
<bean id="persistenceListener" class="trashroute.rabbitmq.listener.PersistenceListener" />
<bean id="webappListener" class="trashroute.rabbitmq.listener.WebappListener" />
<!-- CREATE THE JsonMessageConverter BEAN -->
<bean id="jsonMessageConverter" class="org.springframework.amqp.support.converter.JsonMessageConverter" />
<!-- GLUE THE LISTENER AND QUEUE TO THE LISTENER CONTAINER -->
<rabbit:listener-container id="listenerContainer"
connection-factory="connectionFactory" message-converter="jsonMessageConverter">
<rabbit:listener ref="persistenceListener" queues="trashroute.rabbit.queue" />
<rabbit:listener ref="webappListener" queues="trashroute2.rabbit.queue" />
</rabbit:listener-container>
发送者配置.xml
<!-- First following line creates a rabbit connection factory with specified parameters -->
<rabbit:connection-factory id="connectionFactory" host="localhost" username="guest" password="guest" />
<!-- Obtain admin rights to create an exchange -->
<rabbit:admin connection-factory="connectionFactory" />
<!-- Create a bean which can send message to trashroute-exchange for the Java program to call -->
<rabbit:template id="template" connection-factory="connectionFactory" exchange="myExchange"
message-converter="jsonMessageConverter" />
<bean id="rabbitTemplate" class="org.springframework.amqp.rabbit.core.RabbitTemplate">
<property name="connectionFactory" ref="rabbitConnectionFactory"/>
<property name="messageConverter">
<bean class="org.springframework.amqp.support.converter.JsonMessageConverter"/>
</property>
监听器MainConfiguration.java
@Configuration
public class MainConfiguration {
protected final String persistenceQueue = "trashroute.rabbit.queue";
protected final String webappQueue = "trashroute2.rabbit.queue";
@Bean
public ConnectionFactory connectionFactory() {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory("localhost");
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
return connectionFactory;
}
@Bean
public AmqpAdmin amqpAdmin() {
return new RabbitAdmin(connectionFactory());
}
@Bean
public DataController DataController(){
return new DataController();
}
@Bean
// Every queue is bound to the default direct exchange
public Queue persistenceQueue() {
//Create a new queue with an specific name and the durability value in true.
return new Queue(this.persistenceQueue, true);
}
@Bean
public Queue webappQueue() {
//Create a new queue with an specific name and the durability value in true.
return new Queue(this.webappQueue, true);
}
}
发送者MainConfiguration.java
@Configuration
public class SenderConfiguration {
protected final String persistenceQueue = "trashroute.rabbit.queue";
protected final String webappQueue = "trashroute2.rabbit.queue";
//Create the Template
@Bean
public RabbitTemplate rabbitTemplate() {
RabbitTemplate template = new RabbitTemplate(connectionFactory());
template.setMessageConverter(new JsonMessageConverter());
return template;
}
@Bean
public ConnectionFactory connectionFactory() {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory(
"localhost");
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
return connectionFactory;
}
@Bean
public IServiceManager scheduledProducer() {
return new ServiceManagerImpl();
}
@Bean
public BeanPostProcessor postProcessor() {
return new ScheduledAnnotationBeanPostProcessor();
}
}
谁能告诉我我做错了什么吗?两个监听器之一工作完美,第二个从不读取消息。
请您参考如下方法:
根据上面解释的场景,我尝试创建一个使用 Spring Java Config 的示例应用程序。
消息被发布到trashroute
和webapp
队列,并且各自的接收者(persistence
和webapp
)接收消息消息。
RabbitConfiguration.java(包含发送方和接收方的配置)
@Configuration
@EnableRabbit
public class RabbitConfiguration {
public static final String BROADCAST_TRASHROUTE_QUEUE = "trashroute.rabbit.queue";
public static final String BROADCAST_WEBAPP_QUEUE = "webapp.rabbit.queue";
@Bean
public ConnectionFactory connectionFactory() {
CachingConnectionFactory connectionFactory =
new CachingConnectionFactory("localhost");
return connectionFactory;
}
@Bean
public AmqpAdmin amqpAdmin() {
return new RabbitAdmin(connectionFactory());
}
@Bean
public Queue trashRouteQueue() {
return new Queue(BROADCAST_TRASHROUTE_QUEUE);
}
@Bean
public Queue webAppQueue() {
return new Queue(BROADCAST_WEBAPP_QUEUE);
}
@Bean
public RabbitTemplate rabbitTemplate() {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory());
return rabbitTemplate;
}
@Bean
public FanoutExchange trashRouteExchange() {
FanoutExchange exchange = new FanoutExchange("trashroute");
return exchange;
}
@Bean
public Binding trashRouteBinding() {
return BindingBuilder.bind(trashRouteQueue()).to(trashRouteExchange());
}
@Bean
public Binding webAppBinding() {
return BindingBuilder.bind(webAppQueue()).to(trashRouteExchange());
}
@Bean
SimpleMessageListenerContainer persistenceListenerContainer(ConnectionFactory connectionFactory, @Qualifier("persistenceListenerAdapter") MessageListenerAdapter listenerAdapter) {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
container.setQueues(trashRouteQueue(), webAppQueue());
container.setMessageListener(listenerAdapter);
return container;
}
@Bean
MessageListenerAdapter persistenceListenerAdapter(PersistenceListener receiver) {
return new MessageListenerAdapter(receiver, "receiveMessage");
}
@Bean
SimpleMessageListenerContainer webAppListenerContainer(ConnectionFactory connectionFactory, @Qualifier("webAppListenerAdapter") MessageListenerAdapter listenerAdapter) {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
container.setQueues(trashRouteQueue(), webAppQueue());
container.setMessageListener(listenerAdapter);
return container;
}
@Bean
MessageListenerAdapter webAppListenerAdapter(WebAppListener webAppListener) {
return new MessageListenerAdapter(webAppListener, "receiveMessage");
}
@Bean
PersistenceListener persistenceListener() {
return new PersistenceListener();
}
@Bean
WebAppListener webAppListener() {
return new WebAppListener();
}
}
持久监听器.java
public class PersistenceListener {
public void receiveMessage(String message) {
System.out.println("Persistence Listener: Messsage Received <" + message + ">");
}
}
WebAppListener.java
public class WebAppListener {
public void receiveMessage(String message) {
System.out.println("WebAppListener: Message Received <" + message + ">");
}
}
应用程序.java
@SpringBootApplication
public class Application implements CommandLineRunner {
@Autowired
AnnotationConfigApplicationContext context;
public static void main(String[] args) throws InterruptedException {
SpringApplication.run(Application.class, args);
}
@Override
public void run(String... args) throws Exception {
System.out.println("Waiting five seconds...");
Thread.sleep(5000);
System.out.println("Sending message...");
RabbitTemplate rabbitTemplate = (RabbitTemplate) context.getBean("rabbitTemplate");
rabbitTemplate.convertAndSend(RabbitConfiguration.BROADCAST_TRASHROUTE_QUEUE, "Hello from trashroute queue!");
rabbitTemplate.convertAndSend(RabbitConfiguration.BROADCAST_WEBAPP_QUEUE, "Hello from webapp queue!");
Thread.sleep(10000);
context.close();
}
}
希望这会有所帮助。尽管如果您想在生产中使用它,您需要重构代码。