首页IT科技异步代码是什么意思(异步批处理教程)

异步代码是什么意思(异步批处理教程)

时间2025-08-25 10:21:55分类IT科技浏览5802
导读:书接上回 大数据量、高并发业务怎么优化?(一) 文章中介绍了异步批处理的三种方式,本文继续深入针对前两种进行讲解,并给出代码示例:...

书接上回 大数据量                、高并发业务怎么优化?(一) 文章中介绍了异步批处理的三种方式                ,本文继续深入针对前两种进行讲解                        ,并给出代码示例:

一 普通版本        ,采用阻塞队列ArrayBlockingQueue

使用普通方式能够直接基于JDK中现成的并发包ArrayBlockingQueue 提供的 offer(E e, long timeout, TimeUnit unit)(添加元素到队列尾部                ,如果队列已满则等待参数指定时间后返回false)方法 和 poll(long timeout, TimeUnit unit)(从队列头部获取元素                        ,如果队列为空则等待参数指定时间后返回null)方法        ,来达到异步批处理效果

生产者代码:由于采用内存队列        ,最好在创建 ArrayBlockingQueue 时指定队列大小                        ,防止队列无界                ,导致内存溢出

/** * 生产者 */ @Component @Slf4j public class MonitorQueue { private BlockingQueue<List<NodeCollectDTO>> queue = new ArrayBlockingQueue<>(10000000); public void put(List<NodeCollectDTO> list) { try { queue.put(list); } catch (InterruptedException e) { log.error(String.format("队列put异常:%s", e.getMessage()), e); } } public void offer(List<NodeCollectDTO> list, long timeout, TimeUnit unit) throws InterruptedException { queue.offer(list, timeout, unit); } public List<NodeCollectDTO> poll(long timeout, TimeUnit unit) throws InterruptedException { return queue.poll(timeout, unit); } }

消费者代码:在创建生产者时开启一个子线程在死循环中一直读取队列元素        ,直到队列元素超过我们的 maxNum 时                        ,将临时列表元素插入数据库中

/** * 消费者 */ @Slf4j @Component public class MonitorConsumer implements Runnable { @Autowired private MonitorQueue queue; @Autowired private MonitorService monitorService; @PostConstruct public void init() { new Thread(this, "monitor-collect").start(); } // 临时列表大小限制 private int maxNum = 2000; @SuppressWarnings("InfiniteLoopStatement") @Override public void run() { while (true) { handler(); } } private void handler() { try { List<NodeCollectDTO> temp = new ArrayList<>(maxNum); while (temp.size() <= maxNum) { List<NodeCollectDTO> list = queue.poll(20, TimeUnit.SECONDS); if (CollectionUtil.isNotEmpty(list)) { temp.addAll(list); } else { break; } } if (CollectionUtil.isEmpty(temp)) { return; } int i = monitorService.batchSave(temp); log.debug("----------------------------batchSave num:{}, collect.size:{}", i, collect.size()); } catch (Exception e) { log.error(String.format("消费者异常: %s", e.getMessage()), e); } } }

可以看到采用该种方式实现的异步批量入库代码比较简单                ,便于理解,在性能上                        ,基本都能够满足日常普通业务存在的批量入库场景

二 进阶版                        ,采用 Disruptor 队列,本文基于 Disruptor 最新4.0版本

先给出 Disruptor 官网简介

Disruptor 是一个提供并发环形缓冲区数据结构的库                。它旨在在异步事件处理架构中提供低延迟                        、高吞吐量的工作队列                        。

为了理解 Disruptor 的好处                ,我们可以将它与一些很好理解且目的非常相似的东西进行比较        。在 Disruptor 的情况下                        ,这将是 Java 的 BlockingQueue                。与队列一样        ,Disruptor 的目的是在同一进程内的线程之间移动数据(例如消息或事件)                        。然而                ,Disruptor 提供的一些关键特性使其有别于队列        。他们是:

向消费者多播事件                        ,带有消费者依赖图        。

为事件预分配内存                        。

可选无锁

Disruptor 给我们在项目中实现异步批处理提供了另一种方式        ,一种无锁        、延迟更低        、吞吐量更高                        、提供消费者多播等等的内存队列

下面介绍如何使用

2.1 依赖安装

<dependency> <groupId>com.lmax</groupId> <artifactId>disruptor</artifactId> <version>4.0.0.RC1</version> </dependency>

2.2 Disruptor 使用代码如下:

public class LongEvent{ private long value; public void set(long value){ this.value = value; } @Override public String toString(){ return "LongEvent{" + "value=" + value + }; } } @Slf4j public class LongEventMain { public static void handleEvent(LongEvent event, long sequence, boolean endOfBatch) { log.info("event: " + event + ", sequence:" + sequence + ", endOfBatch:" + endOfBatch); } public static void translate(LongEvent event, long sequence, ByteBuffer buffer) { event.set(buffer.getLong(0)); } public static void main(String[] args) throws Exception { int bufferSize = 128; // 1. 创建Disruptor对象 Disruptor<LongEvent> disruptor = new Disruptor<>(LongEvent::new, bufferSize, DaemonThreadFactory.INSTANCE, ProducerType.SINGLE, new BlockingWaitStrategy()); // 2. 添加事件处理类(消费者) disruptor.handleEventsWith(LongEventMain::handleEvent); // 3. 开启事件处理线程 disruptor.start(); // 4. 获取ringBuffer RingBuffer<LongEvent> ringBuffer = disruptor.getRingBuffer(); ByteBuffer bb = ByteBuffer.allocate(8); for (long l = 0; true; l++) { bb.putLong(0, l); // 5. 发布事件(生产者) ringBuffer.publishEvent(LongEventMain::translate, bb); Thread.sleep(1); } } }

2.3 上面代码完成了一个事件发布后        ,事件处理类就能够收到对应事件信息的功能                        ,但是我们想要的是能在消费者线程中批量处理生产者数据的逻辑                ,还得再修改一下事件处理类代码        ,如下:

@Slf4j public class LongEventBatch implements EventHandler<LongEvent> { private static final int MAX_BATCH_SIZE = 20; private final List<LongEvent> batch = new ArrayList<>(); public LongEventBatch() { // 虚拟机关闭处理 Runtime.getRuntime().addShutdownHook(new Thread(() -> { log.info("------------------ShutdownHook-DataEventHandler,上报tempList"); if (batch.size() > 0) { // 批量入库伪代码 int i = xxxService.batchSave(temp); } })); } @Override public void onEvent(final LongEvent event, final long sequence, final boolean endOfBatch) { log.info("event: " + event + ", sequence:" + sequence + ", endOfBatch:" + endOfBatch); batch.add(event); if (batch.size() >= MAX_BATCH_SIZE) { processBatch(batch); } } private void processBatch(final List<LongEvent> batch) { // 批量入库伪代码 int i = xxxService.batchSave(temp); // 记得清空batch列表 batch.clear(); } }

由此                        ,我们就实现了基于 Disruptor 的异步批处理逻辑                ,该方式会比普通版本性能高出一个数量级,大家在工作中可以尝试使用一番

最后

附博主 github 地址 https://github.com/wayn111

创心域SEO版权声明:以上内容作者已申请原创保护,未经允许不得转载,侵权必究!授权事宜、对本内容有异议或投诉,敬请联系网站管理员,我们将尽快回复您,谢谢合作!

展开全文READ MORE
建瓯到福州有几个隧道(建瓯道路规划)