言成言成啊 | Kit Chen's Blog

多线程进阶

发布于2021-09-06 22:33:36,更新于2022-07-03 08:41:34,标签:java  文章会持续修订,转载请注明来源地址:https://meethigher.top/blog

参考文章

一、Volatile

volatile的作用

  1. 保证变量的内存可见性
  2. 禁止指令重排序

1.1 保证变量的内存可见性

执行下面的程序,会发现控制台永远都不会输出 “主线程访问到 flag 变量” 这句话。我们可以看到,子线程执行1秒后已经将 flag 设置成 true,但主线程执行时没有读到 flag 的最新值,导致控制台没有输出上面的句子。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
/**
* 变量的内存可见性例子
*
* @author star
*/
public class VolatileExample {

/**
* main 方法作为一个主线程
*/
public static void main(String[] args) {
MyThread myThread = new MyThread();
// 开启线程
myThread.start();

// 主线程执行
for (; ; ) {
if (myThread.isFlag()) {
System.out.println("main线程读取到更新后的数据!");
}
}
}

}

/**
* 子线程类
*/
class MyThread extends Thread {

private boolean flag = false;

@Override
public void run() {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
// 修改变量值
flag = true;
System.out.println("flag = " + flag);
}

public boolean isFlag() {
return flag;
}

public void setFlag(boolean flag) {
this.flag = flag;
}
}

但是,在将flag变量改成如下时

1
private volatile boolean flag = false;

输出结果如下

flag是一个在main线程与myThread线程间共享的变量,在myThread线程间更新了flag的内容值,并不会在main中立即获取这个值。

此时为了确保每个线程都能读取到更新后的变量值,此时就需要使用volatile。

至于为啥会出现这种问题?这涉及到Java的内存模型。

在Java虚拟机中,变量的值,保存在主内存中。

当线程访问变量时,它会先获取一个副本,并保存在自己的工作内存中。

如果线程修改了变量的值,虚拟机会在某个时刻把修改后的值回写到主内存。但是这个时间是不固定的。尤其是在x86的架构下,JVM回写主内存的速度非常快,但是,换成ARM的架构,就会有显著的延迟。

这会导致如果一个线程更新了某个变量,另一个线程读取的值可能还是更新前的。

就比如myThread的flag=true,在main线程中,仍然读取到的flag是false。myThread在此刻只是将变量flag的副本变成了true,主内存中flag还是false。

此时,volatile的作用

  • 每次修改访问变量时,总是获取到主内存的最新值
  • 每次修改变量后,立即回写到主内存。

1.2 禁止指令重排序

有序性指的是程序执行的顺序按照代码的先后顺序执行。

1
2
3
4
int i=0;//语句1
boolean flag=false;//语句2
i=1;//语句3
flag=true;//语句4

但是,在上面这个例子中,并不会顺序执行,jvm会在真正执行该段代码时进行优化,会发生指令的重排序。

并不是所有的重排序,都会与原先的顺序不一致。比如下面这个

1
2
int a=10;//语句1
a=a+3;//语句2

因为jvm在进行重排序时,会考虑到数据之间的依赖性,如果语句2必须用到语句1的结果,那么处理器一定保证语句1在语句2前执行。

类似于这种的数据依赖性,在单线程环境下一点问题都没有,因为总能保证数据的正确,但是在多线程环境下,就会发生错误。

这个地方参考了很多文章,以及技术书《深入Java虚拟机》,但是看过了之后,还是不太懂,感觉给的多线程的例子都有点问题。所以这边暂时不进行补充了

二、Future

2.1 Future常用方法

一个Future接口表示一个未来可能会返回的结果,常用方法由

  • get():获取结果(任务耗时就会进行等待状态,类似于同步)
  • get(long timeout, TimeUnit unit):获取结果,等待指定时间
  • cancel(boolean flag):取消当前任务。flag为true表示执行任务的线程应该被立马中断;false表示等待任务执行完毕再取消。
  • isDone():判断任务是否完成。

下面是通过Future实现一个异步任务,并且在异步任务完成之后,自动回调任务。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
public class ThreadDemo {
private static final DateTimeFormatter dtf=DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");

static class CallableImpl implements Callable<String> {

@Override
public String call() throws Exception {
try {
Thread.sleep(10000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("Callable运行结束:"+dtf.format(LocalDateTime.now()));
return "Callable运行结束";
}
}

public static void main(String[] args) throws ExecutionException, InterruptedException {
ExecutorService executorService = Executors.newFixedThreadPool(1);
System.out.println("线程执行开始:"+dtf.format(LocalDateTime.now()));
Future<String> submit = executorService.submit(new CallableImpl());
//使用get,相当于是个同步操作。会等待线程任务执行完毕,才会进行下面任务的执行
//String s = submit.get();
//System.out.println(submit.isDone());
new Thread(new Runnable() {
@Override
public void run() {
while(!submit.isDone()){

}
System.out.println("执行回调任务"+dtf.format(LocalDateTime.now()));
}
}).start();
System.out.println("校验异步耗时:"+dtf.format(LocalDateTime.now()));


//立即关闭
//executorService.shutdownNow();

//等待线程任务执行完后,关闭
executorService.shutdown();

}
}

2.2 CompletableFuture常用方法

案例

使用Future获得异步执行结果时,要么调用阻塞方法get(),要么轮询看isDone()是否为true,这两种方法都不是很好,因为主线程也会被迫等待。

从Java8开始引入了CompletableFuture,它可以传入回调对象,当异步任务完成或者发生异常时,自动调用回调对象的回调方法。

CompletableFuture常用方法

  • supplyAsync:表示创建带返回值的异步任务,相当于ExecutorService submit(Callable task)方法
  • runAsync:runAsync表示创建无返回值的异步任务,相当于ExecutorService submit(Runnable task)方法
  • thenApply(T,U):表示某个任务执行完成后执行的动作,即回调方法。接收任务完成后传过来的参数T,并返回类型U
  • thenApplyAsync(T,U):与thenApply的不同之处在于,thenApplyAsync在执行任务A任务A结束之后的回调,这两个线程可能会不一样;但是thenApply执行任务和任务结束后的回调是同一个线程。
  • thenAccept:跟thenApply作用类似,有入参,但是没有返回值。
  • thenRun:参数是个Runnable接口
  • exceptionally:如果任务执行过程中出现异常,就会回调该方法
  • anyOf:任意个CompletableFuture只要有一个执行完成,该anyOf就完成。
  • allOf:所有的CompletableFuture全部完成,该allOf才完成。
  • whenComplete/handle:可自定义回调执行的用法

通过CompletableFuture完成上面使用Future完成的案例。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
public class ThreadDemo {
private static final DateTimeFormatter dtf = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");

static class RunnableImpl implements Runnable {

@Override
public void run() {
try {
Thread.sleep(10000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("Runnable运行结束!");
}
}

static class CallableImpl implements Callable<String> {

@Override
public String call() throws Exception {
try {
Thread.sleep(10000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("Callable运行结束:" + dtf.format(LocalDateTime.now()));
return "Callable运行结束";
}
}

public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(new Supplier<String>() {
@Override
public String get() {
System.out.println("执行开始" + dtf.format(LocalDateTime.now()));
try {
Thread.sleep(10000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "执行结束" + dtf.format(LocalDateTime.now());
}
});
completableFuture.thenAccept(new Consumer<String>() {
@Override
public void accept(String s) {
System.out.println(s);
System.out.println("success" + dtf.format(LocalDateTime.now()));
}
});
completableFuture.exceptionally(new Function<Throwable, String>() {
@Override
public String apply(Throwable throwable) {
System.out.println("failure");
return null;
}
});

System.out.println("校验异步阻塞!");
// 主线程不要立刻结束,否则CompletableFuture默认使用的线程池会立刻关闭:
Thread.sleep(30000);
}
}

通过运行结果的两张图,可以知道,这就是一个异步的操作,并且主线程不会进入阻塞状态,通过打印校验异步就可以知道并没有影响到主线程继续向下执行。同时,在任务执行完毕之后,还会自动执行回调方法。

CompletableFuture优点

  1. 异步任务结束时,会自动回调执行成功时的逻辑
  2. 异步任务异常时,会自动回调执行异常时的逻辑

串行执行

如果只是实现了异步回调,其实跟Future区别也不是特别大,只是用起来方便一点。CompletableFuture更强大的功能在于,多个CompletableFuture可以串行执行。

串行执行:一个个的按顺序执行,执行完第一个,才会执行第二个。跟面向过程的含义一样。

并行执行:两个人吃两个馒头,效率高。多核cpu

并发执行:一个人吃俩馒头,这个吃一口,那个吃一口,效率低。单核cpu

举一个使用场景,就拿之前写今日校园签到脚本的例子来说

  1. 获取今日最新任务
  2. 通过任务提取任务id
  3. 通过任务id获取任务详细内容
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
public class ThreadDemo {
private static final DateTimeFormatter dtf = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");

public static void main(String[] args) {
CompletableFuture<Void> future = CompletableFuture.supplyAsync(new Supplier<String>() {
@Override
public String get() {
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("获取最新任务的编号" + dtf.format(LocalDateTime.now()));
return "520";
}
}).thenApply(new Function<String, String>() {
@Override
public String apply(String s) {
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("通过" + s + "获取详细任务的编号" + dtf.format(LocalDateTime.now()));
return "1314";
}
}).thenAccept(new Consumer<String>() {
@Override
public void accept(String s) {
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("通过" + s + "完成任务!" + dtf.format(LocalDateTime.now()));
}
});
System.out.println("是否执行完成"+future.isDone()+" 检验主线程是否被阻塞"+dtf.format(LocalDateTime.now()));
try {
Thread.sleep(7000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}

通过运行结果可以发现,既没有影响异步操作,同时还保证了串行操作。

并行执行

场景A:菜鸟攒钱买金装和风云装,只要攒够一套装备,菜鸟就出去砍人!

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
public class ThreadDemo {
private static DateTimeFormatter dtf = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");

public static void main(String[] args) {
CompletableFuture<String> yaFei = CompletableFuture.supplyAsync(new Supplier<String>() {
@Override
public String get() {
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "菜鸟买到金装"+dtf.format(LocalDateTime.now());
}
});
CompletableFuture<String> meiDuSha = CompletableFuture.supplyAsync(new Supplier<String>() {
@Override
public String get() {
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "菜鸟买到风云装"+dtf.format(LocalDateTime.now());
}
});

CompletableFuture<Object> anyOf = CompletableFuture.anyOf(yaFei, meiDuSha);
anyOf.thenAccept(new Consumer<Object>() {
@Override
public void accept(Object o) {
System.out.println(o);
System.out.println("有装备了,菜鸟出去砍人"+dtf.format(LocalDateTime.now()));
}
});
System.out.println("检验是否阻塞主线程"+dtf.format(LocalDateTime.now()));
try {
Thread.sleep(6000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}

场景B:菜鸟攒钱买装备和符文,两套全齐了之后,菜鸟才出去砍人!

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
public class ThreadDemo {
private static DateTimeFormatter dtf = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");

public static void main(String[] args) {
CompletableFuture<Void> zhuangbei = CompletableFuture.runAsync(new Runnable() {
@Override
public void run() {
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("菜鸟买到装备" + dtf.format(LocalDateTime.now()));
}
});
CompletableFuture<Void> fuwen = CompletableFuture.runAsync(new Runnable() {
@Override
public void run() {
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("菜鸟买到符文" + dtf.format(LocalDateTime.now()));
}
});


CompletableFuture<Void> allOf = CompletableFuture.allOf(zhuangbei, fuwen);
allOf.thenAccept(new Consumer<Void>() {
@Override
public void accept(Void aVoid) {
System.out.println("菜鸟出去砍人"+dtf.format(LocalDateTime.now()));
}
});
System.out.println("检验是否阻塞主线程"+dtf.format(LocalDateTime.now()));
try {
Thread.sleep(6000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}

三、线程池

3.1 ThreadPoolExecutor

原理

ThreadPoolExecutor是java.util.convurrent提供的,以内部线程池的形式对外提供管理任务执行、线程调度、线程池管理等服务。

以jdk8为例,Executors提供的线程服务,有4个是通过ThreadPoolExecutor参数设置。这个的说明在下一节

主要参数讲解

  • corePoolSize:核心线程个数,也就是线程池中,至少会运行corePoolSize个线程。但是在刚创建线程池的时候,线程并不会立即启动,而是在等到有任务提交时才会启动。
  • maxmumPoolSize:最大线程个数
  • keepAliveTime:非核心线程的最大空闲时间,空闲时间超过这个设定值,线程就关闭掉了。
  • allowCoreThreadTimeOut:默认值是false。如果改为true,则表示keepAliveTime也同样适用于核心线程,超时就会关闭,知道线程池中线程数为0
  • timeUnit:keepAliveTime的时间单位
  • workQueue:阻塞任务队列,存储等待执行线程的工作队列。BlockingQueue下面有实现类,比如说LinkedBlockingQueue,在创建任务队列时,可以给队列指定长度。
  • threadFactory:线程工厂,默认的工厂是DefaultThreadFactory,如果我们想实现线程池的重命名的话,可以通过自己实现一个工厂。
  • rejectedExecutionHandler:拒绝策略。当提交的任务数,超过了maxmumPoolSize+workQueue之和时,多余的任务会交给拒绝策略来处理。默认是AbortPolicy策略:丢弃任务,抛运行时异常。

整个的执行流程,如下图

poolSize、corePoolSize、maxmumPoolSize、queueCapacity之间的关系

poolSize表示线程池中非空闲的线程个数。

queueCapacity表示队列的长度

  1. poolSize<corePoolSize时,无论是否有空闲线程,都会新建一个线程来处理新提交的任务
  2. poolSize=corePoolSize时,又来了新的任务,且新的任务数量没有超过queueCapacity时,任务就暂时放到队列里,等待执行(前面的任务结束之后,线程空闲出来后再执行)
  3. poolSize=corePoolSize时,又来了新的任务,且新的任务数量超过了queueCapacity时
    • poolSize<maxmumPoolSize:新创建线程来处理任务,任务执行完毕后线程就被销毁了。
    • poolSize=maxmumPoolSize:线程池的处理能力已经达到了极限,此时根据配置的拒绝策略来处理新增任务。

任务队列

jdk8中,ThreadPoolExecutor可选用队列

  • LinkedBlockingQueue:常用,一个由链表组成的有界阻塞队列。先进先出模式,默认队列长度是Integer的最大值。
  • SynchronousQueue:常用,一个不存储元素的阻塞队列。配对通信机制,默认是非公平模式,也就是采用栈操作,后进先出;公平模式就是采用队列操作,先进先出。A线程put任务A进入队列之后,除非有一个B线程来take,否则A线程一直处于阻塞状态。如果A线程put任务A进入队列之后,B线程put任务B进入队列,默认情况下,C线程来take,会获取任务B,之后,D线程再来take,会获取任务A。
  • ArrayBlockingQueue:一个由数组组成的有界阻塞队列。
  • PriorityBlockingQueue:一个支持优先级排序的无界阻塞队列。
  • DelayQueue:一个使用优先级队列实现的无界阻塞队列。
  • LinkedTransferQueue:一个由链表结构组成的无界阻塞队列。
  • LinkedBlockingDeque:一个由链表结构组成的双向阻塞队列。

拒绝策略

jdk8中,ThreadPoolExecutor提供了4中拒绝策略

  • AbortPolicy:默认拒绝策略。丢弃任务并抛出RejectedExecutionException异常
  • CallerRunsPolicy:任务被拒绝添加后,会由调用execute方法的线程来执行被拒绝的任务。除非executor被关闭,否则任务不会被丢弃。简单理解谁调用谁执行
  • DiscardPolicy:直接丢弃任务,不抛异常
  • DiscardOldestPolicy:如果队列满了,新任务来了,会丢弃队列中最前面条任务,将新任务追加到队列。比如现在有1 2 3任务,来了4,丢弃1后变成2 3 4

案例:设置线程池,1个核心、LinkedBlockingQueue队列设置容量为2、2个最大线程,按顺序一次性添加七个任务。通过选择不同的拒绝策略,查看任务执行。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
public class ThreadDemo {
private static DateTimeFormatter dtf = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss:SSS");

public ThreadPoolExecutor getExecutor() {
return new ThreadPoolExecutor(
1,//核心
2,//最大
5,//keepAlive
TimeUnit.SECONDS,//keepAlive的单位
new LinkedBlockingDeque<>(2),//容量为2的阻塞队列
new ThreadPoolExecutor.DiscardOldestPolicy()//拒绝策略
);
}

public static void main(String[] args) {
ThreadPoolExecutor executor = new ThreadDemo().getExecutor();
new Thread(new Runnable() {
@Override
public void run() {
Thread.currentThread().setName("我添加的线程,用于打印活跃中的线程");
while (true) {
System.out.println("=============================");
ThreadGroup threadGroup = Thread.currentThread().getThreadGroup();
threadGroup.list();
System.out.println("=============================");
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}).start();
for (int i = 1; i <= 7; i++) {
//String.valueOf通过源码可知,是new了一个新对象。跟直接String a=str不同的是,后者只引用了别人的地址,而前者指向了自己的数据空间
//String temp= String.valueOf(i);

//声明变量a的同时,系统给a分配了数据空间。
int temp = i;

/**
* 为啥要一个temp变量?
* 因为execute是异步的,如果直接用i的话,可能会出现,所有的异步线程获取的i都是最终的7
*/
executor.execute(() -> {
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + " 执行任务" + temp + " 结束时间:" + dtf.format(LocalDateTime.now()));
});
}
}
}

拒绝策略为AbortPolicy

拒绝策略为CallerRunsPolicy

拒绝策略为DiscardPolicy

拒绝策略为DiscardOldestPolicy

发布:2021-09-06 22:33:36
修改:2022-07-03 08:41:34
链接:https://meethigher.top/blog/2021/advanced-thread/
标签:java 
付款码 打赏 分享
Shift+Ctrl+1 可控制工具栏