亿华智慧云亿华智慧云

如何手写一个线程池?

手写一个异步工具类

我是何手小识,新来了一个公司。写个线程这个公司呢,何手有个特点,写个线程就是何手很鼓励大家封装各种实用的工具类,提高开发效率。写个线程

于是何手我就到处看项目的源码,看看有没有什么能改进的写个线程?果然让我发现了。项目中到处充斥着 new Thread 类来异步执行代码的何手逻辑。

new Thread(r).start();

我们可以封装一个异步工具类啊!写个线程

第一版

说干就干,何手把上面的写个线程代码简单封装一下,一个简单的何手异步工具类就封装好了。

public interface Executor {

void execute(Runnable r);

}public class AsyncExecutorV1 implements Executor {

@Override

public void execute(Runnable r) {

new Thread(r).start();

}

}

于是写个线程开开心心的提交了 merge request。

第二版

正当我满怀期待工具类代码能被合并的何手时候,没想代码被组长杰哥打回来了。

「杰哥」:有心封装工具类值得鼓励,不过还可以改进一下。

「小识」:还能再改进?没感觉我这个工具类还有改进的余地啊!

「杰哥」:假如说有10000个异步任务,你这创建10000个线程,b2b供应网资源耗费太严重了!

「小识」:这样啊,那我加个队列,任务都放到队列中,用一个线程从队列中取任务执行。

public class AsyncExecutorV2 implements Executor {

private BlockingQueueworkQueue;

public AsyncExecutorV2(BlockingQueueworkQueue) {

this.workQueue = workQueue;

WorkThread workThread = new WorkThread();

workThread.start();

}

@SneakyThrows

@Override

public void execute(Runnable r) {

workQueue.add(r);

}

class WorkThread extends Thread {

@Override

public void run() {

while (true) {

Runnable task = null;

try {

task = workQueue.take();

} catch (InterruptedException e) {

e.printStackTrace();

}

task.run();

}

}

}

}第三版

「小识」:杰哥,快帮我看看,还有啥改进的没?

「杰哥」:小伙子不错啊,居然能想到用队列来缓冲任务,不愧是我招进来的人!但是用一个异步线程执行任务,你确定这个工具类比同步执行的效率快?

「小识」:哈哈,又一个工具类翻车的案例,应该多开几个异步线程来执行任务,但是应该开多少呢?

「杰哥」:谁最清楚异步工具类应该用多少个线程来执行呢?

「小识」:使用工具类的人。

「杰哥」:这不对了,你可以定义一个线程数量参数,让用户来决定开多少线程。「另外你这个工具类还个问题,队列满了会直接抛出异常!」

「小识」:那我增加一个拒绝策略类(RejectedExecutionHandler),当线程池满了让用户决定执行策略,比如直接抛异常,用当前线程同步执行任务。

public class AsyncExecutorV3 implements Executor {

private BlockingQueueworkQueue;

private ListworkThreadList = new ArrayList<>();

private RejectedExecutionHandler handler;

public AsyncExecutorV3(int corePoolSize,

BlockingQueueworkQueue,

RejectedExecutionHandler handler) {

this.workQueue = workQueue;

this.handler = handler;

for (int i = 0; i < corePoolSize; i++) {

WorkThread workThread = new WorkThread();

workThread.start();

workThreadList.add(workThread);

}

}

@SneakyThrows

@Override

public void execute(Runnable r) {

if (!workQueue.offer(r)) {

// 队列满了,执行拒绝策略

handler.rejectedExecution(r);

}

}

class WorkThread extends Thread {

@Override

public void run() {

while (true) {

Runnable task = null;

try {

task = workQueue.take();

} catch (InterruptedException e) {

e.printStackTrace();

}

task.run();

}

}

}

}// 拒绝策略类

public interface RejectedExecutionHandler {

void rejectedExecution(Runnable r);

}// 当线程池满了之后直接抛出异常

public class AbortPolicy implements RejectedExecutionHandler {

@Override

public void rejectedExecution(Runnable r) {

throw new RuntimeException("queue is full");

}

}// 当线程池满了之后直接抛出异常

public class AbortPolicy implements RejectedExecutionHandler {

@Override

public void rejectedExecution(Runnable r) {

throw new RuntimeException("queue is full");

}

}

// 当线程池满了之后,用提交任务的服务器租用线程同步执行任务

public class CallerRunsPolicy implements RejectedExecutionHandler {

@Override

public void rejectedExecution(Runnable r) {

r.run();

}

}

再次提交 merge request,终于被合并了,别的团队都开始使用我的工具类了!

过了几天小亮急匆匆找到我。

「小亮」:小识,你的工具类挺好用的。但是我最近遇到了一个问题,我用了CountDownLatch批量执行任务,但是我这个任务好像卡住了,我用jstack想看看线程的执行情况,快告诉我你异步线程的名字设置的是啥?

「小识」:哎呀,我们没设置线程的名字,应该用的是默认的线程名字 Thread-n。

「小亮」:你可得给工具类加个线程名字的参数啊,不然一个一个看线程的状态太累了,而且效率也不高。

「小识」:我这就加。

第四版

赶紧加了一个线程名字的香港云服务器参数,然后再次提交代码。

「杰哥」:哎呀,没想到我也疏忽了,没发现这个问题,确实应该加个线程名字的参数,代码的可扩展性太重要了,改来改去可不行。

「小识」:是啊!

「杰哥」:你觉得你只加一个线程名字参数,可扩展性高吗?如果有的团队想修改异步线程的优先级,你再加个优先级参数?

「小识」:感觉不太行,那让用户把线程传给我吧!

「杰哥」:哈哈,可以,你还可以用工厂模式优化一下,用户传入线程工厂类,工具类用工厂类创建线程。

「小识」:不愧是杰哥,这样一来代码更清爽了!

public class AsyncExecutorV4 implements Executor {

private BlockingQueueworkQueue;

private ListworkThreadList = new ArrayList<>();

private RejectedExecutionHandler handler;

public AsyncExecutorV4(int corePoolSize,

BlockingQueueworkQueue,

RejectedExecutionHandler handler,

ThreadFactory threadFactory) {

this.workQueue = workQueue;

this.handler = handler;

for (int i = 0; i < corePoolSize; i++) {

// 用工厂类创建线程

WorkThread workThread = threadFactory.newThread();

workThread.start();

workThreadList.add(workThread);

}

}

@SneakyThrows

@Override

public void execute(Runnable r) {

if (!workQueue.offer(r)) {

handler.rejectedExecution(r);

}

}

// 异步线程

public class WorkThread extends Thread {

@Override

public void run() {

while (true) {

Runnable task = null;

try {

task = workQueue.take();

} catch (InterruptedException e) {

e.printStackTrace();

}

task.run();

}

}

}

// 异步线程工厂类

public interface ThreadFactory {

WorkThread newThread();

}

}

代码提交之后,小亮给线程起了一个名字,async-thread,现在他通过名字很快就能知道线程池中的线程在干嘛了!

大家不断的进行改进

随着这个异步工具类在公司内部使用的越来越多,大家也提交了很多改进的代码。

按需创建线程,不要一开始就创建「corePoolSize」个线程,而是在调用者提交任务的过程中逐渐创建出来,最后创建了「corePoolSize」个就不再创建了。提高工具的弹性,当任务突增时,队列会被放满,然后多余的任务有可能会被直接扔掉。当然我们可以把「corePoolSize」设的很大,但是这样并不优雅,因为大部分情况下是用不到这么多线程的。当任务突增时,我们可以适当增加线程,提高执行速度,当然创建的总线程数还是要限制一下的,我们把能创建的总数定为「maximumPoolSize」。及时关闭不需要的线程,当任务突增时,线程数可能增加「maximumPoolSize」,但是大多数时间「corePoolSize」个线程就足够用了,因此可以定义一个超时时间,当一个线程在「keepAliveTime」时间内没有执行任务,就把它给关掉。

异步工具类执行流程图

经过大家的不断改进之后,构造函数中的参数也越来越多了,杰哥让我写个文档吧,把这个异步工具类的构造函数和执行流程总结一下,不然新来的小伙伴看到这个工具类一脸懵可不行!

这个工具类的构造函数目前有如下7个参数

public ThreadPoolExecutor(int corePoolSize,

int maximumPoolSize,

long keepAliveTime,

TimeUnit unit,

BlockingQueueworkQueue,

ThreadFactory threadFactory,

RejectedExecutionHandler handler)

参数

含义

corePoolSize

核心线程数

maximumPoolSize

最大线程数

keepAliveTime

非核心线程的空闲时间

TimeUnit

空闲时间的单位

BlockingQueue<Runnable>

任务队列

ThreadFactory

线程工厂

RejectedExecutionHandler

拒绝策略

「执行流程图如下」:

对了,最后大家给这个异步工具类起了一个牛的名字,「线程池」。

赞(59)
未经允许不得转载:>亿华智慧云 » 如何手写一个线程池?