Java에서 쓰레드 결과를 가져오는 방법
-
java.util.concurrent.Future
- 비동기적인 연산, 작업에 대한 결과를 갖는다.
- 쓰레드가 같은 경우에는 return을 받으면 되지만, 다른 쓰레드에 결과를 받기위한 인터페이스
- Thread Pool
- Thread를 새로 만드는 것은 리소스가 많이 들기 때문에 Pool에 여러 쓰레드를 생성해 두고, 필요할 때 가져다 쓰고 반납하도록 하여 자원 낭비를 최소화하는 방법
public static void main(String[] args) throws InterruptedException, ExecutionException { ExecutorService executorService = Executors.newCachedThreadPool(); Future<String> future = executorService.submit(()->{ Thread.sleep(200); log.info("Async"); return "Hello"; }); log.info("Exit"); log.info(future.get()); }
11:21:05.148 [main] INFO com.example.demo.java.SpringReactorController - Exit 11:21:05.351 [pool-1-thread-1] INFO com.example.demo.java.SpringReactorController - Async 11:21:05.352 [main] INFO com.example.demo.java.SpringReactorController - Hello
- ExecutorService의 submit을 통해 Callable 이나 Runnable을 받을 수 있다.
- Thread는 Callable이나 Runnable의 구현된 메서드를 수행한다는 공통점이 있지만, 아래와 같은 차이점이 있다.
- interface Callable: V call() throws Exception: 리턴값이 존재하며 Exception을 던질 수 있다.
- interface Runnable: void run() : 인자, 결과값 리턴이 없다.
- Blocking : 메인 쓰레드는 pool-1-thread-1이 끝난후에 리턴되는 값을 기다린 후에 결과값을 찍는다. 즉, 다른 쓰레드의 결과를 기다리고 있다.
- Future에서 get()메서드를 통해 결과 값을 가져올 수 있고 isDone()을 통해 자식 쓰레드가 완료되었는지 확인할 수 있다.
- Thread를 새로 만드는 것은 리소스가 많이 들기 때문에 Pool에 여러 쓰레드를 생성해 두고, 필요할 때 가져다 쓰고 반납하도록 하여 자원 낭비를 최소화하는 방법
-
비동기 결과를 가져오는 방법
- java.util.concurrent.FutureTask
- Callback을 사용하여 가져오는 방법
- FutureTask 에 done이라는 메서드를 오버라이딩 하여 get()을 호출할 수 있다.
- done()은 완료될 때 실행되는 메서드로 hook의 일종 ```java @FunctionalInterface interface SuccessCallback { void onSuccess(String result); }
@FunctionalInterface interface ExceeptionCallback { void onError(Throwable t); }
public static class CallbackFutureTask extends FutureTask
{ private SuccessCallback sc; private ExceltionCallback ec; public CallbackFutureTask(Callable callable, SuccessCallback sc, ExceptionCallback ec) { super(callable); this.sc = Objects.requireNonNull(sc); this.ec = Objects.requireNonNull(ec); } @Override protected void done() { // TODO Auto-generated method stub try { sc.onSuccess(get()); } catch (InterruptedException | ExecutionException e) { ec.onError(e); } } } public static void main(String[] args) { ExecutorService executorService = Executors.newCachedThreadPool(); FutureTask
futureTask = new CallbackFutureTask (()-> { Thread.sleep(200); log.info("Async"); return "Hello"; }, log::info, throable -> log.error(throable.getMessage()) ); executorService.execute(futureTask); executorService.shutdown(); } 11:58:56.779 [pool-1-thread-1] INFO com.example.demo.java.SpringReactorController - Async 11:58:56.787 [pool-1-thread-1] INFO com.example.demo.java.SpringReactorController - Hello ```
- SuccessCallback과 ExceptionCallback을 활용하여 callback 을 사용했다.
- 안타까운 부분은 설정 + 비즈니스 로직이 함께 작성되어 있어서 분리하는 것이 필요하다.
- java.util.concurrent.FutureTask
Spring에서 쓰레드 결과를 가져오는 방법
- @EnableAsync + @Async
@SpringBootApplication @EnableAsync @Slf4j public class WebFluxExampleApplication { public static void main(String[] args) { SpringApplication.run(WebFluxExampleApplication.class, args); } @Component public static class MyService { @Async public Future<String> hello() throws InterruptedException { log.info("hello()"); Thread.sleep(1000); return new AsyncResult<String>("Hello"); } } @Bean ThreadPoolTaskExecutor tp() { // @Async 사용하기 위한 Thread 설정 ThreadPoolTaskExecutor te = new ThreadPoolTaskExecutor(); te.setCorePoolSize(10); te.setMaxPoolSize(100); te.setQueueCapacity(200); te.setThreadNamePrefix("mythread"); te.initialize(); return te; } @Autowired MyService myService; @Bean ApplicationRunner run() { return args -> { log.info("Run()"); Future<String> result = myService.hello(); log.info("Exit: " + result.isDone()); log.info("Result: " + result.get()); }; } }
2021-08-15 17:37:17.531 INFO 29808 --- [ main] c.e.demo.WebFluxExampleApplication : Run() 2021-08-15 17:37:17.534 INFO 29808 --- [ main] c.e.demo.WebFluxExampleApplication : Exit: false 2021-08-15 17:37:17.542 INFO 29808 --- [ task-1] c.e.demo.WebFluxExampleApplication : hello() 2021-08-15 17:37:18.545 INFO 29808 --- [ main] c.e.demo.WebFluxExampleApplication : Result: Hello
- Exit이 찍힌 시점이 MyService에 비동기에 대한 결과를 기다리지 않았다.
- result.get()을 할 때에 비동기 작업이 끝난 후에 가져온다.
-
ListenableFuture를 통해 callback을 지정해줄 수 있다
ListenableFuture<String> result = myService.hello(); result.addCallback(item -> log.info(item), e-> log.error(e.getMessage()));
Spring WebMVC
- Servlet을 직접 개발하는 일은 없지만, 3.0 이전에는 Blocking 방식으로 처리하였다.
- IO 작업에 대해 쓰레드를 하나를 생성하여 처리하였다.
- 커넥션당 하나의 쓰레드 할당
- HttpServletRequest, HttpServletResponse는 InputStream/OuptutStream을 통해 구현되었는데 기본적으로 Blocking 방식이다.
- 하지만 외부에 API를 대기하는 작업을 블로킹 방식으로 진행한다면 비효율적이다.
- Req1 –> ServletThread01 - req -> Blocking(DB, API..) - WorkThread -> res(html or json <- AsyncContext)
- 3.0 부터 비동기적으로 Servlet 요청을 처리하는 기능이 추가
- 요청에 대하여 Pool에서 할당받은 Servlet Thread가 Work Thread로 요청을 보내고 반납한다.
- Work Thread가 처리완료 후 Pool에서 할당받은 new Servlet Thread에게 응답을 보내고 Servlet Thread는 NIO Connector에게 응답을 보낸 후 반납한다.
- ServletThread가 작업이 끝날 때까지 대기하지 않고 바로 반납하기 때문에 많은 요청/응답을 처리할 수 있다.
- 3.1 Servlet Non Blocking IO
- Callback
- Callable을 활용한 예시
- Client <-> NIO Connector <-> Servlet Thread(<-> Thread Pool) <-> 작업 쓰레드(<-> Thread Pool)
@GetMapping("/callable") public Callable<String> async() { log.info("callable"); return () -> { log.info("async"); Thread.sleep(2000); return "ok"; }; }
2021-08-16 12:06:34.056 INFO 8796 --- [nio-8080-exec-1] com.example.demo.async.AsyncController : callable 2021-08-16 12:06:34.064 INFO 8796 --- [ task-2] com.example.demo.async.AsyncController : async
- Callable을 만들어 새로운 쓰레드로 ok를 리턴
- callable을 호출한 시간 측정
public static void main(String[] args) throws InterruptedException { ExecutorService executorService = Executors.newFixedThreadPool(200); RestTemplate restTemplate = new RestTemplate(); String uri = "http://localhost:8080/callable"; StopWatch mainWatch = new StopWatch(); mainWatch.start(); for(int i = 0; i < 100; i++) { executorService.execute(() -> { StopWatch subWatch = new StopWatch(); subWatch.start(); restTemplate.getForObject(uri, String.class); subWatch.stop(); log.info("Elapsed -> " + subWatch.getTotalTimeSeconds()); }); } executorService.shutdown(); executorService.awaitTermination(100, TimeUnit.SECONDS); mainWatch.stop(); log.info("Main Watch -> " + mainWatch.getTotalTimeSeconds()); }
- 톰캣에 Thread Pool사이즈를 1개로 줄여도 모든 요청이 2초만에 끝난다.
- NIO Connector 1개로 서블릿 쓰레드만 1개로 사용하고 작업 쓰레드는 100개가 실행되어 2초안에 끝나는 것
- Client <-> NIO Connector <-> Servlet Thread(<-> Thread Pool) <-> 작업 쓰레드(<-> Thread Pool)
- DeferredResult Queue
- Client <-> NIO Connector <-> Servlet Thread(<-> Thread Pool) <-> DeferredResult Queue (<- Event)
```java
Queue<DeferredResult
> results = new ConcurrentLinkedQueue<>();
@GetMapping(“/deferred-result”) public DeferredResult
deferredResult() { log.info("deferred-result"); DeferredResult dr = new DeferredResult<>(); results.add(dr); return dr; } @GetMapping("/deferred-result/count") public String deferredResultCount() { return String.valueOf(results.size()); } @GetMapping("/deferred-result/event") public String deferredResultEvent(String message) { for(DeferredResult dr: results) { dr.setResult("Hello: " + message); results.remove(dr); } return "OK"; } ``` - /deferred-result로 요청을 보내면 바로 응답이 오지 않는다. /deferred-result/event를 요청하면 해당 deferredResult가 값이 세팅되어 리턴된다.
- Servlet Thread가 요청을 받고 DeferredResult를 Queue에 저장한 후 풀에 다시 반납한다.
- DeferredResult Queue에 있는 값은 event 요청이 올때까지 대기한다(따로 Thread를 생성하지 않는다.)
- /event 요청이 오면 Queue에 있는 값들을 전부 제거하고 응답을 전송한다.
- 채팅방에서 메세지를 보내면 내부에 돌고 있는 DeferredResult가 단체인원에게 같은 메시지를 보내주는 등에서 사용한다.
- Client <-> NIO Connector <-> Servlet Thread(<-> Thread Pool) <-> DeferredResult Queue (<- Event)
```java
Queue<DeferredResult
- Emitter
@GetMapping("/emitter") public ResponseBodyEmitter emitter() { ResponseBodyEmitter emitter = new ResponseBodyEmitter(); Executors.newSingleThreadExecutor().submit(() -> { for(int i = 0; i < 50; i++) { try { emitter.send("<p>Stream " + i + " </p>"); Thread.sleep(100); } catch (Exception e) {} } }); return emitter; }
- spring-webmvc에서 HTTP Streaming을 하는 가장 간단한 방법은 SSE(Server-Sent Event)를 사용하는 것이다.
- Spring에서 지원하는 Async 응답 방식인 DeferredResult, Callable과 같다
- 반환형이 Async 응답을 필요로하는 경우, Spring MVC는 request.startAsync()를 호출하여 AsyncContext를 획득하여 저장해둔다.
- AsyncContext는 비동기 처리를 제어할 수 있도록 해준다.
- ex) Sevrlet container thread에서 요청 처리를 재개시키며, 기존의 forward와 같은 동작을 하는 dispatch 메서드를 제공한다.
- 이로 인해 응답을 별도의 thread에서 비동기로 처리할 수 있다.
- 요청 thread를 종료시키되, response는 열어둔다
- 다른 thread에서 AsyncContext를 사용해서 response를 완성시킨다.
- Spring MVC는 Servlet container로 다시 요청을 보내고(dispatch), client로 응답한다.
출처
- 토비의 봄 TV 8회 스프링 리액티프 프로그래밍(4) - 자바와 스프링의 비동기 기술
- https://supawer0728.github.io/2018/03/15/spring-http-stereamings/