Skip to main content
 首页 » 数据库

使用redis实现简单消息队列

2022年07月19日157kenshinobiy

使用redis实现简单消息队列

本文我们使用redis中的list命令实现简单的消息队列。

需求背景

一个允许用户上传照片的应用,然后使用不同大小展示照片(缩略、中等大小和较大方式)。第一次实现考虑在上传照片请求中实现图像转换,但处理图像任务比较重,使得请求非常慢,用户体验不好。

可能的解决方案通过Message Queue(MQ,消息队列)实现异步处理。有许多知名的MQ产品,如:ActiveMQ, RabbitMQ, IBM MQ等。当然直接使用java queue数据结构也可以应用内实现,没有绝对的选型,只有相对合理。本例中我们使用redis的list结构实现消息队列,仅为学习说明问题而已。

redis消息队列

大致的想法是:使用list作为消息队列,生产者负责生产待处理的消息,消费者监视list队列并负责处理消息。
基本流程为:

  1. 生产者在队列的尾部增加消息,使用rpush queue message 命令。
  2. 消费者在队列的头部读取消息,使用lpop queue 命令。

业务过程实现先进先出(FIFO)。客户端总是要不断监控新的消息,因此需要使用BLPOP命令——lpop的阻塞版本。同时我们在while循环中调用blpop命令,模拟一直在监控。

具体实现

定义ImageUploader类负责上传图片至服务器,并往队列中添加消息,表明有图片需要处理,消息可以是json字符串:{“imagePath”:”/path/to/image”, “user”:”userid”}

ImageUploader类代码片段如下:

public class ImageUploader { 
  
  public void uploadImage(HttpServletRequest request){ 
      
    String imagePath = saveImage(request); 
    String jsonPayload = createJsonPayload(request, imagePath); 
    jedis.rpush("queue", jsonPayload); 
    //... keep with the processing 
  } 
    
  //.... other methods in the class 
} 

代码大致展示了生产者如何工作的,通过消息队列把生产者上传图片和处理图片进行解耦。这里仅在队列中产生了一个新的消息,由消费者负责处理。

下面是消费者处理MessageConsumer类代码片段:

import java.util.List; 
  
import redis.clients.jedis.Jedis; 
  
public class MessageConsumer  
{ 
    public static void main( String[] args ) 
    { 
        Jedis jedis = new Jedis("localhost");    
        List<String> messages = null; 
        while(true){ 
          System.out.println("Waiting for a message in the queue"); 
          messages = jedis.blpop(0,"queue"); 
          System.out.println("Got the message"); 
          System.out.println("KEY:" + messages.get(0) + " VALUE:" + messages.get(1)); 
          String payload = messages.get(1); 
          //Do some processing with the payload 
          System.out.println("Message received:" + payload); 
        } 
 
    } 
} 

消费者代码可以运行在不同业务过程中、甚至在不同机器上。

jedis.blpop方法返回List,包括两个字符串,key和value。该方法还可以增加integer参数,表示超时时间。这里使用0表示永不超时。

运行测试

当队列中没有值时运行程序,控制台输出消息:

"Waiting for a message in the queue".  

然后通过客户端增加一些消息至队列中,消费者会获取值并处理。我们可以使用redis-cli增加消息,当然也可以通过代码增加消息:

import redis.clients.jedis.Jedis; 
  
public class MessageProducer { 
  
  public static void main(String[] args) { 
    Jedis jedis = new Jedis("localhost"); 
      
    jedis.rpush("queue", "Value 1"); 
    jedis.rpush("queue", "Value 2"); 
    jedis.rpush("queue", "Value 3"); 
  } 
} 

如果我们在MessageConsumer类已经开始运行之后在运行MessageProducer类,则控制台输出如下:

Waiting for a message in the queue 
Got the message 
KEY:queue VALUE:Value 1 
Message received:Value 1 
Waiting for a message in the queue 
Got the message 
KEY:queue VALUE:Value 2 
Message received:Value 2 
Waiting for a message in the queue 
Got the message 
KEY:queue VALUE:Value 3 
Message received:Value 3 
Waiting for a message in the queue 

总结

使用redis作为消息队列实现业务解耦不失为一种便捷方式。确实也有一些队列是基于redis实现的,如 RestMQ, Resque。


本文参考链接:https://blog.csdn.net/neweastsun/article/details/92634491