抱歉,您的浏览器无法访问本站
本页面需要浏览器支持(启用)JavaScript
了解详情 >

摘要:本文介绍了Spring的异步任务。

环境

Windows 10 企业版 LTSC 21H2
Java 1.8
Maven 3.6.3
Spring 5.3.23

1 配置依赖

添加依赖:

pom.xml
1
2
3
4
5
6
<!-- Spring Context -->
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-context</artifactId>
<version>5.3.23</version>
</dependency>

2 配置方式

2.1 编程式异步任务

2.1.1 XML配置

创建任务类:

java
1
2
3
4
5
6
7
8
9
10
11
12
public class DemoTask {
// 异步任务
public void asyncTask(long millis) {
System.out.println(LocalDateTime.now() + " " + Thread.currentThread().getName() + " 执行异步任务开始");
try {
Thread.sleep(millis);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
System.out.println(LocalDateTime.now() + " " + Thread.currentThread().getName() + " 执行异步任务结束");
}
}

创建配置文件:

spring.xml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd">
<!-- 任务对象 -->
<bean id="demoTask" class="com.example.task.DemoTask"/>

<!-- 任务执行器 -->
<bean id="taskExecutor" class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor">
<!-- 核心线程数 -->
<property name="corePoolSize" value="5"/>
<!-- 最大线程数 -->
<property name="maxPoolSize" value="10"/>
<!-- 队列容量 -->
<property name="queueCapacity" value="25"/>
<!-- 线程名称前缀 -->
<property name="threadNamePrefix" value="async-task-"/>
<!-- 线程池关闭时等待所有任务完成 -->
<property name="waitForTasksToCompleteOnShutdown" value="true"/>
<!-- 线程池关闭时等待所有任务完成超时时间,单位秒 -->
<property name="awaitTerminationSeconds" value="60"/>
<!-- 拒绝策略,由调用者线程执行,避免任务丢失 -->
<property name="rejectedExecutionHandler">
<bean class="java.util.concurrent.ThreadPoolExecutor$CallerRunsPolicy"/>
</property>
</bean>
</beans>

创建启动类:

java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
public class DemoApplication {
public static void main(String[] args) {
// 执行主任务
System.out.println(LocalDateTime.now() + " " + Thread.currentThread().getName() + " 执行主任务");
// 加载配置文件
ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext("spring.xml");
// 获取任务对象
DemoTask demoTask = context.getBean(DemoTask.class);
// 获取任务执行器
ThreadPoolTaskExecutor taskExecutor = context.getBean(ThreadPoolTaskExecutor.class);
// 执行异步任务
for (int i = 1; i <= 5; i++) {
long millis = i * 1000;
taskExecutor.execute(() -> demoTask.asyncTask(millis));
}
// 等待3秒,观察任务执行情况
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
// 取消异步任务,关闭线程池
taskExecutor.shutdown();
System.out.println("取消异步任务");
// 等待3秒,观察任务是否继续执行
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
// 关闭容器
context.close();
}
}

2.1.2 注解配置

修改任务类:

java
1
2
3
4
5
6
7
8
9
10
11
12
13
@Component
public class DemoTask {
// 异步任务
public void asyncTask(long millis) {
System.out.println(LocalDateTime.now() + " " + Thread.currentThread().getName() + " 执行异步任务开始");
try {
Thread.sleep(millis);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
System.out.println(LocalDateTime.now() + " " + Thread.currentThread().getName() + " 执行异步任务结束");
}
}

创建配置类:

java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
@Configuration
@ComponentScan("com.example")
public class DemoConfig {
// 任务执行器
@Bean
public ThreadPoolTaskExecutor taskExecutor() {
// 创建任务执行器
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
// 核心线程数
executor.setCorePoolSize(5);
// 最大线程数
executor.setMaxPoolSize(10);
// 队列容量
executor.setQueueCapacity(25);
// 线程名称前缀
executor.setThreadNamePrefix("async-task-");
// 线程池关闭时等待所有任务完成
executor.setWaitForTasksToCompleteOnShutdown(true);
// 线程池关闭时等待所有任务完成超时时间,单位秒
executor.setAwaitTerminationSeconds(60);
// 拒绝策略,由调用者线程执行,避免任务丢失
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
// 初始化任务执行器
executor.initialize();
// 返回任务执行器
return executor;
}
}

修改启动类:

java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
public class DemoApplication {
public static void main(String[] args) {
// 执行主任务
System.out.println(LocalDateTime.now() + " " + Thread.currentThread().getName() + " 执行主任务");
// 加载配置类
AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext(DemoConfig.class);
// 获取任务对象
DemoTask demoTask = context.getBean(DemoTask.class);
// 获取任务执行器
ThreadPoolTaskExecutor taskExecutor = context.getBean(ThreadPoolTaskExecutor.class);
// 执行异步任务
for (int i = 1; i <= 5; i++) {
long millis = i * 1000;
taskExecutor.execute(() -> demoTask.asyncTask(millis));
}
// 等待3秒,观察任务执行情况
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
// 取消异步任务,关闭线程池
taskExecutor.shutdown();
System.out.println("取消异步任务");
// 等待3秒,观察任务是否继续执行
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
// 关闭容器
context.close();
}
}

2.2 声明式异步任务

2.2.1 半注解配置

在XML配置文件中使用task:annotation-driven标签启用任务执行注解,支持通过@Async注解完成声明式异步任务的配置,代替在XML配置文件中配置异步任务。

使用@Async注解管理异步任务,可以在方法上使用,这是声明式异步任务最常用的方式。

使用@Async注解管理异步任务需要使用执行器:

  • 支持在@Async注解中指定执行器名称,这种优先级最高。
  • 如果没有指定执行器名称,会查找TaskExecutor类型的执行器,只有单个才会使用。
  • 如果没有单个TaskExecutor类型的执行器,会查找名称为taskExecutor的执行器。
  • 如果没有找到执行器,会使用SimpleAsyncTaskExecutor类型的执行器,不支持线程池配置。

创建任务类:

java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
@Component
public class DemoTask {
// 异步任务
@Async("taskExecutor")
public void asyncTask(long millis) {
System.out.println(LocalDateTime.now() + " " + Thread.currentThread().getName() + " 执行异步任务开始");
try {
Thread.sleep(millis);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
System.out.println(LocalDateTime.now() + " " + Thread.currentThread().getName() + " 执行异步任务结束");
}
}

创建配置文件:

spring.xml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:task="http://www.springframework.org/schema/task"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd
http://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task.xsd">
<!-- 启用注解扫描 -->
<context:component-scan base-package="com.example"/>

<!-- 任务执行器 -->
<bean id="taskExecutor" class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor">
<!-- 核心线程数 -->
<property name="corePoolSize" value="5"/>
<!-- 最大线程数 -->
<property name="maxPoolSize" value="10"/>
<!-- 队列容量 -->
<property name="queueCapacity" value="25"/>
<!-- 线程名称前缀 -->
<property name="threadNamePrefix" value="async-task-"/>
<!-- 线程池关闭时等待所有任务完成 -->
<property name="waitForTasksToCompleteOnShutdown" value="true"/>
<!-- 线程池关闭时等待所有任务完成超时时间,单位秒 -->
<property name="awaitTerminationSeconds" value="60"/>
<!-- 拒绝策略,由调用者线程执行,避免任务丢失 -->
<property name="rejectedExecutionHandler">
<bean class="java.util.concurrent.ThreadPoolExecutor$CallerRunsPolicy"/>
</property>
</bean>

<!-- 启用任务执行注解 -->
<task:annotation-driven executor="taskExecutor"/>
</beans>

创建启动类:

java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
public class DemoApplication {
public static void main(String[] args) {
// 执行主任务
System.out.println(LocalDateTime.now() + " " + Thread.currentThread().getName() + " 执行主任务");
// 加载配置文件
ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext("spring.xml");
// 获取任务对象
DemoTask demoTask = context.getBean(DemoTask.class);
// 获取任务执行器
ThreadPoolTaskExecutor taskExecutor = context.getBean(ThreadPoolTaskExecutor.class);
// 执行异步任务
for (int i = 1; i <= 5; i++) {
long millis = i * 1000;
demoTask.asyncTask(millis);
}
// 等待3秒,观察任务执行情况
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
// 取消异步任务,关闭线程池
taskExecutor.shutdown();
System.out.println("取消异步任务");
// 等待3秒,观察任务是否继续执行
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
// 关闭容器
context.close();
}
}

2.2.2 全注解配置

使用@EnableAsync注解启用任务执行注解,代替在XML配置文件中使用task:annotation-driven标签。

创建配置类:

java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
@Configuration
@EnableAsync
@ComponentScan("com.example")
public class DemoConfig {
// 任务执行器
@Bean
public ThreadPoolTaskExecutor taskExecutor() {
// 创建任务执行器
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
// 核心线程数
executor.setCorePoolSize(5);
// 最大线程数
executor.setMaxPoolSize(10);
// 队列容量
executor.setQueueCapacity(25);
// 线程名称前缀
executor.setThreadNamePrefix("async-task-");
// 线程池关闭时等待所有任务完成
executor.setWaitForTasksToCompleteOnShutdown(true);
// 线程池关闭时等待所有任务完成超时时间,单位秒
executor.setAwaitTerminationSeconds(60);
// 拒绝策略,由调用者线程执行,避免任务丢失
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
// 初始化任务执行器
executor.initialize();
// 返回任务执行器
return executor;
}
}

修改启动类:

java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
public class DemoApplication {
public static void main(String[] args) {
// 执行主任务
System.out.println(LocalDateTime.now() + " " + Thread.currentThread().getName() + " 执行主任务");
// 加载配置类
AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext(DemoConfig.class);
// 获取任务对象
DemoTask demoTask = context.getBean(DemoTask.class);
// 获取任务执行器
ThreadPoolTaskExecutor taskExecutor = context.getBean(ThreadPoolTaskExecutor.class);
// 执行异步任务
for (int i = 1; i <= 5; i++) {
long millis = i * 1000;
demoTask.asyncTask(millis);
}
// 等待3秒,观察任务执行情况
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
// 取消异步任务,关闭线程池
taskExecutor.shutdown();
System.out.println("取消异步任务");
// 等待3秒,观察任务是否继续执行
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
// 关闭容器
context.close();
}
}

3 获取结果

使用CompletableFuture对象作为返回值,可以获取异步任务的执行结果。

创建任务类:

java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
@Component
public class DemoTask {
// 异步任务
@Async
public CompletableFuture<String> asyncTask() {
System.out.println(LocalDateTime.now() + " " + Thread.currentThread().getName() + " 执行异步任务开始");
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
System.out.println(LocalDateTime.now() + " " + Thread.currentThread().getName() + " 执行异步任务结束");
// 返回异步任务的执行结果
return CompletableFuture.completedFuture("执行成功");
}
}

创建配置类:

java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
@Configuration
@EnableAsync
@ComponentScan("com.example")
public class DemoConfig {
// 任务执行器
@Bean
public ThreadPoolTaskExecutor taskExecutor() {
// 创建任务执行器
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
// 核心线程数
executor.setCorePoolSize(5);
// 最大线程数
executor.setMaxPoolSize(10);
// 队列容量
executor.setQueueCapacity(25);
// 线程名称前缀
executor.setThreadNamePrefix("async-task-");
// 线程池关闭时等待所有任务完成
executor.setWaitForTasksToCompleteOnShutdown(true);
// 线程池关闭时等待所有任务完成超时时间,单位秒
executor.setAwaitTerminationSeconds(60);
// 拒绝策略,由调用者线程执行,避免任务丢失
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
// 初始化任务执行器
executor.initialize();
// 返回任务执行器
return executor;
}
}

创建启动类:

java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public class DemoApplication {
public static void main(String[] args) {
// 执行主任务
System.out.println(LocalDateTime.now() + " " + Thread.currentThread().getName() + " 执行主任务");
// 加载配置类
AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext(DemoConfig.class);
// 获取任务对象
DemoTask demoTask = context.getBean(DemoTask.class);
// 执行异步任务
CompletableFuture<String> future = demoTask.asyncTask();
// 获取异步任务执行结果
String result;
try {
result = future.get();
System.out.println(LocalDateTime.now() + " " + Thread.currentThread().getName() + " 执行异步任务结果:" + result);
} catch (Exception e) {
System.out.println(LocalDateTime.now() + " " + Thread.currentThread().getName() + " 执行异步任务异常:" + e.getCause().getMessage());
}
// 关闭容器
context.close();
}
}

4 异常处理

4.1 有返回值

如果异步任务有返回值,可以在获取结果时处理异常。

创建任务类:

java
1
2
3
4
5
6
7
8
@Component
public class DemoTask {
// 异步任务
@Async
public CompletableFuture<String> asyncTask() {
throw new RuntimeException("异步任务执行失败");
}
}

创建配置类:

java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
@Configuration
@EnableAsync
@ComponentScan("com.example")
public class DemoConfig {
// 任务执行器
@Bean
public ThreadPoolTaskExecutor taskExecutor() {
// 创建任务执行器
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
// 核心线程数
executor.setCorePoolSize(5);
// 最大线程数
executor.setMaxPoolSize(10);
// 队列容量
executor.setQueueCapacity(25);
// 线程名称前缀
executor.setThreadNamePrefix("async-task-");
// 线程池关闭时等待所有任务完成
executor.setWaitForTasksToCompleteOnShutdown(true);
// 线程池关闭时等待所有任务完成超时时间,单位秒
executor.setAwaitTerminationSeconds(60);
// 拒绝策略,由调用者线程执行,避免任务丢失
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
// 初始化任务执行器
executor.initialize();
// 返回任务执行器
return executor;
}
}

创建启动类:

java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public class DemoApplication {
public static void main(String[] args) {
// 执行主任务
System.out.println(LocalDateTime.now() + " " + Thread.currentThread().getName() + " 执行主任务");
// 加载配置类
AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext(DemoConfig.class);
// 获取任务对象
DemoTask demoTask = context.getBean(DemoTask.class);
// 执行异步任务
CompletableFuture<String> future = demoTask.asyncTask();
// 获取异步任务执行结果
String result;
try {
result = future.get();
System.out.println(LocalDateTime.now() + " " + Thread.currentThread().getName() + " 执行异步任务结果:" + result);
} catch (Exception e) {
System.out.println(LocalDateTime.now() + " " + Thread.currentThread().getName() + " 执行异步任务异常:" + e.getCause().getMessage());
}
// 关闭容器
context.close();
}
}

4.2 无返回值

如果异步任务无返回值,需要实现AsyncConfigurer接口自定义异常处理。

创建任务类:

java
1
2
3
4
5
6
7
8
@Component
public class DemoTask {
// 异步任务
@Async
public void asyncTask() {
throw new RuntimeException("异步任务执行失败");
}
}

创建配置类:

java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
@Configuration
@EnableAsync
@ComponentScan("com.example")
public class DemoConfig implements AsyncConfigurer {
// 任务执行器
@Bean
public ThreadPoolTaskExecutor taskExecutor() {
// 创建任务执行器
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
// 核心线程数
executor.setCorePoolSize(5);
// 最大线程数
executor.setMaxPoolSize(10);
// 队列容量
executor.setQueueCapacity(25);
// 线程名称前缀
executor.setThreadNamePrefix("async-task-");
// 线程池关闭时等待所有任务完成
executor.setWaitForTasksToCompleteOnShutdown(true);
// 线程池关闭时等待所有任务完成超时时间,单位秒
executor.setAwaitTerminationSeconds(60);
// 拒绝策略,由调用者线程执行,避免任务丢失
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
// 初始化任务执行器
executor.initialize();
// 返回任务执行器
return executor;
}
// 异步任务异常处理
@Override
public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
return (ex, method, params) -> {
System.err.println("异步方法执行异常:");
System.err.println("方法名称: " + method.getName());
System.err.println("方法参数: " + Arrays.asList(params));
System.err.println("异常信息: " + ex.getMessage());
ex.printStackTrace();
};
}
}

创建启动类:

java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
public class DemoApplication {
public static void main(String[] args) {
// 执行主任务
System.out.println(LocalDateTime.now() + " " + Thread.currentThread().getName() + " 执行主任务");
// 加载配置类
AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext(DemoConfig.class);
// 获取任务对象
DemoTask demoTask = context.getBean(DemoTask.class);
// 执行异步任务
demoTask.asyncTask();
// 关闭容器
context.close();
}
}

5 失效情况

异步任务失效情况:

  • 只有外部的方法调用被AOP拦截才会异步执行,同一个类内部的方法调用不会异步执行。
  • 使用@Async注解标记的方法必须是public方法。
  • 使用@Async注解必须在配置类中使用@EnableAsync注解开启基于注解的异步任务。

评论