Skip to main content
 首页 » 编程设计

使用Spring AMQP进行消息传递

2022年07月19日148langtianya

使用Spring AMQP进行消息传递

本文讨论使用Spring AMQP框架实现AMQP消息通信。先介绍一些消息通信核心概念,然后通过一个实际示例进行实战。

1. 核心概念

1.1. 消息传输

消息传输是应用间进行消息通信的技术,基于异步消息传输代替基于请求-响应的同步架构。消息的生产者和消费者被中介消息层(也称为消息代理)解耦。消息代理具有消息持久化、消息过滤以及消息转换等特性。

消息传递键间的应用都是Java语言实现,则通常使用 JMS (Java Message Service) API。对于不同平台和提供商之间相互通信不能使用JMS客户端和代理服务器,这时需要使用AMQP。

1.2. AMQP – Advanced Message Queuing Protocol

AMQP是实现异步消息通信的开放标准连接规范,提供了应该如何构造消息的描述。

与JMS的差异

AMQP是平台独立二进制协议标准,库可以使用不同语言实现并运行与不同环境中。不受限与特定厂商,因此可以实现JMS代理之间迁移,主流的AMQP消息代理是 RabbitMQ, OpenAMQ, StormMQ。

AMQP实体

简要理解,AMQP有Exchange, Queue, Binding组成:
在这里插入图片描述
Exchange类似于邮电局或邮箱,客户端发布消息至AMQP exchange,有四种内置交换类型:

直接交换(Direct Exchange):通过完全匹配路由key路由消息至特定的队列。
扇出交换(Fanout Exchange): 路由消息至所有绑定的队列。
主题交换(Topic Exchange):路由消息至匹配特定模式的路由key的多个队列。
头信息交换(Headers Exchange): 基于消息头路由消息。

Queue 使用路由key绑定exchange。

Message 消息使用路由key发送至exchange,然后exchange分发消息副本至各个队列。

Spring AMQP

Spring AMQP有两个模块组成:spring-amqp 和 spring-rabbit,这些模块提供下列内容的抽象:

AMQP 实体 – 使用Message, Queue, Binding, Exchange 类创建AMQP 实体。
连接管理 – 使用CachingConnectionFactory连接至 RabbitMQ 代理。
消息发布 – 使用RabbitTemplate发送消息。
消息消费 – 使用 @RabbitListener 注解从队列读消息。

2. 创建Spring Amqp 示例应用

2.1. 安装 AMQP 消息代理

首先需要连接到RabbitMQ消息代理服务,最简单的方法是使用Docker获取并运行RabbitMQ:

docker run -d -p 5672:5672 -p 15672:15672 --name my-rabbit rabbitmq:3-management 
 

我们暴露5672端口为了让我们应用可以连接RabbitMQ。同时暴露15672为了通过管理界面看RabbitMQ正在做什么,url为:http://localhost:15672 ,HTTP API: http://localhost:15672/api/index.html。

现在我们通过Spring AMQP创建应用发送和接收"hello world"消息.

2.2. MAVEN依赖

为了增加spring-amqp 和 spring-rabbit 模块,需增加boot-starter-amqp 依赖:

<dependencies> 
    <dependency> 
        <groupId>org.springframework.boot</groupId> 
        <artifactId>spring-boot-starter-amqp</artifactId> 
        <version>2.1.6.RELEASE</version> 
    </dependency> 
</dependencies> 

读者可以适合的版本。

2.3. 连接至RabbitMQ代理服务器

使用Spring Boot 自动配置创建了 ConnectionFactory, RabbitTemplate, RabbitAdmin bean。因此我们有至RabbitMQ的连接在5672,并使用缺省的用户和密码:guest。我们仅需要灾厄应用使用 @SpringBootApplication注解:

@SpringBootApplication 
public class HelloWorldMessageApp { 
   // ... 
} 

2.4. 创建队列

通过简单定义Queue类型bean即可创建队列。RabbitAdmin会发现并使用路由key(myQueue)绑定至缺省exchange:

@Bean 
public Queue myQueue() { 
    return new Queue("myQueue", false); 
} 

如果需要定义多个队列,使用不同的方法名,则默认Bean名称与方法名一致。

@Bean 
Queue queue1() { 
    return new Queue(“queue1”, false); 
} 
 
@Bean 
Queue queue2() { 
    return new Queue(“queue2”, false);  
} 

我们将队列设置为非持久性,即当RabbitMQ停止时队列中的任何消息将会删除。但重启我们应用程序对队列没有影响。
如果需要重启消息代理服务器队列仍然存在,需要通过参数true设置队列为持久状态,但不能保证其中的消息仍然存在,如果必要需设置消息投递模式为持久化模式。

2.5. 发送消息

使用RabbitTemplate 发送 “Hello, world!”:

rabbitTemplate.convertAndSend("myQueue", "Hello, world!"); 

2.6. 消费消息

通过@RabbitListener注解实现消费消息:

@RabbitListener(queues = "myQueue") 
public void listen(String in) { 
    System.out.println("Message read from myQueue : " + in); 
} 

2.7. 运行示例

首先启动RabbitMQ代理:

docker run -d -p 5672:5672 -p 15672:15672 --name my-rabbit rabbitmq:3-management 

然后运行spring boot应用,HelloWorldMessage.java中的main方法:

mvn spring-boot:run -Dstart-class=com.baeldung.springamqp.simple.HelloWorldMessageApp 

当应用正运行时我们会看到:

  • 应用发送消息至缺省exchange,使用myQueue作为路由key
  • 然后队列myQueue接收到消息
  • 最后监听方法从队列myQueue中消费消息,然后打印至控制台

我们也可以使用RabbitMQ管理url http://localhost:15672 查看发送的消息和消费的消息。

3. 总结

本文我们介绍使用Spring AMQP实现AMQP规范的消息通信应用。


本文参考链接:https://blog.csdn.net/neweastsun/article/details/103553300