@Async
アノテーションとは
非同期に処理を実行できるようにする仕組み。
参考 7. Task Execution and Scheduling
参考 Spring MVC(+Spring Boot)上での非同期リクエストを理解する -前編-
簡単なサンプル
pom.xml
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.0.2.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
非同期にする処理に@Async
を付与する。(@Async
はメソッド単位でもクラス単位でも付与できる)
@Slf4j
@Component
class HeavyJob {
@Async
public void execute() {
log.info("before heavy task");
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
log.info("after heavy task");
}
}
@EnableAsync
で、非同期処理を有効化する。 @EnableAsync
アノテーションをつけると有効になる仕組みは、前に書きました。
参照 アノテーションを処理するには
@EnableAsync
@SpringBootApplication
@Slf4j
@RequiredArgsConstructor
public class DemoApplication {
public static void main(String[] args) {
SpringApplication.run(DemoApplication.class, args);
}
private final HeavyJob heavyJob;
@Bean
public CommandLineRunner getCommandLineRunner() {
return args -> {
log.info("before heavyJob.execute()");
heavyJob.execute();
log.info("after heavyJob.execute()");
};
}
}
すると、以下のように別スレッドで実行される。
2018-06-02 13:20:17.559 INFO 16620 --- [ main] com.example.demo.DemoApplication : before heavyJob.execute()
2018-06-02 13:20:17.561 INFO 16620 --- [ main] .s.a.AnnotationAsyncExecutionInterceptor : No task executor bean found for async processing: no bean of type TaskExecutor and no bean named 'taskExecutor' either
2018-06-02 13:20:17.562 INFO 16620 --- [ main] com.example.demo.DemoApplication : after heavyJob.execute()
2018-06-02 13:20:17.564 INFO 16620 --- [cTaskExecutor-1] com.example.demo.HeavyJob : before heavy task
2018-06-02 13:20:18.564 INFO 16620 --- [cTaskExecutor-1] com.example.demo.HeavyJob : after heavy task
注意点
スレッドの生成
デフォルトのスレッド生成クラスはSimpleAsyncTaskExecutorで、要求ごとにスレッドを生成する。そのため、ThreadPoolTaskExecutorなどを利用して、スレッドを生成しすぎないように、また、再利用するように設定すべき。
参考 7.2.1. TaskExecutor types
参考 Spring MVC(+Spring Boot)上での非同期リクエストを理解する -前編-
例外ハンドリング
デフォルトだと、@Async
メソッドで発生した例外は例外トレースを出力するだけ。
@Slf4j
@Component
@RequiredArgsConstructor
class HeavyJob {
@Async
public void execute() {
log.info("before exception");
throw new RuntimeException("oops");
}
}
以下のように、例外トレースが出力される。
2018-06-02 13:23:19.098 INFO 19120 --- [ main] com.example.demo.DemoApplication : before heavyJob.execute()
2018-06-02 13:23:19.100 INFO 19120 --- [ main] .s.a.AnnotationAsyncExecutionInterceptor : No task executor bean found for async processing: no bean of type TaskExecutor and no bean named 'taskExecutor' either
2018-06-02 13:23:19.102 INFO 19120 --- [ main] com.example.demo.DemoApplication : after heavyJob.execute()
2018-06-02 13:23:19.104 INFO 19120 --- [cTaskExecutor-1] com.example.demo.HeavyJob : before exception
2018-06-02 13:23:19.107 ERROR 19120 --- [cTaskExecutor-1] .a.i.SimpleAsyncUncaughtExceptionHandler : Unexpected error occurred invoking async method 'public void com.example.demo.HeavyJob.execute()'.
java.lang.RuntimeException: oops
at com.example.demo.HeavyJob.execute(HeavyJob.java:16) ~[classes/:na]
at com.example.demo.HeavyJob$$FastClassBySpringCGLIB$$bbd7ff10.invoke(<generated>) ~[classes/:na]
at org.springframework.cglib.proxy.MethodProxy.invoke(MethodProxy.java:204) ~[spring-core-5.0.6.RELEASE.jar:5.0.6.RELEASE]
at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.invokeJoinpoint(CglibAopProxy.java:746) ~[spring-aop-5.0.6.RELEASE.jar:5.0.6.RELEASE]
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163) ~[spring-aop-5.0.6.RELEASE.jar:5.0.6.RELEASE]
at org.springframework.aop.interceptor.AsyncExecutionInterceptor.lambda$invoke$0(AsyncExecutionInterceptor.java:115) ~[spring-aop-5.0.6.RELEASE.jar:5.0.6.RELEASE]
at org.springframework.aop.interceptor.AsyncExecutionInterceptor$$Lambda$313/1027200.call(Unknown Source) [spring-aop-5.0.6.RELEASE.jar:5.0.6.RELEASE]
at java.util.concurrent.FutureTask.run(FutureTask.java:266) [na:1.8.0_25]
at java.lang.Thread.run(Thread.java:745) [na:1.8.0_25]
独自のハンドリングを入れたければ、AsyncUncaughtExceptionHandler を実装する。
@Slf4j
@Configuration
public class MyAsyncConfig extends AsyncConfigurerSupport {
@Override
public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
return (ex, method, params) -> {
log.info("-----------------");
log.info("handle exception");
log.info("-----------------");
};
}
}
2018-06-02 13:25:07.260 INFO 9428 --- [ main] com.example.demo.DemoApplication : before heavyJob.execute()
2018-06-02 13:25:07.262 INFO 9428 --- [ main] .s.a.AnnotationAsyncExecutionInterceptor : No task executor bean found for async processing: no bean of type TaskExecutor and no bean named 'taskExecutor' either
2018-06-02 13:25:07.263 INFO 9428 --- [ main] com.example.demo.DemoApplication : after heavyJob.execute()
2018-06-02 13:25:07.265 INFO 9428 --- [cTaskExecutor-1] com.example.demo.HeavyJob : before exception
2018-06-02 13:25:07.265 INFO 9428 --- [cTaskExecutor-1] com.example.demo.MyAsyncConfig : -----------------
2018-06-02 13:25:07.265 INFO 9428 --- [cTaskExecutor-1] com.example.demo.MyAsyncConfig : handle exception
2018-06-02 13:25:07.265 INFO 9428 --- [cTaskExecutor-1] com.example.demo.MyAsyncConfig : -----------------
スレッドローカルへのアクセス
@Async
を付けたメソッドは別スレッドで実行されるため、スレッドローカルで管理している値は参照できない。 例えば、@RequestScope
のBeanや@SessionScope
のBeanはDIできないし、RequestContextHolderも利用できない。 どうしても処理で使いたいなら、メソッド引数として渡す必要がある。
2018-06-02 13:30:24.631 ERROR 17784 --- [cTaskExecutor-1] .a.i.SimpleAsyncUncaughtExceptionHandler : Unexpected error occurred invoking async method 'public void com.example.demo.HeavyJob.execute()'.
org.springframework.beans.factory.BeanCreationException: Error creating bean with name 'scopedTarget.user': Scope 'request' is not active for the current thread; consider defining a scoped proxy for this bean if you intend to refer to it from a singleton; nested exception is java.lang.IllegalStateException: No thread-bound request found: Are you referring to request attributes outside of an actual web request, or processing a request outside of the originally receiving thread? If you are actually operating within a web request and still receive this message, your code is probably running outside of DispatcherServlet/DispatcherPortlet: In this case, use RequestContextListener or RequestContextFilter to expose the current request.
at org.springframework.beans.factory.support.AbstractBeanFactory.doGetBean(AbstractBeanFactory.java:362) ~[spring-beans-5.0.6.RELEASE.jar:5.0.6.RELEASE]
at org.springframework.beans.factory.support.AbstractBeanFactory.getBean(AbstractBeanFactory.java:199) ~[spring-beans-5.0.6.RELEASE.jar:5.0.6.RELEASE]
at org.springframework.aop.target.SimpleBeanTargetSource.getTarget(SimpleBeanTargetSource.java:35) ~[spring-aop-5.0.6.RELEASE.jar:5.0.6.RELEASE]
at org.springframework.aop.framework.CglibAopProxy$DynamicAdvisedInterceptor.intercept(CglibAopProxy.java:672) ~[spring-aop-5.0.6.RELEASE.jar:5.0.6.RELEASE]
at com.example.demo.User$$EnhancerBySpringCGLIB$$da7d348c.getName(<generated>) ~[classes/:na]
at com.example.demo.HeavyJob.execute(HeavyJob.java:19) ~[classes/:na]
at com.example.demo.HeavyJob$$FastClassBySpringCGLIB$$bbd7ff10.invoke(<generated>) ~[classes/:na]
at org.springframework.cglib.proxy.MethodProxy.invoke(MethodProxy.java:204) ~[spring-core-5.0.6.RELEASE.jar:5.0.6.RELEASE]
at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.invokeJoinpoint(CglibAopProxy.java:746) ~[spring-aop-5.0.6.RELEASE.jar:5.0.6.RELEASE]
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163) ~[spring-aop-5.0.6.RELEASE.jar:5.0.6.RELEASE]
at org.springframework.aop.interceptor.AsyncExecutionInterceptor.lambda$invoke$0(AsyncExecutionInterceptor.java:115) ~[spring-aop-5.0.6.RELEASE.jar:5.0.6.RELEASE]
at org.springframework.aop.interceptor.AsyncExecutionInterceptor$$Lambda$321/1810268094.call(Unknown Source) [spring-aop-5.0.6.RELEASE.jar:5.0.6.RELEASE]
at java.util.concurrent.FutureTask.run(FutureTask.java:266) [na:1.8.0_25]
at java.lang.Thread.run(Thread.java:745) [na:1.8.0_25]
Caused by: java.lang.IllegalStateException: No thread-bound request found: Are you referring to request attributes outside of an actual web request, or processing a request outside of the originally receiving thread? If you are actually operating within a web request and still receive this message, your code is probably running outside of DispatcherServlet/DispatcherPortlet: In this case, use RequestContextListener or RequestContextFilter to expose the current request.
at org.springframework.web.context.request.RequestContextHolder.currentRequestAttributes(RequestContextHolder.java:131) ~[spring-web-5.0.6.RELEASE.jar:5.0.6.RELEASE]
at org.springframework.web.context.request.AbstractRequestAttributesScope.get(AbstractRequestAttributesScope.java:42) ~[spring-web-5.0.6.RELEASE.jar:5.0.6.RELEASE]
at org.springframework.beans.factory.support.AbstractBeanFactory.doGetBean(AbstractBeanFactory.java:350) ~[spring-beans-5.0.6.RELEASE.jar:5.0.6.RELEASE]
... 13 common frames omitted
トランザクション
DataSourceTransactionManagerはスレッドごとにJDBCコネクションを管理するため、@Async
でトランザクションが分かれる。
参考 DataSourceTransactionManager
Binds a JDBC Connection from the specified DataSource to the current thread, potentially allowing for one thread-bound Connection per DataSource. Note: The DataSource that this transaction manager operates on needs to return independent Connections. The Connections may come from a pool (the typical case), but the DataSource must not return thread-scoped / request-scoped Connections or the like. This transaction manager will associate Connections with thread-bound transactions itself, according to the specified propagation behavior. It assumes that a separate, independent Connection can be obtained even during an ongoing transaction.
検証する
pom.xml
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.0.2.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
- 「タスクを登録する」ボタンを押すと、サーバ側でDBに0-10件のランダムなタスクを登録する。
- サーバ側では、
@Async
アノテーションをつけたクラスを用意し、非同期にタスクを消化する。 - 非同期処理を開始したら、クライアントにレスポンスをとりあえず返す
- タスクは1sごとに1件消化し、その都度コミットする。
- dbはh2を利用する。
h2のデフォルトのisolationレベルは「READ COMMITTED」
- Txがコミットされたタイミングで、別のTxから参照可能になる
Springの
@Transactional
のpropagationはREQUIRED- Txが既に開始されていれば、それに参加する
- JDBCコネクションの管理がスレッドごとなら、別Txになるはず
したがって、画面から見たときにリアルタイムに更新されれば、Txが呼び出し元とは分かれていると言えるはず。
ソースコード
登録と非同期更新部分を抜粋。全ソースコードはgithubを参照してください。
参考 github
@Slf4j
@RestController
@AllArgsConstructor
@RequestMapping("api/tasks")
public class TaskRestController {
final TaskService service;
...
@PostMapping
public Task execute() {
Task task = service.register();
service.execute(task.getId());
return task;
}
}
非同期に、1sごとに1タスクを消化する。
@Slf4j
@Service
@AllArgsConstructor
public class TaskServiceImpl implements TaskService {
...
@Override
@Transactional
public void execute(int id) {
Task task = taskMapper.findOne(id);
log.info(task.toString());
// 1sごとにtaskを1こずつ消化していく
while (task.getDone() < task.getAmount()) {
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
service.execute(task);
}
}
}
メソッドに@Transactional
をつけ、親Txに参加するかどうか調べる。
@Slf4j
@Service
@AllArgsConstructor
public class ExecuteServiceImpl implements ExecuteService {
final TaskMapper taskMapper;
@Override
@Async
@Transactional
public void execute(Task task) {
task.setDone(task.getDone() + 1);
taskMapper.update(task);
log.info(task.toString());
}
}
検証結果
@Async
でTxが分かれる。