跳到主要内容

手写一个线程池

阅读需 7 分钟

1.准备一个测试函数

	public static void main(String[] args) {
MyThreadPool myThreadPool = new MyThreadPool();
for (int i = 0; i < 5; i++) {
myThreadPool.execute(() -> {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
System.out.println(Thread.currentThread().getName());
});
}

System.out.println("主线程未被阻塞...");
}

MyThreadPool是我们的自定义线程池,其中包含一个**execute()**方法,用来执行我们的方法。

2.编写MyThreadPool

	void execute(Runnable command) {
}

什么时候创建线程? 线程是否可以复用? command该怎么保存

1. 使用ArrayList保存任务

当只有一个线程执行任务时,使用List保存runnable。 将接收到的runnable存入一个集合内 需要执行时从集合中取出

List<Runnable> commandList = new ArrayList<>();

Thread thread = new Thread(() -> {
while (true) {
if (!commandList.isEmpty()) {
Runnable command = commandList.remove(0);
command.run();
}
}
});
void execute(Runnable command) {
commandList.add(command);
}

使用list时 当list中没有元素时 while (true) 会浪费cpu资源, 需要继续优化

2. 替换为使用阻塞队列保存任务

BlockingQueue<Runnable> commandList = new ArrayBlockingQueue<>(1024);
Thread thread = new Thread(() -> {
while (true) {
try {
Runnable take = commandList.take();
take.run();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}, "唯一线程");

{
thread.start();
}

void execute(Runnable command) {
// 在往BlockingQueue中添加元素时不要使用add, 而是换成offer,offer有一个返回值
// 返回是否添加成功的结果
boolean offer = commandList.offer(command);
}

当队列中没有任务时,**take()**函数会阻塞,而不会浪费CPU资源.

执行main函数,得到如下结果

主线程未被阻塞...
唯一线程
唯一线程
唯一线程
唯一线程
唯一线程

3. 增加线程Thread的数量

我们的线程池应该有多少个线程?

首先将command提取出来,作为一个task

	public final Runnable task = () -> {
while (true) {
try {
Runnable take = commandList.take();
take.run();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
};

接下来定义一个变量 保存线程的数量,使用List保存线程

	private int corePoolSize = 10;
List<Thread> threadList = new ArrayList<>();

// 判断threadList中元素的数量,如果没有达到corePoolSize,则继续创建线程
void execute(Runnable command) {
if (threadList.size() < corePoolSize) {
Thread thread = new Thread(task);
threadList.add(thread);
thread.start();
}
boolean offer = commandList.offer(command);
}

4.增加辅助线程处理任务

如果offer返回的是false, 说明threadList的容量已经满了,需要创建一个新的List来执行其他的任务

private int corePoolSize = 10;
private int maxSize = 16;
List<Thread> coreList = new ArrayList<>();
List<Thread> supportList = new ArrayList<>();

void execute(Runnable command) {
if (coreList.size() < corePoolSize) {
Thread thread = new Thread(task);
coreList.add(thread);
thread.start();
}
boolean offer = commandList.offer(command);
if (!offer) {
Thread thread = new Thread(task);
supportList.add(thread);

thread.start();
}
}

修改一下代码,当核心线程数量加上辅助线程数量小于最大线程数量时,才允许添加到辅助线程List.

上面的代码有线程安全问题,辅助线程在创建后并不是执行之前未放到阻塞队列中的任务

并且即使创建了辅助线程 此时的阻塞队列依然可能是满的

所以在创建完辅助线程后,需要重新将command再放到辅助线程中

如果此时还是失败,说明阻塞队列满了

void execute(Runnable command) {
if (coreList.size() < corePoolSize) {
Thread thread = new Thread(task);

coreList.add(thread);
thread.start();
}
if (commandList.offer(command)) {
return;
}
// 当核心线程数量加上辅助线程数量小于最大线程数量时,可以往创建辅助线程
if (coreList.size() + supportList.size() < maxSize) {
Thread thread = new Thread(task);
supportList.add(thread);

thread.start();
}
if (!commandList.offer(command)) {
throw new RuntimeException("阻塞队列满了");
}
}

5.辅助线程的结束时机

在线程池不忙碌的时候,应该将辅助线程释放,节省空间.

使用BlockingQueuepoll 函数,传入一个时间和一个时间单位.

规定一个超时时间,如果在这个时间内仍然没有拿到任务,说明线程池不忙碌

BlockingQueue<Runnable> commandList = new ArrayBlockingQueue<>(1024);
public final Runnable coreTask = () -> {
while (true) {
try {
Runnable take = commandList.take();
take.run();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
};

public final Runnable supportTask = () -> {
while (true) {
try {
Runnable take = commandList.poll(1, TimeUnit.SECONDS);
if (take == null) {
break;
}
take.run();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
System.out.println("辅助线程" + Thread.currentThread().getName() + "结束了");
};

void execute(Runnable command) {
if (coreList.size() < corePoolSize) {
Thread thread = new Thread(coreTask);

coreList.add(thread);
thread.start();
}
if (commandList.offer(command)) {
return;
}
if (coreList.size() + supportList.size() < maxSize) {
Thread thread = new Thread(supportTask);
supportList.add(thread);

thread.start();
}

if (!commandList.offer(command)) {
throw new RuntimeException("阻塞队列满了");
}
}

此时coreList执行的是coreTask, supportList执行的是supportTask

6.优化参数

将MyTheadPool中需要的参数整理一下,生成构造方法,由外部传入, 同时优化任务.

BlockingQueue<Runnable> commandList = new ArrayBlockingQueue<>(1024);

// 定义一个变量 保存线程的数量
private final int corePoolSize;
private final int maxSize;
private final int timeout;
private final TimeUnit timeUnit;


List<Thread> coreList = new ArrayList<>();
List<Thread> supportList = new ArrayList<>();

public MyThreadPool(int corePoolSize, int maxSize, int timeout, TimeUnit timeUnit) {
this.corePoolSize = corePoolSize;
this.maxSize = maxSize;
this.timeout = timeout;
this.timeUnit = timeUnit;
}

void execute(Runnable command) {
if (coreList.size() < corePoolSize) {
Thread thread = new CoreThread();

coreList.add(thread);
thread.start();
}
if (commandList.offer(command)) {
return;
}
// 当核心线程数量加上辅助线程数量小于最大线程数量时,可以往创建辅助线程
if (coreList.size() + supportList.size() < maxSize) {
Thread thread = new SupportThread();
supportList.add(thread);

thread.start();
}

if (!commandList.offer(command)) {
throw new RuntimeException("阻塞队列满了");
}
}

class CoreThread extends Thread {
@Override
public void run() {
while (true) {
try {
Runnable take = commandList.take();
take.run();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}
}

class SupportThread extends Thread {
@Override
public void run() {
while (true) {
try {
Runnable take = commandList.poll(timeout, timeUnit);
if (take == null) {
break;
}
take.run();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
System.out.println("辅助线程" + Thread.currentThread().getName() + "结束了");
}
}

同时可以把阻塞队列也交给调用方,动态指定队列的大小

	private final int corePoolSize;
private final int maxSize;
private final int timeout;
private final TimeUnit timeUnit;
public final BlockingQueue<Runnable> commandList;


public MyThreadPool(int corePoolSize, int maxSize, int timeout, TimeUnit timeUnit, BlockingQueue<Runnable> commandList) {
this.corePoolSize = corePoolSize;
this.maxSize = maxSize;
this.timeout = timeout;
this.timeUnit = timeUnit;
this.commandList = commandList;
}

此时的main方法修改为下面的形式

public static void main(String[] args) {

MyThreadPool myThreadPool = new MyThreadPool(2, 4, 1, TimeUnit.SECONDS, new ArrayBlockingQueue<>(2));
for (int i = 0; i < 6; i++) {
myThreadPool.execute(() -> {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
System.out.println(Thread.currentThread().getName());
});
}

System.out.println("主线程未被阻塞...");
}

此时执行main方法的结果如下

Exception in thread "main" java.lang.RuntimeException: 阻塞队列满了
at com.carlos.maintenance.insight.MyThreadPool.execute(MyThreadPool.java:86)
at com.carlos.maintenance.insight.Main.main(Main.java:12)
Thread-0
Thread-2
Thread-1
Thread-3
Thread-0
辅助线程Thread-2结束了
辅助线程Thread-3结束了

6个任务,只执行了5个,阻塞队列满了丢弃了一个任务,当阻塞队列满了的时候只是抛出了一个异常,

需要优化一下这一部分,作为一个拒绝策略

7.拒绝策略

新建一个接口,处理失败的任务

public interface RejectHandle {
// 失败的任务 和 线程池本身
void reject(Runnable rejectCommand, MyThreadPool threadPool);
}

将拒绝策略传入到线程池中

	private final int corePoolSize;
private final int maxSize;
private final int timeout;
private final TimeUnit timeUnit;
public final BlockingQueue<Runnable> commandList;
private final RejectHandle rejectHandle;

public MyThreadPool(int corePoolSize, int maxSize, int timeout, TimeUnit timeUnit, BlockingQueue<Runnable> commandList, RejectHandle rejectHandle) {
this.corePoolSize = corePoolSize;
this.maxSize = maxSize;
this.timeout = timeout;
this.timeUnit = timeUnit;
this.commandList = commandList;
this.rejectHandle = rejectHandle;
}

if (!commandList.offer(command)) {
rejectHandle.reject(command, this);
}

假如在触发拒绝策略时想抛出一个异常

public class ThrowRejectHandle implements RejectHandle{

@Override
public void reject(Runnable rejectCommand, MyThreadPool threadPool) {
System.out.println("阻塞队列满了");
}
}

此时的main方法为:

MyThreadPool myThreadPool = new MyThreadPool(2, 4, 1, TimeUnit.SECONDS, new ArrayBlockingQueue<>(2), new ThrowRejectHandle());

如果想在触发拒绝策略时拿出一个任务扔掉,放进这个失败的任务

public class DiscardRejectHandle implements RejectHandle{

@Override
public void reject(Runnable rejectCommand, MyThreadPool threadPool) {
threadPool.commandList.poll();
threadPool.execute(rejectCommand);
}
}

main方法为

MyThreadPool myThreadPool = new MyThreadPool(2, 4, 1, TimeUnit.SECONDS, new ArrayBlockingQueue<>(2), new DiscardRejectHandle());

此时的执行结果为:

主线程未被阻塞...
Thread-1
Thread-0
Thread-3
Thread-2
Thread-1
辅助线程Thread-2结束了
辅助线程Thread-3结束了

此时说明6个任务被扔掉了一个.

Loading Comments...