Reactor core

  • Reactor core는 Reactor3의 핵심 모듈
  • Java 7 버전 이하에서는 reactor 1, 2 사용이 가능하나 Java 8 버전, reactor 3 사용 권장
  • dependency
    <!-- https://mvnrepository.com/artifact/io.projectreactor/reactor-core -->
    <dependency>
      <groupId>io.projectreactor</groupId>
      <artifactId>reactor-core</artifactId>
      <version>3.3.18.RELEASE</version>
    </dependency>
    
    • 만약 프로젝트에 spring-webflux가 추가되어 있으면 reactor-core를 추가할 필요가 없다.

Flux와 Mono

  • Flux와 Mono는 Publisher 구현체며 전송하는 데이터의 갯수의 차이가 존재한다.
  • Flux: 0~N개의 데이터 전달
    • public abstract class Flux implements CorePublisher
    • public abstract void subscribe(CoreSubscriber<? super T> actual);
      • subscribe 메소드를 추상 메소드로 지정하고 있다.
  • Mono: 0~1개의 데이터 전달
    • public abstract class Mono implements CorePublisher
    • public abstract void subscribe(CoreSubscriber<? super T> actual);

Flux

image

** 이미지 출처: https://projectreactor.io/docs/core/release/api/reactor/core/publisher/Flux.html

  • Flux는 Publisher의 구현체로 0~N개의 데이터를 전달할 수 있다.
    • 데이터를 전달할 때마다 onNext 이벤트가 발생
    • 모든 데이터의 전달 처리가 완료되면 onComplete 이벤트가 발생
    • 데이터 전달 과정에서 오류가 발생하면 onError 이벤트가 발생
  • Flux를 생성하는 방법
    • just
      Flux<String> flux = Flux.jst("Hello", "Reactor", "Flux", "Mono");
      
      • just 내에 들어가는 값들로 Flux를 생성한다.
      • Hot Publisher며 처음 방출된 값을 cache해두고 다음 구독자에게 cache된 값을 방출한다.
    • range
      Flux<Integer> flux = Flux.range(0, 10);
      
      • int 범위를 지정하여 순차적인 데이터를 생성한다.
    • fromArray, fromIterable, fromStream
      • fromArray: 이미 생성되어 있는 Array의 데이터를 사용하여 Flux를 생성한다.
      • fromIterable: 이미 생성되어 있는 Collections 구현체와 같은 Iterable의 데이터를 사용하여 Flux를 생성한다.
      • fromStream: stream을 통하여 Flux를 생성할 수 있다.
    • empty
      • 아무값도 전달하지 않는 빈데이터의 Flux를 만들 수 있다.
    • generate
      Flux<Integer> flux = Flux.generate(sink -> { // SynchronousSink
        sink.onRequest(request -> {
          // request는 subscriber가 요청한 갯수
          int data = rand.nextInt(100) + 1;
          sink.next(data);
          if(emitCount == request) {
            sink.complete();
          }
        });
      });
      
      • 동기 방식으로 한번에 1개의 데이터를 생성할 때 사용한다.
      • subscriber로부터 요청이 왔을 때 신호를 생성하며, 실행할 때 인자로 SynchronousSink로 전달한다.
      • next(), complete, error 신호를 발생하여 1번에 1개의 next() 신호만을 발생할 수 있다.
    • create
      Flux<Integer> flux = Flux.create(sink -> { // FluxSink
        sink.onRequest(request -> {
          // request는 subscriber가 요청한 갯수
          for(int i = 0; i < request; i++) {
            sink.next(i);
          }
          if(emitCount == request) {
            sink.complete();
          }
        });
      });
      
      • Cold Publisher를 생성하며 subscribe 하지 않으면 데이터를 방출하지 않는다.
      • create를 사용하면 Subscriber의 요청과 상관없이 비동기로 데이터를 발생할 수 있다.
      • Consumer<? super FluxSink> emitter를 파라미터로 받는 데, Consumer는 람다이고, FluxSink를 살펴보자.
        • FluxSink: 0/1개의 onError/onComplete 이벤트 다음에 임의의 수의 다음 신호를 방출하기 위한 다운스트림 가입자
        • generate와 달리 한번에 한 개 이상의 next() 신호를 발생할 수 있다.
        • 해당 차이는 배압(backpressure)를 일으키며 이에 대한 전략을 설정해주어야 한다.
        • 배압전략 1. IGNORE : Subscriber의 요청 무시하고 발생(Subscriber의 큐가 다 차면 IllegalStateException 발생)
        • 배압전략 2. ERROR : 익셉션(IllegalStateException) 발생
        • 배압전략 3. DROP : Subscriber가 데이터를 받을 준비가 안 되어 있으면 데이터 발생 누락
        • 배압전략 4. LATEST : 마지막 신호만 Subscriber에 전달
        • 배압전략 5. BUFFER : 버퍼에 저장했다가 Subscriber 요청시 전달. 버퍼 제한이 없으므로 OutOfMemoryError 발생 가능
  • Flux 실행 순서
Flux<Integer> flux = Flux.range(0, 10).filter(i -> i % 3 == 0);
flux.subscribe(item->System.out.println("onNext: " + item),
  e -> System.out.println("onError: " + e.getMessage()),
  () -> System.out.println("onComplete")
);
  • 출력
    onNext: 0
    onNext: 3
    onNext: 6
    onNext: 9
    onComplete
    
  • 실행 순서
    • range를 통해 Flux를 생성한다. filter를 통해 3의 배수만 전송할 수 있도록 한다.
    • subscribe를 하기 전까지는 데이터를 전달하지 않는다.
    • subscribe() 메서드를 실행하면, Subscriber가 등록되고 데이터를 전송해달라는 요청을 보낸다.
      • Publisher의 subscribe를 호출할 때 인자로 받은 Subscriber객체의 onSubscribe 메서드가 실행되며, 이 때 인자로 받은 Subscription을 활용한다(request, cancel)
    • 데이터를 전달하면서 onNext() 메서드가 실행되고 만약 오류가 발생하면 onError() 메서드가 실행, 정상 완료되면 onComplete() 메서드가 실행된다.

Flux 특징과 활용방식

  • Mono : 데이터를 한개만 보낼 때 사용
  • Flux: 여러개의 데이터를 보낼 때 사용한다.
    @Data 
    @AllArgsConstructor
    public class Event {
    	private long id;
      private String value;
    }
    
    @RestController
    @Slf4j
    public class MyController {
    	@GetMapping("/event/{id}")
    	public Mono<Event> helloMono(@PathVariable Long id) {
    		return Mono.just(new Event(id, "event: " + id));
    	}
    	@GetMapping("/events")
    	public Flux<Event> helloFlux() {
    		return Flux.just(new Event(1, "event: 1"), new Event(2, "event: 2"));
    	}
    }
    
  • Mono에 단일 데이터를 Collection으로 넣어서 여러개의 데이터를 전달할 수 있지만, Flux의 fromIterable을 사용하면, Stream 형태에 데이터를 가공할 수 있다.
    • Mono로 Collection을 보내면 한번에 보낸다, Flux는 Iterable하게 데이터를 순차적으로 보낼 수 있다.
      @RestController
      @Slf4j
      public class MyController {
        @GetMapping("/event/{id}")
        public Mono<List<Event>> helloMono(@PathVariable Long id) {
            List<Event> list = Arrays.asList(new Event(1, "event: 1"), new Event(2, "event: 2"));
            return Mono.just(list);
        }
        @GetMapping(value = "/events", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
        public Flux<Event> helloFlux() {
            List<Event> list = Arrays.asList(new Event(1, "event: 1"), new Event(2, "event: 2"));
            return Flux.fromIterable(list).filter(event -> event.getId() % 2 == 0);
        }
      }
      
    • text/event-stream: event stream 타입으로 “data: “ 형식의 prefix가 붙는다. SSE(Server Sent Event) 참고
    • fromIterable

      image

      • 이미지 출처: https://projectreactor.io/docs/core/release/api/reactor/core/publisher/Flux.html
    • fromStream, take
      @GetMapping(value = "/events", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
      public Flux<Event> helloFlux() {
      	return Flux
      		.fromStream(Stream.generate(() -> new Event(System.currentTimeMillis(), "events")))
      		.delayElements(Duration.ofSeconds(1))
      		.take(10);
      }
      
      • take: 지정된 숫자만큼 데이터를 보낸 후 cancel()을 호출한다.
      • delayElements: 지정된 시간만큼 대기 후 데이터 전달
        • 메서드를 생성하는 쓰레드가 10초를 대기하는 것이 아닌 delay하는 쓰레드가 새롭게 생성되어 delay한다.
    • 두개의 Flux 묶기
      @GetMapping(value = "/events", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
      public Flux<Event> helloFlux() {
      	Flux<Event> f = Flux.<Event, Long>generate(() -> 1L, (id, sink) -> {
      	sink.next(new Event(id, "events: " + id));
      		return id + 1;
      	});
      	Flux<Long> interval = Flux.interval(Duration.ofSeconds(1));
      	return Flux.zip(f, interval).map(tu -> tu.getT1());
      }
      
      • interval은 정해진 시간동안 대기 후 데이터를 전달한다.

Mono

image

** 이미지 출처: https://projectreactor.io/docs/core/release/api/reactor/core/publisher/Mono.html

  • Mono는 Publisher의 구현체로 0~1개의 데이터를 전달할 수 있다.
    • 데이터를 전달할 때마다 onNext 이벤트가 발생
    • 모든 데이터의 전달 처리가 완료되면 onComplete 이벤트가 발생
    • 데이터 전달 과정에서 오류가 발생하면 onError 이벤트가 발생
  • Mono를 생성하는 방법
    • just
      Mono<Integer> mono = Mono.just(10).filter(i -> i % 3 != 0);
      mono.subscribe(item -> System.out.println("onNext: " + item),
        throwable -> System.out.println("onError: " + throwable.getMessage()),
        () -> System.out.println("onComplete")
      );
      
      • just 내에 들어가는 값으로 Mono를 생성한다.
    • empty
      • 아무값도 전달하지 않는 빈데이터의 Mono를 만들 수 있다.

Mono의 동작방식과 block()

  • NonBlocking과 Sync
    @GetMapping("/")
    public Mono<String> hello() {
        log.info("pos0");
        // Publisher -> (Publisher) -> (Publisher) -> Subscriber
        Mono m = Mono.just("Hello WebFlux").log(); 
        log.info("pos1");
        return m;
    }
    
    2021-09-08 20:45:10.081  INFO 38476 --- [ctor-http-nio-2] com.example.demo.mono.MyController       : pos0
    2021-09-08 20:45:10.082  INFO 38476 --- [ctor-http-nio-2] com.example.demo.mono.MyController       : pos1
    2021-09-08 20:45:10.094  INFO 38476 --- [ctor-http-nio-2] reactor.Mono.Just.1                      : | onSubscribe([Synchronous Fuseable] Operators.ScalarSubscription)
    2021-09-08 20:45:10.096  INFO 38476 --- [ctor-http-nio-2] reactor.Mono.Just.1                      : | request(unbounded)
    2021-09-08 20:45:10.097  INFO 38476 --- [ctor-http-nio-2] reactor.Mono.Just.1                      : | onNext(Hello WebFlux)
    2021-09-08 20:45:10.100  INFO 38476 --- [ctor-http-nio-2] reactor.Mono.Just.1                      : | onComplete()
    
    • Subscriber는 Spring이 자동으로 걸어주게 되어 있다.
    • pos0 다음에 Mono에 대한 로그, pos1 로그가 찍히는 것이 아닌, pos0, pos1 후에 Mono에 대한 로그가 찍혔다.
      • Spring에서 Subscriber가 subscribe 를 할 때 Mono가 실행되기 때문이다.
    • just에 service를 호출한다면 service에 메소드가 먼저 실행된 후 결과값이 just에 들어간다.
      @GetMapping("/")
      public Mono<String> hello() {
          log.info("pos0");
          // Publisher -> (Publisher) -> (Publisher) -> Subscriber
          Mono m = Mono.just(myService.findById(1L)).log(); 
          log.info("pos1");
          return m;
      }
      
      2021-09-08 20:51:47.814  INFO 18036 --- [ctor-http-nio-2] com.example.demo.mono.MyController       : pos0
      2021-09-08 20:51:47.814  INFO 18036 --- [ctor-http-nio-2] com.example.demo.mono.MyService          : service: 1
      2021-09-08 20:51:47.817  INFO 18036 --- [ctor-http-nio-2] com.example.demo.mono.MyController       : pos1
      2021-09-08 20:51:47.829  INFO 18036 --- [ctor-http-nio-2] reactor.Mono.Just.1                      : | onSubscribe([Synchronous Fuseable] Operators.ScalarSubscription)
      2021-09-08 20:51:47.830  INFO 18036 --- [ctor-http-nio-2] reactor.Mono.Just.1                      : | request(unbounded)
      2021-09-08 20:51:47.831  INFO 18036 --- [ctor-http-nio-2] reactor.Mono.Just.1                      : | onNext(1)
      2021-09-08 20:51:47.834  INFO 18036 --- [ctor-http-nio-2] reactor.Mono.Just.1                      : | onComplete()
      
      • just 내부에 있는 myService에 메소드도 미리 실행된다.
  • NonBlocking과 Async
    @GetMapping("/")
    public Mono<String> hello() {
    	log.info("pos0");
    	// Publisher -> (Publisher) -> (Publisher) -> Subscriber
    	Mono<String> m = Mono.fromSupplier(() -> myService.findById(1L)).log(); 
    	log.info("pos1");
    	return m;
    }
    
    2021-09-08 20:55:18.650  INFO 20448 --- [ctor-http-nio-3] com.example.demo.mono.MyController       : pos0
    2021-09-08 20:55:18.653  INFO 20448 --- [ctor-http-nio-3] com.example.demo.mono.MyController       : pos1
    2021-09-08 20:55:18.664  INFO 20448 --- [ctor-http-nio-3] reactor.Mono.Supplier.1                  : | onSubscribe([Fuseable] Operators.MonoSubscriber)
    2021-09-08 20:55:18.667  INFO 20448 --- [ctor-http-nio-3] reactor.Mono.Supplier.1                  : | request(unbounded)
    2021-09-08 20:55:18.668  INFO 20448 --- [ctor-http-nio-3] com.example.demo.mono.MyService          : service: 1
    2021-09-08 20:55:18.668  INFO 20448 --- [ctor-http-nio-3] reactor.Mono.Supplier.1                  : | onNext(1)
    2021-09-08 20:55:18.671  INFO 20448 --- [ctor-http-nio-3] reactor.Mono.Supplier.1                  : | onComplete()
    
    • fromSupplier를 통해 callback 형식으로 작성해 주었다.
    • 동작 순서를 보면 Subscribe된 상태에서 service가 실행되는 것을 확인할 수 있다.
  • return하기 전에 subscribe() 호출
    @GetMapping("/")
    public Mono<String> hello() {
    	log.info("pos0");
      // Publisher -> (Publisher) -> (Publisher) -> Subscriber
      Mono<String> m = Mono.fromSupplier(() -> myService.findById(1L))
          		.doOnNext(c -> log.info(c))
          		.log(); 
    	    
      m.subscribe(); // 먼저 subscribe
      log.info("pos1");
      return m;
    }
    
    2021-09-08 21:01:59.063  INFO 31860 --- [ctor-http-nio-3] com.example.demo.mono.MyController       : pos0
    2021-09-08 21:01:59.067  INFO 31860 --- [ctor-http-nio-3] reactor.Mono.PeekFuseable.1              : | onSubscribe([Fuseable] FluxPeekFuseable.PeekFuseableSubscriber)
    2021-09-08 21:01:59.070  INFO 31860 --- [ctor-http-nio-3] reactor.Mono.PeekFuseable.1              : | request(unbounded)
    2021-09-08 21:01:59.070  INFO 31860 --- [ctor-http-nio-3] com.example.demo.mono.MyService          : service: 1
    2021-09-08 21:01:59.070  INFO 31860 --- [ctor-http-nio-3] com.example.demo.mono.MyController       : 1
    2021-09-08 21:01:59.070  INFO 31860 --- [ctor-http-nio-3] reactor.Mono.PeekFuseable.1              : | onNext(1)
    2021-09-08 21:01:59.071  INFO 31860 --- [ctor-http-nio-3] reactor.Mono.PeekFuseable.1              : | onComplete()
    2021-09-08 21:01:59.071  INFO 31860 --- [ctor-http-nio-3] com.example.demo.mono.MyController       : pos1
    2021-09-08 21:01:59.081  INFO 31860 --- [ctor-http-nio-3] reactor.Mono.PeekFuseable.1              : | onSubscribe([Fuseable] FluxPeekFuseable.PeekFuseableSubscriber)
    2021-09-08 21:01:59.081  INFO 31860 --- [ctor-http-nio-3] reactor.Mono.PeekFuseable.1              : | request(unbounded)
    2021-09-08 21:01:59.081  INFO 31860 --- [ctor-http-nio-3] com.example.demo.mono.MyService          : service: 1
    2021-09-08 21:01:59.081  INFO 31860 --- [ctor-http-nio-3] com.example.demo.mono.MyController       : 1
    2021-09-08 21:01:59.081  INFO 31860 --- [ctor-http-nio-3] reactor.Mono.PeekFuseable.1              : | onNext(1)
    2021-09-08 21:01:59.086  INFO 31860 --- [ctor-http-nio-3] reactor.Mono.PeekFuseable.1              : | onComplete()
    
    • pos0로그 후 subscribe()를 하기 Mono에 대한 로그가 나오고 pos1이 찍힌다. 그 다음 return으로 다시 Mono에 대한 subscribe가 진행된다.
    • Publisher는 1개이지만 여러개의 subscriber를 둘 수 있다.
  • Hot 과 Cold
    • Cold 방식은 subscribe할 때마다 매번 독립적인 데이터를 발행한다.
    • Hot 방식은 subscribe 할때 마다, 새로운 데이터를 발행이나 동작하지 않는 방식이다.
      • subscribe를 매번 할때마다 미리 생성해 둔 데이터로 동작을 한다. 그리고 subscribe 와 무관하게 즉시 데이터 발행과 동작도 가능한 방식이다.
  • block()
    • publisher가 제공하는 결과값을 꺼내서 Mono나 Flux 같은 컨테이너를 제거하고 값을 넘겨주는 것이 목적.
    • block으로 값을 빼왔으면, return에서 다시 Mono를 호출해서 처음부터 끝까지 재 생성 작업을 거치지 말고, 결과값을 Mono.just() 로 감싸서 전달하는 것이 훨씬 효율적
    • Mono 작업들이 DB 조회, Api 요청 등 고 비용 작업인 경우를 생각해보면 명확함

출처

  • https://brunch.co.kr/@springboot/154
  • https://projectreactor.io/docs/core/release/api/overview-summary.html
  • https://javacan.tistory.com/tag/Flux.fromStream
  • https://projectreactor.io/docs/core/release/api/reactor/core/publisher/FluxSink.html
  • 토비의 봄 TV 13회 스프링 리액티프 프로그래밍(9) Mono의 동작방식과 block()
  • 토비의 봄 TV 14회 스프링 리액티프 프로그래밍(10) Flux의 특징과 활용방식