首页IT科技java阻塞队列和非阻塞队列的区别(Java阻塞队列中的异类,SynchronousQueue底层实现原理剖析)

java阻塞队列和非阻塞队列的区别(Java阻塞队列中的异类,SynchronousQueue底层实现原理剖析)

时间2025-05-04 16:51:38分类IT科技浏览6267
导读:上篇文章谈到BlockingQueue的使用场景,并重点分析了ArrayBlockingQueue的实现原理,了解到ArrayBlockingQueue底层是基于数组实现的阻塞队列。...

上篇文章谈到BlockingQueue的使用场景             ,并重点分析了ArrayBlockingQueue的实现原理                   ,了解到ArrayBlockingQueue底层是基于数组实现的阻塞队列             。

但是BlockingQueue的实现类中      ,有一种阻塞队列比较特殊             ,就是SynchronousQueue(同步移交队列)                    ,队列长度为0                   。

作用就是一个线程往队列放数据的时候      ,必须等待另一个线程从队列中取走数据      。同样      ,从队列中取数据的时候                    ,必须等待另一个线程往队列中放数据             。

这样特殊的队列             ,有什么应用场景呢?

1. SynchronousQueue用法

先看一个SynchronousQueue的简单用例:

/** * @author 一灯架构 * @apiNote SynchronousQueue示例 **/ public class SynchronousQueueDemo { public static void main(String[] args) throws InterruptedException { // 1. 创建SynchronousQueue队列 BlockingQueue<Integer> synchronousQueue = new SynchronousQueue<>(); // 2. 启动一个线程      ,往队列中放3个元素 new Thread(() -> { try { System.out.println(Thread.currentThread().getName() + " 入队列 1"); synchronousQueue.put(1); Thread.sleep(1); System.out.println(Thread.currentThread().getName() + " 入队列 2"); synchronousQueue.put(2); Thread.sleep(1); System.out.println(Thread.currentThread().getName() + " 入队列 3"); synchronousQueue.put(3); } catch (InterruptedException e) { e.printStackTrace(); } }).start(); // 3. 等待1000毫秒 Thread.sleep(1000L); // 4. 再启动一个线程                   ,从队列中取出3个元素 new Thread(() -> { try { System.out.println(Thread.currentThread().getName() + " 出队列 " + synchronousQueue.take()); Thread.sleep(1); System.out.println(Thread.currentThread().getName() + " 出队列 " + synchronousQueue.take()); Thread.sleep(1); System.out.println(Thread.currentThread().getName() + " 出队列 " + synchronousQueue.take()); } catch (InterruptedException e) { e.printStackTrace(); } }).start(); } }

输出结果:

Thread-0 入队列 1 Thread-1 出队列 1 Thread-0 入队列 2 Thread-1 出队列 2 Thread-0 入队列 3 Thread-1 出队列 3

从输出结果中可以看到             ,第一个线程Thread-0往队列放入一个元素1后,就被阻塞了                    。直到第二个线程Thread-1从队列中取走元素1后                   ,Thread-0才能继续放入第二个元素2      。

由于SynchronousQueue是BlockingQueue的实现类                   ,所以也实现类BlockingQueue中几组抽象方法:

为了满足不同的使用场景,BlockingQueue设计了很多的放数据和取数据的方法      。

操作 抛出异常 返回特定值 阻塞 阻塞一段时间 放数据 add offer put offer(e, time, unit) 取数据 remove poll take poll(time, unit) 查看数据(不删除) element() peek() 不支持 不支持

这几组方法的不同之处就是:

当队列满了             ,再往队列中放数据                   ,add方法抛异常      ,offer方法返回false             ,put方法会一直阻塞(直到有其他线程从队列中取走数据)                    ,offer(e, time, unit)方法阻塞指定时间然后返回false                    。 当队列是空      ,再从队列中取数据      ,remove方法抛异常                    ,poll方法返回null             ,take方法会一直阻塞(直到有其他线程往队列中放数据)      ,poll(time, unit)方法阻塞指定时间然后返回null             。 当队列是空                   ,再去队列中查看数据(并不删除数据)             ,element方法抛异常,peek方法返回null      。

工作中使用最多的就是offer             、poll阻塞指定时间的方法                   。

2. SynchronousQueue应用场景

SynchronousQueue的特点:

队列长度是0                   ,一个线程往队列放数据                   ,必须等待另一个线程取走数据             。同样,一个线程从队列中取数据             ,必须等待另一个线程往队列中放数据。

这种特殊的实现逻辑有什么应用场景呢?

我的理解就是                   ,如果你希望你的任务需要被快速处理      ,就可以使用这种队列                   。

Java线程池中的newCachedThreadPool(带缓存的线程池)底层就是使用SynchronousQueue实现的                   。

public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); }

newCachedThreadPool线程池的核心线程数是0             ,最大线程数是Integer的最大值                    ,线程存活时间是60秒。

如果你使用newCachedThreadPool线程池      ,你提交的任务会被更快速的处理      ,因为你每次提交任务                    ,都会有一个空闲的线程等着处理任务             。如果没有空闲的线程             ,也会立即创建一个线程处理你的任务                   。

你想想      ,这处理效率                   ,杠杠滴!

当然也有弊端             ,如果你提交了太多的任务,导致创建了大量的线程                   ,这些线程都在竞争CPU时间片                   ,等待CPU调度,处理任务速度也会变慢             ,所以在使用过程中也要综合考虑      。

3. SynchronousQueue源码解析

3.1 SynchronousQueue类属性

public class SynchronousQueue<E> extends AbstractQueue<E> implements BlockingQueue<E> { // 转换器                   ,取数据和放数据的核心逻辑都在这个类里面 private transient volatile Transferer<E> transferer; // 默认的构造方法(使用非公平队列) public SynchronousQueue() { this(false); } // 有参构造方法      ,可以指定是否使用公平队列 public SynchronousQueue(boolean fair) { transferer = fair ? new TransferQueue<E>() : new TransferStack<E>(); } // 转换器实现类 abstract static class Transferer<E> { abstract E transfer(E e, boolean timed, long nanos); } // 基于栈实现的非公平队列 static final class TransferStack<E> extends Transferer<E> { } // 基于队列实现的公平队列 static final class TransferQueue<E> extends Transferer<E> { } }

可以看到SynchronousQueue默认的无参构造方法             ,内部使用的是基于栈实现的非公平队列                    ,当然也可以调用有参构造方法      ,传参是true      ,使用基于队列实现的公平队列             。

// 使用非公平队列(基于栈实现) BlockingQueue<Integer> synchronousQueue = new SynchronousQueue<>(); // 使用公平队列(基于队列实现) BlockingQueue<Integer> synchronousQueue = new SynchronousQueue<>(true);

本次就常用的栈实现来剖析SynchronousQueue的底层实现原理                    。

3.2 栈底层结构

栈结构                    ,是非公平的             ,遵循先进后出      。

使用个case测试一下:

/** * @author 一灯架构 * @apiNote SynchronousQueue示例 **/ public class SynchronousQueueDemo { public static void main(String[] args) throws InterruptedException { // 1. 创建SynchronousQueue队列 SynchronousQueue<Integer> synchronousQueue = new SynchronousQueue<>(); // 2. 启动一个线程      ,往队列中放1个元素 new Thread(() -> { try { System.out.println(Thread.currentThread().getName() + " 入队列 0"); synchronousQueue.put(0); } catch (InterruptedException e) { e.printStackTrace(); } }).start(); // 3. 等待1000毫秒 Thread.sleep(1000L); // 4. 启动一个线程                   ,往队列中放1个元素 new Thread(() -> { try { System.out.println(Thread.currentThread().getName() + " 入队列 1"); synchronousQueue.put(1); } catch (InterruptedException e) { e.printStackTrace(); } }).start(); // 5. 等待1000毫秒 Thread.sleep(1000L); // 6. 再启动一个线程             ,从队列中取出1个元素 new Thread(() -> { try { System.out.println(Thread.currentThread().getName() + " 出队列 " + synchronousQueue.take()); } catch (InterruptedException e) { e.printStackTrace(); } }).start(); // 7. 等待1000毫秒 Thread.sleep(1000L); // 8. 再启动一个线程,从队列中取出1个元素 new Thread(() -> { try { System.out.println(Thread.currentThread().getName() + " 出队列 " + synchronousQueue.take()); } catch (InterruptedException e) { e.printStackTrace(); } }).start(); } }

输出结果:

Thread-0 入队列 0 Thread-1 入队列 1 Thread-2 出队列 1 Thread-3 出队列 0

从输出结果中可以看出                   ,符合栈结构先进后出的顺序      。

3.3 栈节点源码

栈中的数据都是由一个个的节点组成的                   ,先看一下节点类的源码:

// 节点 static final class SNode { // 节点值(取数据的时候,该字段为null) Object item; // 存取数据的线程 volatile Thread waiter; // 节点模式 int mode; // 匹配到的节点 volatile SNode match; // 后继节点 volatile SNode next; }

item

节点值             ,只在存数据的时候用                    。取数据的时候                   ,这个值是null             。

waiter

存取数据的线程      ,如果没有对应的接收线程             ,这个线程会被阻塞      。

mode

节点模式                    ,共有3种类型:

类型值 类型描述 类型的作用 0 REQUEST 表示取数据 1 DATA 表示存数据 2 FULFILLING 表示正在等待执行(比如取数据的线程      ,等待其他线程放数据)

3.4 put/take流程

放数据和取数据的逻辑      ,在底层复用的是同一个方法                    ,以put/take方法为例             ,另外两个放数据的方法      ,add和offer方法底层实现是一样的                   。

先看一下数据流转的过程                   ,方便理解源码             。

还是以上面的case为例:

Thread0先往SynchronousQueue队列中放入元素0 Thread1再往SynchronousQueue队列放入元素1 Thread2从SynchronousQueue队列中取出一个元素

第一步:Thread0先往SynchronousQueue队列中放入元素0

把本次操作组装成SNode压入栈顶             ,item是元素0,waiter是当前线程Thread0                   ,mode是1表示放入数据。

第二步:Thread1再往SynchronousQueue队列放入元素1

把本次操作组装成SNode压入栈顶                   ,item是元素1,waiter是当前线程Thread1             ,mode是1表示放入数据                   ,next是SNode0                   。

第三步:Thread2从SynchronousQueue队列中取出一个元素

这次的操作比较复杂      ,也是先把本次的操作包装成SNode压入栈顶                   。

item是null(取数据的时候             ,这个字段没有值)                    ,waiter是null(当前线程Thread2正在操作      ,所以不用赋值了)      ,mode是2表示正在操作(即将跟后继节点进行匹配)                    ,next是SNode1。

然后             ,Thread2开始把栈顶的两个节点进行匹配      ,匹配成功后                   ,就把SNode2赋值给SNode1的match属性             ,唤醒SNode1中的Thread1线程,然后弹出SNode2节点和SNode1节点             。

3.5 put/take源码实现

看完 了put/take流程                   ,再来看源码就简单多了                   。

先看一下put方法源码:

// 放数据 public void put(E e) throws InterruptedException { // 不允许放null元素 if (e == null) throw new NullPointerException(); // 调用转换器实现类                   ,放元素 if (transferer.transfer(e, false, 0) == null) { // 如果放数据失败,就中断当前线程             ,并抛出异常 Thread.interrupted(); throw new InterruptedException(); } }

核心逻辑都在transfer方法中                   ,代码很长      ,理清逻辑后             ,也很容易理解      。

// 取数据和放数据操作                    ,共用一个方法 E transfer(E e, boolean timed, long nanos) { SNode s = null; // e为空      ,说明是取数据      ,否则是放数据 int mode = (e == null) ? REQUEST : DATA; for (; ; ) { SNode h = head; // 1. 如果栈顶节点为空                    ,或者栈顶节点类型跟本次操作相同(都是取数据             ,或者都是放数据) if (h == null || h.mode == mode) { // 2. 判断节点是否已经超时 if (timed && nanos <= 0) { // 3. 如果栈顶节点已经被取消      ,就删除栈顶节点 if (h != null && h.isCancelled()) casHead(h, h.next); else return null; // 4. 把本次操作包装成SNode                   ,压入栈顶 } else if (casHead(h, s = snode(s, e, h, mode))) { // 5. 挂起当前线程             ,等待被唤醒 SNode m = awaitFulfill(s, timed, nanos); // 6. 如果这个节点已经被取消,就删除这个节点 if (m == s) { clean(s); return null; } // 7. 把s.next设置成head if ((h = head) != null && h.next == s) casHead(h, s.next); return (E) ((mode == REQUEST) ? m.item : s.item); } // 8. 如果栈顶节点类型跟本次操作不同                   ,并且不是FULFILLING类型 } else if (!isFulfilling(h.mode)) { // 9. 再次判断如果栈顶节点已经被取消                   ,就删除栈顶节点 if (h.isCancelled()) casHead(h, h.next); // 10. 把本次操作包装成SNode(类型是FULFILLING),压入栈顶 else if (casHead(h, s = snode(s, e, h, FULFILLING | mode))) { // 11. 使用死循环             ,直到匹配到对应的节点 for (; ; ) { // 12. 遍历下个节点 SNode m = s.next; // 13. 如果节点是null                   ,表示遍历到末尾      ,设置栈顶节点是null             ,结束             。 if (m == null) { casHead(s, null); s = null; break; } SNode mn = m.next; // 14. 如果栈顶的后继节点跟栈顶节点匹配成功                    ,就删除这两个节点      ,结束                    。 if (m.tryMatch(s)) { casHead(s, mn); return (E) ((mode == REQUEST) ? m.item : s.item); } else // 15. 如果没有匹配成功      ,就删除栈顶的后继节点                    ,继续匹配 s.casNext(m, mn); } } } else { // 16. 如果栈顶节点类型跟本次操作不同             ,并且是FULFILLING类型      , // 就再执行一遍上面第11步for循环中的逻辑(很少概率出现) SNode m = h.next; if (m == null) casHead(h, null); else { SNode mn = m.next; if (m.tryMatch(h)) casHead(h, mn); else h.casNext(m, mn); } } } }

transfer方法逻辑也很简单                   ,就是判断本次操作类型是否跟栈顶节点相同             ,如果相同,就把本次操作压入栈顶      。否则就跟栈顶节点匹配                   ,唤醒栈顶节点线程                   ,弹出栈顶节点      。

transfer方法中调用了awaitFulfill方法,作用是挂起当前线程                    。

// 等待被唤醒 SNode awaitFulfill(SNode s, boolean timed, long nanos) { // 1. 计算超时时间 final long deadline = timed ? System.nanoTime() + nanos : 0L; Thread w = Thread.currentThread(); // 2. 计算自旋次数 int spins = (shouldSpin(s) ? (timed ? maxTimedSpins : maxUntimedSpins) : 0); for (;;) { if (w.isInterrupted()) s.tryCancel(); // 3. 如果已经匹配到其他节点             ,直接返回 SNode m = s.match; if (m != null) return m; if (timed) { // 4. 超时时间递减 nanos = deadline - System.nanoTime(); if (nanos <= 0L) { s.tryCancel(); continue; } } // 5. 自旋次数减一 if (spins > 0) spins = shouldSpin(s) ? (spins-1) : 0; else if (s.waiter == null) s.waiter = w; // 6. 开始挂起当前线程 else if (!timed) LockSupport.park(this); else if (nanos > spinForTimeoutThreshold) LockSupport.parkNanos(this, nanos); } }

awaitFulfill方法的逻辑也很简单                   ,就是挂起当前线程             。

take方法底层使用的也是transfer方法:

// 取数据 public E take() throws InterruptedException { // // 调用转换器实现类      ,取数据 E e = transferer.transfer(null, false, 0); if (e != null) return e; // 没取到             ,就中断当前线程 Thread.interrupted(); throw new InterruptedException(); }

4. 总结

SynchronousQueue是一种特殊的阻塞队列                    ,队列长度是0      ,一个线程往队列放数据      ,必须等待另一个线程取走数据      。同样                    ,一个线程从队列中取数据             ,必须等待另一个线程往队列中放数据                   。 SynchronousQueue底层是基于栈和队列两种数据结构实现的             。 Java线程池中的newCachedThreadPool(带缓存的线程池)底层就是使用SynchronousQueue实现的。 如果希望你的任务需要被快速处理      ,可以使用SynchronousQueue队列                   。

我是「一灯架构」                   ,如果本文对你有帮助             ,欢迎各位小伙伴点赞                   、评论和关注,感谢各位老铁                   ,我们下期见

创心域SEO版权声明:以上内容作者已申请原创保护,未经允许不得转载,侵权必究!授权事宜、对本内容有异议或投诉,敬请联系网站管理员,我们将尽快回复您,谢谢合作!

展开全文READ MORE
网上有哪些赚钱渠道可以赚钱(有什么网上赚钱的渠道吗-通过增加流量赚钱的几个方法,你知道吗?)