Java并发编程的艺术 -- 线程池和Executor框架(第九、十章)

文章目录

本文参考于《Java并发编程的艺术》

1、线程池

1.1、为什么使用线程池?

  1. 降低资源消耗:通过重复利用已创建的线程降低线程创建和销毁造成的消耗。
  2. 提高响应速度:当任务到达时,任务可以不需要等到线程创建就能立即执行。
  3. 提高线程的可管理性:线程是稀缺资源,如果无限制地创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一分配、调优和监控。

1.2、线程池的实现原理

1. 线程池的主要处理流程

《Java并发编程的艺术 -- 线程池和Executor框架(第九、十章)》

  1. 线程池判断核心线程池里的线程是否都在执行任务。如果不是,则创建一个新的工作线程来执行任务。
  2. 如果核心线程池里的线程都在执行任务,则线程池判断工作队列是否已经满。如果工作队列没有满,则将新提交的任务存储在这个工作队列里。
  3. 如果工作队列满了,则线程池判断线程池的线程是否都处于工作状态。如果没有,则创建一个新的工作线程来执行任务。
  4. 如果已经满了,则交给饱和策略来处理这个任务

1.3、excute()方法源码分析

// 存放线程池的运行状态 (runState) 和线程池内有效线程的数量 (workerCount)
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));

private static int workerCountOf(int c) { 
    return c & CAPACITY;
}

private final BlockingQueue<Runnable> workQueue;

public void execute(Runnable command) { 
    // 如果任务为null,则抛出异常。
    if (command == null)
        throw new NullPointerException();
    // ctl 中保存的线程池当前的一些状态信息
    int c = ctl.get();

    // 下面会涉及到 3 步 操作
    // 1.首先判断当前线程池中执行的任务数量是否小于 corePoolSize
    // 如果小于的话,通过addWorker(command, true)新建一个线程,并将任务(command)添加到该线程中;然后,启动该线程从而执行任务。
    if (workerCountOf(c) < corePoolSize) { 
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }
    // 2.如果当前执行的任务数量大于等于 corePoolSize 的时候就会走到这里
    // 通过 isRunning 方法判断线程池状态,线程池处于 RUNNING 状态才会被并且队列可以加入任务,该任务才会被加入进去
    if (isRunning(c) && workQueue.offer(command)) { 
        int recheck = ctl.get();
        // 再次获取线程池状态,如果线程池状态不是 RUNNING 状态就需要从任务队列中移除任务,并尝试判断线程是否全部执行完毕。同时执行拒绝策略。
        if (!isRunning(recheck) && remove(command))
            reject(command);
            // 如果当前线程池为空就新创建一个线程并执行。
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    //3. 通过addWorker(command, false)新建一个线程,并将任务(command)添加到该线程中;然后,启动该线程从而执行任务。
    //如果addWorker(command, false)执行失败,则通过reject()执行相应的拒绝策略的内容。
    else if (!addWorker(command, false))
        reject(command);
}

  1. 如果线程数小于核心线程数,则创建线程并执行当前任务
  2. 如线程数大于等于核心线程数或线程创建失败,则将当前任务放到工作队列中。
  3. 如果线程池不处于运行中或任务无法放入队列,并且当前线程数量小于最大允许的线程数量,则创建一个线程执行任务。
  4. 否则则通过reject()执行相应的拒绝策略的内容。

1.4、工作线程

工作线程:线程池创建线程时,会将线程封装成工作线程WorkerWorker在执行完任务后,还会循环获取工作队列里的任务来执行

图示说明

《Java并发编程的艺术 -- 线程池和Executor框架(第九、十章)》

  • execute()方法中创建一个线程时,会让这个线程执行当前任务。
  • 这个线程执行完上图中1的任务后,会反复从BlockingQueue获取任务来执行

1.5、线程池的创建

源代码

/** * 用给定的初始参数创建一个新的ThreadPoolExecutor。 */
public ThreadPoolExecutor(int corePoolSize,
                      int maximumPoolSize,
                      long keepAliveTime,
                      TimeUnit unit,
                      BlockingQueue<Runnable> workQueue,
                      ThreadFactory threadFactory,
                      RejectedExecutionHandler handler) { 
    if (corePoolSize < 0 ||
        maximumPoolSize <= 0 ||
        maximumPoolSize < corePoolSize ||
        keepAliveTime < 0)
            throw new IllegalArgumentException();
    if (workQueue == null || threadFactory == null || handler == null)
        throw new NullPointerException();
    this.corePoolSize = corePoolSize;
    this.maximumPoolSize = maximumPoolSize;
    this.workQueue = workQueue;
    this.keepAliveTime = unit.toNanos(keepAliveTime);
    this.threadFactory = threadFactory;
    this.handler = handler;
}

  1. corePoolSize核心线程池的基本大小):线程池允许创建的最小线程数量。当提交一个任务到线程池时,线程池会创建一个线程来执行任务,即使其他空闲的基本线程能够执行新任务也会创建线程,等到需要执行的任务数大于线程池基本大小时就不再创建。
  2. runnableTaskQueue任务队列):用于保存等待执行的任务的阻塞队列
  • runnableTaskQueue任务队列):用于保存等待执行的任务的阻塞队列。可以选择以下几个阻塞队列。
  • ArrayBlockingQueue:是一个基于数组结构的有界阻塞队列,此队列按FIFO(先进先出)原则对元素进行排序。
  • LinkedBlockingQueue:一个基于链表结构的阻塞队列,此队列按FIFO排序元素,吞吐量通常要高于ArrayBlockingQueue。静态工厂方法Executors.newFixedThreadPool()使用了这个队列。
  • SynchronousQueue:一个不存储元素的阻塞队列。每个插入操作必须等到另一个线程调用移除操作,否则插入操作一直处于阻塞状态,吞吐量通常要高于Linked-BlockingQueue,静态工厂方法Executors.newCachedThreadPool使用了这个队列。
  1. maximumPoolSize线程池最大数量):线程池允许创建的最大线程数。如果队列满了,并且已创建的线程数小于最大线程数,则线程池会再创建新的线程执行任务。值得注意的是,如果使用了无界的任务队列这个参数就没什么效果
  2. ThreadFactory:用于设置创建线程的工厂。
  3. RejectedExecutionHandler饱和策略):当队列和线程池都满了,说明线程池处于饱和状态,那么必须采取一种策略处理提交的新任务。这个策略默认情况下是AbortPolicy,表示无法处理新任务时抛出异常。
  • AbortPolicy:直接抛出异常。
  • CallerRunsPolicy:只用调用者所在线程来运行任务。
  • DiscardOldestPolicy:丢弃队列里最近的一个任务,并执行当前任务。
  • DiscardPolicy:不处理,丢弃掉。
  1. keepAliveTime线程活动保持时间):线程池的工作线程空闲后,保持存活的时间。所以,如果任务很多,并且每个任务执行的时间比较短,可以调大时间,提高线程的利用率。

1.6、execute()和submit()方法

  • execute()方法用于提交不需要返回值的任务,所以无法判断任务是否被线程池执行成功。
  • submit()方法用于提交需要返回值的任务。线程池会返回一个future类型的对象,通过这个future对象可以判断任务是否执行成功,并且可以通过future的 get()方法来获取返回值get()方法会阻塞当前线程直到任务完成,而使用get(long timeout,TimeUnit unit)方法则会阻塞当前线程一段时间后立即返回,这时候有可能任务没有执行完。

submit()源码

public Future<?> submit(Runnable task) { 
    if (task == null) throw new NullPointerException();
    RunnableFuture<Void> ftask = newTaskFor(task, null);
    execute(ftask);
    return ftask;
}


protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) { 
    return new FutureTask<T>(runnable, value);
}

1.7、关闭线程池

1. 关闭线程池方法说明

  • 可以通过调用线程池的shutdownshutdownNow方法来关闭线程池。
  • 它们的原理遍历线程池中的工作线程,然后逐个调用线程的interrupt方法来中断线程,所以无法响应中断的任务可能永远无法终止。
  • 但是它们存在一定的区别,shutdownNow首先将线程池的状态设置成STOP,然后尝试停止所有的正在执行或暂停任务的线程,并返回等待执行任务的列表,而shutdown只是将线程池的状态设置成SHUTDOWN状态,然后中断所有没有正在执行任务的线程
  • 通常调用shutdown方法来关闭线程池,如果任务不一定要执行完,则可以调用shutdownNow方法

2. 返回关闭状态的方法说明

  • 只要调用了这两个关闭方法中的任意一个,isShutdown方法就会返回true
  • 当所有的任务都已关闭后,才表示线程池关闭成功,这时调用isTerminaed方法会返回true

2、Executor框架

2.1、Executor框架简介

2.1.1、Executor框架的两级调度模型

《Java并发编程的艺术 -- 线程池和Executor框架(第九、十章)》

  • 在上层Java多线程程序通常把应用分解为若干个任务,然后使用用户级的调度器(Executor框架)将这些任务映射为固定数量的线程;
  • 在底层,操作系统内核将这些线程映射到硬件处理器上。

2.1.2、Executor框架的结构

1. 结构

  1. 任务:包括被执行任务需要实现的接口:Runnable接口或Callable接口。
  2. 任务的执行:包括任务执行机制的核心接口Executor,以及继承自Executor的ExecutorService接口。
  3. 异步计算的结果:包括接口Future和实现Future接口的FutureTask类。

2. 主要的类和接口

  1. Executor是一个接口,它是Executor框架的基础,它将任务的提交与任务的执行分离开来。
  2. ThreadPoolExecutor是线程池的核心实现类,用来执行被提交的任务。
  3. ScheduledThreadPoolExecutor是一个实现类,可以在给定的延迟后运行命令,或者定期执行命令。ScheduledThreadPoolExecutor比Timer更灵活,功能更强大。
  4. Future接口和实现Future接口的FutureTask类,代表异步计算的结果。
  5. Runnable接口和Callable接口的实现类,都可以被ThreadPoolExecutor或ScheduledThreadPoolExecutor执行。

2.1.3、 Executor框架的使用

《Java并发编程的艺术 -- 线程池和Executor框架(第九、十章)》

  1. 主线程首先要创建实现Runnable或者Callable接口的任务对象
  2. 然后可以把Runnable对象直接交给ExecutorService执行,或者也可以把Runnable对象或Callable对象提交给ExecutorService执行。
  3. ExecutorService将返回一个实现Future接口的对象。由于FutureTask实现了Runnable,程序员也可以创建FutureTask,然后直接交给ExecutorService执行。
  4. 最后,主线程可以执行FutureTask.get()方法来等待任务执行完成。主线程也可以执行FutureTask.cancel(boolean mayInterruptIfRunning)来取消此任务的执行

2.1.4、Executor框架的成员

  1. ThreadPoolExecutor
  • FixedThreadPool:创建使用固定线程数的FixedThreadPool。
  • SingleThreadExecutor:创建使用单个线程的SingleThreadExecutor。
  • CachedThreadPool:创建一个会根据需要创建新线程的CachedThreadPool。
  1. ScheduledThreadPoolExecutor
  • ScheduledThreadPoolExecutor:包含若干个线程的ScheduledThreadPoolExecutor。
  • SingleThreadScheduledExecutor。只包含一个线程的ScheduledThreadPoolExecutor。
  1. Future接口:用来表示异步计算的结果
  2. Runnable接口和Callable接口

2.2、ThreadPoolExecutor

2.2.1、参数详解

  • corePool:核心线程池的大小。
  • maximumPool:最大线程池的大小。
  • BlockingQueue:用来暂时保存任务的工作队列。
  • RejectedExecutionHandler:当ThreadPoolExecutor已经关闭或ThreadPoolExecutor已经饱和时(达到了最大线程池大小且工作队列已满),execute()方法将要调用的Handler。

2.2.2、FixedThreadPool

1. 简介

FixedThreadPool被称为可重用固定线程数的线程池

2. execute()方法

《Java并发编程的艺术 -- 线程池和Executor框架(第九、十章)》

  1. 如果当前运行的线程数少于corePoolSize,则创建新线程来执行任务。
  2. 在线程池完成预热之后(当前运行的线程数等于corePoolSize),将任务加入LinkedBlockingQueue
  3. 线程执行完1中的任务后,会在循环中反复从LinkedBlockingQueue获取任务来执行

3. 其使用无界队列LinkedBlockingQueue带来的影响

  1. 当线程池中的线程数达到corePoolSize后,新任务将在无界队列中等待,因此线程池中的线程数不会超过corePoolSize
  2. 使用无界队列时maximumPoolSize将是一个无效参数
  3. 使用无界队列时keepAliveTime将是一个无效参数
  4. 由于使用无界队列,运行中的FixedThreadPool(未执行方法shutdown()或shutdownNow())不会拒绝任务(不会调用RejectedExecutionHandler.rejectedExecution方法)。

2.2.3、SingleThreadExecutor

1. 简介

  • SingleThreadExecutor是使用单个worker线程的Executor
  • SingleThreadExecutor的corePoolSize和maximumPoolSize被设置为1
  • 其他参数与FixedThreadPool相同。
  • SingleThreadExecutor使用无界队列LinkedBlockingQueue作为线程池的工作队列(队列的容量为Integer.MAX_VALUE)。SingleThreadExecutor使用无界队列作为工作队列对线程池带来的影响与FixedThreadPool相同

2. execute()方法

《Java并发编程的艺术 -- 线程池和Executor框架(第九、十章)》

  1. 如果当前运行的线程数少于corePoolSize(即线程池中无运行的线程),则创建一个新线程来执行任务。
  2. 在线程池完成预热之后(当前线程池中有一个运行的线程),将任务加LinkedBlockingQueue。
  3. 线程执行完1中的任务后,会在一个无限循环中反复从LinkedBlockingQueue获取任务来执行

2.2.4、CachedThreadPool

1. 简介

  • CachedThreadPool一个会根据需要创建新线程的线程池
  • CachedThreadPool的 corePoolSize被设置为0,即corePool为空。
  • maximumPoolSize被设置为Integer.MAX_VALUE,即maximumPool是无界的。
  • 这里把 keepAliveTime设置为60L,意味着CachedThreadPool中的空闲线程等待新任务的最长时间为60秒,空闲线程超过60秒后将会被终止。
  • CachedThreadPool使用没有容量的SynchronousQueue作为线程池的工作队列,但CachedThreadPool的maximumPool是无界的。

2. execute()方法

《Java并发编程的艺术 -- 线程池和Executor框架(第九、十章)》

  1. 首先执行SynchronousQueue.offer(Runnable task)。如果当前maximumPool中有空闲线程正在执行SynchronousQueue.poll(keepAliveTime,TimeUnit.NANOSECONDS),那么主线程执行offer操作与空闲线程执行的poll操作配对成功,主线程把任务交给空闲线程执行,execute()方法执行完成。否则就执行步骤2。
  2. 当初始maximumPool为空,或者maximumPool中当前没有空闲线程时,将没有线程执行SynchronousQueue.poll(keepAliveTime,TimeUnit.NANOSECONDS)。这种情况下,步骤(1)将失败。此时CachedThreadPool会创建一个新线程执行任务,execute()方法执行完成
  3. 在步骤(2)中新创建的线程将任务执行完后,会执行SynchronousQueue.poll(keepAliveTime,TimeUnit.NANOSECONDS)。这个poll操作会让空闲线程最多在SynchronousQueue中等待60秒钟。如果60秒钟内主线程提交了一个新任务(主线程执行步骤1)),那么这个空闲线程将执行主线程提交的新任务;否则,这个空闲线程将终止。由于空闲60秒的空闲线程会被终止,因此长时间保持空闲的CachedThreadPool不会使用任何资源

2.3、ScheduledThreadPoolExecutor

2.3.1、简介

  • 它主要用来在给定的延迟之后运行任务,或者定期执行任务
  • ScheduledThreadPoolExecutor可以在构造函数中指定多个对应的后台线程数

2.3.2、运行机制

  • 使用DelayQueue作为任务队列。DelayQueue是一个无界队列,所以ThreadPoolExecutor的 maximumPoolSize在ScheduledThreadPoolExecutor中没有什么意义。DelayQueue封装了一个PriorityQueue,这个PriorityQueue会对队列中的ScheduledFutureTask进行排序。排序时,time小的排在前面(时间早的任务将被先执行)。如果两个ScheduledFutureTask的time相同,就比较sequenceNumber,sequenceNumber小的排在前面(也就是说,如果两个任务的执行时间相同,那么先提交的任务将被先执行)。

1. 包含的成员变量

  • long型成员变量time,表示这个任务将要被执行的具体时间
  • long型成员变量sequenceNumber,表示这个任务被添加到ScheduledThreadPoolExecutor中的序号
  • long型成员变量period,表示任务执行的间隔周期。

2. 任务执行步骤

《Java并发编程的艺术 -- 线程池和Executor框架(第九、十章)》

  1. 线程1从DelayQueue中获取已到期的ScheduledFutureTask(DelayQueue.take())到期任务是指ScheduledFutureTask的time大于等于当前时间
  2. 线程1执行这个ScheduledFutureTask。
  3. 线程1修改ScheduledFutureTask的time变量为下次将要被执行的时间。
  4. 线程1把这个修改time之后的ScheduledFutureTask放回DelayQueue中(DelayQueue.add())

2.4、FutureTask

2.4.1、简介

Future接口和实现Future接口的FutureTask类,代表异步计算的结果

1. FutureTask的状态

《Java并发编程的艺术 -- 线程池和Executor框架(第九、十章)》

  1. 未启动FutureTask.run()方法还没有被执行之前,FutureTask处于未启动状态。当创建一个FutureTask,且没有执行FutureTask.run()方法之前,这个FutureTask处于未启动状态。
  2. 已启动FutureTask.run()方法被执行的过程中,FutureTask处于已启动状态。
  3. 已完成FutureTask.run()方法执行完后正常结束,或被取消(FutureTask.cancel(..)),或执行FutureTask.run()方法时抛出异常而异常结束,FutureTask处于已完成状态。

2. get方法和cancel方法的执行

《Java并发编程的艺术 -- 线程池和Executor框架(第九、十章)》

  • 当FutureTask处于未启动或已启动状态时,执行FutureTask.get()方法将导致调用线程阻塞;
  • 当FutureTask处于已完成状态时,执行FutureTask.get()方法将导致调用线程立即返回结果或抛出异常。
  • 当FutureTask处于未启动状态时,执行FutureTask.cancel()方法将导致此任务永远不会被执行;
  • 当FutureTask处于已启动状态时,执行FutureTask.cancel(true)方法将以中断执行此任务线程的方式来试图停止任务;
  • 当FutureTask处于已启动状态时,执行FutureTask.cancel(false)方法将不会对正在执行此任务的线程产生影响(让正在执行的任务运行完成);
  • 当FutureTask处于已完成状态时,执行FutureTask.cancel(…)方法将返回false。
    原文作者:崇尚学技术的科班人
    原文地址: https://blog.csdn.net/weixin_56727438/article/details/121591802
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞