本文共 19582 字,大约阅读时间需要 65 分钟。
在业务开发中,有很多异步场景,为了节约时间或或者提高系统的吞吐量,要做一些异步任务,在Java中要实现异步通常都是Thread,开启一个线程Thread,开启线程有四种方式。
1)、继承Thread
2)、实现Runnable接口 3)、实现Callable接口+FutureTask(可以拿到返回结果,可以处理异常) 4)、线程池方式1和方式2:主进程无法获取线程的运算结果。不适合我们当前的场景。
方式3:主进程可以获取线程的运算结果,但是不利于控制服务器中的线程资源。 方式4:通过如下两种方式初始化线程池:Excutors.newFiexedThreadPool(3);
// 或者 new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime,TimeUnit unit, workQuene,threadFactory,handler); 通过线程池性能稳定,也可以获取执行结果,并捕获异常。但是,在业务复杂情况下,一个异步调用可能会依赖于另一个异步调用的执行结果
。
package com.atguigu.gulimall.search.thread;/** * @author: kaiyi * @create: 2020-09-04 11:19 */public class ThreadTest {public static void main(String[] args) { System.out.println("main...start..."); /** * 1)、继承Thread * Thread01 thread = new Thread01(); * thread.start(); // 启动线程 * * 2)、实现Runnable接口 * Runable01 runable01 = new Runable01() * new Thread(runable01).start(); * * 3)、实现Callable接口+FutureTask(可以拿到返回结果,可以处理异常) * FutureTaskFutureTask = new FutureTask<>(new Callable01()); * new Thread(futureTask).start(); * // 阻塞等待整个线程执行完成,获取返回结果 * Integer integer = futureTask.get(); * * 4)、线程池[ExecutorService] * 给线程池直接提交任务。 * service.execute(new Runable01()); * 创建: * ①、Excutors * ②、new ThreadPoolExecutor * * Future:可以获取到异步结果 */ Thread01 thread01 = new Thread01(); thread01.start(); // 启动线程 System.out.println("main...end..."); } public static class Thread01 extends Thread{ @Override public void run(){ System.out.println("当前线程:" + Thread.currentThread().getId()); int i = 10 / 2; System.out.println("运行结果:" + i); } }}
执行结果打印:
main...start...main...end...当前线程:10运行结果:5Process finished with exit code 0
可以看到开启线程,进入了异步,主程序已经结束,线程才打印。
降低资源的消耗
通过重复利用已经创建好的线程降低线程的创建和销毁带来的损耗提高响应速度
因为线程池中的线程数没有超过线程池的最大上限时,有的线程处于等待分配任务的状态,当任务来时无需创建新的线程就能执行。提高线程的可管理性
线程池会根据当前系统特点对池内的线程进行优化处理,减少线程创建和销毁带来的系统开销。创建线程池:
new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime,TimeUnit unit, workQuene,threadFactory,handler);
源码:
/** * Creates a new {@code ThreadPoolExecutor} with the given initial * parameters. * * @param corePoolSize the number of threads to keep in the pool, even * if they are idle, unless {@code allowCoreThreadTimeOut} is set * @param maximumPoolSize the maximum number of threads to allow in the * pool * @param keepAliveTime when the number of threads is greater than * the core, this is the maximum time that excess idle threads * will wait for new tasks before terminating. * @param unit the time unit for the {@code keepAliveTime} argument * @param workQueue the queue to use for holding tasks before they are * executed. This queue will hold only the {@code Runnable} * tasks submitted by the {@code execute} method. * @param threadFactory the factory to use when the executor * creates a new thread * @param handler the handler to use when execution is blocked * because the thread bounds and queue capacities are reached * @throws IllegalArgumentException if one of the following holds: * {@code corePoolSize < 0} * {@code keepAliveTime < 0} * {@code maximumPoolSize <= 0} * {@code maximumPoolSize < corePoolSize} * @throws NullPointerException if {@code workQueue} * or {@code threadFactory} or {@code handler} is null */ public ThreadPoolExecutor( int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueueworkQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0) throw new IllegalArgumentException(); if (workQueue == null || threadFactory == null || handler == null) throw new NullPointerException(); this.acc = System.getSecurityManager() == null ? null : AccessController.getContext(); this.corePoolSize = corePoolSize; this.maximumPoolSize = maximumPoolSize; this.workQueue = workQueue; this.keepAliveTime = unit.toNanos(keepAliveTime); this.threadFactory = threadFactory; this.handler = handler; }
创建线程池示例:
private static void threadPool(){ ThreadPoolExecutor threadPool = new ThreadPoolExecutor( 5, 200, 10L, TimeUnit.SECONDS, new LinkedBlockingDeque(10000), Executors.defaultThreadFactory(), new ThreadPoolExecutor.AbortPolicy() ); // 定时任务的线程池 ExecutorService service = Executors.newScheduledThreadPool(2); }
1、线程池创建,准备好core数量的核心线程,准备接受任务。
1.1、core满了,就将再进来的任务放入阻塞队列中。空闲的core就会自己去阻塞队列获取任务。 2、阻塞队列满了,就直接开新线程执行,最大只能开到max指定的数量。 3、max满了就用 RejectedExecutionHandler handler 拒绝任务 max都执行完成,有很多空闲,在指定的时间 keepAliveTime 以后,释放 max - core 这些线程。 new LinkedBlockingDeque<>():默认是 Integer 的最大值。会导致内存占满,需要根据自己的业务定。一个线程池 core 7, max 20, queue:50, 100并发进来怎么分配?
答:先有 7 个能直接得到执行,接下来 50 个进入阻塞队列,再多开10个线程继续执行。现在 70 个被安排上了。剩下 30 个默认拒绝策略。注意:当 core满了 不会立即创建新的线程,而是将进来的任务放入到阻塞队列中,当阻塞队列满了之后,才会直接新开线程执行,最大只能开到 Max指定的数量。
业务场景:
查询商品详情页的逻辑比较复杂,有些数据还需要远程调用,必然需要花费更多的时间。 假如商品详情页的每个查询,用户需要6.5s 后才能看到商品详情页的内容,这是显然不能接受的。 如果多个线程同时完成这个 6 步操作,也许只需要 1.5s 即可响应完成。CompletableFuture 提供了四个静态方法来创建一个异步操作。
public static CompletableFuturerunAsync(Runnable runnable);public static CompletableFuture runAsync(Runnable runnable, Executor executor);public static CompletableFuture supplyAsync(Supplier supplier) ;public static CompletableFuture supplyAsync(Supplier supplier, Executor executor);
说明:没有指定Executor的方法会使用 ForkJoinPool.commonPool() 作为它的线程池执行异步代码。如果指定线程池,则使用指定的线程池运行。
示例:
/** * CompletableFuture 提供了四个静态方法来创建一个异步操作。 * */public class CompletableFutureDemo1 { public static void main(String[] args) { /** 第一种 runAsync 不支持返回值的 */ CompletableFuturerunAsync = CompletableFuture.runAsync(() -> { System.out.println("runAsync 不支持返回值"); }).whenComplete((t,u)->{ System.out.println("runAsync 完成"); }); /** 第二种 supplyAsync 支持返回值 */ CompletableFuture supplyAsync = CompletableFuture.supplyAsync(() -> { int i = 10; return i; }).whenComplete((t, u) -> { System.out.println("supplyAsync 完成,返回值是" + t); }); }}
运行结果:
runAsync 不支持返回值 runAsync 完成 supplyAsync 完成,返回值是10当CompletableFuture的计算结果完成,或者抛出异常的时候,可以执行特定的Action。主要是下面的方法:
public CompletableFuturewhenComplete(BiConsumer action);返回相同的结果或例外,这一阶段的新completionstage,这个阶段完成时,执行特定动作的结果(或 null如果没有)和异常(或 null如果没有)这个阶段。 public CompletableFuture whenCompleteAsync(BiConsumer action);返回相同的结果或例外,这一阶段的新completionstage,这个阶段完成时,执行特定动作执行给定的操作这一阶段的默认的异步执行设施,其结果(或 null如果没有)和异常(或 null如果没有)这个阶段作为参数。public CompletableFuture whenCompleteAsync(BiConsumer action, Executor executor);返回相同的结果或例外,这一阶段的新completionstage,这个阶段完成时,执行使用所提供的遗嘱执行人,给出的行动与结果(或 null如果没有)和异常(或 null如果没有)这个阶段作为参数。 public CompletableFuture exceptionally(Function fn);返回一个新的completablefuture已经完成与给定值。
thenApply 方法:当一个线程依赖另一个线程时,获取上一个任务返回的结果,并返回当前任务的返回值。
thenAccept方法:消费处理结果。接收任务的处理结果,并消费处理,无返回结果。 thenRun方法:只要上面的任务执行完成,就开始执行thenRun,只是处理完任务后,执行 thenRun的后续操作public CompletableFuture thenApply(Function fn)public CompletableFuture thenApplyAsync(Function fn)public CompletableFuture thenApplyAsync(Function fn, Executor executor)当一个线程依赖另一个线程时,获取上一个任务返回的结果,**并返回当前任务的返回值**。(有返回值)public CompletionStagethenAccept(Consumer action);public CompletionStage thenAcceptAsync(Consumer action);public CompletionStage thenAcceptAsync(Consumer action,Executor executor);消费处理结果。接收任务的处理结果,并消费处理,**无返回结果。public CompletionStage thenRun(Runnable action);public CompletionStage thenRunAsync(Runnable action);public CompletionStage thenRunAsync(Runnable action,Executor executor);只要上面的任务执行完成,就开始执行thenRun,**只是处理完任务后,执行 thenRun的后续操作
每一个方法都对应了三种操作。带有Async默认是异步执行的。这里所谓的异步指的是不在当前线程内执行。带有参数Executor executor的则用指定的线程池方案,不指定的话则用默认的ForkJoinPool.commonPool()。
说明:(Function<? super T,? extends U> fn)两个任务必须都完成,触发该任务。
thenCombine:组合两个future,获取两个future任务的返回结果,并返回当前任务的返回值 thenAcceptBoth:组合两个future,获取两个future任务的返回结果,然后处理任务,没有返回值。 runAfterBoth:组合两个future,不需要获取future的结果,只需两个future处理完任务后,处理该任务public CompletableFuturethenCombine( CompletionStage other, BiFunction fn);public CompletableFuture thenCombineAsync( CompletionStage other, BiFunction fn);public CompletableFuture thenCombineAsync( CompletionStage other, BiFunction fn, Executor executor); 组合两个future,获取两个future任务的返回结果,并返回当前任务的返回值 public CompletableFuture thenAcceptBoth( CompletionStage other, BiConsumer action);public CompletableFuture thenAcceptBothAsync( CompletionStage other, BiConsumer action);public CompletableFuture thenAcceptBothAsync( CompletionStage other, BiConsumer action, Executor executor);组合两个future,获取两个future任务的返回结果,然后处理任务,没有返回值。public CompletableFuture runAfterBoth(CompletionStage other, Runnable action);public CompletableFuture runAfterBothAsync(CompletionStage other, Runnable action);public CompletableFuture runAfterBothAsync(CompletionStage other, Runnable action, Executor executor);组合两个future,不需要获取future的结果,只需两个future处理完任务后,处理该任务
public static CompletableFutureallOf(CompletableFuture ... cfs);public static CompletableFuture
说明:
allOf:等待所有任务完成 anyOf:只要有一个任务完成package com.atguigu.gulimall.search.thread;import java.util.concurrent.*;/** * @author: kaiyi * @create: 2020-09-04 11:19 */public class ThreadTest { public static ExecutorService executor = Executors.newFixedThreadPool(10); public static void main(String[] args) throws ExecutionException, InterruptedException { // System.out.println("main......start....."); // Thread thread = new Thread01(); // thread.start(); // System.out.println("main......end....."); // Runable01 runable01 = new Runable01(); // new Thread(runable01).start(); // FutureTaskfutureTask = new FutureTask<>(new Callable01()); // new Thread(futureTask).start(); // System.out.println(futureTask.get()); // service.execute(new Runable01()); // Future submit = service.submit(new Callable01()); // submit.get(); System.out.println("main......start....."); // CompletableFuture future = CompletableFuture.runAsync(() -> { // System.out.println("当前线程:" + Thread.currentThread().getId()); // int i = 10 / 2; // System.out.println("运行结果:" + i); // }, executor); /** * 方法完成后的处理 */ // CompletableFuture future = CompletableFuture.supplyAsync(() -> { // System.out.println("当前线程:" + Thread.currentThread().getId()); // int i = 10 / 0; // System.out.println("运行结果:" + i); // return i; // }, executor).whenComplete((res,exception) -> { // //虽然能得到异常信息,但是没法修改返回数据 // System.out.println("异步任务成功完成了...结果是:" + res + "异常是:" + exception); // }).exceptionally(throwable -> { // //可以感知异常,同时返回默认值 // return 10; // }); /** * 方法执行完后端处理 */ // CompletableFuture future = CompletableFuture.supplyAsync(() -> { // System.out.println("当前线程:" + Thread.currentThread().getId()); // int i = 10 / 2; // System.out.println("运行结果:" + i); // return i; // }, executor).handle((result,thr) -> { // if (result != null) { // return result * 2; // } // if (thr != null) { // System.out.println("异步任务成功完成了...结果是:" + result + "异常是:" + thr); // return 0; // } // return 0; // }); /** * 线程串行化 * 1、thenRunL:不能获取上一步的执行结果 * 2、thenAcceptAsync:能接受上一步结果,但是无返回值 * 3、thenApplyAsync:能接受上一步结果,有返回值 * */ CompletableFuture future = CompletableFuture.supplyAsync(() -> { System.out.println("当前线程:" + Thread.currentThread().getId()); int i = 10 / 2; System.out.println("运行结果:" + i); return i; }, executor).thenApplyAsync(res -> { System.out.println("任务2启动了..." + res); return "Hello" + res; }, executor); System.out.println("main......end....." + future.get()); } private static void threadPool() { ExecutorService threadPool = new ThreadPoolExecutor( 200, 10, 10L, TimeUnit.SECONDS, new LinkedBlockingDeque (10000), Executors.defaultThreadFactory(), new ThreadPoolExecutor.AbortPolicy() ); //定时任务的线程池 ExecutorService service = Executors.newScheduledThreadPool(2); } public static class Thread01 extends Thread { @Override public void run() { System.out.println("当前线程:" + Thread.currentThread().getId()); int i = 10 / 2; System.out.println("运行结果:" + i); } } public static class Runable01 implements Runnable { @Override public void run() { System.out.println("当前线程:" + Thread.currentThread().getId()); int i = 10 / 2; System.out.println("运行结果:" + i); } } public static class Callable01 implements Callable { @Override public Integer call() throws Exception { System.out.println("当前线程:" + Thread.currentThread().getId()); int i = 10 / 2; System.out.println("运行结果:" + i); return i; } }}
@Overridepublic SkuItemVo item(Long skuId) { SkuItemVo skuItemVo = new SkuItemVo(); // 1、sku基本信息获取 pms_sku_info SkuInfoEntity info = getById(skuId); skuItemVo.setInfo(info); Long spuId = info.getSpuId(); Long catalogId = info.getCatalogId(); //2、sku的图片信息 pms_sku_images ListimagesEntities = skuImagesService.getImagesBySkuId(skuId); skuItemVo.setImages(imagesEntities); //3、获取spu的销售属性组合 // 分析当前spu有多少个sku(pms_sku_info),然后再查sku有多少个属性组合(pms_sku_sale_attr_value) List saleAttrVos = skuSaleAttrValueService.getSaleAttrBySpuId(spuId); skuItemVo.setSaleAttr(saleAttrVos); //4、获取spu的介绍 pms_spu_info_desc SpuInfoDescEntity spuInfoDescEntity = spuInfoDescService.getById(spuId); skuItemVo.setDesc(spuInfoDescEntity); //5、获取spu的规格参数信息 List attrGroupVos = attrGroupService.getAttrGroupWithAttrsBySpuId(spuId, catalogId); skuItemVo.setGroupAttrs(attrGroupVos); return skuItemVo;}
可以看到上边的商品详情查询逻辑是按照执行顺序查询的,1查完,2才能查,2查完,3才能查,这样比较耗时,我们可以使用异步的方式来查询,上边代码的1,2是没有关系的,3、4、5必须依赖1的结果,下边就用之前学的异步编排的方式来优化代码。
要在项目中使用异步编排,需要先创建线程池配置类,然后直接使用Bean。
1、创建线程池配置类@Configurationpublic class MyThreadConfig { @Bean public ThreadPoolExecutor threadPoolExecutor(ThreadPoolConfigProperties pool) { /* 这里是写死的参数,需要从配置文件里边获取相关参数 return new ThreadPoolExecutor( 20, 200, 10, pool.getKeepAliveTime(), TimeUnit.SECONDS, new LinkedBlockingQueue<>(100000), Executors.defaultThreadFactory(), new ThreadPoolExecutor.AbortPolicy() ); */ return new ThreadPoolExecutor( pool.getCoreSize(), pool.getMaxSize(), pool.getKeepAliveTime(), TimeUnit.SECONDS, new LinkedBlockingQueue<>(100000), Executors.defaultThreadFactory(), new ThreadPoolExecutor.AbortPolicy() ); }}
2、创建线程池属性类
@ConfigurationProperties(prefix = "gulimall.thread")@Component@Datapublic class ThreadPoolConfigProperties { private Integer coreSize; private Integer maxSize; private Integer keepAliveTime;}
3、修改配置文件
添加线程池相关配置我们可以看到,IDEA编辑器会有自定义配置类的相关属性的提示,如果没有看到,则需要添加自定义配置文件依赖。 自定义配置依赖: pom.xmlgulimall.thread.core-size=20
gulimall.thread.max-size=200 gulimall.thread.keep-alive-time=10
org.springframework.boot spring-boot-configuration-processor true
@Service("skuInfoService")public class SkuInfoServiceImpl extends ServiceImplimplements SkuInfoService { @Resource private SkuImagesService skuImagesService; @Resource private SpuInfoDescService spuInfoDescService; @Resource private AttrGroupService attrGroupService; @Resource private SkuSaleAttrValueService skuSaleAttrValueService; @Autowired ThreadPoolExecutor threadPoolExecutor; // 线程池/*** 异步编排获取*/ @Override public SkuItemVo item(Long skuId) throws ExecutionException, InterruptedException { SkuItemVo skuItemVo = new SkuItemVo(); // 异步编排 CompletableFuture infoFuture = CompletableFuture.supplyAsync(() -> { // 1、sku基本信息获取 pms_sku_info SkuInfoEntity info = getById(skuId); skuItemVo.setInfo(info); // 需要将获取的值返回别,后边别人要用 return info; }, threadPoolExecutor); // 接下来做,res表示上一步的返回值 CompletableFuture saleAttrFuture = infoFuture.thenAcceptAsync((res) -> { //3、获取spu的销售属性组合 // 分析当前spu有多少个sku(pms_sku_info),然后再查sku有多少个属性组合(pms_sku_sale_attr_value) List saleAttrVos = skuSaleAttrValueService.getSaleAttrBySpuId(res.getSpuId()); skuItemVo.setSaleAttr(saleAttrVos); }, threadPoolExecutor); CompletableFuture descFuture = infoFuture.thenAcceptAsync((res) -> { //4、获取spu的介绍 pms_spu_info_desc SpuInfoDescEntity spuInfoDescEntity = spuInfoDescService.getById(res.getSpuId()); skuItemVo.setDesc(spuInfoDescEntity); }, threadPoolExecutor); CompletableFuture baseAttrFuture = infoFuture.thenAcceptAsync((res) -> { //5、获取spu的规格参数信息 List attrGroupVos = attrGroupService.getAttrGroupWithAttrsBySpuId(res.getSpuId(), res.getCatalogId()); skuItemVo.setGroupAttrs(attrGroupVos); }, threadPoolExecutor); CompletableFuture imageFuture = CompletableFuture.runAsync(() -> { //2、sku的图片信息 pms_sku_images List imagesEntities = skuImagesService.getImagesBySkuId(skuId); skuItemVo.setImages(imagesEntities); }, threadPoolExecutor); // 等到所有任务都完成,infoFuture 可以不用写 // CompletableFuture.allOf(infoFuture, saleAttrFuture, descFuture, baseAttrFuture, imageFuture).get(); CompletableFuture.allOf(saleAttrFuture, descFuture, baseAttrFuture, imageFuture).get(); return skuItemVo; }}
单元测试:
@Testpublic void itemTest() throws ExecutionException, InterruptedException { Long skuId = 1L; SkuItemVo itemVo = skuInfoService.item(skuId); System.out.println(itemVo); }
转载地址:http://ponti.baihongyu.com/