使用Spring AMQP进行消息传递
本文讨论使用Spring AMQP框架实现AMQP消息通信。先介绍一些消息通信核心概念,然后通过一个实际示例进行实战。
1. 核心概念
1.1. 消息传输
消息传输是应用间进行消息通信的技术,基于异步消息传输代替基于请求-响应的同步架构。消息的生产者和消费者被中介消息层(也称为消息代理)解耦。消息代理具有消息持久化、消息过滤以及消息转换等特性。
消息传递键间的应用都是Java语言实现,则通常使用 JMS (Java Message Service) API。对于不同平台和提供商之间相互通信不能使用JMS客户端和代理服务器,这时需要使用AMQP。
1.2. AMQP – Advanced Message Queuing Protocol
AMQP是实现异步消息通信的开放标准连接规范,提供了应该如何构造消息的描述。
与JMS的差异
AMQP是平台独立二进制协议标准,库可以使用不同语言实现并运行与不同环境中。不受限与特定厂商,因此可以实现JMS代理之间迁移,主流的AMQP消息代理是 RabbitMQ, OpenAMQ, StormMQ。
AMQP实体
简要理解,AMQP有Exchange, Queue, Binding组成:
Exchange类似于邮电局或邮箱,客户端发布消息至AMQP exchange,有四种内置交换类型:
直接交换(Direct Exchange):通过完全匹配路由key路由消息至特定的队列。
扇出交换(Fanout Exchange): 路由消息至所有绑定的队列。
主题交换(Topic Exchange):路由消息至匹配特定模式的路由key的多个队列。
头信息交换(Headers Exchange): 基于消息头路由消息。
Queue 使用路由key绑定exchange。
Message 消息使用路由key发送至exchange,然后exchange分发消息副本至各个队列。
Spring AMQP
Spring AMQP有两个模块组成:spring-amqp 和 spring-rabbit,这些模块提供下列内容的抽象:
AMQP 实体 – 使用Message, Queue, Binding, Exchange 类创建AMQP 实体。
连接管理 – 使用CachingConnectionFactory连接至 RabbitMQ 代理。
消息发布 – 使用RabbitTemplate发送消息。
消息消费 – 使用 @RabbitListener 注解从队列读消息。
2. 创建Spring Amqp 示例应用
2.1. 安装 AMQP 消息代理
首先需要连接到RabbitMQ消息代理服务,最简单的方法是使用Docker获取并运行RabbitMQ:
docker run -d -p 5672:5672 -p 15672:15672 --name my-rabbit rabbitmq:3-management
我们暴露5672端口为了让我们应用可以连接RabbitMQ。同时暴露15672为了通过管理界面看RabbitMQ正在做什么,url为:http://localhost:15672 ,HTTP API: http://localhost:15672/api/index.html。
现在我们通过Spring AMQP创建应用发送和接收"hello world"消息.
2.2. MAVEN依赖
为了增加spring-amqp 和 spring-rabbit 模块,需增加boot-starter-amqp 依赖:
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
<version>2.1.6.RELEASE</version>
</dependency>
</dependencies>
读者可以适合的版本。
2.3. 连接至RabbitMQ代理服务器
使用Spring Boot 自动配置创建了 ConnectionFactory, RabbitTemplate, RabbitAdmin bean。因此我们有至RabbitMQ的连接在5672,并使用缺省的用户和密码:guest。我们仅需要灾厄应用使用 @SpringBootApplication注解:
@SpringBootApplication
public class HelloWorldMessageApp {
// ...
}
2.4. 创建队列
通过简单定义Queue类型bean即可创建队列。RabbitAdmin会发现并使用路由key(myQueue)绑定至缺省exchange:
@Bean
public Queue myQueue() {
return new Queue("myQueue", false);
}
如果需要定义多个队列,使用不同的方法名,则默认Bean名称与方法名一致。
@Bean
Queue queue1() {
return new Queue(“queue1”, false);
}
@Bean
Queue queue2() {
return new Queue(“queue2”, false);
}
我们将队列设置为非持久性,即当RabbitMQ停止时队列中的任何消息将会删除。但重启我们应用程序对队列没有影响。
如果需要重启消息代理服务器队列仍然存在,需要通过参数true设置队列为持久状态,但不能保证其中的消息仍然存在,如果必要需设置消息投递模式为持久化模式。
2.5. 发送消息
使用RabbitTemplate 发送 “Hello, world!”:
rabbitTemplate.convertAndSend("myQueue", "Hello, world!");
2.6. 消费消息
通过@RabbitListener注解实现消费消息:
@RabbitListener(queues = "myQueue")
public void listen(String in) {
System.out.println("Message read from myQueue : " + in);
}
2.7. 运行示例
首先启动RabbitMQ代理:
docker run -d -p 5672:5672 -p 15672:15672 --name my-rabbit rabbitmq:3-management
然后运行spring boot应用,HelloWorldMessage.java中的main方法:
mvn spring-boot:run -Dstart-class=com.baeldung.springamqp.simple.HelloWorldMessageApp
当应用正运行时我们会看到:
- 应用发送消息至缺省exchange,使用myQueue作为路由key
- 然后队列myQueue接收到消息
- 最后监听方法从队列myQueue中消费消息,然后打印至控制台
我们也可以使用RabbitMQ管理url http://localhost:15672 查看发送的消息和消费的消息。
3. 总结
本文我们介绍使用Spring AMQP实现AMQP规范的消息通信应用。
本文参考链接:https://blog.csdn.net/neweastsun/article/details/103553300