摘要
在学习pulsar的时候,发现自己对多线程的知识了解的还是太少了。
正文
参考文章
一、Volatile volatile的作用
保证变量的内存可见性 禁止指令重排序 1.1 保证变量的内存可见性 执行下面的程序,会发现控制台永远都不会输出 “主线程访问到 flag 变量” 这句话。我们可以看到,子线程执行1秒后已经将 flag 设置成 true,但主线程执行时没有读到 flag 的最新值,导致控制台没有输出上面的句子。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
/**
* 变量的内存可见性例子
*
* @author star
*/
public class VolatileExample {
/**
* main 方法作为一个主线程
*/
public static void main ( String [] args ) {
MyThread myThread = new MyThread ();
// 开启线程
myThread . start ();
// 主线程执行
for (; ; ) {
if ( myThread . isFlag ()) {
System . out . println ( "main线程读取到更新后的数据!" );
}
}
}
}
/**
* 子线程类
*/
class MyThread extends Thread {
private boolean flag = false ;
@Override
public void run () {
try {
Thread . sleep ( 1000 );
} catch ( InterruptedException e ) {
e . printStackTrace ();
}
// 修改变量值
flag = true ;
System . out . println ( "flag = " + flag );
}
public boolean isFlag () {
return flag ;
}
public void setFlag ( boolean flag ) {
this . flag = flag ;
}
}
展开
但是,在将flag变量改成如下时
1
private volatile boolean flag = false ;
输出结果如下
展开
flag是一个在main线程与myThread线程间共享的变量,在myThread线程间更新了flag的内容值,并不会在main中立即获取这个值。
此时为了确保每个线程都能读取到更新后的变量值 ,此时就需要使用volatile。
至于为啥会出现这种问题?这涉及到Java的内存模型。
展开
在Java虚拟机中,变量的值,保存在主内存中。
当线程访问变量时,它会先获取一个副本,并保存在自己的工作内存中。
如果线程修改了变量的值,虚拟机会在某个时刻把修改后的值回写到主内存。但是这个时间是不固定的。尤其是在x86的架构下,JVM回写主内存的速度非常快,但是,换成ARM的架构,就会有显著的延迟。
这会导致如果一个线程更新了某个变量,另一个线程读取的值可能还是更新前的。
就比如myThread的flag=true,在main线程中,仍然读取到的flag是false。myThread在此刻只是将变量flag的副本变成了true,主内存中flag还是false。
此时,volatile的作用
每次修改访问变量时,总是获取到主内存的最新值 每次修改变量后,立即回写到主内存。 1.2 禁止指令重排序 有序性指的是程序执行的顺序按照代码的先后顺序执行。
1
2
3
4
int i = 0 ; //语句1
boolean flag = false ; //语句2
i = 1 ; //语句3
flag = true ; // 语句4
但是,在上面这个例子中,并不会顺序执行,jvm会在真正执行该段代码时进行优化,会发生指令的重排序。
展开
并不是所有的重排序,都会与原先的顺序不一致。比如下面这个
1
2
int a = 10 ; //语句1
a = a + 3 ; // 语句2
因为jvm在进行重排序时,会考虑到数据之间的依赖性,如果语句2必须用到语句1的结果,那么处理器一定保证语句1在语句2前执行。
类似于这种的数据依赖性,在单线程环境下一点问题都没有,因为总能保证数据的正确,但是在多线程环境下,就会发生错误。
这个地方参考了很多文章,以及技术书《深入Java虚拟机》,但是看过了之后,还是不太懂,感觉给的多线程的例子都有点问题。所以这边暂时不进行补充了
二、Future 2.1 Future常用方法 一个Future接口表示一个未来可能会返回的结果,常用方法由
get():获取结果(任务耗时就会进行等待状态,类似于同步) get(long timeout, TimeUnit unit):获取结果,等待指定时间 cancel(boolean flag):取消当前任务。flag为true表示执行任务的线程应该被立马中断;false表示等待任务执行完毕再取消。 isDone():判断任务是否完成。 下面是通过Future实现一个异步任务,并且在异步任务完成之后,自动回调任务。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
public class ThreadDemo {
private static final DateTimeFormatter dtf = DateTimeFormatter . ofPattern ( "yyyy-MM-dd HH:mm:ss" );
static class CallableImpl implements Callable < String > {
@Override
public String call () throws Exception {
try {
Thread . sleep ( 10000 );
} catch ( InterruptedException e ) {
e . printStackTrace ();
}
System . out . println ( "Callable运行结束:" + dtf . format ( LocalDateTime . now ()));
return "Callable运行结束" ;
}
}
public static void main ( String [] args ) throws ExecutionException , InterruptedException {
ExecutorService executorService = Executors . newFixedThreadPool ( 1 );
System . out . println ( "线程执行开始:" + dtf . format ( LocalDateTime . now ()));
Future < String > submit = executorService . submit ( new CallableImpl ());
//使用get,相当于是个同步操作。会等待线程任务执行完毕,才会进行下面任务的执行
//String s = submit.get();
//System.out.println(submit.isDone());
new Thread ( new Runnable () {
@Override
public void run () {
while ( ! submit . isDone ()){
}
System . out . println ( "执行回调任务" + dtf . format ( LocalDateTime . now ()));
}
}). start ();
System . out . println ( "校验异步耗时:" + dtf . format ( LocalDateTime . now ()));
//立即关闭
//executorService.shutdownNow();
//等待线程任务执行完后,关闭
executorService . shutdown ();
}
}
2.2 CompletableFuture常用方法 案例 使用Future获得异步执行结果时,要么调用阻塞方法get(),要么轮询看isDone()是否为true,这两种方法都不是很好,因为主线程也会被迫等待。
从Java8开始引入了CompletableFuture,它可以传入回调对象,当异步任务完成或者发生异常时,自动调用回调对象的回调方法。
CompletableFuture常用方法
supplyAsync:表示创建带返回值的异步任务,相当于ExecutorService submit(Callable task)方法 runAsync:runAsync表示创建无返回值的异步任务,相当于ExecutorService submit(Runnable task)方法 thenApply(T,U):表示某个任务执行完成后执行的动作,即回调方法。接收任务完成后传过来的参数T,并返回类型U thenApplyAsync(T,U):与thenApply的不同之处在于,thenApplyAsync在执行任务A 和任务A结束之后的回调 ,这两个线程可能会不一样;但是thenApply执行任务和任务结束后的回调是同一个线程。 thenAccept:跟thenApply作用类似,有入参,但是没有返回值。 thenRun:参数是个Runnable接口 exceptionally:如果任务执行过程中出现异常,就会回调该方法 anyOf:任意个CompletableFuture只要有一个执行完成,该anyOf就完成。 allOf:所有的CompletableFuture全部完成,该allOf才完成。 whenComplete/handle:可自定义回调执行的用法 通过CompletableFuture完成上面使用Future完成的案例。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
public class ThreadDemo {
private static final DateTimeFormatter dtf = DateTimeFormatter . ofPattern ( "yyyy-MM-dd HH:mm:ss" );
static class RunnableImpl implements Runnable {
@Override
public void run () {
try {
Thread . sleep ( 10000 );
} catch ( InterruptedException e ) {
e . printStackTrace ();
}
System . out . println ( "Runnable运行结束!" );
}
}
static class CallableImpl implements Callable < String > {
@Override
public String call () throws Exception {
try {
Thread . sleep ( 10000 );
} catch ( InterruptedException e ) {
e . printStackTrace ();
}
System . out . println ( "Callable运行结束:" + dtf . format ( LocalDateTime . now ()));
return "Callable运行结束" ;
}
}
public static void main ( String [] args ) throws ExecutionException , InterruptedException {
CompletableFuture < String > completableFuture = CompletableFuture . supplyAsync ( new Supplier < String > () {
@Override
public String get () {
System . out . println ( "执行开始" + dtf . format ( LocalDateTime . now ()));
try {
Thread . sleep ( 10000 );
} catch ( InterruptedException e ) {
e . printStackTrace ();
}
return "执行结束" + dtf . format ( LocalDateTime . now ());
}
});
completableFuture . thenAccept ( new Consumer < String > () {
@Override
public void accept ( String s ) {
System . out . println ( s );
System . out . println ( "success" + dtf . format ( LocalDateTime . now ()));
}
});
completableFuture . exceptionally ( new Function < Throwable , String > () {
@Override
public String apply ( Throwable throwable ) {
System . out . println ( "failure" );
return null ;
}
});
System . out . println ( "校验异步阻塞!" );
// 主线程不要立刻结束,否则CompletableFuture默认使用的线程池会立刻关闭:
Thread . sleep ( 30000 );
}
}
展开
展开
通过运行结果的两张图,可以知道,这就是一个异步的操作,并且主线程不会进入阻塞状态,通过打印校验异步就可以知道并没有影响到主线程继续向下执行。同时,在任务执行完毕之后,还会自动执行回调方法。
CompletableFuture优点
异步任务结束时,会自动回调执行成功时的逻辑 异步任务异常时,会自动回调执行异常时的逻辑 串行执行 如果只是实现了异步回调,其实跟Future区别也不是特别大,只是用起来方便一点。CompletableFuture更强大的功能在于,多个CompletableFuture可以串行执行。
串行执行:一个个的按顺序执行,执行完第一个,才会执行第二个。跟面向过程的含义一样。
并行执行:两个人吃两个馒头,效率高。多核cpu
并发执行:一个人吃俩馒头,这个吃一口,那个吃一口,效率低。单核cpu
举一个使用场景,就拿之前写今日校园签到脚本的例子来说
获取今日最新任务 通过任务提取任务id 通过任务id获取任务详细内容 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
public class ThreadDemo {
private static final DateTimeFormatter dtf = DateTimeFormatter . ofPattern ( "yyyy-MM-dd HH:mm:ss" );
public static void main ( String [] args ) {
CompletableFuture < Void > future = CompletableFuture . supplyAsync ( new Supplier < String > () {
@Override
public String get () {
try {
Thread . sleep ( 2000 );
} catch ( InterruptedException e ) {
e . printStackTrace ();
}
System . out . println ( "获取最新任务的编号" + dtf . format ( LocalDateTime . now ()));
return "520" ;
}
}). thenApply ( new Function < String , String > () {
@Override
public String apply ( String s ) {
try {
Thread . sleep ( 2000 );
} catch ( InterruptedException e ) {
e . printStackTrace ();
}
System . out . println ( "通过" + s + "获取详细任务的编号" + dtf . format ( LocalDateTime . now ()));
return "1314" ;
}
}). thenAccept ( new Consumer < String > () {
@Override
public void accept ( String s ) {
try {
Thread . sleep ( 2000 );
} catch ( InterruptedException e ) {
e . printStackTrace ();
}
System . out . println ( "通过" + s + "完成任务!" + dtf . format ( LocalDateTime . now ()));
}
});
System . out . println ( "是否执行完成" + future . isDone () + " 检验主线程是否被阻塞" + dtf . format ( LocalDateTime . now ()));
try {
Thread . sleep ( 7000 );
} catch ( InterruptedException e ) {
e . printStackTrace ();
}
}
}
展开
通过运行结果可以发现,既没有影响异步操作,同时还保证了串行操作。
并行执行 场景A:菜鸟攒钱买金装和风云装,只要攒够一套装备,菜鸟就出去砍人!
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
public class ThreadDemo {
private static DateTimeFormatter dtf = DateTimeFormatter . ofPattern ( "yyyy-MM-dd HH:mm:ss" );
public static void main ( String [] args ) {
CompletableFuture < String > yaFei = CompletableFuture . supplyAsync ( new Supplier < String > () {
@Override
public String get () {
try {
Thread . sleep ( 5000 );
} catch ( InterruptedException e ) {
e . printStackTrace ();
}
return "菜鸟买到金装" + dtf . format ( LocalDateTime . now ());
}
});
CompletableFuture < String > meiDuSha = CompletableFuture . supplyAsync ( new Supplier < String > () {
@Override
public String get () {
try {
Thread . sleep ( 2000 );
} catch ( InterruptedException e ) {
e . printStackTrace ();
}
return "菜鸟买到风云装" + dtf . format ( LocalDateTime . now ());
}
});
CompletableFuture < Object > anyOf = CompletableFuture . anyOf ( yaFei , meiDuSha );
anyOf . thenAccept ( new Consumer < Object > () {
@Override
public void accept ( Object o ) {
System . out . println ( o );
System . out . println ( "有装备了,菜鸟出去砍人" + dtf . format ( LocalDateTime . now ()));
}
});
System . out . println ( "检验是否阻塞主线程" + dtf . format ( LocalDateTime . now ()));
try {
Thread . sleep ( 6000 );
} catch ( InterruptedException e ) {
e . printStackTrace ();
}
}
}
展开
场景B:菜鸟攒钱买装备和符文,两套全齐了之后,菜鸟才出去砍人!
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
public class ThreadDemo {
private static DateTimeFormatter dtf = DateTimeFormatter . ofPattern ( "yyyy-MM-dd HH:mm:ss" );
public static void main ( String [] args ) {
CompletableFuture < Void > zhuangbei = CompletableFuture . runAsync ( new Runnable () {
@Override
public void run () {
try {
Thread . sleep ( 5000 );
} catch ( InterruptedException e ) {
e . printStackTrace ();
}
System . out . println ( "菜鸟买到装备" + dtf . format ( LocalDateTime . now ()));
}
});
CompletableFuture < Void > fuwen = CompletableFuture . runAsync ( new Runnable () {
@Override
public void run () {
try {
Thread . sleep ( 2000 );
} catch ( InterruptedException e ) {
e . printStackTrace ();
}
System . out . println ( "菜鸟买到符文" + dtf . format ( LocalDateTime . now ()));
}
});
CompletableFuture < Void > allOf = CompletableFuture . allOf ( zhuangbei , fuwen );
allOf . thenAccept ( new Consumer < Void > () {
@Override
public void accept ( Void aVoid ) {
System . out . println ( "菜鸟出去砍人" + dtf . format ( LocalDateTime . now ()));
}
});
System . out . println ( "检验是否阻塞主线程" + dtf . format ( LocalDateTime . now ()));
try {
Thread . sleep ( 6000 );
} catch ( InterruptedException e ) {
e . printStackTrace ();
}
}
}
展开
三、线程池 3.1 ThreadPoolExecutor 原理 ThreadPoolExecutor是java.util.convurrent提供的,以内部线程池的形式对外提供管理任务执行、线程调度、线程池管理等服务。
以jdk8为例,Executors提供的线程服务,有4个是通过ThreadPoolExecutor参数设置。这个的说明在下一节
展开
主要参数讲解
corePoolSize:核心线程个数,也就是线程池中,至少会运行corePoolSize个线程。但是在刚创建线程池的时候,线程并不会立即启动,而是在等到有任务提交时才会启动。 maxmumPoolSize:最大线程个数 keepAliveTime:非核心线程的最大空闲时间,空闲时间超过这个设定值,线程就关闭掉了。 allowCoreThreadTimeOut:默认值是false。如果改为true,则表示keepAliveTime也同样适用于核心线程,超时就会关闭,知道线程池中线程数为0 timeUnit:keepAliveTime的时间单位 workQueue:阻塞任务队列,存储等待执行线程的工作队列。BlockingQueue下面有实现类,比如说LinkedBlockingQueue,在创建任务队列时,可以给队列指定长度。 threadFactory:线程工厂,默认的工厂是DefaultThreadFactory,如果我们想实现线程池的重命名的话,可以通过自己实现一个工厂。 rejectedExecutionHandler:拒绝策略。当提交的任务数,超过了maxmumPoolSize+workQueue之和时,多余的任务会交给拒绝策略来处理。默认是AbortPolicy策略:丢弃任务,抛运行时异常。
展开
整个的执行流程,如下图
展开
poolSize、corePoolSize、maxmumPoolSize、queueCapacity之间的关系
poolSize表示线程池中非空闲的线程个数。
queueCapacity表示队列的长度
poolSize<corePoolSize时,无论是否有空闲线程,都会新建一个线程来处理新提交的任务 poolSize=corePoolSize时,又来了新的任务,且新的任务数量没有超过queueCapacity时,任务就暂时放到队列里,等待执行(前面的任务结束之后,线程空闲出来后再执行) poolSize=corePoolSize时,又来了新的任务,且新的任务数量超过了queueCapacity时poolSize<maxmumPoolSize:新创建线程来处理任务,任务执行完毕后线程就被销毁了。 poolSize=maxmumPoolSize:线程池的处理能力已经达到了极限,此时根据配置的拒绝策略来处理新增任务。
展开
任务队列 jdk8中,ThreadPoolExecutor可选用队列
LinkedBlockingQueue:常用 ,一个由链表组成的有界阻塞队列。先进先出模式,默认队列长度是Integer的最大值。 SynchronousQueue:常用 ,一个不存储元素的阻塞队列。配对通信机制,默认是非公平模式,也就是采用栈操作,后进先出;公平模式就是采用队列操作,先进先出。A线程put任务A进入队列之后,除非有一个B线程来take,否则A线程一直处于阻塞状态。如果A线程put任务A进入队列之后,B线程put任务B进入队列,默认情况下,C线程来take,会获取任务B,之后,D线程再来take,会获取任务A。 ArrayBlockingQueue:一个由数组组成的有界阻塞队列。 PriorityBlockingQueue:一个支持优先级排序的无界阻塞队列。 DelayQueue:一个使用优先级队列实现的无界阻塞队列。 LinkedTransferQueue:一个由链表结构组成的无界阻塞队列。 LinkedBlockingDeque:一个由链表结构组成的双向阻塞队列。 拒绝策略 jdk8中,ThreadPoolExecutor提供了4中拒绝策略
AbortPolicy:默认拒绝策略。丢弃任务并抛出RejectedExecutionException异常 CallerRunsPolicy:任务被拒绝添加后,会由调用execute方法的线程来执行被拒绝的任务。除非executor被关闭,否则任务不会被丢弃。简单理解谁调用谁执行 DiscardPolicy:直接丢弃任务,不抛异常 DiscardOldestPolicy:如果队列满了,新任务来了,会丢弃队列中最前面条任务,将新任务追加到队列。比如现在有1 2 3任务,来了4,丢弃1后变成2 3 4 案例:设置线程池,1个核心、LinkedBlockingQueue队列设置容量为2、2个最大线程,按顺序一次性添加七个任务。通过选择不同的拒绝策略,查看任务执行。
展开
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
public class ThreadDemo {
private static DateTimeFormatter dtf = DateTimeFormatter . ofPattern ( "yyyy-MM-dd HH:mm:ss:SSS" );
public ThreadPoolExecutor getExecutor () {
return new ThreadPoolExecutor (
1 , //核心
2 , //最大
5 , //keepAlive
TimeUnit . SECONDS , //keepAlive的单位
new LinkedBlockingDeque <> ( 2 ), //容量为2的阻塞队列
new ThreadPoolExecutor . DiscardOldestPolicy () //拒绝策略
);
}
public static void main ( String [] args ) {
ThreadPoolExecutor executor = new ThreadDemo (). getExecutor ();
new Thread ( new Runnable () {
@Override
public void run () {
Thread . currentThread (). setName ( "我添加的线程,用于打印活跃中的线程" );
while ( true ) {
System . out . println ( "=============================" );
ThreadGroup threadGroup = Thread . currentThread (). getThreadGroup ();
threadGroup . list ();
System . out . println ( "=============================" );
try {
Thread . sleep ( 5000 );
} catch ( InterruptedException e ) {
e . printStackTrace ();
}
}
}
}). start ();
for ( int i = 1 ; i <= 7 ; i ++ ) {
//String.valueOf通过源码可知,是new了一个新对象。跟直接String a=str不同的是,后者只引用了别人的地址,而前者指向了自己的数据空间
//String temp= String.valueOf(i);
//声明变量a的同时,系统给a分配了数据空间。
int temp = i ;
/**
* 为啥要一个temp变量?
* 因为execute是异步的,如果直接用i的话,可能会出现,所有的异步线程获取的i都是最终的7
*/
executor . execute (() -> {
try {
Thread . sleep ( 5000 );
} catch ( InterruptedException e ) {
e . printStackTrace ();
}
System . out . println ( Thread . currentThread (). getName () + " 执行任务" + temp + " 结束时间:" + dtf . format ( LocalDateTime . now ()));
});
}
}
}
拒绝策略为AbortPolicy
展开
拒绝策略为CallerRunsPolicy
展开
拒绝策略为DiscardPolicy
展开
拒绝策略为DiscardOldestPolicy
展开