티스토리 뷰

반응형

최신의 스프링 부트 애플리케이션에서 Async, Scheduled 등을 사용하면 가상 스레딩을 활용하지 않는 이상 ThreadPoolTaskExecutor가 구현체로 선택된다. 기본적인 설정, 종료 시점의 예약 작업 핸들링 설정 등을 설명한다.

 


 

Task Executor

 

스프링 어플리케이션에서 @EnableAsync 설정을 추가하고 @Async가 붙은 메서드를 런타임에 호출 시 Runnable 혹은 Callable의 형태로 스레드 풀의 Blocking Queue에 작업을 등록한 뒤 비동기로 처리된다.

 

비동기 처리 시 작업을 등록할 스레드 풀이 필요한데, 스프링 부트가 아닌 순수 스프링 환경에서는 별도의 설정이 없다면  AsyncExecutionInterceptor에 의해 요청마다 스레드를 새로 생성하는 SimpleAsyncTaskExecutor를 Executor로 등록한다.

 

스프링 부트에서는 Java 21 이상을 사용해 가상 스레드를 사용하도록 설정하는 경우에만(spring.threads.virtual.enabled = true) 스레드의 생성 작업이 비교적 가벼워 SimpleAsyncTaskExecutor를 기본 Task Executor로 등록, 이외에는 설정에 따라 풀링을 하는 ThreadPoolTaskExecutor를 등록한다.

 

 

ThreadPoolTaskExecutor

 

스프링에서 제공하는 Task Executor 클래스이나 내부적으로는 JDK의 스레드 풀인 ThreadPoolExecutor를 사용하며 부가 기능을 일부 포함한다. 실제 풀링 기능의 동작 방식은 JDK의 ThreadPoolExecutor를 따른다.

 

protected ExecutorService initializeExecutor(ThreadFactory threadFactory, RejectedExecutionHandler rejectedExecutionHandler) {
    ...
    executor = new ThreadPoolExecutor(this.corePoolSize, this.maxPoolSize, (long)this.keepAliveSeconds, TimeUnit.SECONDS, queue, threadFactory, rejectedExecutionHandler);
    ...
}

 

 

 

기본 설정과 동작 방식

 

Spring Boot를 통해 기본 생성되는 ThreadPoolTaskExecutor의 주요 풀링 관련 설정(=ThreadPoolExecutor 설정)은 아래와 같다

  • corePoolSize: 8
  • maxPoolSize: Integer.MAX_VALUE
  • queueCapacity: Integer.MAX_VALUE
  • keepAliveSeconds: 60

이 환경에서의 동작 방식은 ThreadPoolExecutor의 JavaDoc을 보면 자세히 알 수 있다.

  1. 런타임에서는 최초 크기 0의 스레드 풀로 시작한다.
  2. 스레드 풀에 작업이 등록되면 corePoolSize까지의 요청은 바로 신규 스레드가 생성되어 처리된다.
  3. corePoolSize 이상의 요청이 동시 등록되면 Blocking Queue에 우선 작업이 쌓인다.
  4. Blocking Queue 크기 이상의 작업이 요청되었다면 순차적으로 maxPoolSize 크기 이하의 스레드를 생성하여 작업들을 할당한다.
  5. 설정된 한도만큼의 스레드를 사용 중이며 Blocking Queue도 가득 찼으나 새로운 작업이 등록되면 RejectedExecutionHandler에 의해 처리된다(기본: RejectedExecutionException 발생)
  6. corePoolSize 이상으로 늘어난 스레드는 이후 요청이 감소하면 keepAliveSeconds만큼 대기했다가 정리된다.

 

유의할 점은 corePoolSize만큼의 스레드가 동시 처리를 하는 중 신규 작업이 추가되었을 때 maxPoolSize까지 신규 스레드를 생성하는 것이 아닌, Blocking Queue에 작업을 우선적으로 등록한다는 것.

하지만 queueCapacity가 기본 Integer.MAX_VALUE이므로 기본 설정에서는 corePoolSize 크기 이상의 스레드가 할당될 일이 사실상 없고 모두 Blocking Queue에서 대기하게 된다.

 

이로 인해 기본 설정에서는 동시 요청이 많은 순간 @Async 메서드 콜이 몰려도 8개의 스레드에서만 작업을 처리해 비동기 작업이 지연될 수 있다.

 

 

설정 변경

 

두 가지 방법이 존재한다.

application.yaml 혹은 application.properties의 spring.task.execution.pool 설정을 바꿔 spring boot가 지정한 설정의 pool을 생성하도록 할 수 있다.

 

# application.yaml
spring:
  task:
    execution:
      pool:
        max-size: 16
        queue-capacity: 100
        keep-alive: "10s"

 

 

혹은 설정 파일에서 Executor 관련 Bean을 직접 주입하면 @Async 처리 과정에서 이 스레드 풀을 사용한다.

아래의 방법으로는 스레드 풀을 여럿 설정하여 @Async 호출 시 원하는 스레드 풀을 사용하도록 선택할 수도 있다.

 

@Config
public class BeanConfig {

    @Bean
    public TaskExecutor taskExecutor() {
        ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
        taskExecutor.setCorePoolSize(8);
        taskExecutor.setQueueCapacity(Integer.MAX_VALUE);
        taskExecutor.setMaxPoolSize(Integer.MAX_VALUE);
        taskExecutor.initialize();
        return taskExecutor;
    }
}

 

 

만약 요청이 급증할 때 Blocking Queue를 거치지 않고 바로 신규 스레드를 생성하여 비동기 작업들을 빠르게 처리해야 한다면 queueCapacity를 0으로 설정하면 된다.

 

 

종료 시의 예약된 작업 핸들링

 

앱이 종료 시그널을 받고 종료되는 상황에서 기존 작업이 어떻게 정리되는지는 매우 중요한 문제다.

완전하진 않지만 아래의 두 설정을 통해 이미 예약된 작업들의 처리를 어느 정도 수행할 수 있다.

 

setWaitForTasksToCompleteOnShutDown()

true로 설정 시 스레드 풀을 종료하는 과정에서 ExecutorService의 shutdown()을 수행, 기본 값 false로 설정 시 shutdownNow()를 수행한다.

// ExecutorConfigurationSupport.class
public void shutdown() {
	...
    if (this.executor != null) {
        if (this.waitForTasksToCompleteOnShutdown) {
            this.executor.shutdown();
        } else {
            Iterator var1 = this.executor.shutdownNow().iterator();
            ...
        }
        this.awaitTerminationIfNecessary(this.executor);
    }
}

 

 

 

ExecutorService의 shutdown()은 예약된 작업들의 완료를 보장하지는 않으며, 신규 작업의 예약만 추가적으로 받아들이지 않는다.

 

 

반면 ExecutorService의 shutdownNow()는 처리 중 및 대기 중인 작업 모두를 중단 시도하고 중단된 작업들을 반환한다.

 

setAwaitTerminationMillis(), setAwaitTerminationSeconds()

위의 ExecutorConfigurationSupport 클래스에서 볼 수 있듯, 스레드 풀을 종료하는 과정에서는 awaitTerminationIfNecessary가 호출된다. 이 곳이 지정된 시간만큼 처리 중인 작업들이 완료되기를 기다리는 블로킹 구간이다.

두 메서드로 설정한 시간만큼 작업들의 완료를 기다리며, 시간이 초과될 시 할당되었던 스레드는 Interrupt 시킨다.

 

 

스레드 풀을 닫을 때 이미 등록된 작업의 무조건적 완료를 보장하는 방법은 제공하지 않으며, 위의 두 가지 설정을 조합하여 지정된 최대 시간 동안의 대기까지는 허용할 수 있다.

 

 

 

추가적인 설정

 

이외에도 부가적인 ThreadPoolTaskExecutor 설정 몇 개를 소개한다.

 

setTaskDecorator()

TaskDecorator 설정을 추가하면 스레드 풀을 통해 작업이 처리될 때 decorator의 runnable이 추가적으로 수행된다. (기본 null)

 

protected ExecutorService initializeExecutor(ThreadFactory threadFactory, RejectedExecutionHandler rejectedExecutionHandler) {
    ...
    if (this.taskDecorator != null) {
        executor = new ThreadPoolExecutor(this.corePoolSize, this.maxPoolSize, (long)this.keepAliveSeconds, TimeUnit.SECONDS, queue, threadFactory, rejectedExecutionHandler) {
            public void execute(Runnable command) {
                Runnable decorated = ThreadPoolTaskExecutor.this.taskDecorator.decorate(command);
                if (decorated != command) {
                    ThreadPoolTaskExecutor.this.decoratedTaskMap.put(decorated, command);
                }

                super.execute(decorated);
            }
        };
    }
    ...
}

 

일례로, 아래와 같이 사용 시 스레드 풀의 작업 처리 과정에서 스레드 별 로그가 출력된다.

 

TaskDecorator taskDecorator = runnable -> () -> log.debug("decorator");
taskExecutor.setTaskDecorator(taskDecorator);

 

 

setAllowCoreThreadTimeOut()

기본 옵션 false에서 corePool 크기 이상으로 만들어진 스레드는 유휴 상태에서 keepAliveSeconds(기본 60초) 이후 자동으로 정리되나,

corePool 크기 만큼의 스레드들(Core Thread)은 생성 이후 정리되지 않는다.

이 옵션을 true로 설정 시 Core Thread 또한 유휴 상태가 지속되면 정리된다.

 

 

setPrestartAllCoreThreads()

위에서 설명했듯, 기본 옵션 false에서는 최초 런타임에서는 비동기 요청이 시작되어야 Core Thread들이 생성된다.

true로 설정 시 마치 HikariCP와 같은 Connection Pool이 미리 pool size만큼의 커넥션을 최초 런타임에 한 번에 맺듯,

런타임 초기에 Core Thread들을 corePoolSize만큼 바로 생성해 서버의 warm up 영향을 줄일 수 있다.

 

 


Reference

https://github.com/spring-projects/spring-framework/blob/5e808ad0183a63944ab08017a13cbc7ed75bc581/spring-aop/src/main/java/org/springframework/aop/interceptor/AsyncExecutionInterceptor.java#L156

https://docs.spring.io/spring-framework/reference/integration/scheduling.html

반응형