摘要

在学习pulsar的时候,发现自己对多线程的知识了解的还是太少了。

正文

参考文章

一、Volatile

volatile的作用

  1. 保证变量的内存可见性
  2. 禁止指令重排序

1.1 保证变量的内存可见性

执行下面的程序,会发现控制台永远都不会输出 “主线程访问到 flag 变量” 这句话。我们可以看到,子线程执行1秒后已经将 flag 设置成 true,但主线程执行时没有读到 flag 的最新值,导致控制台没有输出上面的句子。

java
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
/**
 * 变量的内存可见性例子
 *
 * @author star
 */
public class VolatileExample {

    /**
     * main 方法作为一个主线程
     */
    public static void main(String[] args) {
        MyThread myThread = new MyThread();
        // 开启线程
        myThread.start();

        // 主线程执行
        for (; ; ) {
            if (myThread.isFlag()) {
                System.out.println("main线程读取到更新后的数据!");
            }
        }
    }

}

/**
 * 子线程类
 */
class MyThread extends Thread {

    private boolean flag = false;

    @Override
    public void run() {
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        // 修改变量值
        flag = true;
        System.out.println("flag = " + flag);
    }

    public boolean isFlag() {
        return flag;
    }

    public void setFlag(boolean flag) {
        this.flag = flag;
    }
}

1.png

但是,在将flag变量改成如下时

java
1
private volatile boolean flag = false;

输出结果如下

2.png

flag是一个在main线程与myThread线程间共享的变量,在myThread线程间更新了flag的内容值,并不会在main中立即获取这个值。

此时为了确保每个线程都能读取到更新后的变量值,此时就需要使用volatile。

至于为啥会出现这种问题?这涉及到Java的内存模型。

3.png

在Java虚拟机中,变量的值,保存在主内存中。

当线程访问变量时,它会先获取一个副本,并保存在自己的工作内存中。

如果线程修改了变量的值,虚拟机会在某个时刻把修改后的值回写到主内存。但是这个时间是不固定的。尤其是在x86的架构下,JVM回写主内存的速度非常快,但是,换成ARM的架构,就会有显著的延迟。

这会导致如果一个线程更新了某个变量,另一个线程读取的值可能还是更新前的。

就比如myThread的flag=true,在main线程中,仍然读取到的flag是false。myThread在此刻只是将变量flag的副本变成了true,主内存中flag还是false。

此时,volatile的作用

  • 每次修改访问变量时,总是获取到主内存的最新值
  • 每次修改变量后,立即回写到主内存。

1.2 禁止指令重排序

有序性指的是程序执行的顺序按照代码的先后顺序执行。

java
1
2
3
4
int i=0;//语句1
boolean flag=false;//语句2
i=1;//语句3
flag=true;//语句4

但是,在上面这个例子中,并不会顺序执行,jvm会在真正执行该段代码时进行优化,会发生指令的重排序。

4.png

并不是所有的重排序,都会与原先的顺序不一致。比如下面这个

java
1
2
int a=10;//语句1
a=a+3;//语句2

因为jvm在进行重排序时,会考虑到数据之间的依赖性,如果语句2必须用到语句1的结果,那么处理器一定保证语句1在语句2前执行。

类似于这种的数据依赖性,在单线程环境下一点问题都没有,因为总能保证数据的正确,但是在多线程环境下,就会发生错误。

这个地方参考了很多文章,以及技术书《深入Java虚拟机》,但是看过了之后,还是不太懂,感觉给的多线程的例子都有点问题。所以这边暂时不进行补充了

二、Future

2.1 Future常用方法

一个Future接口表示一个未来可能会返回的结果,常用方法由

  • get():获取结果(任务耗时就会进行等待状态,类似于同步)
  • get(long timeout, TimeUnit unit):获取结果,等待指定时间
  • cancel(boolean flag):取消当前任务。flag为true表示执行任务的线程应该被立马中断;false表示等待任务执行完毕再取消。
  • isDone():判断任务是否完成。

下面是通过Future实现一个异步任务,并且在异步任务完成之后,自动回调任务。

java
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
public class ThreadDemo {
    private static final DateTimeFormatter dtf=DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");

    static class CallableImpl implements Callable<String> {

        @Override
        public String call() throws Exception {
            try {
                Thread.sleep(10000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("Callable运行结束:"+dtf.format(LocalDateTime.now()));
            return "Callable运行结束";
        }
    }

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        ExecutorService executorService = Executors.newFixedThreadPool(1);
        System.out.println("线程执行开始:"+dtf.format(LocalDateTime.now()));
        Future<String> submit = executorService.submit(new CallableImpl());
        //使用get,相当于是个同步操作。会等待线程任务执行完毕,才会进行下面任务的执行
        //String s = submit.get();
        //System.out.println(submit.isDone());
        new Thread(new Runnable() {
            @Override
            public void run() {
                while(!submit.isDone()){

                }
                System.out.println("执行回调任务"+dtf.format(LocalDateTime.now()));
            }
        }).start();
        System.out.println("校验异步耗时:"+dtf.format(LocalDateTime.now()));


        //立即关闭
        //executorService.shutdownNow();

        //等待线程任务执行完后,关闭
        executorService.shutdown();

    }
}

2.2 CompletableFuture常用方法

案例

使用Future获得异步执行结果时,要么调用阻塞方法get(),要么轮询看isDone()是否为true,这两种方法都不是很好,因为主线程也会被迫等待。

从Java8开始引入了CompletableFuture,它可以传入回调对象,当异步任务完成或者发生异常时,自动调用回调对象的回调方法。

CompletableFuture常用方法

  • supplyAsync:表示创建带返回值的异步任务,相当于ExecutorService submit(Callable task)方法
  • runAsync:runAsync表示创建无返回值的异步任务,相当于ExecutorService submit(Runnable task)方法
  • thenApply(T,U):表示某个任务执行完成后执行的动作,即回调方法。接收任务完成后传过来的参数T,并返回类型U
  • thenApplyAsync(T,U):与thenApply的不同之处在于,thenApplyAsync在执行任务A任务A结束之后的回调,这两个线程可能会不一样;但是thenApply执行任务和任务结束后的回调是同一个线程。
  • thenAccept:跟thenApply作用类似,有入参,但是没有返回值。
  • thenRun:参数是个Runnable接口
  • exceptionally:如果任务执行过程中出现异常,就会回调该方法
  • anyOf:任意个CompletableFuture只要有一个执行完成,该anyOf就完成。
  • allOf:所有的CompletableFuture全部完成,该allOf才完成。
  • whenComplete/handle:可自定义回调执行的用法

通过CompletableFuture完成上面使用Future完成的案例。

java
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
public class ThreadDemo {
    private static final DateTimeFormatter dtf = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");

    static class RunnableImpl implements Runnable {

        @Override
        public void run() {
            try {
                Thread.sleep(10000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("Runnable运行结束!");
        }
    }

    static class CallableImpl implements Callable<String> {

        @Override
        public String call() throws Exception {
            try {
                Thread.sleep(10000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("Callable运行结束:" + dtf.format(LocalDateTime.now()));
            return "Callable运行结束";
        }
    }

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(new Supplier<String>() {
            @Override
            public String get() {
                System.out.println("执行开始" + dtf.format(LocalDateTime.now()));
                try {
                    Thread.sleep(10000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                return "执行结束" + dtf.format(LocalDateTime.now());
            }
        });
        completableFuture.thenAccept(new Consumer<String>() {
            @Override
            public void accept(String s) {
                System.out.println(s);
                System.out.println("success" + dtf.format(LocalDateTime.now()));
            }
        });
        completableFuture.exceptionally(new Function<Throwable, String>() {
            @Override
            public String apply(Throwable throwable) {
                System.out.println("failure");
                return null;
            }
        });

        System.out.println("校验异步阻塞!");
        // 主线程不要立刻结束,否则CompletableFuture默认使用的线程池会立刻关闭:
        Thread.sleep(30000);
    }
}

5.png

6.png

通过运行结果的两张图,可以知道,这就是一个异步的操作,并且主线程不会进入阻塞状态,通过打印校验异步就可以知道并没有影响到主线程继续向下执行。同时,在任务执行完毕之后,还会自动执行回调方法。

CompletableFuture优点

  1. 异步任务结束时,会自动回调执行成功时的逻辑
  2. 异步任务异常时,会自动回调执行异常时的逻辑

串行执行

如果只是实现了异步回调,其实跟Future区别也不是特别大,只是用起来方便一点。CompletableFuture更强大的功能在于,多个CompletableFuture可以串行执行。

串行执行:一个个的按顺序执行,执行完第一个,才会执行第二个。跟面向过程的含义一样。

并行执行:两个人吃两个馒头,效率高。多核cpu

并发执行:一个人吃俩馒头,这个吃一口,那个吃一口,效率低。单核cpu

举一个使用场景,就拿之前写今日校园签到脚本的例子来说

  1. 获取今日最新任务
  2. 通过任务提取任务id
  3. 通过任务id获取任务详细内容
java
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
public class ThreadDemo {
    private static final DateTimeFormatter dtf = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");

    public static void main(String[] args) {
        CompletableFuture<Void> future = CompletableFuture.supplyAsync(new Supplier<String>() {
            @Override
            public String get() {
                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("获取最新任务的编号" + dtf.format(LocalDateTime.now()));
                return "520";
            }
        }).thenApply(new Function<String, String>() {
            @Override
            public String apply(String s) {
                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("通过" + s + "获取详细任务的编号" + dtf.format(LocalDateTime.now()));
                return "1314";
            }
        }).thenAccept(new Consumer<String>() {
            @Override
            public void accept(String s) {
                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("通过" + s + "完成任务!" + dtf.format(LocalDateTime.now()));
            }
        });
        System.out.println("是否执行完成"+future.isDone()+" 检验主线程是否被阻塞"+dtf.format(LocalDateTime.now()));
        try {
            Thread.sleep(7000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

7.png

通过运行结果可以发现,既没有影响异步操作,同时还保证了串行操作。

并行执行

场景A:菜鸟攒钱买金装和风云装,只要攒够一套装备,菜鸟就出去砍人!

java
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
public class ThreadDemo {
    private static DateTimeFormatter dtf = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");

    public static void main(String[] args) {
        CompletableFuture<String> yaFei = CompletableFuture.supplyAsync(new Supplier<String>() {
            @Override
            public String get() {
                try {
                    Thread.sleep(5000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                return "菜鸟买到金装"+dtf.format(LocalDateTime.now());
            }
        });
        CompletableFuture<String> meiDuSha = CompletableFuture.supplyAsync(new Supplier<String>() {
            @Override
            public String get() {
                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                return "菜鸟买到风云装"+dtf.format(LocalDateTime.now());
            }
        });

        CompletableFuture<Object> anyOf = CompletableFuture.anyOf(yaFei, meiDuSha);
        anyOf.thenAccept(new Consumer<Object>() {
            @Override
            public void accept(Object o) {
                System.out.println(o);
                System.out.println("有装备了,菜鸟出去砍人"+dtf.format(LocalDateTime.now()));
            }
        });
        System.out.println("检验是否阻塞主线程"+dtf.format(LocalDateTime.now()));
        try {
            Thread.sleep(6000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

8.png

场景B:菜鸟攒钱买装备和符文,两套全齐了之后,菜鸟才出去砍人!

java
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
public class ThreadDemo {
    private static DateTimeFormatter dtf = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");

    public static void main(String[] args) {
        CompletableFuture<Void> zhuangbei = CompletableFuture.runAsync(new Runnable() {
            @Override
            public void run() {
                try {
                    Thread.sleep(5000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("菜鸟买到装备" + dtf.format(LocalDateTime.now()));
            }
        });
        CompletableFuture<Void> fuwen = CompletableFuture.runAsync(new Runnable() {
            @Override
            public void run() {
                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("菜鸟买到符文" + dtf.format(LocalDateTime.now()));
            }
        });


        CompletableFuture<Void> allOf = CompletableFuture.allOf(zhuangbei, fuwen);
        allOf.thenAccept(new Consumer<Void>() {
            @Override
            public void accept(Void aVoid) {
                System.out.println("菜鸟出去砍人"+dtf.format(LocalDateTime.now()));
            }
        });
        System.out.println("检验是否阻塞主线程"+dtf.format(LocalDateTime.now()));
        try {
            Thread.sleep(6000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

9.png

三、线程池

3.1 ThreadPoolExecutor

原理

ThreadPoolExecutor是java.util.convurrent提供的,以内部线程池的形式对外提供管理任务执行、线程调度、线程池管理等服务。

以jdk8为例,Executors提供的线程服务,有4个是通过ThreadPoolExecutor参数设置。这个的说明在下一节

10.png

主要参数讲解

  • corePoolSize:核心线程个数,也就是线程池中,至少会运行corePoolSize个线程。但是在刚创建线程池的时候,线程并不会立即启动,而是在等到有任务提交时才会启动。
  • maxmumPoolSize:最大线程个数
  • keepAliveTime:非核心线程的最大空闲时间,空闲时间超过这个设定值,线程就关闭掉了。
  • allowCoreThreadTimeOut:默认值是false。如果改为true,则表示keepAliveTime也同样适用于核心线程,超时就会关闭,知道线程池中线程数为0
  • timeUnit:keepAliveTime的时间单位
  • workQueue:阻塞任务队列,存储等待执行线程的工作队列。BlockingQueue下面有实现类,比如说LinkedBlockingQueue,在创建任务队列时,可以给队列指定长度。
  • threadFactory:线程工厂,默认的工厂是DefaultThreadFactory,如果我们想实现线程池的重命名的话,可以通过自己实现一个工厂。
  • rejectedExecutionHandler:拒绝策略。当提交的任务数,超过了maxmumPoolSize+workQueue之和时,多余的任务会交给拒绝策略来处理。默认是AbortPolicy策略:丢弃任务,抛运行时异常。

11.png

整个的执行流程,如下图

13.png

poolSize、corePoolSize、maxmumPoolSize、queueCapacity之间的关系

poolSize表示线程池中非空闲的线程个数。

queueCapacity表示队列的长度

  1. poolSize<corePoolSize时,无论是否有空闲线程,都会新建一个线程来处理新提交的任务
  2. poolSize=corePoolSize时,又来了新的任务,且新的任务数量没有超过queueCapacity时,任务就暂时放到队列里,等待执行(前面的任务结束之后,线程空闲出来后再执行)
  3. poolSize=corePoolSize时,又来了新的任务,且新的任务数量超过了queueCapacity时
    • poolSize<maxmumPoolSize:新创建线程来处理任务,任务执行完毕后线程就被销毁了。
    • poolSize=maxmumPoolSize:线程池的处理能力已经达到了极限,此时根据配置的拒绝策略来处理新增任务。

12.png

任务队列

jdk8中,ThreadPoolExecutor可选用队列

  • LinkedBlockingQueue:常用,一个由链表组成的有界阻塞队列。先进先出模式,默认队列长度是Integer的最大值。
  • SynchronousQueue:常用,一个不存储元素的阻塞队列。配对通信机制,默认是非公平模式,也就是采用栈操作,后进先出;公平模式就是采用队列操作,先进先出。A线程put任务A进入队列之后,除非有一个B线程来take,否则A线程一直处于阻塞状态。如果A线程put任务A进入队列之后,B线程put任务B进入队列,默认情况下,C线程来take,会获取任务B,之后,D线程再来take,会获取任务A。
  • ArrayBlockingQueue:一个由数组组成的有界阻塞队列。
  • PriorityBlockingQueue:一个支持优先级排序的无界阻塞队列。
  • DelayQueue:一个使用优先级队列实现的无界阻塞队列。
  • LinkedTransferQueue:一个由链表结构组成的无界阻塞队列。
  • LinkedBlockingDeque:一个由链表结构组成的双向阻塞队列。

拒绝策略

jdk8中,ThreadPoolExecutor提供了4中拒绝策略

  • AbortPolicy:默认拒绝策略。丢弃任务并抛出RejectedExecutionException异常
  • CallerRunsPolicy:任务被拒绝添加后,会由调用execute方法的线程来执行被拒绝的任务。除非executor被关闭,否则任务不会被丢弃。简单理解谁调用谁执行
  • DiscardPolicy:直接丢弃任务,不抛异常
  • DiscardOldestPolicy:如果队列满了,新任务来了,会丢弃队列中最前面条任务,将新任务追加到队列。比如现在有1 2 3任务,来了4,丢弃1后变成2 3 4

案例:设置线程池,1个核心、LinkedBlockingQueue队列设置容量为2、2个最大线程,按顺序一次性添加七个任务。通过选择不同的拒绝策略,查看任务执行。

14.png

java
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
public class ThreadDemo {
    private static DateTimeFormatter dtf = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss:SSS");

    public ThreadPoolExecutor getExecutor() {
        return new ThreadPoolExecutor(
                1,//核心
                2,//最大
                5,//keepAlive
                TimeUnit.SECONDS,//keepAlive的单位
                new LinkedBlockingDeque<>(2),//容量为2的阻塞队列
                new ThreadPoolExecutor.DiscardOldestPolicy()//拒绝策略
        );
    }

    public static void main(String[] args) {
        ThreadPoolExecutor executor = new ThreadDemo().getExecutor();
        new Thread(new Runnable() {
            @Override
            public void run() {
                Thread.currentThread().setName("我添加的线程,用于打印活跃中的线程");
                while (true) {
                    System.out.println("=============================");
                    ThreadGroup threadGroup = Thread.currentThread().getThreadGroup();
                    threadGroup.list();
                    System.out.println("=============================");
                    try {
                        Thread.sleep(5000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        }).start();
        for (int i = 1; i <= 7; i++) {
            //String.valueOf通过源码可知,是new了一个新对象。跟直接String a=str不同的是,后者只引用了别人的地址,而前者指向了自己的数据空间
            //String temp= String.valueOf(i);

            //声明变量a的同时,系统给a分配了数据空间。
            int temp = i;

            /**
             * 为啥要一个temp变量?
             * 因为execute是异步的,如果直接用i的话,可能会出现,所有的异步线程获取的i都是最终的7
             */
            executor.execute(() -> {
                try {
                    Thread.sleep(5000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println(Thread.currentThread().getName() + " 执行任务" + temp + " 结束时间:" + dtf.format(LocalDateTime.now()));
            });
        }
    }
}

拒绝策略为AbortPolicy

15.png

拒绝策略为CallerRunsPolicy

16.png

拒绝策略为DiscardPolicy

17.png

拒绝策略为DiscardOldestPolicy

18.png