使用redis实现简单消息队列
本文我们使用redis中的list命令实现简单的消息队列。
需求背景
一个允许用户上传照片的应用,然后使用不同大小展示照片(缩略、中等大小和较大方式)。第一次实现考虑在上传照片请求中实现图像转换,但处理图像任务比较重,使得请求非常慢,用户体验不好。
可能的解决方案通过Message Queue(MQ,消息队列)实现异步处理。有许多知名的MQ产品,如:ActiveMQ, RabbitMQ, IBM MQ等。当然直接使用java queue数据结构也可以应用内实现,没有绝对的选型,只有相对合理。本例中我们使用redis的list结构实现消息队列,仅为学习说明问题而已。
redis消息队列
大致的想法是:使用list作为消息队列,生产者负责生产待处理的消息,消费者监视list队列并负责处理消息。
基本流程为:
- 生产者在队列的尾部增加消息,使用rpush queue message 命令。
- 消费者在队列的头部读取消息,使用lpop queue 命令。
业务过程实现先进先出(FIFO)。客户端总是要不断监控新的消息,因此需要使用BLPOP命令——lpop的阻塞版本。同时我们在while循环中调用blpop命令,模拟一直在监控。
具体实现
定义ImageUploader类负责上传图片至服务器,并往队列中添加消息,表明有图片需要处理,消息可以是json字符串:{“imagePath”:”/path/to/image”, “user”:”userid”}
ImageUploader类代码片段如下:
public class ImageUploader {
public void uploadImage(HttpServletRequest request){
String imagePath = saveImage(request);
String jsonPayload = createJsonPayload(request, imagePath);
jedis.rpush("queue", jsonPayload);
//... keep with the processing
}
//.... other methods in the class
}
代码大致展示了生产者如何工作的,通过消息队列把生产者上传图片和处理图片进行解耦。这里仅在队列中产生了一个新的消息,由消费者负责处理。
下面是消费者处理MessageConsumer类代码片段:
import java.util.List;
import redis.clients.jedis.Jedis;
public class MessageConsumer
{
public static void main( String[] args )
{
Jedis jedis = new Jedis("localhost");
List<String> messages = null;
while(true){
System.out.println("Waiting for a message in the queue");
messages = jedis.blpop(0,"queue");
System.out.println("Got the message");
System.out.println("KEY:" + messages.get(0) + " VALUE:" + messages.get(1));
String payload = messages.get(1);
//Do some processing with the payload
System.out.println("Message received:" + payload);
}
}
}
消费者代码可以运行在不同业务过程中、甚至在不同机器上。
jedis.blpop方法返回List,包括两个字符串,key和value。该方法还可以增加integer参数,表示超时时间。这里使用0表示永不超时。
运行测试
当队列中没有值时运行程序,控制台输出消息:
"Waiting for a message in the queue".
然后通过客户端增加一些消息至队列中,消费者会获取值并处理。我们可以使用redis-cli增加消息,当然也可以通过代码增加消息:
import redis.clients.jedis.Jedis;
public class MessageProducer {
public static void main(String[] args) {
Jedis jedis = new Jedis("localhost");
jedis.rpush("queue", "Value 1");
jedis.rpush("queue", "Value 2");
jedis.rpush("queue", "Value 3");
}
}
如果我们在MessageConsumer类已经开始运行之后在运行MessageProducer类,则控制台输出如下:
Waiting for a message in the queue
Got the message
KEY:queue VALUE:Value 1
Message received:Value 1
Waiting for a message in the queue
Got the message
KEY:queue VALUE:Value 2
Message received:Value 2
Waiting for a message in the queue
Got the message
KEY:queue VALUE:Value 3
Message received:Value 3
Waiting for a message in the queue
总结
使用redis作为消息队列实现业务解耦不失为一种便捷方式。确实也有一些队列是基于redis实现的,如 RestMQ, Resque。
本文参考链接:https://blog.csdn.net/neweastsun/article/details/92634491