一文带你彻底弄懂线程池设计机制

2024-11-06 23:11

背景介绍

虽然 Java 对线程的创建、中断、等待、通知、销毁、同步等功能提供了很多的支持,但是从操作系统角度来说,频繁的创建线程和销毁线程,其实是需要大量的时间和资源的。

例如,当有多个任务同时需要处理的时候,一个任务对应一个线程来执行,以此来提升任务的执行效率,模型图如下:

图片

如果任务数非常少,这种模式倒问题不大,但是如果任务数非常的多,可能就会存在很大的问题:

  • 1.线程数不可控:随着任务数的增多,线程数也会增多,这些线程都没办法进行统一管理

  • 2.系统的开销很大:创建线程对系统来说开销很高,随着线程数也会增多,可能会出现系统资源紧张的问题,严重的情况系统可能直接死机

假如把很多任务让一组线程来执行,而不是一个任务对应一个新线程,这种通过接受任务并进行分发处理的就是线程池

图片

线程池内部维护了若干个线程,当没有任务的时候,这些线程都处于等待状态;当有新的任务进来时,就分配一个空闲线程执行;当所有线程都处于忙碌状态时,新任务要么放入队列中等待,要么增加一个新线程进行处理,要么直接拒绝。

很显然,这种通过线程池来执行多任务的思路,优势明显:

  • 1.资源更加可控:能有效的控制线程数,防止线程数过多,导致系统资源紧张

  • 2.资源消耗更低:因为线程可以复用,可以有效的降低创建和销毁线程的时间和资源

  • 3.执行效率更高:当新的任务进来时,可以不需要等待线程的创建立即执行

关于这一点,我们可以看一个简单的对比示例。

/**
 * 使用一个任务对应一个线程来执行
 * @param args
 */
public static void main(String[] args) {
    long startTime = System.currentTimeMillis();
    final Random random = new Random();
    List<Integer> list = new CopyOnWriteArrayList<>();

    // 一个任务对应一个线程,使用20000个线程执行任务
    for (int i = 0; i < 20000; i++) {
        new Thread(new Runnable() {
            @Override
            public void run() {
                list.add(random.nextInt(100));
            }
        }).start();
    }
    // 等待任务执行完毕
    while (true){
        if(list.size() >= 20000){
            break;
        }
    }
    System.out.println("一个任务对应一个线程,执行耗时:" + (System.currentTimeMillis() - startTime) + "ms");
}
/**
 * 使用线程池进行执行任务
 * @param args
 */
public static void main(String[] args) {
    long startTime = System.currentTimeMillis();
    final Random random = new Random();
    List<Integer> list = new CopyOnWriteArrayList<>();

    // 使用线程池进行执行任务,默认4个线程
    ThreadPoolExecutor executor = new ThreadPoolExecutor(4, 4, 60, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(20000));
    for (int i = 0; i < 20000; i++) {
     // 提交任务
        executor.submit(new Runnable() {
            @Override
            public void run() {
                list.add(random.nextInt(100));
            }
        });
    }

    // 等待任务执行完毕
    while (true){
        if(list.size() >= 20000){
            break;
        }
    }
    System.out.println("使用线程池,执行耗时:" + (System.currentTimeMillis() - startTime) + "ms");
    // 关闭线程池
    executor.shutdown();
}

两者执行耗时情况对比,如下:

一个任务对应一个线程,执行耗时:3073ms
---------------------------
使用线程池,执行耗时:578ms

从结果上可以看出,同样的任务数,采用线程池和不采用线程池,执行耗时差距非常明显,一个任务对应一个新的线程来执行,反而效率不如采用 4 个线程的线程池执行的快。

为什么会产生这种现象,下面我们就一起来聊聊线程池。

线程池概述

站在专业的角度讲,线程池其实是一种利用池化思想来实现线程管理的技术,它将线程的创建和任务的执行进行解耦,同时复用已经创建的线程来降低频繁创建和销毁线程所带来的资源消耗。通过合理的参数设置,可以实现更低的系统资源使用率、更高的任务并发执行效率。

在 Java 中,线程池最顶级的接口是Executor,名下的实现类关系图如下:

图片

关键接口和实现类,相关的描述如下:

  • 1.Executor是最顶级的接口,它的作用是将任务的执行和线程的创建进行抽象解藕

  • 2.ExecutorService接口继承了Executor接口,在Executor的基础上,增加了一些关于管理线程池的一些方法,比如查看任务的状态、获取线程池的状态、终止线程池等标准方法

  • 3.ThreadPoolExecutor是一个线程池的核心实现类,完整的封装了线程池相关的操作方法,通过它可以创建线程池

  • 4.ScheduledThreadPoolExecutor是一个使用线程池的定时调度实现类,完整的封装了定时调度相关的操作方法,通过它可以创建周期性线程池

    整个关系图中,其中ThreadPoolExecutor是线程池最核心的实现类,开发者可以使用它来创建线程池。

ThreadPoolExecutor 构造方法

ThreadPoolExecutor类的完整构造方法一共有七个参数,理解这些参数的配置对使用好线程池至关重要,完整的构造方法核心源码如下:

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler)

各个参数的解读如下:

  • corePoolSize:核心线程数量,用于执行任务的核心线程数。

  • maximumPoolSize:最大线程数量,线程池中允许创建线程的最大数量

  • keepAliveTime:空闲线程存活的时间。只有当线程池中的线程数大于 corePoolSize 时,这个参数才会起作用

  • unit:空闲线程存活的时间单位

  • workQueue:任务队列,用于存储还没来得及执行的任务

  • threadFactory:线程工厂。用于执行任务时创建新线程的工厂

  • handler:拒绝策略,当线程池和和队列容量处于饱满,使用某种策略来拒绝任务提交

ThreadPoolExecutor 执行流程

创建完线程池之后就可以提交任务了,当有新的任务进来时,线程池就会工作并分配线程去执行任务。

ThreadPoolExecutor的典型用法如下:

// 创建固定大小的线程池
ThreadPoolExecutor executor = new ThreadPoolExecutor(4, 4, 60, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(100));
// 提交任务
executor.execute(task1);
executor.execute(task2);
executor.execute(task3);
...

针对任务的提交方式,ThreadPoolExecutor还提供了两种方法。

  • execute()方法:一种无返回值的方法,也是最核心的任务提交方法

  • submit()方法:支持有返回值,通过FutureTask对象来获取任务执行完后的返回值,底层依然调用的是execute()方法

ThreadPoolExecutor执行提交的任务流程虽然比较复杂,但是通过对源码的分析,大致的任务执行流程,可以用如下图来概括。

图片

整个执行流程,大体步骤如下:

  • 1.初始化完线程池之后,默认情况下,线程数为0,当有任务到来后才会创建新线程去执行任务

  • 2.每次收到提交的任务之后,会先检查核心线程数是否已满,如果没有,就会继续创建新线程来执行任务,直到核心线程数达到设定值

  • 3.当核心线程数已满,会检查任务队列是否已满,如果没有,就会将任务存储到阻塞任务队列中

  • 4.当任务队列已满,会再次检查线程池中的线程数是否达到最大值,如果没有,就会创建新的线程来执行任务

  • 5.如果任务队列已满、线程数已达到最大值,此时线程池已经无法再接受新的任务,当收到任务之后,会执行拒绝策略

我们再回头来看上文提到的ThreadPoolExecutor构造方法中的七个参数,这些参数会直接影响线程的执行情况,各个参数的变化情况,可以用如下几点来概括:

  • 1.当线程池中的线程数小于 corePoolSize 时,新任务都不排队而是直接创新新线程来执行

  • 2.当线程池中的线程数大于等于 corePoolSize,workQueue 未满时,将新任务添加到 workQueue 中而不是创建新线程来执行

  • 3.当线程池中的线程数大于等于 corePoolSize,workQueue 已满,但是线程数小于 maximumPoolSize 时,此时会创建新的线程来处理被添加的任务

  • 4.当线程池中的线程数大于等于 maximumPoolSize,并且 workQueue 已满,新任务会被拒绝,使用 handler 执行被拒绝的任务

ThreadPoolExecutor执行任务的部分核心源码如下!

execute 提交任务
public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
    int c = ctl.get();
 // 工作线程数量 < corePoolSize,直接创建线程执行任务
    if (workerCountOf(c) < corePoolSize) {
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }
 // 工作线程数量 >= corePoolSize,将任务添加至阻塞队列中
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
  // 往阻塞队列中添加任务的时候,如果线程池非运行状态,将任务remove,并执行拒绝策略
        if (! isRunning(recheck) && remove(command))
            reject(command);
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    // 阻塞队列已满,尝试添加新的线程去执行,如果工作线程数量 >= maximumPoolSize,执行拒绝策略
    else if (!addWorker(command, false))
        reject(command);
}
addWorker 创建线程加入线程池
private boolean addWorker(Runnable firstTask, boolean core) {
    retry:
    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);

  // 线程池状态处于非 RUNNING 状态,添加worker失败
        if (rs >= SHUTDOWN &&
            ! (rs == SHUTDOWN &&
               firstTask == null &&
               ! workQueue.isEmpty()))
            return false;
  // 判断线程池中线程数量大于等于该线程池允许的最大线程数量,如果大于则worker失败,反之cas更新线程池中的线程数
        for (;;) {
            int wc = workerCountOf(c);
            if (wc >= CAPACITY ||
                wc >= (core ? corePoolSize : maximumPoolSize))
                return false;
            if (compareAndIncrementWorkerCount(c))
                break retry;
            c = ctl.get();  // Re-read ctl
            if (runStateOf(c) != rs)
                continue retry;
            // else CAS failed due to workerCount change; retry inner loop
        }
    }

    boolean workerStarted = false;
    boolean workerAdded = false;
    Worker w = null;
    try {
  // 创建工作线程
        w = new Worker(firstTask);
        final Thread t = w.thread;
        if (t != null) {
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                int rs = runStateOf(ctl.get());
                if (rs < SHUTDOWN ||
                    (rs  SHUTDOWN && firstTask  null)) {
                 // 如果线程池处于 RUNNING 状态并且线程已经启动,则抛出线程异常启动
                    if (t.isAlive()) 
                        throw new IllegalThreadStateException();
     // 将线程加入已创建的工作线程集合,更新用于追踪线程池中线程数量 largestPoolSize 字段
                    workers.add(w);
                    int s = workers.size();
                    if (s > largestPoolSize)
                        largestPoolSize = s;
                    workerAdded = true;
                }
            } finally {
                mainLock.unlock();
            }
            if (workerAdded) {
    // 启动线程执行任务
                t.start();
                workerStarted = true;
            }
        }
    } finally {
        if (! workerStarted)
            addWorkerFailed(w);
    }
    return workerStarted;
}
runWorker 执行任务
final void runWorker(Worker w) {
 // 获取执行任务线程
    Thread wt = Thread.currentThread();
    // 获取执行任务
    Runnable task = w.firstTask;
 // 将worker中的任务置空
    w.firstTask = null;
    w.unlock(); // allow interrupts
    boolean completedAbruptly = true;
    try {
     // 从当前工作线程种获取任务,或者循环从阻塞任务队列中获取任务
        while (task != null || (task = getTask()) != null) {
            w.lock();
   // 双重检查线程池是否正在停止,如果线程池停止,并且当前线程能够中断,则中断线程
            if ((runStateAtLeast(ctl.get(), STOP) ||
                 (Thread.interrupted() &&
                  runStateAtLeast(ctl.get(), STOP))) &&
                !wt.isInterrupted())
                wt.interrupt();
            try {
    // 前置执行任务钩子函数
                beforeExecute(wt, task);
                Throwable thrown = null;
                try {
     // 执行当前任务
                    task.run();
                } catch (RuntimeException x) {
                    thrown = x; throw x;
                } catch (Error x) {
                    thrown = x; throw x;
                } catch (Throwable x) {
                    thrown = x; throw new Error(x);
                } finally {
     // 后置执行任务钩子函数
                    afterExecute(task, thrown);
                }
            } finally {
                task = null;
                w.completedTasks++;
                w.unlock();
            }
        }
        completedAbruptly = false;
    } finally {
  // 回收线程
        processWorkerExit(w, completedAbruptly);
    }
}
reject 执行拒绝策略
final void reject(Runnable command) {
 // 执行拒绝策略
    handler.rejectedExecution(command, this);
}

当线程池中的线程数大于等于 maximumPoolSize,并且 workQueue 已满,新任务会被拒绝,使用RejectedExecutionHandler接口的rejectedExecution()方法来处理被拒绝的任务。

线程池提供了四种拒绝策略实现类来拒绝任务,具体如下:

描述

AbortPolicy

直接抛出一个RejectedExecutionException,这也是JDK默认的拒绝策略

DiscardPolicy

什么也不做,直接丢弃任务

DiscardOldestPolicy

将阻塞队列中的任务移除出来,然后执行当前任务

CallerRunsPolicy

尝试直接运行被拒绝的任务,如果线程池已经被关闭了,任务就被丢弃了

ThreadPoolExecutor 线程池状态

我们知道 Java 种的线程一共 6 种状态,其实线程池也有状态。

因为线程池也是异步执行的,有的任务正在执行,有的任务存储在任务队列中,有的线程处于工作状态,有的线程处于空闲状态等待回收,为了更加精细化的管理线程池,线程池也设计了 5 中状态,部分核心源码如下:

public class ThreadPoolExecutor extends AbstractExecutorService {

 // 线程池线程数的bit数
 private static final int COUNT_BITS = Integer.SIZE - 3;
 
 // 线程池状态
 private static final int RUNNING    = -1 << COUNT_BITS;
 private static final int SHUTDOWN   =  0 << COUNT_BITS;
 private static final int STOP       =  1 << COUNT_BITS;
 private static final int TIDYING    =  2 << COUNT_BITS;
 private static final int TERMINATED =  3 << COUNT_BITS;
}

其中的状态流程,可以用如下图来描述!

图片

这几个状态的转化关系,可以用如下几个步骤来概括:

  • 1.线程池创建完之后,默认就进入了可执行状态RUNNING,此时线程数为 0,当有任务进来时,再创建新线程来执行,可以看成是一个慢启动的过程

  • 2.当线程池处于运行状态时,可以通过shutdown()或者shutdownNow()方法来改变运行状态。shutdown()是一个平稳的关闭操作,线程池停止接受新的任务,同时等待已经提交的任务执行完毕,包括那些进入队列还没有开始的任务,这时候线程池处于 SHUTDOWN 状态;shutdownNow()是一个立即关闭的操作,线程池立刻停止接受新的任务,同时线程池取消所有执行的任务和已经进入队列但是还没有执行的任务,这时候线程池处于 STOP 状态

  • 3.当任务队列和线程池均为空的时候,SHUTDOWN 或者 STOP 状态,就会进入 TIDYING 状态,等待被终止

  • 4.当terminated()方法被调用完成之后,线程池会从 TIDYING 状态进入 TERMINATED 状态,此时线程池就结束了

线程池应用

正如文章的开头所介绍的,使用线程池的方式,通常可以用如下几个步骤来概括:

// 1.创建固定大小为4的线程数、空闲线程的存活时间为15秒、阻塞任务队列的上限为1000的线程池完整示例
ThreadPoolExecutor executor = new ThreadPoolExecutor(4, 4, 15, TimeUnit.SECONDS, new LinkedBlockingQueue<>(1000), Executors.defaultThreadFactory(), new ThreadPoolExecutor.AbortPolicy());

// 2.提交任务
executor.submit(task1);
executor.submit(task2);
executor.submit(task3);
...

// 3.使用完毕之后,可以手动关闭线程池
executor.shutdown();

正如上文所说,其中execute()submit()方法都可以用来提交任务,稍有不同的是:submit()方法同时还支持获取任务执行完毕的返回结果。

针对线程池的使用,Java 还提供了Executors工具类,开发者可以通过此工具,快速创建不同类型的线程池。

下面我们一起来看下Executors为用户提供的几种创建线程池的方法。

newSingleThreadExecutor

newSingleThreadExecutor()方法表示创建一个单线程的线程池,核心源码如下:

public static ExecutorService newSingleThreadExecutor() {
    return new FinalizableDelegatedExecutorService
        (new ThreadPoolExecutor(1, 1,
                                0L, TimeUnit.MILLISECONDS,
                                new LinkedBlockingQueue<Runnable>()));
}

从构造参数上可以很清晰的看到,线程池中的线程数为 1,不会被线程池自动回收,workQueue 选择的是无界的LinkedBlockingQueue阻塞队列,不管来多少任务存入阻塞队列中,前面一个任务执行完毕,再执行队列中的剩余任务。

简单应用示例如下:

public static void main(String[] args) {
    long startTime = System.currentTimeMillis();
    final Random random = new Random();
    List<Integer> list = new CopyOnWriteArrayList<>();

    // 创建一个单线程线程池
    ExecutorService executor  = Executors.newSingleThreadExecutor();
    for (int i = 0; i < 10; i++) {
        executor.submit(new Runnable() {
            @Override
            public void run() {
                list.add(random.nextInt(100));
                System.out.println("thread name:" + Thread.currentThread().getName());
            }
        });
    }

    while (true){
        if(list.size() >= 10){
            break;
        }
    }
    System.out.println("执行耗时:" + (System.currentTimeMillis() - startTime) + "ms");
    // 关闭线程池
    executor.shutdown();
}

运行结果如下:

thread name:pool-1-thread-1
thread name:pool-1-thread-1
thread name:pool-1-thread-1
thread name:pool-1-thread-1
thread name:pool-1-thread-1
thread name:pool-1-thread-1
thread name:pool-1-thread-1
thread name:pool-1-thread-1
thread name:pool-1-thread-1
thread name:pool-1-thread-1
执行耗时:13ms

newFixedThreadPool

newFixedThreadPool()方法表示创建一个固定大小线程数的线程池,核心源码如下:

public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads,
                                  0L, TimeUnit.MILLISECONDS,
                                  new LinkedBlockingQueue<Runnable>());
}

固定大小的线程池和单线程的线程池有异曲同工之处,无非是让线程池中能运行的线程数量支持手动指定。

简单应用示例如下:

public static void main(String[] args) {
    long startTime = System.currentTimeMillis();
    final Random random = new Random();
    List<Integer> list = new CopyOnWriteArrayList<>();

    // 创建固定大小线程数为3的线程池
    ExecutorService executor  = Executors.newFixedThreadPool(3);
    for (int i = 0; i < 10; i++) {
        executor.submit(new Runnable() {
            @Override
            public void run() {
                list.add(random.nextInt(100));
                System.out.println("thread name:" + Thread.currentThread().getName());
            }
        });
    }

    while (true){
        if(list.size() >= 10){
            break;
        }
    }
    System.out.println("执行耗时:" + (System.currentTimeMillis() - startTime) + "ms");
    // 关闭线程池
    executor.shutdown();
}

运行结果如下:

thread name:pool-1-thread-2
thread name:pool-1-thread-1
thread name:pool-1-thread-3
thread name:pool-1-thread-3
thread name:pool-1-thread-3
thread name:pool-1-thread-1
thread name:pool-1-thread-3
thread name:pool-1-thread-2
thread name:pool-1-thread-2
thread name:pool-1-thread-1
执行耗时:10ms

newCachedThreadPool

newCachedThreadPool()方法表示创建一个可缓存的无界线程池,核心源码如下:

public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                  60L, TimeUnit.SECONDS,
                                  new SynchronousQueue<Runnable>());
}

从构造参数上可以看出,线程池中的最大线程数为Integer.MAX_VALUE,也就是Integer的最大值,workQueue 选择的是SynchronousQueue阻塞队列,这个阻塞队列不像LinkedBlockingQueue,它没有容量,只负责做临时任务缓存,如果有任务进来立刻会被执行。

也就是说,只要添加进去了任务,线程就会立刻去执行,当任务超过线程池的线程数则创建新的线程去执行,线程数量的最大上线为Integer.MAX_VALUE,当线程池中的线程空闲时间超过 60s,则会自动回收该线程。

简单应用示例如下:

public static void main(String[] args) {
    long startTime = System.currentTimeMillis();
    final Random random = new Random();
    List<Integer> list = new CopyOnWriteArrayList<>();

    // 创建可缓存的无界线程池
    ExecutorService executor  = Executors.newCachedThreadPool();
    for (int i = 0; i < 10; i++) {
        executor.submit(new Runnable() {
            @Override
            public void run() {
                list.add(random.nextInt(100));
                System.out.println("thread name:" + Thread.currentThread().getName());
            }
        });
    }

    while (true){
        if(list.size() >= 10){
            break;
        }
    }
    System.out.println("执行耗时:" + (System.currentTimeMillis() - startTime) + "ms");
    // 关闭线程池
    executor.shutdown();
}

运行结果如下:

thread name:pool-1-thread-1
thread name:pool-1-thread-2
thread name:pool-1-thread-3
thread name:pool-1-thread-4
thread name:pool-1-thread-3
thread name:pool-1-thread-2
thread name:pool-1-thread-1
thread name:pool-1-thread-4
thread name:pool-1-thread-4
thread name:pool-1-thread-4
执行耗时:13ms

newScheduledThreadPool

newScheduledThreadPool()方法表示创建周期性的线程池,可以指定线程池中的核心线程数,支持定时及周期性任务的执行,核心源码如下:

public ScheduledThreadPoolExecutor(int corePoolSize) {
    super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
          new DelayedWorkQueue());
}

从构造参数上可以看出,线程池支持指定核心线程数,最大线程数为Integer.MAX_VALUE,workQueue 选择的是DelayedWorkQueue延迟阻塞队列,这个阻塞队列支持任务延迟消费,新加入的任务不会立刻被执行,只有时间到期之后才会被取出;当非核心线程处于空闲状态时,会立刻进行收回。

ScheduledExecutorService支持三种类型的定时调度方法,分别如下:

  • schedule:支持指定多久执行一次任务

  • scheduleAtFixedRate:支持周期性间隔多久的执行任务

  • scheduleWithFixedDelay:同样也是指周期性的执行任务,不过它指的是上一个任务执行完之后,延迟多久执行下一个任务

下面我们一起来看看它们的应用方式。

schedule 方法使用示例
SimpleDateFormat sdf =new SimpleDateFormat("yyyy-MM-dd hh:mm:ss");

// 创建线程数量为2的定时调度线程池
ScheduledExecutorService executor  = Executors.newScheduledThreadPool(2);
System.out.println(sdf.format(new Date()) +  " 准备启动");
// 定时执行一次的任务,延迟1s后执行
executor.schedule(new Runnable() {

    @Override
    public void run() {
        System.out.println(sdf.format(new Date()) +  " thread name:" + Thread.currentThread().getName() +  ", schedule");

    }
}, 1, TimeUnit.SECONDS);

输出结果:

2023-11-17 01:41:12 准备启动
2023-11-17 01:41:13 thread name:pool-1-thread-1, schedule
scheduleAtFixedRate 方法使用示例
SimpleDateFormat sdf =new SimpleDateFormat("yyyy-MM-dd hh:mm:ss");

// 创建线程数量为2的定时调度线程池
ScheduledExecutorService executor  = Executors.newScheduledThreadPool(2);
System.out.println(sdf.format(new Date()) +  " 准备启动");

// 周期性地执行任务,第一个任务延迟1s后执行,之后每隔2s周期性执行任务,需要等待上一次的任务执行完毕才执行下一个
executor.scheduleAtFixedRate(new Runnable() {

    @Override
    public void run() {
        System.out.println(sdf.format(new Date()) +  " thread name:" + Thread.currentThread().getName() +  " begin");
        try {
            Thread.sleep(3000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println(sdf.format(new Date()) +  " thread name:" + Thread.currentThread().getName() +  " end");
    }
}, 1, 2, TimeUnit.SECONDS);

输出结果:

2023-11-17 02:00:44 准备启动
2023-11-17 02:00:45 thread name:pool-1-thread-1 begin
2023-11-17 02:00:48 thread name:pool-1-thread-1 end
2023-11-17 02:00:48 thread name:pool-1-thread-1 begin
2023-11-17 02:00:51 thread name:pool-1-thread-1 end
2023-11-17 02:00:51 thread name:pool-1-thread-1 begin
2023-11-17 02:00:54 thread name:pool-1-thread-1 end
scheduleWithFixedDelay 方法使用示例
SimpleDateFormat sdf =new SimpleDateFormat("yyyy-MM-dd hh:mm:ss");

// 创建线程数量为2的定时调度线程池
ScheduledExecutorService executor  = Executors.newScheduledThreadPool(2);
System.out.println(sdf.format(new Date()) +  " 准备启动");
// 周期性地执行任务,第一个任务延迟1s后执行,之后上一个任务执行完毕之后,延迟2秒再执行下一个任务
executor.scheduleWithFixedDelay(new Runnable() {

    @Override
    public void run() {
        System.out.println(sdf.format(new Date()) +  " thread name:" + Thread.currentThread().getName() +  " begin");
        try {
            Thread.sleep(3000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println(sdf.format(new Date()) +  " thread name:" + Thread.currentThread().getName() +  " end");

    }
}, 1, 2, TimeUnit.SECONDS);

输出结果:

2023-11-17 01:53:26 准备启动
2023-11-17 01:53:27 thread name:pool-1-thread-1 begin
2023-11-17 01:53:30 thread name:pool-1-thread-1 end
2023-11-17 01:53:32 thread name:pool-1-thread-1 begin
2023-11-17 01:53:35 thread name:pool-1-thread-1 end
2023-11-17 01:53:37 thread name:pool-1-thread-1 begin
2023-11-17 01:53:40 thread name:pool-1-thread-1 end

工厂方法小结

从以上的介绍中,我们可以对这四种线程池的参数做一个汇总,内容如下表:

工厂方法

corePoolSize

maximumPoolSize

keepAliveTime

workQueue

newSingleThreadExecutor

1

1

0

LinkedBlockingQueue

newFixedThreadPool

nThreads

nThreads

0

LinkedBlockingQueue

newCachedThreadPool

0

Integer.MAX_VALUE

60s

SynchronousQueue

newScheduledThreadPool

corePoolSize

Integer.MAX_VALUE

0

DelayedWorkQueue

这四个线程池,主要的区别在于:corePoolSize、maximumPoolSize、keepAliveTime、workQueue 这四个参数,其中线程工厂为默认类DefaultThreadFactory,线程饱和的拒绝策略为默认类AbortPolicy

小结

结合以上的分析,最后我们再来总结一下。

对于线程池的使用,不太建议采用Executors工具去创建,尽量通过ThreadPoolExecutor的构造方法来创建,原因在于:有利于规避资源耗尽的风险;同时建议开发者手动设定任务队列的上限,防止服务出现 OOM。

虽然Executors工具提供了四种创建线程池的方法,能帮助开发者省去繁琐的参数配置,但是newSingleThreadExecutornewFixedThreadPool方法创建的线程池,任务队列上限为Integer.MAX_VALUE,这意味着可以无限提交任务,这在高并发的环境下,系统可能会出现 OOM,导致整个线程池不可用;其次newCachedThreadPool方法也存在同样的问题,无限的创建线程可能会给系统带来更多的资源消耗。

其次,创建线程池的时候应该尽量给线程定义一个具体的业务名字前缀,方便定位问题,不同类型的业务尽量使用不同的线程池来实现。

例如可以使用guava包,创建自定义的线程工厂。

ThreadFactory threadFactory = new ThreadFactoryBuilder()
                        .setNameFormat(threadNamePrefix + "-%d")
                        .setDaemon(true).build();

当然,你也可以自行实现一个线程工厂,需要继承ThreadFactory接口,案例如下:

import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * 线程工厂,它设置线程名称,有利于我们定位问题。
 */
public final class NamingThreadFactory implements ThreadFactory {

    private final AtomicInteger threadNum = new AtomicInteger();
    private final ThreadFactory delegate;
    private final String name;

    /**
     * 创建一个带名字的线程池生产工厂
     */
    public NamingThreadFactory(ThreadFactory delegate, String name) {
        this.delegate = delegate;
        this.name = name;
    }

    @Override
    public Thread newThread(Runnable r) {
        Thread t = delegate.newThread(r);
        t.setName(name + "-" + threadNum.incrementAndGet());
        return t;
    }
}

创建一个线程名称以order开头的线程工厂。

NamingThreadFactory threadFactory = new NamingThreadFactory(Executors.defaultThreadFactory(), "order");

最后,再来说说关于线程池中线程数,如何合理设定的问题?

  • 对于需要消耗 CPU 资源的密集型任务,可以将线程数设置为 N(CPU 核心数)+1,比 CPU 核心数多出来的一个线程是为了防止线程偶发的缺页中断,或者其它原因导致的任务暂停而带来的影响

  • 对于需要消耗 I/O 资源的密集型任务,可以将线程数设置为 2N,原因在于:线程在处理 I/O 的时间段内不会占用 CPU 资源,这时就可以将 CPU 交出给其它线程使用,因此可以多配置一些线程数

那如何判断当前是 CPU 密集型任务还是 I/O 密集型任务呢?

最简单的方法就是:如果当前任务涉及到网络读取,文件读取等,这类都是 IO 密集型任务,除此之外,可以看成是 CPU 密集型任务。

相关文章
热点文章
精彩视频
Tags

站点地图 在线访客: 今日访问量: 昨日访问量: 总访问量: