博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
异步与线程池及异步编排
阅读量:4147 次
发布时间:2019-05-25

本文共 19582 字,大约阅读时间需要 65 分钟。

文章目录

一、异步

在业务开发中,有很多异步场景,为了节约时间或或者提高系统的吞吐量,要做一些异步任务,在Java中要实现异步通常都是Thread,开启一个线程Thread,开启线程有四种方式。

1、初始化线程池的4中方式

1)、继承Thread

2)、实现Runnable接口
3)、实现Callable接口+FutureTask(可以拿到返回结果,可以处理异常)
4)、线程池

方式1和方式2:主进程无法获取线程的运算结果。不适合我们当前的场景。

方式3:主进程可以获取线程的运算结果,但是不利于控制服务器中的线程资源。
方式4:通过如下两种方式初始化线程池:

Excutors.newFiexedThreadPool(3);

// 或者
new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime,TimeUnit unit, workQuene,threadFactory,handler);
通过线程池性能稳定,也可以获取执行结果,并捕获异常。但是,在业务复杂情况下,一个异步调用可能会依赖于另一个异步调用的执行结果

2、开启线程测试

package com.atguigu.gulimall.search.thread;/** * @author: kaiyi * @create: 2020-09-04 11:19 */public class ThreadTest {public static void main(String[] args) {   System.out.println("main...start...");    /**     * 1)、继承Thread     *    Thread01 thread = new Thread01();     *     thread.start(); // 启动线程     *     * 2)、实现Runnable接口     *     Runable01 runable01 = new Runable01()     *     new Thread(runable01).start();     *     * 3)、实现Callable接口+FutureTask(可以拿到返回结果,可以处理异常)     *     FutureTask
FutureTask = new FutureTask<>(new Callable01()); * new Thread(futureTask).start(); * // 阻塞等待整个线程执行完成,获取返回结果 * Integer integer = futureTask.get(); * * 4)、线程池[ExecutorService] * 给线程池直接提交任务。 * service.execute(new Runable01()); * 创建: * ①、Excutors * ②、new ThreadPoolExecutor * * Future:可以获取到异步结果 */ Thread01 thread01 = new Thread01(); thread01.start(); // 启动线程 System.out.println("main...end..."); } public static class Thread01 extends Thread{ @Override public void run(){ System.out.println("当前线程:" + Thread.currentThread().getId()); int i = 10 / 2; System.out.println("运行结果:" + i); } }}

执行结果打印:

main...start...main...end...当前线程:10运行结果:5Process finished with exit code 0

可以看到开启线程,进入了异步,主程序已经结束,线程才打印。

二、线程池

1、开发中为什么使用线程池?

降低资源的消耗

通过重复利用已经创建好的线程降低线程的创建和销毁带来的损耗

提高响应速度

因为线程池中的线程数没有超过线程池的最大上限时,有的线程处于等待分配任务的状态,当任务来时无需创建新的线程就能执行。

提高线程的可管理性

线程池会根据当前系统特点对池内的线程进行优化处理,减少线程创建和销毁带来的系统开销。

创建线程池:

new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime,TimeUnit unit, workQuene,threadFactory,handler);

源码:

/**     * Creates a new {@code ThreadPoolExecutor} with the given initial     * parameters.     *     * @param corePoolSize the number of threads to keep in the pool, even     *        if they are idle, unless {@code allowCoreThreadTimeOut} is set     * @param maximumPoolSize the maximum number of threads to allow in the     *        pool     * @param keepAliveTime when the number of threads is greater than     *        the core, this is the maximum time that excess idle threads     *        will wait for new tasks before terminating.     * @param unit the time unit for the {@code keepAliveTime} argument     * @param workQueue the queue to use for holding tasks before they are     *        executed.  This queue will hold only the {@code Runnable}     *        tasks submitted by the {@code execute} method.     * @param threadFactory the factory to use when the executor     *        creates a new thread     * @param handler the handler to use when execution is blocked     *        because the thread bounds and queue capacities are reached     * @throws IllegalArgumentException if one of the following holds:
* {@code corePoolSize < 0}
* {@code keepAliveTime < 0}
* {@code maximumPoolSize <= 0}
* {@code maximumPoolSize < corePoolSize} * @throws NullPointerException if {@code workQueue} * or {@code threadFactory} or {@code handler} is null */ public ThreadPoolExecutor( int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue
workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0) throw new IllegalArgumentException(); if (workQueue == null || threadFactory == null || handler == null) throw new NullPointerException(); this.acc = System.getSecurityManager() == null ? null : AccessController.getContext(); this.corePoolSize = corePoolSize; this.maximumPoolSize = maximumPoolSize; this.workQueue = workQueue; this.keepAliveTime = unit.toNanos(keepAliveTime); this.threadFactory = threadFactory; this.handler = handler; }

创建线程池示例:

private static void threadPool(){    ThreadPoolExecutor threadPool = new ThreadPoolExecutor(        5,        200,        10L,        TimeUnit.SECONDS,        new LinkedBlockingDeque
(10000), Executors.defaultThreadFactory(), new ThreadPoolExecutor.AbortPolicy() ); // 定时任务的线程池 ExecutorService service = Executors.newScheduledThreadPool(2); }

2、线程池七大参数:

  • corePoolSize:[5] 核心线程数[一直存在除非(allowCoreThreadTimeOut)];线程池创建好以后就准备就绪。5个 Thread thread = new Thread(); thread.start();
  • maximumPoolSize:[200] 最大线程数量;控制资源。
  • keepAliveTime:存活时间。如果当前的线程数量大于 core数量。释放空闲的线程(maximumPoolSize - corePoolSize)。只要线程空闲的时间大于指定的keepAliveTime。
  • unit:时间单位
  • BlockingQueue workQueue:阻塞队列。如果任务有很多,就会将目前多的任务放到队列里面。只要有线程空闲,就会去队列里面取出新的任务继续执行。
  • threadFactory:线程的创建工厂。
  • RejectedExecutionHandler handler:如果队列满了,按照我们指定的拒绝策略拒绝执行任务。

3、工作顺序:

1、线程池创建,准备好core数量的核心线程,准备接受任务。

1.1、core满了,就将再进来的任务放入阻塞队列中。空闲的core就会自己去阻塞队列获取任务。
2、阻塞队列满了,就直接开新线程执行,最大只能开到max指定的数量。
3、max满了就用 RejectedExecutionHandler handler 拒绝任务
max都执行完成,有很多空闲,在指定的时间 keepAliveTime 以后,释放 max - core 这些线程。
new LinkedBlockingDeque<>():默认是 Integer 的最大值。会导致内存占满,需要根据自己的业务定。

4、面试:

一个线程池 core 7, max 20, queue:50, 100并发进来怎么分配?

答:先有 7 个能直接得到执行,接下来 50 个进入阻塞队列,再多开10个线程继续执行。现在 70 个被安排上了。剩下 30 个默认拒绝策略。

注意:当 core满了 不会立即创建新的线程,而是将进来的任务放入到阻塞队列中,当阻塞队列满了之后,才会直接新开线程执行,最大只能开到 Max指定的数量。

三、CompletableFuture异步编排

业务场景:

查询商品详情页的逻辑比较复杂,有些数据还需要远程调用,必然需要花费更多的时间。
在这里插入图片描述
假如商品详情页的每个查询,用户需要6.5s 后才能看到商品详情页的内容,这是显然不能接受的。
如果多个线程同时完成这个 6 步操作,也许只需要 1.5s 即可响应完成。

1、创建异步对象

CompletableFuture 提供了四个静态方法来创建一个异步操作。

public static CompletableFuture
runAsync(Runnable runnable);public static CompletableFuture
runAsync(Runnable runnable, Executor executor);public static
CompletableFuture supplyAsync(Supplier supplier) ;public static CompletableFuture supplyAsync(Supplier supplier, Executor executor);

说明:没有指定Executor的方法会使用 ForkJoinPool.commonPool() 作为它的线程池执行异步代码。如果指定线程池,则使用指定的线程池运行。

  • ①、runXxxx都是没有返回结果的,supplyXxx都是可以获取返回结果的
  • ②、可以传入自定义的线程池,否则就用默认的线程池。

示例:

/** * CompletableFuture 提供了四个静态方法来创建一个异步操作。 * */public class CompletableFutureDemo1 {    public static void main(String[] args) {        /** 第一种 runAsync 不支持返回值的 */        CompletableFuture
runAsync = CompletableFuture.runAsync(() -> { System.out.println("runAsync 不支持返回值"); }).whenComplete((t,u)->{ System.out.println("runAsync 完成"); }); /** 第二种 supplyAsync 支持返回值 */ CompletableFuture
supplyAsync = CompletableFuture.supplyAsync(() -> { int i = 10; return i; }).whenComplete((t, u) -> { System.out.println("supplyAsync 完成,返回值是" + t); }); }}

运行结果:

runAsync 不支持返回值
runAsync 完成
supplyAsync 完成,返回值是10

2、计算完成时回调方法

当CompletableFuture的计算结果完成,或者抛出异常的时候,可以执行特定的Action。主要是下面的方法:

public CompletableFuture
whenComplete(BiConsumer
action);返回相同的结果或例外,这一阶段的新completionstage,这个阶段完成时,执行特定动作的结果(或 null如果没有)和异常(或 null如果没有)这个阶段。 public CompletableFuture
whenCompleteAsync(BiConsumer
action);返回相同的结果或例外,这一阶段的新completionstage,这个阶段完成时,执行特定动作执行给定的操作这一阶段的默认的异步执行设施,其结果(或 null如果没有)和异常(或 null如果没有)这个阶段作为参数。public CompletableFuture
whenCompleteAsync(BiConsumer
action, Executor executor);返回相同的结果或例外,这一阶段的新completionstage,这个阶段完成时,执行使用所提供的遗嘱执行人,给出的行动与结果(或 null如果没有)和异常(或 null如果没有)这个阶段作为参数。 public CompletableFuture
exceptionally(Function
fn);返回一个新的completablefuture已经完成与给定值。

3、线程串行化方法

thenApply 方法:当一个线程依赖另一个线程时,获取上一个任务返回的结果,并返回当前任务的返回值。

thenAccept方法:消费处理结果。接收任务的处理结果,并消费处理,无返回结果。
thenRun方法:只要上面的任务执行完成,就开始执行thenRun,只是处理完任务后,执行 thenRun的后续操作

public  CompletableFuture thenApply(Function
fn)public CompletableFuture thenApplyAsync(Function
fn)public CompletableFuture thenApplyAsync(Function
fn, Executor executor)当一个线程依赖另一个线程时,获取上一个任务返回的结果,**并返回当前任务的返回值**。(有返回值)public CompletionStage
thenAccept(Consumer
action);public CompletionStage
thenAcceptAsync(Consumer
action);public CompletionStage
thenAcceptAsync(Consumer
action,Executor executor);消费处理结果。接收任务的处理结果,并消费处理,**无返回结果。public CompletionStage
thenRun(Runnable action);public CompletionStage
thenRunAsync(Runnable action);public CompletionStage
thenRunAsync(Runnable action,Executor executor);只要上面的任务执行完成,就开始执行thenRun,**只是处理完任务后,执行 thenRun的后续操作

每一个方法都对应了三种操作。带有Async默认是异步执行的。这里所谓的异步指的是不在当前线程内执行。带有参数Executor executor的则用指定的线程池方案,不指定的话则用默认的ForkJoinPool.commonPool()。

说明:(Function<? super T,? extends U> fn)

  • T : 上一个任务返回结果的类型
  • U:当前任务的返回值的类型

4、两任务组合 - 都要完成

两个任务必须都完成,触发该任务。

thenCombine:组合两个future,获取两个future任务的返回结果,并返回当前任务的返回值
thenAcceptBoth:组合两个future,获取两个future任务的返回结果,然后处理任务,没有返回值。
runAfterBoth:组合两个future,不需要获取future的结果,只需两个future处理完任务后,处理该任务

public 
CompletableFuture
thenCombine( CompletionStage
other, BiFunction
fn);public
CompletableFuture
thenCombineAsync( CompletionStage
other, BiFunction
fn);public
CompletableFuture
thenCombineAsync( CompletionStage
other, BiFunction
fn, Executor executor); 组合两个future,获取两个future任务的返回结果,并返回当前任务的返回值 public
CompletableFuture
thenAcceptBoth( CompletionStage
other, BiConsumer
action);public
CompletableFuture
thenAcceptBothAsync( CompletionStage
other, BiConsumer
action);public
CompletableFuture
thenAcceptBothAsync( CompletionStage
other, BiConsumer
action, Executor executor);组合两个future,获取两个future任务的返回结果,然后处理任务,没有返回值。public CompletableFuture
runAfterBoth(CompletionStage
other, Runnable action);public CompletableFuture
runAfterBothAsync(CompletionStage
other, Runnable action);public CompletableFuture
runAfterBothAsync(CompletionStage
other, Runnable action, Executor executor);组合两个future,不需要获取future的结果,只需两个future处理完任务后,处理该任务

5、多任务组合

public static CompletableFuture
allOf(CompletableFuture
... cfs);public static CompletableFuture
anyOf(CompletableFuture
... cfs);

说明:

allOf:等待所有任务完成
anyOf:只要有一个任务完成

四、测试代码

package com.atguigu.gulimall.search.thread;import java.util.concurrent.*;/** * @author: kaiyi * @create: 2020-09-04 11:19 */public class ThreadTest {    public static ExecutorService executor = Executors.newFixedThreadPool(10);    public static void main(String[] args) throws ExecutionException, InterruptedException {        // System.out.println("main......start.....");        // Thread thread = new Thread01();        // thread.start();        // System.out.println("main......end.....");        // Runable01 runable01 = new Runable01();        // new Thread(runable01).start();        // FutureTask
futureTask = new FutureTask<>(new Callable01()); // new Thread(futureTask).start(); // System.out.println(futureTask.get()); // service.execute(new Runable01()); // Future
submit = service.submit(new Callable01()); // submit.get(); System.out.println("main......start....."); // CompletableFuture
future = CompletableFuture.runAsync(() -> { // System.out.println("当前线程:" + Thread.currentThread().getId()); // int i = 10 / 2; // System.out.println("运行结果:" + i); // }, executor); /** * 方法完成后的处理 */ // CompletableFuture
future = CompletableFuture.supplyAsync(() -> { // System.out.println("当前线程:" + Thread.currentThread().getId()); // int i = 10 / 0; // System.out.println("运行结果:" + i); // return i; // }, executor).whenComplete((res,exception) -> { // //虽然能得到异常信息,但是没法修改返回数据 // System.out.println("异步任务成功完成了...结果是:" + res + "异常是:" + exception); // }).exceptionally(throwable -> { // //可以感知异常,同时返回默认值 // return 10; // }); /** * 方法执行完后端处理 */ // CompletableFuture
future = CompletableFuture.supplyAsync(() -> { // System.out.println("当前线程:" + Thread.currentThread().getId()); // int i = 10 / 2; // System.out.println("运行结果:" + i); // return i; // }, executor).handle((result,thr) -> { // if (result != null) { // return result * 2; // } // if (thr != null) { // System.out.println("异步任务成功完成了...结果是:" + result + "异常是:" + thr); // return 0; // } // return 0; // }); /** * 线程串行化 * 1、thenRunL:不能获取上一步的执行结果 * 2、thenAcceptAsync:能接受上一步结果,但是无返回值 * 3、thenApplyAsync:能接受上一步结果,有返回值 * */ CompletableFuture
future = CompletableFuture.supplyAsync(() -> { System.out.println("当前线程:" + Thread.currentThread().getId()); int i = 10 / 2; System.out.println("运行结果:" + i); return i; }, executor).thenApplyAsync(res -> { System.out.println("任务2启动了..." + res); return "Hello" + res; }, executor); System.out.println("main......end....." + future.get()); } private static void threadPool() { ExecutorService threadPool = new ThreadPoolExecutor( 200, 10, 10L, TimeUnit.SECONDS, new LinkedBlockingDeque
(10000), Executors.defaultThreadFactory(), new ThreadPoolExecutor.AbortPolicy() ); //定时任务的线程池 ExecutorService service = Executors.newScheduledThreadPool(2); } public static class Thread01 extends Thread { @Override public void run() { System.out.println("当前线程:" + Thread.currentThread().getId()); int i = 10 / 2; System.out.println("运行结果:" + i); } } public static class Runable01 implements Runnable { @Override public void run() { System.out.println("当前线程:" + Thread.currentThread().getId()); int i = 10 / 2; System.out.println("运行结果:" + i); } } public static class Callable01 implements Callable
{ @Override public Integer call() throws Exception { System.out.println("当前线程:" + Thread.currentThread().getId()); int i = 10 / 2; System.out.println("运行结果:" + i); return i; } }}

五、异步编排小案例

1、顺序获取详情、

@Overridepublic SkuItemVo item(Long skuId) {    SkuItemVo skuItemVo = new SkuItemVo();    // 1、sku基本信息获取 pms_sku_info    SkuInfoEntity info = getById(skuId);    skuItemVo.setInfo(info);    Long spuId = info.getSpuId();    Long catalogId = info.getCatalogId();    //2、sku的图片信息    pms_sku_images    List
imagesEntities = skuImagesService.getImagesBySkuId(skuId); skuItemVo.setImages(imagesEntities); //3、获取spu的销售属性组合 // 分析当前spu有多少个sku(pms_sku_info),然后再查sku有多少个属性组合(pms_sku_sale_attr_value) List
saleAttrVos = skuSaleAttrValueService.getSaleAttrBySpuId(spuId); skuItemVo.setSaleAttr(saleAttrVos); //4、获取spu的介绍 pms_spu_info_desc SpuInfoDescEntity spuInfoDescEntity = spuInfoDescService.getById(spuId); skuItemVo.setDesc(spuInfoDescEntity); //5、获取spu的规格参数信息 List
attrGroupVos = attrGroupService.getAttrGroupWithAttrsBySpuId(spuId, catalogId); skuItemVo.setGroupAttrs(attrGroupVos); return skuItemVo;}

可以看到上边的商品详情查询逻辑是按照执行顺序查询的,1查完,2才能查,2查完,3才能查,这样比较耗时,我们可以使用异步的方式来查询,上边代码的1,2是没有关系的,3、4、5必须依赖1的结果,下边就用之前学的异步编排的方式来优化代码。

2、异步编排获取详情

1、线程池配置

要在项目中使用异步编排,需要先创建线程池配置类,然后直接使用Bean。

1、创建线程池配置类

@Configurationpublic class MyThreadConfig {  @Bean  public ThreadPoolExecutor threadPoolExecutor(ThreadPoolConfigProperties pool) {    /*    这里是写死的参数,需要从配置文件里边获取相关参数    return new ThreadPoolExecutor(        20,        200,        10,        pool.getKeepAliveTime(),        TimeUnit.SECONDS,        new LinkedBlockingQueue<>(100000),        Executors.defaultThreadFactory(),        new ThreadPoolExecutor.AbortPolicy()    );     */    return new ThreadPoolExecutor(        pool.getCoreSize(),        pool.getMaxSize(),        pool.getKeepAliveTime(),        TimeUnit.SECONDS,        new LinkedBlockingQueue<>(100000),        Executors.defaultThreadFactory(),        new ThreadPoolExecutor.AbortPolicy()    );  }}

2、创建线程池属性类

@ConfigurationProperties(prefix = "gulimall.thread")@Component@Datapublic class ThreadPoolConfigProperties {   private Integer coreSize;   private Integer maxSize;   private Integer keepAliveTime;}

3、修改配置文件

添加线程池相关配置

gulimall.thread.core-size=20

gulimall.thread.max-size=200
gulimall.thread.keep-alive-time=10

在这里插入图片描述

我们可以看到,IDEA编辑器会有自定义配置类的相关属性的提示,如果没有看到,则需要添加自定义配置文件依赖。
自定义配置依赖:
pom.xml

org.springframework.boot
spring-boot-configuration-processor
true

2、代码优化

@Service("skuInfoService")public class SkuInfoServiceImpl extends ServiceImpl
implements SkuInfoService { @Resource private SkuImagesService skuImagesService; @Resource private SpuInfoDescService spuInfoDescService; @Resource private AttrGroupService attrGroupService; @Resource private SkuSaleAttrValueService skuSaleAttrValueService; @Autowired ThreadPoolExecutor threadPoolExecutor; // 线程池/*** 异步编排获取*/ @Override public SkuItemVo item(Long skuId) throws ExecutionException, InterruptedException { SkuItemVo skuItemVo = new SkuItemVo(); // 异步编排 CompletableFuture
infoFuture = CompletableFuture.supplyAsync(() -> { // 1、sku基本信息获取 pms_sku_info SkuInfoEntity info = getById(skuId); skuItemVo.setInfo(info); // 需要将获取的值返回别,后边别人要用 return info; }, threadPoolExecutor); // 接下来做,res表示上一步的返回值 CompletableFuture
saleAttrFuture = infoFuture.thenAcceptAsync((res) -> { //3、获取spu的销售属性组合 // 分析当前spu有多少个sku(pms_sku_info),然后再查sku有多少个属性组合(pms_sku_sale_attr_value) List
saleAttrVos = skuSaleAttrValueService.getSaleAttrBySpuId(res.getSpuId()); skuItemVo.setSaleAttr(saleAttrVos); }, threadPoolExecutor); CompletableFuture
descFuture = infoFuture.thenAcceptAsync((res) -> { //4、获取spu的介绍 pms_spu_info_desc SpuInfoDescEntity spuInfoDescEntity = spuInfoDescService.getById(res.getSpuId()); skuItemVo.setDesc(spuInfoDescEntity); }, threadPoolExecutor); CompletableFuture
baseAttrFuture = infoFuture.thenAcceptAsync((res) -> { //5、获取spu的规格参数信息 List
attrGroupVos = attrGroupService.getAttrGroupWithAttrsBySpuId(res.getSpuId(), res.getCatalogId()); skuItemVo.setGroupAttrs(attrGroupVos); }, threadPoolExecutor); CompletableFuture
imageFuture = CompletableFuture.runAsync(() -> { //2、sku的图片信息 pms_sku_images List
imagesEntities = skuImagesService.getImagesBySkuId(skuId); skuItemVo.setImages(imagesEntities); }, threadPoolExecutor); // 等到所有任务都完成,infoFuture 可以不用写 // CompletableFuture.allOf(infoFuture, saleAttrFuture, descFuture, baseAttrFuture, imageFuture).get(); CompletableFuture.allOf(saleAttrFuture, descFuture, baseAttrFuture, imageFuture).get(); return skuItemVo; }}

单元测试:

@Testpublic void itemTest() throws ExecutionException, InterruptedException {   Long skuId = 1L;   SkuItemVo itemVo = skuInfoService.item(skuId);   System.out.println(itemVo); }

转载地址:http://ponti.baihongyu.com/

你可能感兴趣的文章
linux进程监控和自动重启的简单实现
查看>>
OpenFeign学习(三):OpenFeign配置生成代理对象
查看>>
OpenFeign学习(四):OpenFeign的方法同步请求执行
查看>>
OpenFeign学习(六):OpenFign进行表单提交参数或传输文件
查看>>
Ribbon 学习(二):Spring Cloud Ribbon 加载配置原理
查看>>
Ribbon 学习(三):RestTemplate 请求负载流程解析
查看>>
深入理解HashMap
查看>>
XML生成(一):DOM生成XML
查看>>
XML生成(三):JDOM生成
查看>>
Ubuntu Could not open lock file /var/lib/dpkg/lock - open (13:Permission denied)
查看>>
collect2: ld returned 1 exit status
查看>>
C#入门
查看>>
C#中ColorDialog需点两次确定才会退出的问题
查看>>
数据库
查看>>
nginx反代 499 502 bad gateway 和timeout
查看>>
linux虚拟机安装tar.gz版jdk步骤详解
查看>>
python实现100以内自然数之和,偶数之和
查看>>
python数字逆序输出及多个print输出在同一行
查看>>
苏宁产品经理面经
查看>>
百度产品经理群面
查看>>