介绍 Java RabbitMQ Client
解耦组件是软件设计的重要部分,其中一种实现是使用消息系统。其提供异步方式实现组件或服务间通信。本文我们介绍其中一个消息系统实现:RabbitMQ。
RabbitMQ是实现Advanced Message Queuing Protocol (AMQP)协议的消息代理中间件,对主流语言都提供了客户端库。
除了对软件组件进行解耦,还可以用于下列场景:
- 执行后端操作
- 执行异步操作
- 在网络仅允许单向访问时,使用可以避免繁琐的轮询方式。
1. 消息模型
首先我们从高层看下消息是如何工作的。简言之,有两类应用于消息系统进行交互:生产者和消费者。生产者给代理发送(发布)消息,消费者从代理接收消息。通常这些软件组件运行在不同的主机上,RabbitMQ作为它们之间通信中间件。
本文我们讨论一个简单示例,两个组件使用RabbitMQ进行通信,其中一个服务负责发布消息至RabbitMQ,另一个负责消费。
2. 示例实现
2.1. 环境准备
开始之前需要先运行RabbitMQ,读者可以参考官方指南。
当然我们需要使用Java客户端与RabbitMQ服务端进行交互,Maven依赖如下:
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>4.0.0</version>
</dependency>
RabbitMQ服务器运行之后,我们需要使用java客户端进行连接:
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
ConnectionFactory 设置和消息服务器的连接,同时负责协议(AMQP)和认证。示例代码连接到localhost,我们可以使用setHost函数设置主机。
也可以使用setPort方法设置RabbitMQ服务器的端口号,默认为15672.
factory.setPort(15678);
当然也可以设置用户名和密码:
factory.setUsername("user1");
factory.setPassword("MyPassword");
下面使用该连接发布和消费消息。
2.2. 生产者
讨论简单场景,web应用允许用户给网站增加新的产品,当有新产品增加时,需要给客户发送邮件。首先我们定义队列:
channel.queueDeclare("products_queue", false, false, false, null);
每当用户增加新的产品时,我们给队列发布消息:
String message = "product details";
channel.basicPublish("", "products_queue", null, message.getBytes());
最后我们关闭通道和连接。发送的消息将被另一个服务消费,其负责发送邮件给客户。
2.3. 消费者
下面看如何实现消费端代码,首先声明相同的队列:
channel.queueDeclare("products_queue", false, false, false, null);
下面代码定义消费者采用异步方式处理消息:
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(
String consumerTag,
Envelope envelope,
AMQP.BasicProperties properties,
byte[] body) throws IOException {
String message = new String(body, "UTF-8");
// process the message
}
};
channel.basicConsume("products_queue", true, consumer);
3. 总结
本文简要讨论了RabbitMQ的基本概念,并通过简单示例说明如何实现生产者和消费者程序。
本文参考链接:https://blog.csdn.net/neweastsun/article/details/103546010