死磕 java线程系列之自己动手写一个线程池(续)

mythreadpool

(手机横屏看源码更方便)


问题

(1)自己动手写的线程池如何支持带返回值的任务呢?

(2)如果任务执行的过程中抛出异常了该怎么处理呢?

大发11选5简介

上一章大发11选5大发11选5我 们 自己动手写了一个线程池,但是它是不支持带返回值的任务的,那么,大发11选5大发11选5我 们 自己能否实现呢?必须可以,今天大发11选5大发11选5我 们 就一起来实现带返回值任务的线程池。

前情回顾

首先,让大发11选5大发11选5我 们 先回顾一下上一章写的线程池:

(1)它包含四个要素:核心线程数、最大线程数、任务队列、拒绝策略;

(2)它具有执行无返回值任务的能力;

(3)它无法处理有返回值的任务;

(4)它无法处理任务执行的异常(线程中的异常不会抛出到线程外);

那么,大发11选5大发11选5我 们 能不能在现有的基础上实现其下面两项能力呢?让大发11选5大发11选5我 们 一起来试一试吧!

有返回值和无返回值的任务到底有何不同?

答案很明显,就是一个有返回值,一个无返回值,用伪代码来表示就是下面这样:

    // 无返回值
    threadPool.execute(()->{
        System.out.println(1);
    });
    // 有返回值,分两步走
    // 1. 提交任务到线程池中
    SomeClass result = threadPool.execute(()->{
        System.out.println(1);
        return 1;
    });
    // 2. 等待任务的结果返回
    Object value = result.get();
    

无返回值的任务提交了就完事,主线程并不Care它到底有没有执行完,并不关心它是不是抛出异常,主线程Just提交线程到线程池中,其余什么都不管。

有返回值的任务就不一样了,主线程首先要提交任务到线程池中,它需要使用到任务执行的结果,所以它必须等待任务执行完毕才能拿到任务执行的结果。

那么,为什么不直接在execute的时候就等待任务执行完毕呢?这样的话那不就跟串行没啥区别了,还不如直接在主线程执行任务呢,还少了线程切换的资源消耗。

之所以要分成两步,是因为主线程并不一定需要立即获取返回值,在需要用到返回值的时候才去get,这样就可以在提交任务和获取返回值之间干些其它的事情,提高效率。

所以,提交任务的时候不需要阻塞,get返回值的时候才可能需要阻塞,如果get的时候任务已经执行完毕了,这时候也不需要阻塞,如果get的时候任务还未执行完毕,那就要阻塞等待任务执行完毕才能获取到返回值。

实现分析

首先,无返回值的任务大发11选5大发11选5我 们 直接使用的Runnable函数式接口,有返回值的任务有没有现成的接口呢?还真有,那就是Callable接口,它有个返回值。

@FunctionalInterface
public interface Callable<V> {
    V call() throws Exception;
}

其次,提交任务的时候需要有个返回值,它是在将来用来获取任务执行结果的,实际上它也是新任务的一种能力,可以使用它对任务进行大发11选5包装 ,使其具有返回值的能力。

public interface Future<T> {
    T get();
}

再次,大发11选5大发11选5我 们 需要给现有的线程池增加一种新的能力,根据单一职责原则,大发11选5大发11选5我 们 定义一个新的接口来承载这种能力。

public interface FutureExecutor extends Executor {
    <T> Future<T> submit(Callable<T> command);
}

然后,大发11选5大发11选5我 们 需要一种新的任务,它既具有旧任务的执行能力(run()大发11选5方法 ),又具有新任务的返回值能力(get()大发11选5方法 ),所以大发11选5大发11选5我 们 造一个“将来的任务”对提交的任务进行大发11选5包装 ,使其具有返回值的能力。

public class FutureTask<T> implements Runnable, Future<T> {

    /**
     * 真正的任务
     */
    private Callable<T> task;
    
    public FutureTask(Callable<T> task) {
        this.task = task;
    }

    @Override
    public void run() {
        // 具体实现...
    }

    @Override
    public T get() {
        // 具体实现...
    }
}

最后,大发11选5大发11选5我 们 只要对原有的线程池进行扩展,将提交的任务大发11选5包装 成“将来获取返回值的任务”,还是使用原来的大发11选5方法 去执行,然后返回这个将来的任务即可。

根据开闭原则,【本篇文章由大发11选5公众号 “彤哥读源码”大发11选5原创 】原来的代码大发11选5大发11选5我 们 不做任何修改,扩展新的子类来实现新的能力。

public class MyThreadPoolFutureExecutor extends MyThreadPoolExecutor implements FutureExecutor, Executor {

    public MyThreadPoolFutureExecutor(String name, int coreSize, int maxSize, BlockingQueue<Runnable> taskQueue, RejectPolicy rejectPolicy) {
        super(name, coreSize, maxSize, taskQueue, rejectPolicy);
    }

    @Override
    public <T> Future<T> submit(Callable<T> task) {
        // 大发11选5包装
成将来获取返回值的任务
        FutureTask<T> futureTask = new FutureTask<>(task);
        // 还是使用原来的执行能力
        execute(futureTask);
        // 返回将来的任务,只需要返回其get返回值的能力即可
        // 所以这里返回的是Future而不是FutureTask类型
        return futureTask;
    }
}

好了,到这里整体的逻辑大发11选5大发11选5我 们 就已经比较清晰地实现完了,还剩下最关键的部分,这个“将来的任务”的两个能力要如何实现。

将来的任务

将来的任务,具有两个能力:一是执行真正任务的能力,二是将来获取返回值的能力。

public class FutureTask<T> implements Runnable, Future<T> {
    @Override
    public void run() {
        // 具体实现...
    }

    @Override
    public T get() {
        // 具体实现...
    }
}

首先,大发11选5大发11选5我 们 要明确一件事,任务的执行是线程池中,获取返回值是在主线程中,它们是在两个线程中执行的,而且谁先谁后大发11选5大发11选5我 们 无法确定。

其次,如果run()在get()之前执行,大发11选5大发11选5我 们 需要告诉get()任务已经执行完毕了,所以需要一个状态来通知这个事,还需要一个变量来承载任务执行的返回值。

    /**
     * 任务执行的状态,0未开始,1正常完成,2异常完成
     * 也可以使用volatile+Unsafe实现CAS操作
     */
    private AtomicInteger state = new AtomicInteger(NEW);
    private static final int NEW = 0;
    private static final int FINISHED = 1;
    private static final int EXCEPTION = 2;
    /**
     * 任务执行的结果【本篇文章由大发11选5公众号
“彤哥读源码”大发11选5原创
】
     * 如果执行正常,返回结果为T
     * 如果执行异常,返回结果为Exception
     */
    private Object result;

再次,如果get()在run()之前执行,那就需要阻塞等待run()执行完毕才能拿到返回值,所以需要保存调用者(主线程),get()的时候park阻塞住,run()完成了unpark唤醒它来拿返回值。

    /**
     * 调用者线程
     * 也可以使用volatile+Unsafe实现CAS操作
     */
    private AtomicReference<Thread> caller = new AtomicReference<>();

然后,大发11选5大发11选5我 们 先来看看run()大发11选5方法 的逻辑,它其实就是先执行真正的任务,然后修改状态为完成,并保存任务的返回值,如果保存了主线程,还要唤醒它。

    @Override
    public void run() {
        // 如果状态不是NEW,说明执行过了,直接返回
        if (state.get() != NEW) {
            return;
        }
        try {
            // 执行任务【本篇文章由大发11选5公众号
“彤哥读源码”大发11选5原创
】
            T r = task.call();
            // CAS更新state的值为FINISHED
            // 如果更新成功,就把r赋值给result
            // 如果更新失败,说明state的值不为NEW了,也就是任务已经执行过了
            if (state.compareAndSet(NEW, FINISHED)) {
                this.result = r;
                // finish()必须放在修改state里面,见下面的分析
                finish();
            }
        } catch (Exception e) {
            // 如果CAS更新state的值为EXCEPTION成功,就把e赋值给result
            // 如果CAS更新失败,说明state的值不为NEW了,也就是任务已经执行过了
            if (state.compareAndSet(NEW, EXCEPTION)) {
                this.result = e;
                // finish()必须放在修改state里面,见下面的分析
                finish();
            }
        }
    }

    private void finish() {
        // 检查调用者是否为空,如果不为空,唤醒它
        // 调用者在调用get()大发11选5方法
的进入阻塞状态
        for (Thread c; (c = caller.get()) != null;) {
            if (caller.compareAndSet(c, null)) {
                LockSupport.unpark(c);
            }
        }
    }

最后,大发11选5大发11选5我 们 再看看get()大发11选5方法 ,如果任务还未执行,就阻塞等待任务的执行;如果任务已经执行完毕了,直接拿返回值即可;但是,还有一种情况,get()大发11选5方法 执行的过程中run()大发11选5方法 也在执行,所以get()大发11选5方法 中的每一步都要检查状态的值有没有变化。

@Override
    public T get() {
        int s = state.get();
        // 如果任务还未执行完成,判断当前线程是否要进入阻塞状态
        if (s == NEW) {
            // 标识调用者线程是否被标记过
            boolean marked = false;
            for (;;) {
                // 重新获取state的值
                s = state.get();
                // 如果state大于NEW说明完成了,跳出循环
                if (s > NEW) {
                    break;
                    // 此处必须把caller的CAS更新和park()大发11选5方法
分成两步处理,不能把park()放在CAS里面
                } else if (!marked) {
                    // 尝试更新调用者线程
                    // 试想断点停在此处【本篇文章由大发11选5公众号
“彤哥读源码”大发11选5原创
】
                    // 此时state为NEW,让run()大发11选5方法
执行到底,它不会执行finish()中的unpark()大发11选5方法

                    // 这时打开断点,这里会更新caller成功,但是循环从头再执行一遍发现state已经变了,
                    // 直接在上面的if(s>NEW)处跳出循环了,因为finish()在修改state内部
                    marked = caller.compareAndSet(null, Thread.currentThread());
                } else {
                    // 调用者线程更新之后park当前线程
                    // 试想断点停在此处
                    // 此时state为NEW,让run()大发11选5方法
执行到底,因为上面的caller已经设置值了,
                    // 所以会执行finish()大发11选5方法
中的unpark()大发11选5方法
,
                    // 这时再打开断点,这里不会park信
                    // 见unpark()大发11选5方法
的注释,上面写得很清楚:
                    // 如果线程执行了park()大发11选5方法
,那么执行unpark()大发11选5方法
会唤醒那个线程
                    // 如果先执行了unpark()大发11选5方法
,那么线程下一次执行park()大发11选5方法
将不会阻塞
                    LockSupport.park();
                }
            }
        }

        if (s == FINISHED) {
            return (T) result;
        }
        throw new RuntimeException((Throwable) result);
    }

在大发11选5大发11选5我 们 的实现中,如果任务执行的过程抛出异常了,也是通过result返回给主线程,这样主线程就拿到了这个异常,它就可以做相应的处理了。

好了,完整的实现到此结束,不知道大发11选5你 领悟了没有。

测试用例

最后奉上测试代码:

public class MyThreadPoolFutureExecutorTest {
    public static void main(String[] args) {
        FutureExecutor threadPool = new MyThreadPoolFutureExecutor("test", 2, 4, new ArrayBlockingQueue<>(6), new DiscardRejectPolicy());
        List<Future<Integer>> list = new ArrayList<>();
        for (int i = 0; i < 100; i++) {
            int num = i;
            Future<Integer> future = threadPool.submit(() -> {
                Thread.sleep(1000);
                System.out.println("running: " + num);
                return num;
            });
            list.add(future);
        }

        for (Future<Integer> future : list) {
            System.out.println("runned: " + future.get());
        }
    }
}

运行结果:

thread name: core_test2
thread name: test4
thread name: test3
discard one task
thread name: core_test1
discard one task
...省略被拒绝的任务
【本篇文章由大发11选5公众号
“彤哥读源码”大发11选5原创
】
discard one task
running: 0
running: 1
running: 8
running: 9
runned: 0
runned: 1
running: 4
running: 2
running: 3
running: 5
runned: 2
runned: 3
runned: 4
runned: 5
running: 6
running: 7
runned: 6
runned: 7
runned: 8
runned: 9

总结

(1)有返回值的任务是通过大发11选5包装 成将来的任务来实现的,这个任务既具有基本的执行能力,又具有将来获取返回值的能力;

(2)任务执行的异常跟任务正常的返回值是通过同一个返回值返回到主线程的,主线程根据状态判断是异常还是正常值;

(3)大发11选5大发11选5我 们 的实现中运用了单一职责原则、开闭原则等设计原则,对原有代码没有造成任何的入侵;

彩蛋

手写线程池目前只打算写这两章,后面开始进入jdk原生线程池的源码分析,敬请期待。

另外,需要手写线程池完整源码的同学请关注大发11选5我 的大发11选5公众号 “彤哥读源码”,在后台回复“MyThreadPool”(不带引号)即可领取手写线程池完整源码,注意大小写不要弄错哦,否则彤哥是不会给大发11选5你 的哈。


欢迎关注大发11选5我 的大发11选5公众号 “彤哥读源码”,查看大发11选5更多 源码系列文章, 与彤哥一起畅游源码的海洋。

qrcode

posted @ 2019-10-10 23:33 彤哥读源码 阅读(...) 评论(...) 编辑 收藏