Future

  • Future (java)
    • Future는 비동기 처리 결과를 표현하기 위해서 사용된다. 비동기 처리가 완료되었는지 확인하고, 처리 완료를 기다리고, 처리 결과를 리턴하는 메소드를 제공한다.
    • get() 메서드를 통해 결과를 가져온다. 동기식으로 결과가 끝날때까지 대기한다.
    • Future 구현체로 FutureTask가 있다.
  • ListenableFuture (spring)
    • 비동기 task 처리가 가능하며 callback method를 통해 task가 성공했을 때, 실패했을 때의 작업을 미리 지정해놓을 수 있다.
    • completable() 메서드를 통해서 CompletableFuture로 변환할 수 있다.
  • CompletableFuture
    • Java 8에서 나왔으며 간단하게 비동기적인 결과를 담고 있고, 가져올 수 있다.
    • Future와 CompletionStage 를 상속한다
    • CompletableFuture 리스트의 모든값이 완료될 때까지 기다릴지 아니면 하나의 값만 완료되길 기다릴지 선택할 수 있다는 것이 장점
    • 병렬성과(Parallelism)과 동시성(Concurrency)에서 CompletableFuture가 중요한데, 여러개의 cpu core 사이에 지연 실행이나 예외를 callable하게 처리할 수 있어서 명시적인 처리가 가능해진다.
  • CompletionStage
    • 함수형 스타일 또는 선언형 스타일로 코드를 작성할 수 있으며 코드가 깔끔해지고 결과를 비동기적으로 처리 가능하다

CompletableFuture 사용

  • 예시
    CompletableFuture<String> cf1 = new CompletableFuture<>();
    cf1.complete("Hello");
    log.info(cf1.get()); // Hello 출력
      
    CompletableFuture<String> cf2 = new CompletableFuture<>();
    cf2.completeExceptionally(new RuntimeException());
    log.error(cf2.get()); // 예외 발생
    
    • CompletableFuture에는 비동기 Task에 대한 결과가 담기며, get() 메서드를 호출해서 가져올 수 있다.
    • complete()를 통해 완료와 함께 결과값을 지정할 수 있고, completeExceptionally() 를 통해 완료와 함께 예외를 지정할 수 있다.
  • runAsync & thenRun
    CompletableFuture
        .runAsync(() -> log.info("runAsync"))
        .thenRun(() -> log.info("thenRun"))
        .thenRun(() -> log.info("thenRun"));
    log.info("exit");
    ForkJoinPool.commonPool().shutdown();
    ForkJoinPool.commonPool().awaitTermination(10, TimeUnit.SECONDS);
    
    22:10:31.377 [main] INFO com.example.demo.completable.Example - exit
    22:10:31.377 [ForkJoinPool.commonPool-worker-3] INFO com.example.demo.completable.Example - runAsync
    22:10:31.381 [ForkJoinPool.commonPool-worker-3] INFO com.example.demo.completable.Example - thenRun
    22:10:31.381 [ForkJoinPool.commonPool-worker-3] INFO com.example.demo.completable.Example - thenRun
    
    • 의존적인 비동기작업을 간단하게 구현할 수 있다.
    • 두개 이상의 비동기작업을 실행하여 끝나면 결과를 사용하도록도 구현할 수 있다.
    • ForkJoinPool
      • Java 7에서 새로 지원하는 fork-join 풀은 기본적으로 큰 업무를 작은 업무로 나누어 배분해서 , 일을 한 후에 일을 취합하는 형태입니다.
      • Fork 를 통해서 업무를 분담하고 Join 을 통해서 업무를 취합합니다.
      • 쓰레드별 큐를 추가하여 업무를 분담하고 나누어 처리하여 효율을 높일 때 사용

      image

      • 이미지 출처: https://hamait.tistory.com/612 [HAMA 블로그]
  • supplyAsync, thenApply, thenAccept
    CompletableFuture
        .supplyAsync(() -> { 
            log.info("supplyAsync");
            return 1;
        }).thenApply(s -> { 
            log.info("thenApply {}", s); return s + 1; 
        }).thenAccept(s -> log.info("thenAccept {}", s)); 
    log.info("exit");
    ForkJoinPool.commonPool().shutdown();
    ForkJoinPool.commonPool().awaitTermination(10, TimeUnit.SECONDS);
    
    22:15:38.440 [main] INFO com.example.demo.completable.Example - exit
    22:15:38.440 [ForkJoinPool.commonPool-worker-3] INFO com.example.demo.completable.Example - supplyAsync
    22:15:38.445 [ForkJoinPool.commonPool-worker-3] INFO com.example.demo.completable.Example - thenApply 1
    22:15:38.447 [ForkJoinPool.commonPool-worker-3] INFO com.example.demo.completable.Example - thenAccept 2
    
    • thenApply(Function<T, R>) : 앞의 비동기 작업의 결과를 받아 사용해 새로운 값을 return.
    • thenAccept(Consumer) : 앞의 비동기 작업의 결과를 받아 사용하며 return이 없다.
  • thenCompose
    CompletableFuture
        .supplyAsync(() -> {
            log.info("supplyAsync");
            return 1;
        }).thenCompose(s -> { 
            log.info("thenApply {}", s);
            return CompletableFuture.completedFuture(s + 1);
        }).thenApply(s -> {
            log.info("thenApply {}", s);
            return s + 1;
        }).thenAccept(s -> log.info("thenAccept {}", s));
    log.info("exit");
    ForkJoinPool.commonPool().shutdown();
    ForkJoinPool.commonPool().awaitTermination(10, TimeUnit.SECONDS);
    
    22:56:16.613 [main] INFO com.example.demo.completable.Example - exit
    22:56:16.613 [ForkJoinPool.commonPool-worker-3] INFO com.example.demo.completable.Example - supplyAsync
    22:56:16.618 [ForkJoinPool.commonPool-worker-3] INFO com.example.demo.completable.Example - thenApply 1
    22:56:16.621 [ForkJoinPool.commonPool-worker-3] INFO com.example.demo.completable.Example - thenApply 2
    22:56:16.621 [ForkJoinPool.commonPool-worker-3] INFO com.example.demo.completable.Example - thenAccept 3
    
    • thenCompose(Function<T, R>): return이 CompletableFuture인 경우 thenCompose를 사용한다.
  • exceptionally
    CompletableFuture
        .supplyAsync(() -> {
            log.info("supplyAsync");
            return 1;
        }).thenCompose(s -> { 
            log.info("thenApply {}", s);
            if (1 == 1) throw new RuntimeException();
            return CompletableFuture.completedFuture(s + 1);
        }).exceptionally(e -> {
            log.info("exceptionally");
            return -10;
        }).thenApply(s -> {
            log.info("thenApply {}", s);
            return s + 1;
        }).thenAccept(s -> log.info("thenAccept {}", s));
    log.info("exit");
    ForkJoinPool.commonPool().shutdown();
    ForkJoinPool.commonPool().awaitTermination(10, TimeUnit.SECONDS);
    
    23:00:17.344 [main] INFO com.example.demo.completable.Example - exit
    23:00:17.344 [ForkJoinPool.commonPool-worker-3] INFO com.example.demo.completable.Example - supplyAsync
    23:00:17.350 [ForkJoinPool.commonPool-worker-3] INFO com.example.demo.completable.Example - thenApply 1
    23:00:17.352 [ForkJoinPool.commonPool-worker-3] INFO com.example.demo.completable.Example - exceptionally
    23:00:17.352 [ForkJoinPool.commonPool-worker-3] INFO com.example.demo.completable.Example - thenAccept -10
    
    • Exception이 발생하면 바로 exceptionally를 타게 된다.
  • thenApplyAsync
    ExecutorService es = Executors.newFixedThreadPool(10);
    CompletableFuture
        .supplyAsync(() -> {
            log.info("supplyAsync");
            return 1;
        }, es).thenCompose(s -> {
            log.info("thenApply {}", s);
            return CompletableFuture.completedFuture(s + 1);
        }).thenApply(s -> {
            log.info("thenApply {}", s);
            return s + 2;
        }).thenApplyAsync(s -> {
            log.info("thenApply {}", s);
            return s + 3;
        }, es).exceptionally(e -> {
            log.info("exceptionally");
            return -10;
        }).thenAcceptAsync(s -> log.info("thenAccept {}", s), es);
    log.info("exit");
    ForkJoinPool.commonPool().shutdown();
    ForkJoinPool.commonPool().awaitTermination(10, TimeUnit.SECONDS);
    
    23:03:34.965 [main] INFO com.example.demo.completable.Example - exit
    23:03:34.964 [pool-1-thread-1] INFO com.example.demo.completable.Example - supplyAsync
    23:03:34.970 [pool-1-thread-1] INFO com.example.demo.completable.Example - thenApply 1
    23:03:34.972 [pool-1-thread-1] INFO com.example.demo.completable.Example - thenApply 2
    23:03:34.973 [pool-1-thread-2] INFO com.example.demo.completable.Example - thenApply 4
    23:03:34.974 [pool-1-thread-3] INFO com.example.demo.completable.Example - thenAccept 7
    
    • 이 작업은 다른 스레드에서 처리를 하려고 할 때, thenApplyAsync를 사용한다.
    • 스레드의 사용을 더 효율적으로 하고 자원을 더 효율적으로 사용한다.
    • 현재 스레드 풀의 정책에 따라서 새로운 스레드를 할당하거나 대기중인 스레드를 사용한다. (스레드 풀 전략에 따라 다르다.)

Callback Hell 개선 - ListenableFuture to CompletableFuture

@Autowired
MyService myService;
AsyncRestTemplate rt = new AsyncRestTemplate(new Netty4ClientHttpRequestFactory(new NioEventLoopGroup(1)));

@GetMapping("/rest")
public DeferredResult<String> rest(int idx) {
    DeferredResult<String> dr = new DeferredResult<>();
    toCf(rt.getForEntity("http://localhost:8081/service?req={req}", String.class, "hello " + idx))
        .thenCompose(s -> toCf(rt.getForEntity("http://localhost:8081/service2?req={req}", String.class, s.getBody())))
        .thenCompose(s -> toCf(myService.work(s.getBody())))
        .thenAccept(s -> dr.setResult(s))
        .exceptionally(e -> {
            dr.setErrorResult(e.getMessage());
            return null;
        });

    return dr;
}

public <T> CompletableFuture<T> toCf(ListenableFuture<T> lf) {
    CompletableFuture<T> cf = new CompletableFuture<>();
    lf.addCallback(s -> cf.complete(s), e -> cf.completeExceptionally(e));
    return cf;
}
  • ListenableFuture을 wrapper하여 CompletableFuture를 생성하여 사용한다.

출처

  • 토비의 봄 TV 11회 스프링 리액티프 프로그래밍(7) - CompletableFuture
  • https://gunju-ko.github.io/java/2018/07/05/Future.html
  • https://hamait.tistory.com/612 [HAMA 블로그]