redis 关闭服务(利用Redisson实现订单关闭)
导读:实体类 为了方便测试,直接在测试类中的写内部类:...
实体类
为了方便测试 ,直接在测试类中的写内部类:
@Data @AllArgsConstructor @NoArgsConstructor public class OrderInfo { /** * 订单id */ private Integer id; /** * 描述:用来记录关闭时间 ,可以在测试时用来验证 。关闭时间是否跟 expireTime相等 */ private String description; /** * 创建时间 */ private LocalDateTime createTime; /** * 过期时间:关闭时间 */ private LocalDateTime expireTime; }生成订单
模拟生成订单并设置过期时间 。
执行时会在redis创建2个key: redisson_delay_queue:{<closeKey> } :订单数据 redisson_delay_queue_timeout:{<closeKey> } :zset类型 ,按时间戳排序 /** * 创建订单 ,并设置过期时间 * * @throws IOException */ @Test void createOrder() { RBlockingDeque<OrderInfo> blockingDeque = redissonClient.getBlockingDeque(closeKey); RDelayedQueue<OrderInfo> delayedQueue = redissonClient.getDelayedQueue(blockingDeque); // 100条订单 int n = 100; Random random = new Random(); for (int i = 0; i < n; i++) { // 1~100之间的正整数 int i1 = random.nextInt(100) + 1; LocalDateTime now = LocalDateTime.now(); delayedQueue.offer(new OrderInfo(i + 1, "close: " + i1, now, now.plusSeconds(i1)), i1, TimeUnit.SECONDS); } }关闭订单
关闭订单 ,这里会产生订阅 。redis会出现redisson_delay_queue_channel 。
/** * 关闭订单 * * @throws IOException */ @Test void closeOrder() { ReentrantLock lock = new ReentrantLock(); // 5个线程 int poolSize = 5; List<CompletableFuture<Void>> futureList = new ArrayList<>(); for (int i = 0; i < poolSize; i++) { futureList.add(CompletableFuture.runAsync(() -> { RBlockingDeque<OrderInfo> blockingDeque = redissonClient.getBlockingDeque(closeKey); // 加入监听 redissonClient.getDelayedQueue(blockingDeque); while (true) { OrderInfo take; try { take = blockingDeque.take(); } catch (Exception e) { continue; } if (take == null) { continue; } // 验证多次是否会重复关闭 。正常里不会近 ,只是验证下 。正式环境 ,可以删除 try { lock.lock(); if(closed.contains(take.getId())){ log.info("测试是否会抢占:已存在其他线程处理关闭订单[{}]", take.getId()); } closed.add(take.getId()); }finally { lock.unlock(); } // 处理订单关闭逻辑 log.info("订单[{}]关闭中 。 。 。", take.getId()); log.info("订单[{}]已关闭!order={}", take.getId(), toJsonString(take)); } })); } // 模拟正式环境中进程一直在运行 ,因为test时 ,没有join则会只执行一次出现消费完数据后进程就关闭了 CompletableFuture.allOf(futureList.toArray(new CompletableFuture[0])).join(); }完整测试类:
package cn.skyjilygao.demo; import lombok.AllArgsConstructor; import lombok.Data; import lombok.NoArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.junit.jupiter.api.Test; import org.redisson.api.RBlockingDeque; import org.redisson.api.RDelayedQueue; import org.redisson.api.RedissonClient; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import java.io.IOException; import java.time.LocalDateTime; import java.util.ArrayList; import java.util.List; import java.util.Random; import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentSkipListSet; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.ReentrantLock; import static cn.skyjilygao.util.EntityUtil.toJsonString; @Slf4j @SpringBootTest public class CloseOrderTests { @Autowired private RedissonClient redissonClient; public static String closeKey = "order_close_test"; public volatile static Set<Integer> closed = new ConcurrentSkipListSet<>(); /** * 创建订单,并设置过期时间 * * @throws IOException */ @Test void createOrder() { RBlockingDeque<OrderInfo> blockingDeque = redissonClient.getBlockingDeque(closeKey); RDelayedQueue<OrderInfo> delayedQueue = redissonClient.getDelayedQueue(blockingDeque); int a = 100; Random random = new Random(100); for (int i = 0; i < a; i++) { int i1 = random.nextInt(1 + i) + 1; delayedQueue.offer(new OrderInfo(i + 1, "close: " + i1, LocalDateTime.now(), LocalDateTime.now().plusSeconds(i1)), i1, TimeUnit.SECONDS); } } /** * 关闭订单 * * @throws IOException */ @Test void closeOrder() { ReentrantLock lock = new ReentrantLock(); // 5个线程 int poolSize = 5; List<CompletableFuture<Void>> futureList = new ArrayList<>(); for (int i = 0; i < poolSize; i++) { futureList.add(CompletableFuture.runAsync(() -> { RBlockingDeque<OrderInfo> blockingDeque = redissonClient.getBlockingDeque(closeKey); // 加入监听 redissonClient.getDelayedQueue(blockingDeque); while (true) { OrderInfo take; try { take = blockingDeque.take(); } catch (Exception e) { continue; } if (take == null) { continue; } try { lock.lock(); if(closed.contains(take.getId())){ log.info("测试是否会抢占:已存在其他线程处理关闭订单[{}]", take.getId()); } closed.add(take.getId()); }finally { lock.unlock(); } log.info("订单[{}]关闭中。 。 。", take.getId()); log.info("订单[{}]已关闭!order={}", take.getId(), toJsonString(take)); } })); } // 模拟正式环境中进程一直在运行 ,因为test时 ,没有join则会只执行一次出现消费完数据后进程就关闭了 CompletableFuture.allOf(futureList.toArray(new CompletableFuture[0])).join(); } @Data @AllArgsConstructor @NoArgsConstructor public class OrderInfo { private Integer id; private String description; private LocalDateTime createTime; private LocalDateTime expireTime; } }创心域SEO版权声明:以上内容作者已申请原创保护,未经允许不得转载,侵权必究!授权事宜、对本内容有异议或投诉,敬请联系网站管理员,我们将尽快回复您,谢谢合作!