线程池 join(JUC学习-线程池部分)
自定义线程池
package com.appletree24; import java.util.ArrayDeque; import java.util.Deque; import java.util.HashSet; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.ReentrantLock; class Main { public static void main(String[] args) throws ExecutionException, InterruptedException { ThreadPool threadPool = new ThreadPool(2, 1000, TimeUnit.MICROSECONDS, 5, (queue, task) -> { //带超时等待 // queue.offer(task,500,TimeUnit.MILLISECONDS); }); for (int i = 0; i < 10; i++) { int j = i; threadPool.execute(() -> { try { Thread.sleep(1000L); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(j); }); } } } //策略模式接口 此处使用策略模式是因为在实现拒绝策略时 ,有许多种拒绝的方式 ,这些方式如果不使用恰当的模式,就需要大量的if..else来编写 //且方式数量大于4个 ,会造成类膨胀的问题 ,推荐使用混合模式 //https://www.runoob.com/design-pattern/strategy-pattern.html @FunctionalInterface interface RejectPolicy<T> { void reject(BlockingQueue<T> queue, T task); } class ThreadPool { //任务队列 private BlockingQueue<Runnable> taskQueue; //线程集合 private HashSet<Worker> workers = new HashSet<>(); //线程数 private int coreSize; //超时时间 private long timeout; private TimeUnit timeUnit; private RejectPolicy<Runnable> rejectPolicy; //执行任务 public void execute(Runnable task) { //当任务数未超过核心线程数时 ,直接交给Worker对象执行 //如果超过 ,则加入阻塞任务队列 ,暂存起来 synchronized (workers) { if (workers.size() < coreSize) { Worker worker = new Worker(task); workers.add(worker); worker.start(); } else { //第一种选择死等 // taskQueue.put(task); //第二种为超时等待 //第三种为消费者放弃任务执行 //第四种为主线程抛出异常 //第五种让调用者自行执行任务 taskQueue.tryPut(rejectPolicy, task); } } } public ThreadPool(int coreSize, long timeout, TimeUnit timeUnit, int queueCapcity, RejectPolicy<Runnable> rejectPolicy) { this.coreSize = coreSize; this.timeout = timeout; this.timeUnit = timeUnit; this.taskQueue = new BlockingQueue<>(queueCapcity); this.rejectPolicy = rejectPolicy; } class Worker extends Thread { private Runnable task; public Worker(Runnable task) { this.task = task; } @Override public void run() { //执行任务 //1.当传递过来的task不为空 ,执行任务 //2.当task执行完毕 ,再接着取下一个任务并执行 while (task != null || (task = taskQueue.poll(1000, TimeUnit.MILLISECONDS)) != null) { try { task.run(); } catch (Exception e) { e.printStackTrace(); } finally { task = null; } } synchronized (workers) { workers.remove(this); } } } } class BlockingQueue<T> { //1. 任务队列 private final Deque<T> queue = new ArrayDeque<>(); //2. 锁 private final ReentrantLock lock = new ReentrantLock(); //3. 生产者条件变量 private final Condition fullWaitSet = lock.newCondition(); //4. 消费者条件变量 private final Condition emptyWaitSet = lock.newCondition(); //5. 容量上限 private int capcity; public BlockingQueue(int capcity) { this.capcity = capcity; } //带超时的等待获取 public T poll(long timeout, TimeUnit unit) { lock.lock(); long nanos = unit.toNanos(timeout); try { while (queue.isEmpty()) { try { if (nanos <= 0) { return null; } nanos = emptyWaitSet.awaitNanos(nanos); } catch (InterruptedException e) { e.printStackTrace(); } } T t = queue.removeFirst(); fullWaitSet.signal(); return t; } finally { lock.unlock(); } } //消费者拿取任务的方法 public T take() { lock.lock(); try { while (queue.isEmpty()) { try { emptyWaitSet.await(); } catch (InterruptedException e) { e.printStackTrace(); } } T t = queue.removeFirst(); fullWaitSet.signal(); return t; } finally { lock.unlock(); } } //阻塞添加 public void put(T task) { lock.lock(); try { while (queue.size() == capcity) { try { fullWaitSet.await(); } catch (InterruptedException e) { e.printStackTrace(); } } queue.offerLast(task); //添加完后唤醒消费者等待 emptyWaitSet.signal(); } finally { lock.unlock(); } } //带超时时间的阻塞添加 public boolean offer(T task, long timeout, TimeUnit unit) { lock.lock(); try { long nanos = unit.toNanos(timeout); while (queue.size() == capcity) { try { if (nanos <= 0) return false; nanos = fullWaitSet.awaitNanos(nanos); } catch (InterruptedException e) { e.printStackTrace(); } } queue.offerLast(task); //添加完后唤醒消费者等待 emptyWaitSet.signal(); return true; } finally { lock.unlock(); } } //获取当前阻塞队列大小 public int size() { lock.lock(); try { return queue.size(); } finally { lock.unlock(); } } public void tryPut(RejectPolicy<T> rejectPolicy, T task) { lock.lock(); try { //判断队列是否已满 if (queue.size() == capcity) { rejectPolicy.reject(this, task); } else { queue.addLast(task); emptyWaitSet.signal(); } } finally { lock.unlock(); } } }上述的自定义线程池虽然能够执行完毕主线程给予的任务 ,但任务全部执行结束后 ,开辟的线程池内核心线程仍然在运行,并没有结束 ,这是因为目前线程池中的take方法仍然为不会有超时等待的take方法 ,造成了死等,需要为其加入超时停止的功能 。也就是替代take()的poll()
JDK自带线程池
介绍
ThreadPoolExecutor使用int的高三位表示线程池状态 ,低29位表示线程数量
在ThreadPoolExecutor中 ,同样也存在拒绝策略 。其图结构如下:
其中接口就对应着在自定义线程池中实现的策略模式接口,下面的四个实现类就对应着四种不同的拒绝方式:
利用工具类创建固定大小线程池
利用工具类创建带缓冲的线程池
从源码可以看出 ,带缓冲的线程池中缓冲队列的使用的是一个名为SynchronousQueue的队列 ,这个队列的特点如下:队列不具有容量 ,当没有线程来取时 ,是无法对其内部放入数据的 ,例如队列内部已有一个数字1 ,但此时没有线程取走 ,则线此队列目前并不能继续存入数据 ,直到1被取走
利用工具类创建单线程线程池
从源码可以看出 ,单线程线程池中核心线程数与最大线程数相等,即不存在应急线程。只能解决一个任务
那么这个线程池和我自己创建一个线程的线程池有什么区别呢?区别如下:
ThreadPoolExecutor-submit method
public static void main(String[] args) throws ExecutionException, InterruptedException { ExecutorService pool = Executors.newFixedThreadPool(2); Future<String> result = pool.submit(() -> { System.out.println("running"); Thread.sleep(1000); return "ok"; }); System.out.println(result.get()); }submit方法可以传入Runnable和Callable类型的参数 ,并且将线程内部所执行任务的结果返回 ,用Future包装
ThreadPoolExecutor-invokeAll
public static void main(String[] args) throws ExecutionException, InterruptedException { ExecutorService pool = Executors.newFixedThreadPool(2); List<Future<String>> results = pool.invokeAll(Arrays.asList(() -> { System.out.println("begin"); Thread.sleep(1000); return "1"; }, () -> { System.out.println("begin"); Thread.sleep(500); return "2"; })); results.forEach(f -> { try { System.out.printf(f.get()); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } }); }invokeAll方法可以传入任务的集合,同样的任务的返回值也会以列表形式返回
ThreadPoolExecutor-invokeAny
public static void main(String[] args) throws ExecutionException, InterruptedException { ExecutorService pool = Executors.newFixedThreadPool(2); String result = pool.invokeAny(Arrays.asList(() -> { System.out.println("begin"); Thread.sleep(1000); return "1"; }, () -> { System.out.println("begin"); Thread.sleep(500); return "2"; })); pool.awaitTermination(1000, TimeUnit.MILLISECONDS); System.out.println(result); }invokeAny方法同样可以传入任务的集合 ,只不过最后返回的结果并不是任务的结果集合 ,而是最早完成的那个任务的结果 。
ThreadPoolExecutor-shutdown
public static void main(String[] args) throws ExecutionException, InterruptedException { ExecutorService pool = Executors.newFixedThreadPool(2); List<Future<String>> results = pool.invokeAll(Arrays.asList(() -> { System.out.println("begin"); Thread.sleep(1000); return "1"; }, () -> { System.out.println("begin"); Thread.sleep(500); return "2"; })); pool.shutdown(); results.forEach(f -> { try { System.out.println(f.get()); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } }); }shutdown方法会将线程池的状态变为SHUTDOWN
不会接受新任务 但已提交的任务会执行完 此方法不会阻塞调用线程的执行ThreadPoolExecutor-shutdownNow
public static void main(String[] args) throws ExecutionException, InterruptedException { ExecutorService pool = Executors.newFixedThreadPool(2); List<Future<String>> results = pool.invokeAll(Arrays.asList(() -> { System.out.println("begin"); Thread.sleep(1000); return "1"; }, () -> { System.out.println("begin"); Thread.sleep(500); return "2"; })); List<Runnable> runnables = pool.shutdownNow(); results.forEach(f -> { try { System.out.println(f.get()); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } }); }shutdownNow方法会将线程池状态变为STOP
不会接受新任务 会将队列中现有的任务返回 并且用interrupt方法中断正在执行的任务创心域SEO版权声明:以上内容作者已申请原创保护,未经允许不得转载,侵权必究!授权事宜、对本内容有异议或投诉,敬请联系网站管理员,我们将尽快回复您,谢谢合作!