Java中使用redis作为消息队列 使用redis作为消息队列 在Java中使用Redis作为消息队列,可以通过Redis的List
数据结构或者Pub/Sub
模式来实现。以下是一个简单的示例,展示了如何使用Redis的List
作为消息队列。
1. 使用Redis的List作为消息队列 Redis的List
数据结构非常适合用来实现消息队列,因为它支持在列表的两端进行操作。
1.1 生产者(发送消息) 生产者将消息推入列表的头部:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 import redis.clients.jedis.Jedis;public class RedisProducer { private Jedis jedis; public RedisProducer (String host, int port) { this .jedis = new Jedis (host, port); } public void produce (String queueName, String message) { jedis.lpush(queueName, message); } public void close () { jedis.close(); } }
1.2 消费者(接收消息) 消费者从列表的尾部弹出消息:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 import redis.clients.jedis.Jedis; public class RedisConsumer { private Jedis jedis; public RedisConsumer(String host, int port) { this.jedis = new Jedis(host, port); } public String consume(String queueName) { return jedis.rpop(queueName); } public void close() { jedis.close(); } }
1.3 阻塞式消费(BLPOP) 为了提高效率,可以使用BLPOP
方法,它会在没有消息时阻塞,直到有新消息到达:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 import redis.clients.jedis.Jedis; public class RedisBlockingConsumer { private Jedis jedis; public RedisBlockingConsumer(String host, int port) { this.jedis = new Jedis(host, port); } public String blockingConsume(String queueName, int timeout) { String message = jedis.blpop(timeout, queueName).get(1); return message; } public void close() { jedis.close(); } }
2. 使用Redis的Pub/Sub模式 Redis的发布订阅模式适用于一对多的消息分发。
2.1 发布者(发布消息) 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 import redis.clients.jedis.Jedis; public class RedisPublisher { private Jedis jedis; public RedisPublisher(String host, int port) { this.jedis = new Jedis(host, port); } public void publish(String channel, String message) { jedis.publish(channel, message); } public void close() { jedis.close(); } }
2.2 订阅者(接收消息) 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 import redis.clients.jedis.Jedis; import redis.clients.jedis.JedisPubSub; public class RedisSubscriber { private Jedis jedis; public RedisSubscriber(String host, int port) { this.jedis = new Jedis(host, port); } public void subscribe(String channel, JedisPubSub jedisPubSub) { jedis.subscribe(jedisPubSub, channel); } public void close() { jedis.close(); } }
3. 使用Redisson库 Redisson是一个功能丰富的Redis Java客户端,提供了更高层次的抽象,包括消息队列、分布式锁等。
3.1 添加依赖 1 2 3 4 5 <dependency > <groupId > org.redisson</groupId > <artifactId > redisson</artifactId > <version > 3.17.5</version > </dependency >
运行 HTML
3.2 生产者和消费者 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 import org.redisson.Redisson; import org.redisson.api.RQueue; import org.redisson.api.RedissonClient; import org.redisson.config.Config; public class RedissonQueueExample { public static void main(String[] args) { Config config = new Config(); config.useSingleServer().setAddress("redis://127.0.0.1:6379"); RedissonClient redisson = Redisson.create(config); RQueue<String> queue = redisson.getQueue("myQueue"); // 生产者 new Thread(() -> { for (int i = 0; i < 10; i++) { queue.offer("Message " + i); System.out.println("Produced: Message " + i); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } } }).start(); // 消费者 new Thread(() -> { while (true) { String message = queue.poll(); if (message != null) { System.out.println("Consumed: " + message); } else { try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } } } }).start(); redisson.shutdown(); } }
4. 总结
List :适用于简单的队列场景,支持阻塞式操作。
Pub/Sub :适用于发布订阅场景,适合一对多的消息分发。
Redisson :提供了更高级的抽象,适合复杂的分布式应用场景。
选择哪种方式取决于你的具体需求。如果需要可靠的消息传递和队列功能,建议使用List
或Redisson的队列实现。
但是还是存在缺点,redis不存在消息确认机制,所以还得再优化一下。
Redis 作为消息队列+消息确认机制 为了在使用 Redis 作为消息队列的基础上加上消息确认机制,我们可以采用以下方案:
方案概述
生产者 :将消息放入 queue:待处理
列表。
消费者 :使用 Lua 脚本原子地将消息从 queue:待处理
移动到 queue:正在处理
,并记录处理开始时间。
消息处理 :消费者处理消息,处理完成后从 queue:正在处理
删除消息,并删除处理记录。
故障处理 :如果处理失败,可以选择不删除消息,或者将其重新放入 queue:待处理
。
定时检查 :使用定时任务检查 queue:正在处理
中的消息,如果超过阈值时间未处理完成,则重新放入 queue:待处理
。
实现步骤 1. 生产者代码 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 import redis.clients.jedis.Jedis;public class RedisProducer { private Jedis jedis; public RedisProducer (String host, int port) { this .jedis = new Jedis (host, port); } public void produce (String queueName, String message) { jedis.lpush(queueName, message); } public void close () { jedis.close(); } }
2. 消费者代码 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 import redis.clients.jedis.Jedis;import redis.clients.jedis.Scripting;public class RedisConsumer { private Jedis jedis; private String moveScript = "local message = redis.call('RPOP', KEYS[1]); if message then redis.call('LPUSH', KEYS[2], message); end; return message;" ; public RedisConsumer (String host, int port) { this .jedis = new Jedis (host, port); } public String consume (String待处理Queue, String正在处理Queue) { String message = (String) jedis.eval(moveScript, 2 ,待处理Queue,正在处理Queue); if (message != null ) { jedis.hset("hash:正在处理" , message, String.valueOf(System.currentTimeMillis())); } return message; } public void confirm (String正在处理Queue, String message) { jedis.lrem(正在处理Queue, 0 , message); jedis.hdel("hash:正在处理" , message); } public void close () { jedis.close(); } }
3. 定时检查任务 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 import redis.clients.jedis.Jedis;import java.util.Set;import java.util.Date;public class MessageMonitor { private Jedis jedis; private long threshold = 60000 ; public MessageMonitor (String host, int port) { this .jedis = new Jedis (host, port); } public void checkMessages () { Set<String> messages = jedis.hkeys("hash:正在处理" ); for (String message : messages) { long startTime = Long.parseLong(jedis.hget("hash:正在处理" , message)); if (new Date ().getTime() - startTime > threshold) { jedis.lpush("queue:待处理" , message); jedis.hdel("hash:正在处理" , message); } } } public void scheduleCheck () { new ScheduledThreadPoolExecutor (1 ).scheduleAtFixedRate(() -> checkMessages(), 0 , 30 , TimeUnit.SECONDS); } public void close () { jedis.close(); } }
代码说明
生产者 :将消息放入 queue:待处理
列表。
消费者 :
使用 Lua 脚本原子地将消息从 queue:待处理
移动到 queue:正在处理
。
记录消息处理开始时间到 hash:正在处理
。
处理消息后,调用 confirm
方法删除消息和处理记录。
定时检查任务 :
每 30 秒检查一次 hash:正在处理
中的消息。
如果消息处理时间超过阈值(例如 1 分钟),将其重新放入 queue:待处理
。
优点
原子操作 :使用 Lua 脚本确保消息移动的原子性。
故障 tolerance :通过定时检查任务,确保消息不会永久卡在 queue:正在处理
中。
消息不丢失 :处理失败的消息会重新进入待处理队列。
缺点
复杂性 :需要额外的逻辑来处理消息确认和故障恢复。
性能开销 :定时任务和哈希表操作可能增加系统负载。
结论 通过上述方案,我们实现了基于 Redis 的消息队列的消息确认机制,确保消息的可靠传递和处理。虽然实现较为复杂,但在 Redis 作为消息队列的场景下,这是一个可行的解决方案。