ThreadPool Hell

  • 아주 빠르게 무언가를 계산하고 해당 처리를 끝내는 경우라면 굳이 비동기 MVC(서블릿)를 사용하지 않아도 문제가 없다.
  • 하지만 분산 환경에서 여러 서비스에 대한 호출을 하거나 외부 서비스에 호출을 하는 경우가 빈번해지면서 비동기로 처리하기 부분이 존재하게 된다.
    • NIO Connector를 사용하여 요청에 대한 Thread는 바로 반납이 가능하지만 Worker Thread에 경우 계속해서 생겨나야 한다.
    • 비동기 서블릿을 사용하더라도 하나의 요청을 처리하는 동안 하나의 작업(워커) 스레드는 그 시간동안 대기상태에 빠지게 되어 결국에는 스레드 풀의 가용성이 떨어지게 된다.
  • Thread Pool Hell이란 풀 안에 있는 스레드에 대한 사용 요청이 급격하게 증가해 추가적인 요청이 들어올 때, 사용 가능한 스레드 풀의 스레드가 없기 때문에 대기 상태에 빠져 요청에 대한 응답이 느려지게 되는 상태

  • 예시
    • localhost:8080/rest/hello 를 호출하면 핸들러 내에서 localhost:8081/anoter-service를 호출하여 리턴한다.
    • localhost:8080/rest/hello
      RestTemplate restTemplate = new RestTemplate();
      @GetMapping("/rest/hello")
      public String hello(@RequestParam int index) {
      	String returnValue = restTemplate.getForObject("http://localhost:8081/another-service?req=${req}", String.class, "hello " + index);
      	return returnValue;
      }
      
      • 톰캣에서 생성하는 Thread는 1개로 설정하였다.
        server.tomcat.threads.max=1
        
    • localhost:8081/another-service
      • 새로운 서비스를 생성하기 위해서 @SpringBootApplication을 사용하였고, property(port, max-thread) 를 따로 생성하였다.
      • 해당 클래스를 따로 spring boot application을 run 하면 된다.
        @SpringBootApplication
        public class AnotherService {	
          @RestController
          public static class MyController {
              @GetMapping("/another-service")
              public String anotherService(@RequestParam String req) {
                  return "another-service " + req;
              }
          }
          public static void main(String[] args) {
              System.setProperty("server.port", "8081");
              System.setProperty("server.tomcat.threads.max", "1");
              SpringApplication.run(AnotherService.class, args);
          }
        }
        
    • Load Test
      public static void main(String[] args) throws InterruptedException {
      // 100개 쓰레드 생성
      	ExecutorService executorService = Executors.newFixedThreadPool(100);	
      	RestTemplate restTemplate = new RestTemplate();
      	String url = "http://localhost:8080/rest/hello?index={index}";
      	CyclicBarrier barrier = new CyclicBarrier(100); // 동기화
      	StopWatch mainStopWatch = new StopWatch();
      	mainStopWatch.start();
      	for(int i = 0; i < 100; i++) {
      		executorService.submit(() -> {
      			int index = counter.addAndGet(1);
      			barrier.await(); // 생성 당시 정해놓은 partition까지 synchronize. 
      			StopWatch subStopWatch = new StopWatch();
      			log.info("Thread {}", index);
      			subStopWatch.start();
      			String returnValue = restTemplate.getForObject(url, String.class, index);
      			subStopWatch.stop();
      			log.info("Elapsed: {}, {} / {}", index, subStopWatch.getTotalTimeSeconds(), returnValue);
      			return "good";
      		});
      	}	
      	executorService.shutdown();
      	executorService.awaitTermination(100, TimeUnit.SECONDS);
      	mainStopWatch.stop();
      	log.info("Terminated: {}", mainStopWatch.getTotalTimeSeconds());
      }
      
      • CyclicBarrier를 통해서 동기화를 만들었다.
        • 동작 방식은 await()을 만나면 생성자에서 설정한 partition개수 만큼에 Thread를 대기한 후, 실행한다.
    • 간단한 예시지만 hello 핸들러가 처리하는 Worker Thread 내부에서는 anoter-service를 호출하는 과정에서 blocking 되기 때문에 CPU는 놀고 있지만 요청을 빠르게 처리하지 못하게 된다.

AsyncRestTemplate

  • API를 호출하는 작업을 비동기적으로 처리하는 방법으로 AsyncRestTemplate은 RestTemplate을 비동기로 지원하며 Non-blocking이다.
    AsyncRestTemplate asyncRestTemplate = new AsyncRestTemplate();
    @GetMapping("/rest/hello")
    public ListenableFuture<ResponseEntity<String>> hello(@RequestParam int index) {
    	return asyncRestTemplate.getForEntity("http://localhost:8081/another-service?req=${req}", String.class, "hello " + index);
    }
    
    • ListenableFuture을 바로 리턴이 가능하다.
    • tomcat 스레드는 요청에 대한 작업을 다 끝내기 전에 반환을 해서 바로 다음 요청을 처리하도록 사용
    • 외부 서비스로부터 실제 결과를 받고 클라이언트의 요청에 응답을 보내기 위해서는 새로운 스레드를 할당 받아 사용
    • 외부 서비스로부터 실제 결과를 받고 클라이언트에 응답을 보내기 위해서는 새로운 스레드를 할당 받아야 하지만, 외부 API를 호출하는 동안은 스레드(tomcat) 자원을 낭비하고 싶지 않다는 것이 목적
  • Spring Boot 2.0 부터 Deprecated되었으며 WebClient를 사용해야 한다.

DeferredResult

  • DeferredResult를 사용
    • AsyncRestTemplate으로 가져온 결과는 ListenableFuture이다.
    • callback을 활용하여 DeferredResult에 결과를 저장하여 사용할 수 있다.
      AsyncRestTemplate asyncRestTemplate = new AsyncRestTemplate();
      @GetMapping("/rest/hello")
      public DeferredResult<String> hello(@RequestParam int index) {
        DeferredResult<String> result = new DeferredResult<>();
        ListenableFuture<ResponseEntity<String>> future = asyncRestTemplate.getForEntity("http://localhost:8081/another-service?req=${req}", String.class, "hello " + index);
        future.addCallback(s-> {
            result.setResult(s.getBody());
        }, e -> {
            result.setErrorResult(e.getMessage());
        });
        return result;
      }
      

Callback Hell

  • AsyncRestTemplate을 활용하여 여러 서비스를 호출한다면, Callback을 계속 걸어주어야 하는 현상이 발생한다.
    AsyncRestTemplate asyncRestTemplate = new AsyncRestTemplate();
    @GetMapping("/rest/hello")
    public DeferredResult<String> hello(@RequestParam int index) {
    	DeferredResult<String> result = new DeferredResult<>();
    	ListenableFuture<ResponseEntity<String>> future1 = asyncRestTemplate.getForEntity("http://localhost:8081/another-service?req=${req}", String.class, "hello " + index);
    	future1.addCallback(s-> {
    		result.setResult(s.getBody());
          ListenableFuture<ResponseEntity<String>> future2 = asyncRestTemplate.getForEntity("http://localhost:8081/another-service2", String.class);
          future2.addCallback(s2 -> {
              result.setResult(s2.getBody());
          }, e2 -> {
              result.setErrorResult(e2.getMessage());
          }); 
    	}, e -> {
    		result.setErrorResult(e.getMessage());
    	});	
    	return result;
    }
    
    • 계속해서 service를 호출하게 되면 callback 안으로 들어가야 한다.

AsyncRestTemplate의 콜백 헬 해결

  • AsyncRestTemplate으로 서비스를 호출하여 결과를 받아오면 ListenableFuture 타입이다.
  • 해당 콜백을 설정할 때, 콜백안에서 다시 새로운 서비스를 호출하여야 하며 계속 콜백을 만들어야하는 콜백 헬이 발생한다.
  • 해결 방법으로 ListenableFuture Wrapping Class를 정의하여 method chain을 생성하도록 한다.
public static class Completion {
	Completion next;	
	static Completion from(ListenableFuture<ResponseEntity<String>> future) {
		Completion c = new Completion();
		future.addCallback(s-> c.complete(s), e -> c.error(e));
		return c;
	}	
	public void complete(ResponseEntity<String> s) {
		run(s);
	}	
	public void error(Throwable e) {
		if(next != null) next.error(e);
	}
	public void andAccept(Consumer<ResponseEntity<String>> consumer) {
		Completion c = new AcceptCompletion(consumer);
		this.next = c;
	}
	public Completion andError(Consumer<Throwable> consumer) {
		Completion c = new ErrorCompletion(consumer);
		this.next = c;
		return c;
	}
	public Completion andApply(Function<ResponseEntity<String>, ListenableFuture<ResponseEntity<String>>> function) {
		Completion c = new ApplyCompletion(function);
		this.next = c;
		return c;
	}
	public void run(ResponseEntity<String> s) {}		
}
	
public static class ApplyCompletion extends Completion {
	Function<ResponseEntity<String>, ListenableFuture<ResponseEntity<String>>> function;
	public ApplyCompletion(Function<ResponseEntity<String>, ListenableFuture<ResponseEntity<String>>> function) {
		this.function = function;
	}
	@Override
	public void run(ResponseEntity<String> value) {
		ListenableFuture<ResponseEntity<String>> future = function.apply(value);
		future.addCallback(s -> this.next.complete(s), e -> this.next.error(e));
	}
}

public static class AcceptCompletion extends Completion {
	Consumer<ResponseEntity<String>> consumer;		
	public AcceptCompletion(Consumer<ResponseEntity<String>> consumer) {
		this.consumer = consumer;
	}		
	@Override
	public void run(ResponseEntity<String> value) {
		this.consumer.accept(value);
	}
}

public static class ErrorCompletion extends Completion {
	Consumer<Throwable> consumer;
	public ErrorCompletion(Consumer<Throwable> consumer) {
		this.consumer = consumer;
	}
	@Override
	public void run(ResponseEntity<String> s) {
		if(next != null) next.run(s);
	}
	@Override
	public void error(Throwable e) {
		consumer.accept(e);
	}
}
  • 다형성 활용
    • Completion을 결과를 받아서 사용만 하고 끝나는 Accept 처리를 하는 AcceptCompletion
    • 또 다른 비동기 작업을 수행하고 그 결과를 반환하는 ApplyCompletion
    • 예외가 발생하는 결과를 실행하는 ErrorCompletion으로 분리하여 구현
  • Generic 적용 필요
    • 먼저 Class 기준으로 제네릭 변수를 사용할 것인지, 아니면 메소드 범위에 변수를 사용할 것인지 판단 필요

출처

  • 토비의 봄 TV 9회 스프링 리액티프 프로그래밍(5) - 비동기 RestTemplate과 비동기 MVC/Servlet
  • 토비의 봄 TV 10회 스프링 리액티프 프로그래밍(6) - AsyncRestTemplate의 콜백 헬과 중복작업 문제