Spring comes with a thread pool is very convenient to use, but in relatively complex concurrent programming scenarios, the use of the scenario still requires careful consideration of the configuration, or you may encounter the pitfalls mentioned in this article.

Specific code reference sample project

1. Overview

ThredPoolTaskExcutor has 2 core configurations, one is the thread pool size and one is the queue size. The processing flow of ThredPoolTaskExcutor: New threads are created and requests are processed until the thread count size is equal to corePoolSize; requests are put into workQueue and free threads in the thread pool go to workQueue to fetch tasks and process them; when workQueue is full, new threads are created and requests are processed and when the thread pool size is equal to maximumPoolSize, RejectedExecutionHandler will be used to do the rejection process.

There are four types of Reject policies.

  • AbortPolicy policy, which is the default policy, rejects the request and throws an exception RejectedExecutionException.
  • CallerRunsPolicy policy , the task is executed by the calling thread.
  • DiscardPolicy policy that rejects the request without throwing an exception.
  • DiscardOldestPolicy policy to discard the first task to enter the queue.

2. Exceptions where multiple asynchronous processes share the same thread pool

Simulates a time-consuming operation that is set to execute asynchronously by the @Async annotation. async will use a thread pool named taskExecutor by default. The operation returns a CompletableFuture, and subsequent processing waits for the asynchronous operation to complete.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
@Service
public class DelayService {
    @Async
    public CompletableFuture<String> delayFoo(String v) {
        try {
            Thread.sleep(1000L);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println(v + " runs in thread: " + Thread.currentThread().getName());
        return CompletableFuture.completedFuture(v);
    }
}

Set the thread pool size to 2 and the queue to a value larger than the thread pool, in this case 10. When the queue size is larger than or equal to the thread pool size, the program will block as in this article.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
    @Bean
    public Executor taskExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(2);
        executor.setMaxPoolSize(2);
        executor.setQueueCapacity(10);
        executor.setThreadNamePrefix("taskExecutor-");
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        executor.setWaitForTasksToCompleteOnShutdown(true);
        executor.initialize();
        return executor;
    }
}

Concurrency processing.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
    while (true) {
        try {
            CompletableFuture.runAsync(
                () -> CompletableFuture.allOf(Stream.of("1", "2", "3")
                .map(v -> delayService.delayFoo(v))
                .toArray(CompletableFuture[]::new)) // 将数组中的任务提交到线程池中
                .join(), taskExecutor); // 通过join方法等待任务完成
            } catch (Exception e) {
                e.printStackTrace();
            }
        }

3. Problem Analysis

After the program starts, it blocks soon. Checking the thread status through jstack, we find that taskExecutor-1, taskExecutor-2 and main are in WAITING state, waiting for the execution of CompletableFuture.join method to finish.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
priority:5 - threadId:0x00007f7f8eb36800 - nativeId:0x3e03 - nativeId (decimal):15875 - state:WAITING
stackTrace:
java.lang.Thread.State: WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
- parking to wait for <0x00000007961fe548> (a java.util.concurrent.CompletableFuture$Signaller)
at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
at java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1693)
at java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3323)
at java.util.concurrent.CompletableFuture.waitingGet(CompletableFuture.java:1729)
at java.util.concurrent.CompletableFuture.join(CompletableFuture.java:1934)

By analyzing the execution process of the program, it is not difficult to find the reason of blocking. As the size of the Queue set by the thread pool is larger than the size of the thread pool, when the thread pool is full, the delayFoo method will be in the queue, and as the program is executed, there will always be a situation where the thread pool is full of CompletableFuture.join methods and the queue is full of delayFoo methods.

At this time, the join method in the thread is waiting for the execution of the delayFoo method in the queue to complete, and the delayFoo method in the queue cannot be executed because it cannot wait for the available threads, and the whole program is in a deadlock state.

The solution is simple: set the size of the queue to a size smaller than the number of threads, so that the methods in the queue will have a chance to get threads, and thus will not enter a deadlock state because the threads are full.

Reference http://springcamp.cn/java-concurrent-thread-block/