개발/java

CompletableFuture

캐리캐리 2024. 9. 5. 14:19

multi Thread를 통한 성능 개선 

 

단일 Thread를 통해 작업을 수행하게 되면 많은 서비스를 수행하는 로직에서는 동작이 다소 지연 되는 경우가 있다.

이 경우 Thread를 추가하여 작업을 main Thread가 아닌 다른 Thread에 작업을 배정함으로써 속도를 향상 시킬 수 있다 

그 중 CompletableFuture를 통해 성능을 개선한 사례를 소개하려 한다 

 

CompletableFuture의 경우 ForkJoin을 통해 스레드 풀을 할당받으며 반환 값이 있는 supplayAsync method와 반환 값이 없는 runAsync를 통해 다중 스레드로 작업을 할 당 할 수 있다 또한 아래와 같이 ThreadPoolTaskExecutor를 직접 생성하여 작업별 할당 스레드를 직접 선언하는 것도 가능하다 

 

@Bean(name = "defaultExecutor")
public ThreadPoolTaskExecutor defaultExecutor() {

    ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
    //스레드 풀의 기본 크기 , 처음 시작할 때 생성되는 스레드의 수이며 이 수는 계속 유지한다 
    taskExecutor.setCorePoolSize(15);
    //작업이 많아 기본 크기 초과로 들어올 경우 추가로 생성할 수 있는 값이다 
    taskExecutor.setMaxPoolSize(20);
    //스레드풀이 모든 스레드를 사용중이면 추가로 들어오는 작업이 해당 큐에서 대기한다 
    taskExecutor.setQueueCapacity(25);
    //스레드 명 
    taskExecutor.setThreadNamePrefix("defaultExecutor-"); 
    // 스레드가 부족할 경우 코어 스레드를 빌려와 해당 작업 완료
    taskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); 
    taskExecutor.initialize();
    
    return taskExecutor;
}

 

방식은 개별 서비스를 수행할때 map을 통해 CompletableFuture로 변환하는 과정을 거치고 이후 이를 join을 통해 스레드별 작업을 수행하는 방식이다 

public class OrderInfo{
    private String orderName;
    private List<Product> product;

}

public class Product{
	private String productName;
    private int price;
}

@Test
void purchaseTest(){
    //스타벅스가 카페라떼 구매 희망 
    Product caffeLatte = new Product("카페라떼",3000);
 	OrderInfo starbucks = new OrderInfo("스타벅스",caffeLatte);
    
    //할리스가 카페라떼 구매 희망  
    Product caffeLatte = new Product("카페라떼",3000);
 	OrderInfo hollys = new OrderInfo("할리스",caffeLatte);
    
    List<OrderInfo> orderInfoList = new ArrayList<>();
    orderInfoList.add(starbucks)
    orderInfoList.add(hollys);
    
    //각각의 주문을 실질 구매 
    for(OrderInfo orderInfo: orderInfoList){
    	this.purchase(orderInfo);
    }

}


private int purchase(OrderInfo orderInfo){
	
    int orderNumber = orderService.purchase(orderInfo);
    
    return orderNumber;
}

 

해당 코드와 같이 purchase()를 통해 각각의 주문을 한다고 생각을 해보자

이 경우 각각의 주문은 main Thread에서 수행을 하므로 스타벅스 구매가 진행 될때 할리스 구매는 대기 상태에 빠지게 된다 

이를 개선하기 위해서는  구매하는 과정을 별도 스레드를 생성하여 각각 작업을 위임 하여야 한다 

 

우선 주문정보리스트로 구매를 진행할때 진행을 CompletableFuture로 위임하고 이 작업을 join을 통해 병렬로 수행하는 로직을 구현한다 

	//각각의 작업 수행의 결과를 supplyAsync를 통해 수행한다 
	List<CompletableFuture<int> compleatblePurchaseList = orderInfoList.stream
    .map(orderInfo -> CompletableFuture.supplyAsync(
    	()-> purchase(orderInfo)
    ,defaultExecutor).collect(Collectors.toList));
    
    //join을 통해 각각의 작업을  Thread 별 수행하며 완료 처리 한다 
  	int orderSize =compleatblePurchaseList.stream.map(CompletableFuture::join).count();

 

이때 ThreadPoolTaskExecutor의 경우 작업이 얼마나 많이 처리되는지를 확인하여 적절한 값을 배정하며 RejectedExecutionHandler를 통해 할당 스레드의 가용량이 초과 될 경우 어떻게 작업을 처리할 것인지도 설정할 수 있다. 

해당 서비스에 맞는 정책을 설정하여 가용량 초과로 인한 작업이 거부 발생 시 대응할 수 있다. 

 

  • AbortPolicy:  default 정책이며 가용량 초과하면 오류를 반환한다
  • CallerRunsPolicy: 작업이 거부되면 main Thread에서 작업을 수행한다 
  • DiscardPolicy:  가용량이 초과하면 오류를 반환하지 않고 무시한다 
  • DiscardOldestPolicy: 가용량 초과로 작업이 거부되면 가장 오래된 작업을 제거하고 거부된 작업을 수행한다