SIer だけど技術やりたいブログ

Spring @Asyncで非同期処理をするときの注意点

Java Spring

@Asyncアノテーションとは

非同期に処理を実行できるようにする仕組み。
参考 7. Task Execution and Scheduling
参考 Spring MVC(+Spring Boot)上での非同期リクエストを理解する -前編-

簡単なサンプル

pom.xml

 <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.0.2.RELEASE</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>

非同期にする処理に@Asyncを付与する。(@Asyncはメソッド単位でもクラス単位でも付与できる)

@Slf4j
@Component
class HeavyJob {

    @Async
    public void execute() {
        log.info("before heavy task");
        try {
            TimeUnit.SECONDS.sleep(1);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        log.info("after heavy task");
    }
}

@EnableAsyncで、非同期処理を有効化する。 @EnableAsyncアノテーションをつけると有効になる仕組みは、前に書きました。
参照 アノテーションを処理するには

@EnableAsync
@SpringBootApplication
@Slf4j
@RequiredArgsConstructor
public class DemoApplication {

    public static void main(String[] args) {
        SpringApplication.run(DemoApplication.class, args);
    }

    private final HeavyJob heavyJob;

    @Bean
    public CommandLineRunner getCommandLineRunner() {
        return args -> {
            log.info("before heavyJob.execute()");
            heavyJob.execute();
            log.info("after heavyJob.execute()");
        };
    }

}

すると、以下のように別スレッドで実行される。

2018-06-02 13:20:17.559  INFO 16620 --- [           main] com.example.demo.DemoApplication         : before heavyJob.execute()
2018-06-02 13:20:17.561  INFO 16620 --- [           main] .s.a.AnnotationAsyncExecutionInterceptor : No task executor bean found for async processing: no bean of type TaskExecutor and no bean named 'taskExecutor' either
2018-06-02 13:20:17.562  INFO 16620 --- [           main] com.example.demo.DemoApplication         : after heavyJob.execute()
2018-06-02 13:20:17.564  INFO 16620 --- [cTaskExecutor-1] com.example.demo.HeavyJob                : before heavy task
2018-06-02 13:20:18.564  INFO 16620 --- [cTaskExecutor-1] com.example.demo.HeavyJob                : after heavy task

注意点

スレッドの生成

デフォルトのスレッド生成クラスはSimpleAsyncTaskExecutorで、要求ごとにスレッドを生成する。そのため、ThreadPoolTaskExecutorなどを利用して、スレッドを生成しすぎないように、また、再利用するように設定すべき。
参考 7.2.1. TaskExecutor types
参考 Spring MVC(+Spring Boot)上での非同期リクエストを理解する -前編-

例外ハンドリング

デフォルトだと、@Asyncメソッドで発生した例外は例外トレースを出力するだけ。

@Slf4j
@Component
@RequiredArgsConstructor
class HeavyJob {

    @Async
    public void execute() {
        log.info("before exception");
        throw new RuntimeException("oops");
    }
}

以下のように、例外トレースが出力される。

2018-06-02 13:23:19.098  INFO 19120 --- [           main] com.example.demo.DemoApplication         : before heavyJob.execute()
2018-06-02 13:23:19.100  INFO 19120 --- [           main] .s.a.AnnotationAsyncExecutionInterceptor : No task executor bean found for async processing: no bean of type TaskExecutor and no bean named 'taskExecutor' either
2018-06-02 13:23:19.102  INFO 19120 --- [           main] com.example.demo.DemoApplication         : after heavyJob.execute()
2018-06-02 13:23:19.104  INFO 19120 --- [cTaskExecutor-1] com.example.demo.HeavyJob                : before exception
2018-06-02 13:23:19.107 ERROR 19120 --- [cTaskExecutor-1] .a.i.SimpleAsyncUncaughtExceptionHandler : Unexpected error occurred invoking async method 'public void com.example.demo.HeavyJob.execute()'.

java.lang.RuntimeException: oops
    at com.example.demo.HeavyJob.execute(HeavyJob.java:16) ~[classes/:na]
    at com.example.demo.HeavyJob$$FastClassBySpringCGLIB$$bbd7ff10.invoke(<generated>) ~[classes/:na]
    at org.springframework.cglib.proxy.MethodProxy.invoke(MethodProxy.java:204) ~[spring-core-5.0.6.RELEASE.jar:5.0.6.RELEASE]
    at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.invokeJoinpoint(CglibAopProxy.java:746) ~[spring-aop-5.0.6.RELEASE.jar:5.0.6.RELEASE]
    at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163) ~[spring-aop-5.0.6.RELEASE.jar:5.0.6.RELEASE]
    at org.springframework.aop.interceptor.AsyncExecutionInterceptor.lambda$invoke$0(AsyncExecutionInterceptor.java:115) ~[spring-aop-5.0.6.RELEASE.jar:5.0.6.RELEASE]
    at org.springframework.aop.interceptor.AsyncExecutionInterceptor$$Lambda$313/1027200.call(Unknown Source) [spring-aop-5.0.6.RELEASE.jar:5.0.6.RELEASE]
    at java.util.concurrent.FutureTask.run(FutureTask.java:266) [na:1.8.0_25]
    at java.lang.Thread.run(Thread.java:745) [na:1.8.0_25]

独自のハンドリングを入れたければ、AsyncUncaughtExceptionHandler を実装する。

@Slf4j
@Configuration
public class MyAsyncConfig extends AsyncConfigurerSupport {
    @Override
    public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
        return (ex, method, params) -> {
            log.info("-----------------");
            log.info("handle exception");
            log.info("-----------------");
        };
    }
}
2018-06-02 13:25:07.260  INFO 9428 --- [           main] com.example.demo.DemoApplication         : before heavyJob.execute()
2018-06-02 13:25:07.262  INFO 9428 --- [           main] .s.a.AnnotationAsyncExecutionInterceptor : No task executor bean found for async processing: no bean of type TaskExecutor and no bean named 'taskExecutor' either
2018-06-02 13:25:07.263  INFO 9428 --- [           main] com.example.demo.DemoApplication         : after heavyJob.execute()
2018-06-02 13:25:07.265  INFO 9428 --- [cTaskExecutor-1] com.example.demo.HeavyJob                : before exception
2018-06-02 13:25:07.265  INFO 9428 --- [cTaskExecutor-1] com.example.demo.MyAsyncConfig           : -----------------
2018-06-02 13:25:07.265  INFO 9428 --- [cTaskExecutor-1] com.example.demo.MyAsyncConfig           : handle exception
2018-06-02 13:25:07.265  INFO 9428 --- [cTaskExecutor-1] com.example.demo.MyAsyncConfig           : -----------------

スレッドローカルへのアクセス

@Asyncを付けたメソッドは別スレッドで実行されるため、スレッドローカルで管理している値は参照できない。 例えば、@RequestScopeのBeanや@SessionScopeのBeanはDIできないし、RequestContextHolderも利用できない。 どうしても処理で使いたいなら、メソッド引数として渡す必要がある。

2018-06-02 13:30:24.631 ERROR 17784 --- [cTaskExecutor-1] .a.i.SimpleAsyncUncaughtExceptionHandler : Unexpected error occurred invoking async method 'public void com.example.demo.HeavyJob.execute()'.

org.springframework.beans.factory.BeanCreationException: Error creating bean with name 'scopedTarget.user': Scope 'request' is not active for the current thread; consider defining a scoped proxy for this bean if you intend to refer to it from a singleton; nested exception is java.lang.IllegalStateException: No thread-bound request found: Are you referring to request attributes outside of an actual web request, or processing a request outside of the originally receiving thread? If you are actually operating within a web request and still receive this message, your code is probably running outside of DispatcherServlet/DispatcherPortlet: In this case, use RequestContextListener or RequestContextFilter to expose the current request.
    at org.springframework.beans.factory.support.AbstractBeanFactory.doGetBean(AbstractBeanFactory.java:362) ~[spring-beans-5.0.6.RELEASE.jar:5.0.6.RELEASE]
    at org.springframework.beans.factory.support.AbstractBeanFactory.getBean(AbstractBeanFactory.java:199) ~[spring-beans-5.0.6.RELEASE.jar:5.0.6.RELEASE]
    at org.springframework.aop.target.SimpleBeanTargetSource.getTarget(SimpleBeanTargetSource.java:35) ~[spring-aop-5.0.6.RELEASE.jar:5.0.6.RELEASE]
    at org.springframework.aop.framework.CglibAopProxy$DynamicAdvisedInterceptor.intercept(CglibAopProxy.java:672) ~[spring-aop-5.0.6.RELEASE.jar:5.0.6.RELEASE]
    at com.example.demo.User$$EnhancerBySpringCGLIB$$da7d348c.getName(<generated>) ~[classes/:na]
    at com.example.demo.HeavyJob.execute(HeavyJob.java:19) ~[classes/:na]
    at com.example.demo.HeavyJob$$FastClassBySpringCGLIB$$bbd7ff10.invoke(<generated>) ~[classes/:na]
    at org.springframework.cglib.proxy.MethodProxy.invoke(MethodProxy.java:204) ~[spring-core-5.0.6.RELEASE.jar:5.0.6.RELEASE]
    at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.invokeJoinpoint(CglibAopProxy.java:746) ~[spring-aop-5.0.6.RELEASE.jar:5.0.6.RELEASE]
    at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163) ~[spring-aop-5.0.6.RELEASE.jar:5.0.6.RELEASE]
    at org.springframework.aop.interceptor.AsyncExecutionInterceptor.lambda$invoke$0(AsyncExecutionInterceptor.java:115) ~[spring-aop-5.0.6.RELEASE.jar:5.0.6.RELEASE]
    at org.springframework.aop.interceptor.AsyncExecutionInterceptor$$Lambda$321/1810268094.call(Unknown Source) [spring-aop-5.0.6.RELEASE.jar:5.0.6.RELEASE]
    at java.util.concurrent.FutureTask.run(FutureTask.java:266) [na:1.8.0_25]
    at java.lang.Thread.run(Thread.java:745) [na:1.8.0_25]
Caused by: java.lang.IllegalStateException: No thread-bound request found: Are you referring to request attributes outside of an actual web request, or processing a request outside of the originally receiving thread? If you are actually operating within a web request and still receive this message, your code is probably running outside of DispatcherServlet/DispatcherPortlet: In this case, use RequestContextListener or RequestContextFilter to expose the current request.
    at org.springframework.web.context.request.RequestContextHolder.currentRequestAttributes(RequestContextHolder.java:131) ~[spring-web-5.0.6.RELEASE.jar:5.0.6.RELEASE]
    at org.springframework.web.context.request.AbstractRequestAttributesScope.get(AbstractRequestAttributesScope.java:42) ~[spring-web-5.0.6.RELEASE.jar:5.0.6.RELEASE]
    at org.springframework.beans.factory.support.AbstractBeanFactory.doGetBean(AbstractBeanFactory.java:350) ~[spring-beans-5.0.6.RELEASE.jar:5.0.6.RELEASE]
    ... 13 common frames omitted

トランザクション

DataSourceTransactionManagerはスレッドごとにJDBCコネクションを管理するため、@Asyncでトランザクションが分かれる。

参考 DataSourceTransactionManager

Binds a JDBC Connection from the specified DataSource to the current thread, potentially allowing for one thread-bound Connection per DataSource. Note: The DataSource that this transaction manager operates on needs to return independent Connections. The Connections may come from a pool (the typical case), but the DataSource must not return thread-scoped / request-scoped Connections or the like. This transaction manager will associate Connections with thread-bound transactions itself, according to the specified propagation behavior. It assumes that a separate, independent Connection can be obtained even during an ongoing transaction.

検証する

pom.xml

 <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.0.2.RELEASE</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
  1. 「タスクを登録する」ボタンを押すと、サーバ側でDBに0-10件のランダムなタスクを登録する。
  2. サーバ側では、@Asyncアノテーションをつけたクラスを用意し、非同期にタスクを消化する。
  3. 非同期処理を開始したら、クライアントにレスポンスをとりあえず返す
  4. タスクは1sごとに1件消化し、その都度コミットする。
  5. dbはh2を利用する。

  • h2のデフォルトのisolationレベルは「READ COMMITTED

    • Txがコミットされたタイミングで、別のTxから参照可能になる
  • Springの@TransactionalのpropagationはREQUIRED

    • Txが既に開始されていれば、それに参加する
    • JDBCコネクションの管理がスレッドごとなら、別Txになるはず

したがって、画面から見たときにリアルタイムに更新されれば、Txが呼び出し元とは分かれていると言えるはず。

ソースコード

登録と非同期更新部分を抜粋。全ソースコードはgithubを参照してください。
参考 github

@Slf4j
@RestController
@AllArgsConstructor
@RequestMapping("api/tasks")
public class TaskRestController {
  final TaskService service;
  ...
  @PostMapping
  public Task execute() {
    Task task = service.register();
    service.execute(task.getId());
    return task;
  }
}

非同期に、1sごとに1タスクを消化する。

@Slf4j
@Service
@AllArgsConstructor
public class TaskServiceImpl implements TaskService {
    ...
    @Override
    @Transactional
    public void execute(int id) {
        Task task = taskMapper.findOne(id);
        log.info(task.toString());

        // 1sごとにtaskを1こずつ消化していく
        while (task.getDone() < task.getAmount()) {
            try {
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            service.execute(task);
        }
    }
}

メソッドに@Transactionalをつけ、親Txに参加するかどうか調べる。

@Slf4j
@Service
@AllArgsConstructor
public class ExecuteServiceImpl implements ExecuteService {
  final TaskMapper taskMapper;

    @Override
    @Async
    @Transactional
    public void execute(Task task) {
        task.setDone(task.getDone() + 1);
        taskMapper.update(task);
        log.info(task.toString());
    }
}

検証結果

@AsyncでTxが分かれる。