参考文章
一、Volatile volatile的作用
保证变量的内存可见性 禁止指令重排序 1.1 保证变量的内存可见性 执行下面的程序,会发现控制台永远都不会输出 “主线程访问到 flag 变量” 这句话。我们可以看到,子线程执行1秒后已经将 flag 设置成 true,但主线程执行时没有读到 flag 的最新值,导致控制台没有输出上面的句子。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 public class VolatileExample { public static void main (String[] args) { MyThread myThread = new MyThread(); myThread.start(); for (; ; ) { if (myThread.isFlag()) { System.out.println("main线程读取到更新后的数据!" ); } } } } class MyThread extends Thread { private boolean flag = false ; @Override public void run () { try { Thread.sleep(1000 ); } catch (InterruptedException e) { e.printStackTrace(); } flag = true ; System.out.println("flag = " + flag); } public boolean isFlag () { return flag; } public void setFlag (boolean flag) { this .flag = flag; } }
但是,在将flag变量改成如下时
1 private volatile boolean flag = false ;
输出结果如下
flag是一个在main线程与myThread线程间共享的变量,在myThread线程间更新了flag的内容值,并不会在main中立即获取这个值。
此时为了确保每个线程都能读取到更新后的变量值 ,此时就需要使用volatile。
至于为啥会出现这种问题?这涉及到Java的内存模型。
在Java虚拟机中,变量的值,保存在主内存中。
当线程访问变量时,它会先获取一个副本,并保存在自己的工作内存中。
如果线程修改了变量的值,虚拟机会在某个时刻把修改后的值回写到主内存。但是这个时间是不固定的。尤其是在x86的架构下,JVM回写主内存的速度非常快,但是,换成ARM的架构,就会有显著的延迟。
这会导致如果一个线程更新了某个变量,另一个线程读取的值可能还是更新前的。
就比如myThread的flag=true,在main线程中,仍然读取到的flag是false。myThread在此刻只是将变量flag的副本变成了true,主内存中flag还是false。
此时,volatile的作用
每次修改访问变量时,总是获取到主内存的最新值 每次修改变量后,立即回写到主内存。 1.2 禁止指令重排序 有序性指的是程序执行的顺序按照代码的先后顺序执行。
1 2 3 4 int i=0 ;boolean flag=false ;i=1 ; flag=true ;
但是,在上面这个例子中,并不会顺序执行,jvm会在真正执行该段代码时进行优化,会发生指令的重排序。
并不是所有的重排序,都会与原先的顺序不一致。比如下面这个
因为jvm在进行重排序时,会考虑到数据之间的依赖性,如果语句2必须用到语句1的结果,那么处理器一定保证语句1在语句2前执行。
类似于这种的数据依赖性,在单线程环境下一点问题都没有,因为总能保证数据的正确,但是在多线程环境下,就会发生错误。
这个地方参考了很多文章,以及技术书《深入Java虚拟机》,但是看过了之后,还是不太懂,感觉给的多线程的例子都有点问题。所以这边暂时不进行补充了
二、Future 2.1 Future常用方法 一个Future接口表示一个未来可能会返回的结果,常用方法由
get():获取结果(任务耗时就会进行等待状态,类似于同步) get(long timeout, TimeUnit unit):获取结果,等待指定时间 cancel(boolean flag):取消当前任务。flag为true表示执行任务的线程应该被立马中断;false表示等待任务执行完毕再取消。 isDone():判断任务是否完成。 下面是通过Future实现一个异步任务,并且在异步任务完成之后,自动回调任务。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 public class ThreadDemo { private static final DateTimeFormatter dtf=DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss" ); static class CallableImpl implements Callable <String > { @Override public String call () throws Exception { try { Thread.sleep(10000 ); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("Callable运行结束:" +dtf.format(LocalDateTime.now())); return "Callable运行结束" ; } } public static void main (String[] args) throws ExecutionException, InterruptedException { ExecutorService executorService = Executors.newFixedThreadPool(1 ); System.out.println("线程执行开始:" +dtf.format(LocalDateTime.now())); Future<String> submit = executorService.submit(new CallableImpl()); new Thread(new Runnable() { @Override public void run () { while (!submit.isDone()){ } System.out.println("执行回调任务" +dtf.format(LocalDateTime.now())); } }).start(); System.out.println("校验异步耗时:" +dtf.format(LocalDateTime.now())); executorService.shutdown(); } }
2.2 CompletableFuture常用方法 案例 使用Future获得异步执行结果时,要么调用阻塞方法get(),要么轮询看isDone()是否为true,这两种方法都不是很好,因为主线程也会被迫等待。
从Java8开始引入了CompletableFuture,它可以传入回调对象,当异步任务完成或者发生异常时,自动调用回调对象的回调方法。
CompletableFuture常用方法
supplyAsync:表示创建带返回值的异步任务,相当于ExecutorService submit(Callable task)方法 runAsync:runAsync表示创建无返回值的异步任务,相当于ExecutorService submit(Runnable task)方法 thenApply(T,U):表示某个任务执行完成后执行的动作,即回调方法。接收任务完成后传过来的参数T,并返回类型U thenApplyAsync(T,U):与thenApply的不同之处在于,thenApplyAsync在执行任务A 和任务A结束之后的回调 ,这两个线程可能会不一样;但是thenApply执行任务和任务结束后的回调是同一个线程。 thenAccept:跟thenApply作用类似,有入参,但是没有返回值。 thenRun:参数是个Runnable接口 exceptionally:如果任务执行过程中出现异常,就会回调该方法 anyOf:任意个CompletableFuture只要有一个执行完成,该anyOf就完成。 allOf:所有的CompletableFuture全部完成,该allOf才完成。 whenComplete/handle:可自定义回调执行的用法 通过CompletableFuture完成上面使用Future完成的案例。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 public class ThreadDemo { private static final DateTimeFormatter dtf = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss" ); static class RunnableImpl implements Runnable { @Override public void run () { try { Thread.sleep(10000 ); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("Runnable运行结束!" ); } } static class CallableImpl implements Callable <String > { @Override public String call () throws Exception { try { Thread.sleep(10000 ); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("Callable运行结束:" + dtf.format(LocalDateTime.now())); return "Callable运行结束" ; } } public static void main (String[] args) throws ExecutionException, InterruptedException { CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(new Supplier<String>() { @Override public String get () { System.out.println("执行开始" + dtf.format(LocalDateTime.now())); try { Thread.sleep(10000 ); } catch (InterruptedException e) { e.printStackTrace(); } return "执行结束" + dtf.format(LocalDateTime.now()); } }); completableFuture.thenAccept(new Consumer<String>() { @Override public void accept (String s) { System.out.println(s); System.out.println("success" + dtf.format(LocalDateTime.now())); } }); completableFuture.exceptionally(new Function<Throwable, String>() { @Override public String apply (Throwable throwable) { System.out.println("failure" ); return null ; } }); System.out.println("校验异步阻塞!" ); Thread.sleep(30000 ); } }
通过运行结果的两张图,可以知道,这就是一个异步的操作,并且主线程不会进入阻塞状态,通过打印校验异步
就可以知道并没有影响到主线程继续向下执行。同时,在任务执行完毕之后,还会自动执行回调方法。
CompletableFuture优点
异步任务结束时,会自动回调执行成功时的逻辑 异步任务异常时,会自动回调执行异常时的逻辑 串行执行 如果只是实现了异步回调,其实跟Future区别也不是特别大,只是用起来方便一点。CompletableFuture更强大的功能在于,多个CompletableFuture可以串行执行。
串行执行:一个个的按顺序执行,执行完第一个,才会执行第二个。跟面向过程的含义一样。
并行执行:两个人吃两个馒头,效率高。多核cpu
并发执行:一个人吃俩馒头,这个吃一口,那个吃一口,效率低。单核cpu
举一个使用场景,就拿之前写今日校园签到脚本的例子来说
获取今日最新任务 通过任务提取任务id 通过任务id获取任务详细内容 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 public class ThreadDemo { private static final DateTimeFormatter dtf = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss" ); public static void main (String[] args) { CompletableFuture<Void> future = CompletableFuture.supplyAsync(new Supplier<String>() { @Override public String get () { try { Thread.sleep(2000 ); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("获取最新任务的编号" + dtf.format(LocalDateTime.now())); return "520" ; } }).thenApply(new Function<String, String>() { @Override public String apply (String s) { try { Thread.sleep(2000 ); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("通过" + s + "获取详细任务的编号" + dtf.format(LocalDateTime.now())); return "1314" ; } }).thenAccept(new Consumer<String>() { @Override public void accept (String s) { try { Thread.sleep(2000 ); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("通过" + s + "完成任务!" + dtf.format(LocalDateTime.now())); } }); System.out.println("是否执行完成" +future.isDone()+" 检验主线程是否被阻塞" +dtf.format(LocalDateTime.now())); try { Thread.sleep(7000 ); } catch (InterruptedException e) { e.printStackTrace(); } } }
通过运行结果可以发现,既没有影响异步操作,同时还保证了串行操作。
并行执行 场景A:菜鸟攒钱买金装和风云装,只要攒够一套装备,菜鸟就出去砍人!
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 public class ThreadDemo { private static DateTimeFormatter dtf = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss" ); public static void main (String[] args) { CompletableFuture<String> yaFei = CompletableFuture.supplyAsync(new Supplier<String>() { @Override public String get () { try { Thread.sleep(5000 ); } catch (InterruptedException e) { e.printStackTrace(); } return "菜鸟买到金装" +dtf.format(LocalDateTime.now()); } }); CompletableFuture<String> meiDuSha = CompletableFuture.supplyAsync(new Supplier<String>() { @Override public String get () { try { Thread.sleep(2000 ); } catch (InterruptedException e) { e.printStackTrace(); } return "菜鸟买到风云装" +dtf.format(LocalDateTime.now()); } }); CompletableFuture<Object> anyOf = CompletableFuture.anyOf(yaFei, meiDuSha); anyOf.thenAccept(new Consumer<Object>() { @Override public void accept (Object o) { System.out.println(o); System.out.println("有装备了,菜鸟出去砍人" +dtf.format(LocalDateTime.now())); } }); System.out.println("检验是否阻塞主线程" +dtf.format(LocalDateTime.now())); try { Thread.sleep(6000 ); } catch (InterruptedException e) { e.printStackTrace(); } } }
场景B:菜鸟攒钱买装备和符文,两套全齐了之后,菜鸟才出去砍人!
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 public class ThreadDemo { private static DateTimeFormatter dtf = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss" ); public static void main (String[] args) { CompletableFuture<Void> zhuangbei = CompletableFuture.runAsync(new Runnable() { @Override public void run () { try { Thread.sleep(5000 ); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("菜鸟买到装备" + dtf.format(LocalDateTime.now())); } }); CompletableFuture<Void> fuwen = CompletableFuture.runAsync(new Runnable() { @Override public void run () { try { Thread.sleep(2000 ); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("菜鸟买到符文" + dtf.format(LocalDateTime.now())); } }); CompletableFuture<Void> allOf = CompletableFuture.allOf(zhuangbei, fuwen); allOf.thenAccept(new Consumer<Void>() { @Override public void accept (Void aVoid) { System.out.println("菜鸟出去砍人" +dtf.format(LocalDateTime.now())); } }); System.out.println("检验是否阻塞主线程" +dtf.format(LocalDateTime.now())); try { Thread.sleep(6000 ); } catch (InterruptedException e) { e.printStackTrace(); } } }
三、线程池 3.1 ThreadPoolExecutor 原理 ThreadPoolExecutor是java.util.convurrent提供的,以内部线程池的形式对外提供管理任务执行、线程调度、线程池管理等服务。
以jdk8为例,Executors提供的线程服务,有4个是通过ThreadPoolExecutor参数设置。这个的说明在下一节
主要参数讲解
corePoolSize:核心线程个数,也就是线程池中,至少会运行corePoolSize个线程。但是在刚创建线程池的时候,线程并不会立即启动,而是在等到有任务提交时才会启动。 maxmumPoolSize:最大线程个数 keepAliveTime:非核心线程的最大空闲时间,空闲时间超过这个设定值,线程就关闭掉了。 allowCoreThreadTimeOut:默认值是false。如果改为true,则表示keepAliveTime也同样适用于核心线程,超时就会关闭,知道线程池中线程数为0 timeUnit:keepAliveTime的时间单位 workQueue:阻塞任务队列,存储等待执行线程的工作队列。BlockingQueue下面有实现类,比如说LinkedBlockingQueue,在创建任务队列时,可以给队列指定长度。 threadFactory:线程工厂,默认的工厂是DefaultThreadFactory,如果我们想实现线程池的重命名的话,可以通过自己实现一个工厂。 rejectedExecutionHandler:拒绝策略。当提交的任务数,超过了maxmumPoolSize+workQueue之和时,多余的任务会交给拒绝策略来处理。默认是AbortPolicy策略:丢弃任务,抛运行时异常。 整个的执行流程,如下图
poolSize、corePoolSize、maxmumPoolSize、queueCapacity之间的关系
poolSize表示线程池中非空闲的线程个数。
queueCapacity表示队列的长度
poolSize<corePoolSize时,无论是否有空闲线程,都会新建一个线程来处理新提交的任务 poolSize=corePoolSize时,又来了新的任务,且新的任务数量没有超过queueCapacity时,任务就暂时放到队列里,等待执行(前面的任务结束之后,线程空闲出来后再执行) poolSize=corePoolSize时,又来了新的任务,且新的任务数量超过了queueCapacity时poolSize<maxmumPoolSize:新创建线程来处理任务,任务执行完毕后线程就被销毁了。 poolSize=maxmumPoolSize:线程池的处理能力已经达到了极限,此时根据配置的拒绝策略来处理新增任务。 任务队列 jdk8中,ThreadPoolExecutor可选用队列
LinkedBlockingQueue:常用 ,一个由链表组成的有界阻塞队列。先进先出模式,默认队列长度是Integer的最大值。 SynchronousQueue:常用 ,一个不存储元素的阻塞队列。配对通信机制,默认是非公平模式,也就是采用栈操作,后进先出;公平模式就是采用队列操作,先进先出。A线程put任务A进入队列之后,除非有一个B线程来take,否则A线程一直处于阻塞状态。如果A线程put任务A进入队列之后,B线程put任务B进入队列,默认情况下,C线程来take,会获取任务B,之后,D线程再来take,会获取任务A。 ArrayBlockingQueue:一个由数组组成的有界阻塞队列。 PriorityBlockingQueue:一个支持优先级排序的无界阻塞队列。 DelayQueue:一个使用优先级队列实现的无界阻塞队列。 LinkedTransferQueue:一个由链表结构组成的无界阻塞队列。 LinkedBlockingDeque:一个由链表结构组成的双向阻塞队列。 拒绝策略 jdk8中,ThreadPoolExecutor提供了4中拒绝策略
AbortPolicy:默认拒绝策略。丢弃任务并抛出RejectedExecutionException异常 CallerRunsPolicy:任务被拒绝添加后,会由调用execute方法的线程来执行被拒绝的任务。除非executor被关闭,否则任务不会被丢弃。简单理解谁调用谁执行 DiscardPolicy:直接丢弃任务,不抛异常 DiscardOldestPolicy:如果队列满了,新任务来了,会丢弃队列中最前面条任务,将新任务追加到队列。比如现在有1 2 3任务,来了4,丢弃1后变成2 3 4 案例:设置线程池,1个核心、LinkedBlockingQueue队列设置容量为2、2个最大线程,按顺序一次性添加七个任务。通过选择不同的拒绝策略,查看任务执行。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 public class ThreadDemo { private static DateTimeFormatter dtf = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss:SSS" ); public ThreadPoolExecutor getExecutor () { return new ThreadPoolExecutor( 1 , 2 , 5 , TimeUnit.SECONDS, new LinkedBlockingDeque<>(2 ), new ThreadPoolExecutor.DiscardOldestPolicy() ); } public static void main (String[] args) { ThreadPoolExecutor executor = new ThreadDemo().getExecutor(); new Thread(new Runnable() { @Override public void run () { Thread.currentThread().setName("我添加的线程,用于打印活跃中的线程" ); while (true ) { System.out.println("=============================" ); ThreadGroup threadGroup = Thread.currentThread().getThreadGroup(); threadGroup.list(); System.out.println("=============================" ); try { Thread.sleep(5000 ); } catch (InterruptedException e) { e.printStackTrace(); } } } }).start(); for (int i = 1 ; i <= 7 ; i++) { int temp = i; executor.execute(() -> { try { Thread.sleep(5000 ); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(Thread.currentThread().getName() + " 执行任务" + temp + " 结束时间:" + dtf.format(LocalDateTime.now())); }); } } }
拒绝策略为AbortPolicy
拒绝策略为CallerRunsPolicy
拒绝策略为DiscardPolicy
拒绝策略为DiscardOldestPolicy