首页IT科技线程池 join(JUC学习-线程池部分)

线程池 join(JUC学习-线程池部分)

时间2025-06-17 16:29:05分类IT科技浏览3726
导读:自定义线程池 package com.appletree24; import java.util.ArrayDeque;...

自定义线程池

package com.appletree24; import java.util.ArrayDeque; import java.util.Deque; import java.util.HashSet; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.ReentrantLock; class Main { public static void main(String[] args) throws ExecutionException, InterruptedException { ThreadPool threadPool = new ThreadPool(2, 1000, TimeUnit.MICROSECONDS, 5, (queue, task) -> { //带超时等待 // queue.offer(task,500,TimeUnit.MILLISECONDS); }); for (int i = 0; i < 10; i++) { int j = i; threadPool.execute(() -> { try { Thread.sleep(1000L); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(j); }); } } } //策略模式接口 此处使用策略模式是因为在实现拒绝策略时             ,有许多种拒绝的方式                   ,这些方式如果不使用恰当的模式       ,就需要大量的if..else来编写 //且方式数量大于4个             ,会造成类膨胀的问题                   ,推荐使用混合模式 //https://www.runoob.com/design-pattern/strategy-pattern.html @FunctionalInterface interface RejectPolicy<T> { void reject(BlockingQueue<T> queue, T task); } class ThreadPool { //任务队列 private BlockingQueue<Runnable> taskQueue; //线程集合 private HashSet<Worker> workers = new HashSet<>(); //线程数 private int coreSize; //超时时间 private long timeout; private TimeUnit timeUnit; private RejectPolicy<Runnable> rejectPolicy; //执行任务 public void execute(Runnable task) { //当任务数未超过核心线程数时       ,直接交给Worker对象执行 //如果超过      ,则加入阻塞任务队列                   ,暂存起来 synchronized (workers) { if (workers.size() < coreSize) { Worker worker = new Worker(task); workers.add(worker); worker.start(); } else { //第一种选择死等 // taskQueue.put(task); //第二种为超时等待 //第三种为消费者放弃任务执行 //第四种为主线程抛出异常 //第五种让调用者自行执行任务 taskQueue.tryPut(rejectPolicy, task); } } } public ThreadPool(int coreSize, long timeout, TimeUnit timeUnit, int queueCapcity, RejectPolicy<Runnable> rejectPolicy) { this.coreSize = coreSize; this.timeout = timeout; this.timeUnit = timeUnit; this.taskQueue = new BlockingQueue<>(queueCapcity); this.rejectPolicy = rejectPolicy; } class Worker extends Thread { private Runnable task; public Worker(Runnable task) { this.task = task; } @Override public void run() { //执行任务 //1.当传递过来的task不为空             ,执行任务 //2.当task执行完毕      ,再接着取下一个任务并执行 while (task != null || (task = taskQueue.poll(1000, TimeUnit.MILLISECONDS)) != null) { try { task.run(); } catch (Exception e) { e.printStackTrace(); } finally { task = null; } } synchronized (workers) { workers.remove(this); } } } } class BlockingQueue<T> { //1. 任务队列 private final Deque<T> queue = new ArrayDeque<>(); //2. 锁 private final ReentrantLock lock = new ReentrantLock(); //3. 生产者条件变量 private final Condition fullWaitSet = lock.newCondition(); //4. 消费者条件变量 private final Condition emptyWaitSet = lock.newCondition(); //5. 容量上限 private int capcity; public BlockingQueue(int capcity) { this.capcity = capcity; } //带超时的等待获取 public T poll(long timeout, TimeUnit unit) { lock.lock(); long nanos = unit.toNanos(timeout); try { while (queue.isEmpty()) { try { if (nanos <= 0) { return null; } nanos = emptyWaitSet.awaitNanos(nanos); } catch (InterruptedException e) { e.printStackTrace(); } } T t = queue.removeFirst(); fullWaitSet.signal(); return t; } finally { lock.unlock(); } } //消费者拿取任务的方法 public T take() { lock.lock(); try { while (queue.isEmpty()) { try { emptyWaitSet.await(); } catch (InterruptedException e) { e.printStackTrace(); } } T t = queue.removeFirst(); fullWaitSet.signal(); return t; } finally { lock.unlock(); } } //阻塞添加 public void put(T task) { lock.lock(); try { while (queue.size() == capcity) { try { fullWaitSet.await(); } catch (InterruptedException e) { e.printStackTrace(); } } queue.offerLast(task); //添加完后唤醒消费者等待 emptyWaitSet.signal(); } finally { lock.unlock(); } } //带超时时间的阻塞添加 public boolean offer(T task, long timeout, TimeUnit unit) { lock.lock(); try { long nanos = unit.toNanos(timeout); while (queue.size() == capcity) { try { if (nanos <= 0) return false; nanos = fullWaitSet.awaitNanos(nanos); } catch (InterruptedException e) { e.printStackTrace(); } } queue.offerLast(task); //添加完后唤醒消费者等待 emptyWaitSet.signal(); return true; } finally { lock.unlock(); } } //获取当前阻塞队列大小 public int size() { lock.lock(); try { return queue.size(); } finally { lock.unlock(); } } public void tryPut(RejectPolicy<T> rejectPolicy, T task) { lock.lock(); try { //判断队列是否已满 if (queue.size() == capcity) { rejectPolicy.reject(this, task); } else { queue.addLast(task); emptyWaitSet.signal(); } } finally { lock.unlock(); } } }

上述的自定义线程池虽然能够执行完毕主线程给予的任务                   ,但任务全部执行结束后             ,开辟的线程池内核心线程仍然在运行,并没有结束                   ,这是因为目前线程池中的take方法仍然为不会有超时等待的take方法                   ,造成了死等,需要为其加入超时停止的功能             。也就是替代take()的poll()

JDK自带线程池

介绍

ThreadPoolExecutor使用int的高三位表示线程池状态             ,低29位表示线程数量

在ThreadPoolExecutor中                   ,同样也存在拒绝策略                   。其图结构如下:

其中接口就对应着在自定义线程池中实现的策略模式接口       ,下面的四个实现类就对应着四种不同的拒绝方式:

利用工具类创建固定大小线程池

利用工具类创建带缓冲的线程池

从源码可以看出             ,带缓冲的线程池中缓冲队列的使用的是一个名为SynchronousQueue的队列                   ,这个队列的特点如下:队列不具有容量       ,当没有线程来取时      ,是无法对其内部放入数据的                   ,例如队列内部已有一个数字1             ,但此时没有线程取走      ,则线此队列目前并不能继续存入数据                   ,直到1被取走

利用工具类创建单线程线程池

从源码可以看出             ,单线程线程池中核心线程数与最大线程数相等,即不存在应急线程       。只能解决一个任务

那么这个线程池和我自己创建一个线程的线程池有什么区别呢?区别如下:

ThreadPoolExecutor-submit method

public static void main(String[] args) throws ExecutionException, InterruptedException { ExecutorService pool = Executors.newFixedThreadPool(2); Future<String> result = pool.submit(() -> { System.out.println("running"); Thread.sleep(1000); return "ok"; }); System.out.println(result.get()); }

submit方法可以传入Runnable和Callable类型的参数                   ,并且将线程内部所执行任务的结果返回                   ,用Future包装

ThreadPoolExecutor-invokeAll

public static void main(String[] args) throws ExecutionException, InterruptedException { ExecutorService pool = Executors.newFixedThreadPool(2); List<Future<String>> results = pool.invokeAll(Arrays.asList(() -> { System.out.println("begin"); Thread.sleep(1000); return "1"; }, () -> { System.out.println("begin"); Thread.sleep(500); return "2"; })); results.forEach(f -> { try { System.out.printf(f.get()); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } }); }

invokeAll方法可以传入任务的集合,同样的任务的返回值也会以列表形式返回

ThreadPoolExecutor-invokeAny

public static void main(String[] args) throws ExecutionException, InterruptedException { ExecutorService pool = Executors.newFixedThreadPool(2); String result = pool.invokeAny(Arrays.asList(() -> { System.out.println("begin"); Thread.sleep(1000); return "1"; }, () -> { System.out.println("begin"); Thread.sleep(500); return "2"; })); pool.awaitTermination(1000, TimeUnit.MILLISECONDS); System.out.println(result); }

invokeAny方法同样可以传入任务的集合             ,只不过最后返回的结果并不是任务的结果集合                   ,而是最早完成的那个任务的结果             。

ThreadPoolExecutor-shutdown

public static void main(String[] args) throws ExecutionException, InterruptedException { ExecutorService pool = Executors.newFixedThreadPool(2); List<Future<String>> results = pool.invokeAll(Arrays.asList(() -> { System.out.println("begin"); Thread.sleep(1000); return "1"; }, () -> { System.out.println("begin"); Thread.sleep(500); return "2"; })); pool.shutdown(); results.forEach(f -> { try { System.out.println(f.get()); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } }); }

shutdown方法会将线程池的状态变为SHUTDOWN

不会接受新任务 但已提交的任务会执行完 此方法不会阻塞调用线程的执行

ThreadPoolExecutor-shutdownNow

public static void main(String[] args) throws ExecutionException, InterruptedException { ExecutorService pool = Executors.newFixedThreadPool(2); List<Future<String>> results = pool.invokeAll(Arrays.asList(() -> { System.out.println("begin"); Thread.sleep(1000); return "1"; }, () -> { System.out.println("begin"); Thread.sleep(500); return "2"; })); List<Runnable> runnables = pool.shutdownNow(); results.forEach(f -> { try { System.out.println(f.get()); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } }); }

shutdownNow方法会将线程池状态变为STOP

不会接受新任务 会将队列中现有的任务返回 并且用interrupt方法中断正在执行的任务

创心域SEO版权声明:以上内容作者已申请原创保护,未经允许不得转载,侵权必究!授权事宜、对本内容有异议或投诉,敬请联系网站管理员,我们将尽快回复您,谢谢合作!

展开全文READ MORE
phpcms怎么用(PHPCMS 如何制作手机版?) 网站如何快速提升排名(提升网站排名的终极秘籍:从优化到营销,一网打尽!)