首页IT科技redis 关闭服务(利用Redisson实现订单关闭)

redis 关闭服务(利用Redisson实现订单关闭)

时间2025-06-20 02:53:08分类IT科技浏览5445
导读:实体类 为了方便测试,直接在测试类中的写内部类:...

实体类

为了方便测试              ,直接在测试类中的写内部类:

@Data @AllArgsConstructor @NoArgsConstructor public class OrderInfo { /** * 订单id */ private Integer id; /** * 描述:用来记录关闭时间                      ,可以在测试时用来验证              。关闭时间是否跟 expireTime相等 */ private String description; /** * 创建时间 */ private LocalDateTime createTime; /** * 过期时间:关闭时间 */ private LocalDateTime expireTime; }

生成订单

模拟生成订单并设置过期时间                      。

执行时会在redis创建2个key: redisson_delay_queue:{<closeKey> } :订单数据 redisson_delay_queue_timeout:{<closeKey> } :zset类型       ,按时间戳排序 /** * 创建订单       ,并设置过期时间 * * @throws IOException */ @Test void createOrder() { RBlockingDeque<OrderInfo> blockingDeque = redissonClient.getBlockingDeque(closeKey); RDelayedQueue<OrderInfo> delayedQueue = redissonClient.getDelayedQueue(blockingDeque); // 100条订单 int n = 100; Random random = new Random(); for (int i = 0; i < n; i++) { // 1~100之间的正整数 int i1 = random.nextInt(100) + 1; LocalDateTime now = LocalDateTime.now(); delayedQueue.offer(new OrderInfo(i + 1, "close: " + i1, now, now.plusSeconds(i1)), i1, TimeUnit.SECONDS); } }

关闭订单

关闭订单                      ,这里会产生订阅       。redis会出现redisson_delay_queue_channel       。

/** * 关闭订单 * * @throws IOException */ @Test void closeOrder() { ReentrantLock lock = new ReentrantLock(); // 5个线程 int poolSize = 5; List<CompletableFuture<Void>> futureList = new ArrayList<>(); for (int i = 0; i < poolSize; i++) { futureList.add(CompletableFuture.runAsync(() -> { RBlockingDeque<OrderInfo> blockingDeque = redissonClient.getBlockingDeque(closeKey); // 加入监听 redissonClient.getDelayedQueue(blockingDeque); while (true) { OrderInfo take; try { take = blockingDeque.take(); } catch (Exception e) { continue; } if (take == null) { continue; } // 验证多次是否会重复关闭                      。正常里不会近              ,只是验证下              。正式环境       ,可以删除 try { lock.lock(); if(closed.contains(take.getId())){ log.info("测试是否会抢占:已存在其他线程处理关闭订单[{}]", take.getId()); } closed.add(take.getId()); }finally { lock.unlock(); } // 处理订单关闭逻辑 log.info("订单[{}]关闭中       。                      。              。", take.getId()); log.info("订单[{}]已关闭!order={}", take.getId(), toJsonString(take)); } })); } // 模拟正式环境中进程一直在运行                      ,因为test时              ,没有join则会只执行一次出现消费完数据后进程就关闭了 CompletableFuture.allOf(futureList.toArray(new CompletableFuture[0])).join(); }

完整测试类:

package cn.skyjilygao.demo; import lombok.AllArgsConstructor; import lombok.Data; import lombok.NoArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.junit.jupiter.api.Test; import org.redisson.api.RBlockingDeque; import org.redisson.api.RDelayedQueue; import org.redisson.api.RedissonClient; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import java.io.IOException; import java.time.LocalDateTime; import java.util.ArrayList; import java.util.List; import java.util.Random; import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentSkipListSet; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.ReentrantLock; import static cn.skyjilygao.util.EntityUtil.toJsonString; @Slf4j @SpringBootTest public class CloseOrderTests { @Autowired private RedissonClient redissonClient; public static String closeKey = "order_close_test"; public volatile static Set<Integer> closed = new ConcurrentSkipListSet<>(); /** * 创建订单,并设置过期时间 * * @throws IOException */ @Test void createOrder() { RBlockingDeque<OrderInfo> blockingDeque = redissonClient.getBlockingDeque(closeKey); RDelayedQueue<OrderInfo> delayedQueue = redissonClient.getDelayedQueue(blockingDeque); int a = 100; Random random = new Random(100); for (int i = 0; i < a; i++) { int i1 = random.nextInt(1 + i) + 1; delayedQueue.offer(new OrderInfo(i + 1, "close: " + i1, LocalDateTime.now(), LocalDateTime.now().plusSeconds(i1)), i1, TimeUnit.SECONDS); } } /** * 关闭订单 * * @throws IOException */ @Test void closeOrder() { ReentrantLock lock = new ReentrantLock(); // 5个线程 int poolSize = 5; List<CompletableFuture<Void>> futureList = new ArrayList<>(); for (int i = 0; i < poolSize; i++) { futureList.add(CompletableFuture.runAsync(() -> { RBlockingDeque<OrderInfo> blockingDeque = redissonClient.getBlockingDeque(closeKey); // 加入监听 redissonClient.getDelayedQueue(blockingDeque); while (true) { OrderInfo take; try { take = blockingDeque.take(); } catch (Exception e) { continue; } if (take == null) { continue; } try { lock.lock(); if(closed.contains(take.getId())){ log.info("测试是否会抢占:已存在其他线程处理关闭订单[{}]", take.getId()); } closed.add(take.getId()); }finally { lock.unlock(); } log.info("订单[{}]关闭中。                      。                      。", take.getId()); log.info("订单[{}]已关闭!order={}", take.getId(), toJsonString(take)); } })); } // 模拟正式环境中进程一直在运行                      ,因为test时                      ,没有join则会只执行一次出现消费完数据后进程就关闭了 CompletableFuture.allOf(futureList.toArray(new CompletableFuture[0])).join(); } @Data @AllArgsConstructor @NoArgsConstructor public class OrderInfo { private Integer id; private String description; private LocalDateTime createTime; private LocalDateTime expireTime; } }

创心域SEO版权声明:以上内容作者已申请原创保护,未经允许不得转载,侵权必究!授权事宜、对本内容有异议或投诉,敬请联系网站管理员,我们将尽快回复您,谢谢合作!

展开全文READ MORE
python+数据挖掘(Python收集参数是什么?) 前置技能是什么意思(【人工智能与深度学习】监督方法的成功故事: 前置训练)