摘要
在学习pulsar的时候,发现自己对多线程的知识了解的还是太少了。
正文
参考文章
一、Volatile
volatile的作用
- 保证变量的内存可见性
- 禁止指令重排序
1.1 保证变量的内存可见性
执行下面的程序,会发现控制台永远都不会输出 “主线程访问到 flag 变量” 这句话。我们可以看到,子线程执行1秒后已经将 flag 设置成 true,但主线程执行时没有读到 flag 的最新值,导致控制台没有输出上面的句子。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
| /**
* 变量的内存可见性例子
*
* @author star
*/
public class VolatileExample {
/**
* main 方法作为一个主线程
*/
public static void main(String[] args) {
MyThread myThread = new MyThread();
// 开启线程
myThread.start();
// 主线程执行
for (; ; ) {
if (myThread.isFlag()) {
System.out.println("main线程读取到更新后的数据!");
}
}
}
}
/**
* 子线程类
*/
class MyThread extends Thread {
private boolean flag = false;
@Override
public void run() {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
// 修改变量值
flag = true;
System.out.println("flag = " + flag);
}
public boolean isFlag() {
return flag;
}
public void setFlag(boolean flag) {
this.flag = flag;
}
}
|
但是,在将flag变量改成如下时
1
| private volatile boolean flag = false;
|
输出结果如下
flag是一个在main线程与myThread线程间共享的变量,在myThread线程间更新了flag的内容值,并不会在main中立即获取这个值。
此时为了确保每个线程都能读取到更新后的变量值,此时就需要使用volatile。
至于为啥会出现这种问题?这涉及到Java的内存模型。
在Java虚拟机中,变量的值,保存在主内存中。
当线程访问变量时,它会先获取一个副本,并保存在自己的工作内存中。
如果线程修改了变量的值,虚拟机会在某个时刻把修改后的值回写到主内存。但是这个时间是不固定的。尤其是在x86的架构下,JVM回写主内存的速度非常快,但是,换成ARM的架构,就会有显著的延迟。
这会导致如果一个线程更新了某个变量,另一个线程读取的值可能还是更新前的。
就比如myThread的flag=true,在main线程中,仍然读取到的flag是false。myThread在此刻只是将变量flag的副本变成了true,主内存中flag还是false。
此时,volatile的作用
- 每次修改访问变量时,总是获取到主内存的最新值
- 每次修改变量后,立即回写到主内存。
1.2 禁止指令重排序
有序性指的是程序执行的顺序按照代码的先后顺序执行。
1
2
3
4
| int i=0;//语句1
boolean flag=false;//语句2
i=1;//语句3
flag=true;//语句4
|
但是,在上面这个例子中,并不会顺序执行,jvm会在真正执行该段代码时进行优化,会发生指令的重排序。
并不是所有的重排序,都会与原先的顺序不一致。比如下面这个
1
2
| int a=10;//语句1
a=a+3;//语句2
|
因为jvm在进行重排序时,会考虑到数据之间的依赖性,如果语句2必须用到语句1的结果,那么处理器一定保证语句1在语句2前执行。
类似于这种的数据依赖性,在单线程环境下一点问题都没有,因为总能保证数据的正确,但是在多线程环境下,就会发生错误。
这个地方参考了很多文章,以及技术书《深入Java虚拟机》,但是看过了之后,还是不太懂,感觉给的多线程的例子都有点问题。所以这边暂时不进行补充了
二、Future
2.1 Future常用方法
一个Future接口表示一个未来可能会返回的结果,常用方法由
- get():获取结果(任务耗时就会进行等待状态,类似于同步)
- get(long timeout, TimeUnit unit):获取结果,等待指定时间
- cancel(boolean flag):取消当前任务。flag为true表示执行任务的线程应该被立马中断;false表示等待任务执行完毕再取消。
- isDone():判断任务是否完成。
下面是通过Future实现一个异步任务,并且在异步任务完成之后,自动回调任务。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
| public class ThreadDemo {
private static final DateTimeFormatter dtf=DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
static class CallableImpl implements Callable<String> {
@Override
public String call() throws Exception {
try {
Thread.sleep(10000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("Callable运行结束:"+dtf.format(LocalDateTime.now()));
return "Callable运行结束";
}
}
public static void main(String[] args) throws ExecutionException, InterruptedException {
ExecutorService executorService = Executors.newFixedThreadPool(1);
System.out.println("线程执行开始:"+dtf.format(LocalDateTime.now()));
Future<String> submit = executorService.submit(new CallableImpl());
//使用get,相当于是个同步操作。会等待线程任务执行完毕,才会进行下面任务的执行
//String s = submit.get();
//System.out.println(submit.isDone());
new Thread(new Runnable() {
@Override
public void run() {
while(!submit.isDone()){
}
System.out.println("执行回调任务"+dtf.format(LocalDateTime.now()));
}
}).start();
System.out.println("校验异步耗时:"+dtf.format(LocalDateTime.now()));
//立即关闭
//executorService.shutdownNow();
//等待线程任务执行完后,关闭
executorService.shutdown();
}
}
|
2.2 CompletableFuture常用方法
案例
使用Future获得异步执行结果时,要么调用阻塞方法get(),要么轮询看isDone()是否为true,这两种方法都不是很好,因为主线程也会被迫等待。
从Java8开始引入了CompletableFuture,它可以传入回调对象,当异步任务完成或者发生异常时,自动调用回调对象的回调方法。
CompletableFuture常用方法
- supplyAsync:表示创建带返回值的异步任务,相当于ExecutorService submit(Callable task)方法
- runAsync:runAsync表示创建无返回值的异步任务,相当于ExecutorService submit(Runnable task)方法
- thenApply(T,U):表示某个任务执行完成后执行的动作,即回调方法。接收任务完成后传过来的参数T,并返回类型U
- thenApplyAsync(T,U):与thenApply的不同之处在于,thenApplyAsync在执行任务A和任务A结束之后的回调,这两个线程可能会不一样;但是thenApply执行任务和任务结束后的回调是同一个线程。
- thenAccept:跟thenApply作用类似,有入参,但是没有返回值。
- thenRun:参数是个Runnable接口
- exceptionally:如果任务执行过程中出现异常,就会回调该方法
- anyOf:任意个CompletableFuture只要有一个执行完成,该anyOf就完成。
- allOf:所有的CompletableFuture全部完成,该allOf才完成。
- whenComplete/handle:可自定义回调执行的用法
通过CompletableFuture完成上面使用Future完成的案例。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
| public class ThreadDemo {
private static final DateTimeFormatter dtf = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
static class RunnableImpl implements Runnable {
@Override
public void run() {
try {
Thread.sleep(10000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("Runnable运行结束!");
}
}
static class CallableImpl implements Callable<String> {
@Override
public String call() throws Exception {
try {
Thread.sleep(10000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("Callable运行结束:" + dtf.format(LocalDateTime.now()));
return "Callable运行结束";
}
}
public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(new Supplier<String>() {
@Override
public String get() {
System.out.println("执行开始" + dtf.format(LocalDateTime.now()));
try {
Thread.sleep(10000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "执行结束" + dtf.format(LocalDateTime.now());
}
});
completableFuture.thenAccept(new Consumer<String>() {
@Override
public void accept(String s) {
System.out.println(s);
System.out.println("success" + dtf.format(LocalDateTime.now()));
}
});
completableFuture.exceptionally(new Function<Throwable, String>() {
@Override
public String apply(Throwable throwable) {
System.out.println("failure");
return null;
}
});
System.out.println("校验异步阻塞!");
// 主线程不要立刻结束,否则CompletableFuture默认使用的线程池会立刻关闭:
Thread.sleep(30000);
}
}
|
![5.png 5.png]()
![6.png 6.png]()
通过运行结果的两张图,可以知道,这就是一个异步的操作,并且主线程不会进入阻塞状态,通过打印校验异步就可以知道并没有影响到主线程继续向下执行。同时,在任务执行完毕之后,还会自动执行回调方法。
CompletableFuture优点
- 异步任务结束时,会自动回调执行成功时的逻辑
- 异步任务异常时,会自动回调执行异常时的逻辑
串行执行
如果只是实现了异步回调,其实跟Future区别也不是特别大,只是用起来方便一点。CompletableFuture更强大的功能在于,多个CompletableFuture可以串行执行。
串行执行:一个个的按顺序执行,执行完第一个,才会执行第二个。跟面向过程的含义一样。
并行执行:两个人吃两个馒头,效率高。多核cpu
并发执行:一个人吃俩馒头,这个吃一口,那个吃一口,效率低。单核cpu
举一个使用场景,就拿之前写今日校园签到脚本的例子来说
- 获取今日最新任务
- 通过任务提取任务id
- 通过任务id获取任务详细内容
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
| public class ThreadDemo {
private static final DateTimeFormatter dtf = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
public static void main(String[] args) {
CompletableFuture<Void> future = CompletableFuture.supplyAsync(new Supplier<String>() {
@Override
public String get() {
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("获取最新任务的编号" + dtf.format(LocalDateTime.now()));
return "520";
}
}).thenApply(new Function<String, String>() {
@Override
public String apply(String s) {
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("通过" + s + "获取详细任务的编号" + dtf.format(LocalDateTime.now()));
return "1314";
}
}).thenAccept(new Consumer<String>() {
@Override
public void accept(String s) {
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("通过" + s + "完成任务!" + dtf.format(LocalDateTime.now()));
}
});
System.out.println("是否执行完成"+future.isDone()+" 检验主线程是否被阻塞"+dtf.format(LocalDateTime.now()));
try {
Thread.sleep(7000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
|
![7.png 7.png]()
通过运行结果可以发现,既没有影响异步操作,同时还保证了串行操作。
并行执行
场景A:菜鸟攒钱买金装和风云装,只要攒够一套装备,菜鸟就出去砍人!
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
| public class ThreadDemo {
private static DateTimeFormatter dtf = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
public static void main(String[] args) {
CompletableFuture<String> yaFei = CompletableFuture.supplyAsync(new Supplier<String>() {
@Override
public String get() {
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "菜鸟买到金装"+dtf.format(LocalDateTime.now());
}
});
CompletableFuture<String> meiDuSha = CompletableFuture.supplyAsync(new Supplier<String>() {
@Override
public String get() {
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "菜鸟买到风云装"+dtf.format(LocalDateTime.now());
}
});
CompletableFuture<Object> anyOf = CompletableFuture.anyOf(yaFei, meiDuSha);
anyOf.thenAccept(new Consumer<Object>() {
@Override
public void accept(Object o) {
System.out.println(o);
System.out.println("有装备了,菜鸟出去砍人"+dtf.format(LocalDateTime.now()));
}
});
System.out.println("检验是否阻塞主线程"+dtf.format(LocalDateTime.now()));
try {
Thread.sleep(6000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
|
![8.png 8.png]()
场景B:菜鸟攒钱买装备和符文,两套全齐了之后,菜鸟才出去砍人!
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
| public class ThreadDemo {
private static DateTimeFormatter dtf = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
public static void main(String[] args) {
CompletableFuture<Void> zhuangbei = CompletableFuture.runAsync(new Runnable() {
@Override
public void run() {
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("菜鸟买到装备" + dtf.format(LocalDateTime.now()));
}
});
CompletableFuture<Void> fuwen = CompletableFuture.runAsync(new Runnable() {
@Override
public void run() {
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("菜鸟买到符文" + dtf.format(LocalDateTime.now()));
}
});
CompletableFuture<Void> allOf = CompletableFuture.allOf(zhuangbei, fuwen);
allOf.thenAccept(new Consumer<Void>() {
@Override
public void accept(Void aVoid) {
System.out.println("菜鸟出去砍人"+dtf.format(LocalDateTime.now()));
}
});
System.out.println("检验是否阻塞主线程"+dtf.format(LocalDateTime.now()));
try {
Thread.sleep(6000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
|
![9.png 9.png]()
三、线程池
3.1 ThreadPoolExecutor
原理
ThreadPoolExecutor是java.util.convurrent提供的,以内部线程池的形式对外提供管理任务执行、线程调度、线程池管理等服务。
以jdk8为例,Executors提供的线程服务,有4个是通过ThreadPoolExecutor参数设置。这个的说明在下一节
主要参数讲解
- corePoolSize:核心线程个数,也就是线程池中,至少会运行corePoolSize个线程。但是在刚创建线程池的时候,线程并不会立即启动,而是在等到有任务提交时才会启动。
- maxmumPoolSize:最大线程个数
- keepAliveTime:非核心线程的最大空闲时间,空闲时间超过这个设定值,线程就关闭掉了。
- allowCoreThreadTimeOut:默认值是false。如果改为true,则表示keepAliveTime也同样适用于核心线程,超时就会关闭,知道线程池中线程数为0
- timeUnit:keepAliveTime的时间单位
- workQueue:阻塞任务队列,存储等待执行线程的工作队列。BlockingQueue下面有实现类,比如说LinkedBlockingQueue,在创建任务队列时,可以给队列指定长度。
- threadFactory:线程工厂,默认的工厂是DefaultThreadFactory,如果我们想实现线程池的重命名的话,可以通过自己实现一个工厂。
- rejectedExecutionHandler:拒绝策略。当提交的任务数,超过了maxmumPoolSize+workQueue之和时,多余的任务会交给拒绝策略来处理。默认是AbortPolicy策略:丢弃任务,抛运行时异常。
整个的执行流程,如下图
poolSize、corePoolSize、maxmumPoolSize、queueCapacity之间的关系
poolSize表示线程池中非空闲的线程个数。
queueCapacity表示队列的长度
- poolSize<corePoolSize时,无论是否有空闲线程,都会新建一个线程来处理新提交的任务
- poolSize=corePoolSize时,又来了新的任务,且新的任务数量没有超过queueCapacity时,任务就暂时放到队列里,等待执行(前面的任务结束之后,线程空闲出来后再执行)
- poolSize=corePoolSize时,又来了新的任务,且新的任务数量超过了queueCapacity时
- poolSize<maxmumPoolSize:新创建线程来处理任务,任务执行完毕后线程就被销毁了。
- poolSize=maxmumPoolSize:线程池的处理能力已经达到了极限,此时根据配置的拒绝策略来处理新增任务。
任务队列
jdk8中,ThreadPoolExecutor可选用队列
- LinkedBlockingQueue:常用,一个由链表组成的有界阻塞队列。先进先出模式,默认队列长度是Integer的最大值。
- SynchronousQueue:常用,一个不存储元素的阻塞队列。配对通信机制,默认是非公平模式,也就是采用栈操作,后进先出;公平模式就是采用队列操作,先进先出。A线程put任务A进入队列之后,除非有一个B线程来take,否则A线程一直处于阻塞状态。如果A线程put任务A进入队列之后,B线程put任务B进入队列,默认情况下,C线程来take,会获取任务B,之后,D线程再来take,会获取任务A。
- ArrayBlockingQueue:一个由数组组成的有界阻塞队列。
- PriorityBlockingQueue:一个支持优先级排序的无界阻塞队列。
- DelayQueue:一个使用优先级队列实现的无界阻塞队列。
- LinkedTransferQueue:一个由链表结构组成的无界阻塞队列。
- LinkedBlockingDeque:一个由链表结构组成的双向阻塞队列。
拒绝策略
jdk8中,ThreadPoolExecutor提供了4中拒绝策略
- AbortPolicy:默认拒绝策略。丢弃任务并抛出RejectedExecutionException异常
- CallerRunsPolicy:任务被拒绝添加后,会由调用execute方法的线程来执行被拒绝的任务。除非executor被关闭,否则任务不会被丢弃。简单理解谁调用谁执行
- DiscardPolicy:直接丢弃任务,不抛异常
- DiscardOldestPolicy:如果队列满了,新任务来了,会丢弃队列中最前面条任务,将新任务追加到队列。比如现在有1 2 3任务,来了4,丢弃1后变成2 3 4
案例:设置线程池,1个核心、LinkedBlockingQueue队列设置容量为2、2个最大线程,按顺序一次性添加七个任务。通过选择不同的拒绝策略,查看任务执行。
![14.png 14.png]()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
| public class ThreadDemo {
private static DateTimeFormatter dtf = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss:SSS");
public ThreadPoolExecutor getExecutor() {
return new ThreadPoolExecutor(
1,//核心
2,//最大
5,//keepAlive
TimeUnit.SECONDS,//keepAlive的单位
new LinkedBlockingDeque<>(2),//容量为2的阻塞队列
new ThreadPoolExecutor.DiscardOldestPolicy()//拒绝策略
);
}
public static void main(String[] args) {
ThreadPoolExecutor executor = new ThreadDemo().getExecutor();
new Thread(new Runnable() {
@Override
public void run() {
Thread.currentThread().setName("我添加的线程,用于打印活跃中的线程");
while (true) {
System.out.println("=============================");
ThreadGroup threadGroup = Thread.currentThread().getThreadGroup();
threadGroup.list();
System.out.println("=============================");
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}).start();
for (int i = 1; i <= 7; i++) {
//String.valueOf通过源码可知,是new了一个新对象。跟直接String a=str不同的是,后者只引用了别人的地址,而前者指向了自己的数据空间
//String temp= String.valueOf(i);
//声明变量a的同时,系统给a分配了数据空间。
int temp = i;
/**
* 为啥要一个temp变量?
* 因为execute是异步的,如果直接用i的话,可能会出现,所有的异步线程获取的i都是最终的7
*/
executor.execute(() -> {
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + " 执行任务" + temp + " 结束时间:" + dtf.format(LocalDateTime.now()));
});
}
}
}
|
拒绝策略为AbortPolicy
拒绝策略为CallerRunsPolicy
拒绝策略为DiscardPolicy
拒绝策略为DiscardOldestPolicy