Project Reactor
개요
- Reactor 는 Spring Framework 주도하에 개발된 리액티브 스트림즈 구현체
- spring-boot-starter-webflux 내부에 Reactor Core 라이브러리가 포함되어 있다.
- 특징
- Reactive Streams
- Non Blocking
- Java’s Functional API
- Flux/Mono
- Well suited for MSA - Reactor는 마이크로서비스에서 수맣은 서비스들 간에 지속적으로 발생하는 I/O 처리에 적합
- Backpressure-ready Network
- Hello Reactor
Flux<String> sequence = Flux.just("Hello", "Reactor"); sequence.map(data -> data.toLowCase()) .subscribe(System.out::println);
- Flux 는 Reactor에서 Publisher 역할, subscribe에 들어간 인자가 Subscriber 역할을 한다.
- just, map은 operator 메소드로 just는 데이터를 생성하여 제공, map은 전달받은 데이터를 가공한다.
- Reactor는 데이터를 생성하여 제공(1단계), 가공(2단계), 전달받은 데이터 처리(3단계)로 구성
- 마블다이어그램
- 비동기적인 데이터 흐름을 시간의 흐름에 따라 시각적으로 표시한 다이어그램
- Publisher의 타임라인과 Subscriber 타임라인 중간에 operator에 대해 표시하며 marvel(구슬)은 데이터를 의미
Cold Sequence, Hot Sequence
- Cold란 새로 시작한다는 의미, Hot은 새로 시작하지 않는다.
- Cold Sequence
Flux<String> coldFlux = Flux.fromIterable(Arrays.asList("KOREA", "JAPAN", "CHINESE")) .map(String::toLowCase); coldFlux.subscribe(System.out::println); // KOREA, JAPAN, CHINESE 출려 System.out.println("--------------"); Thread.sleep(2000L); coldFlux.subscribe(System.out::println); // KOREA, JAPAN, CHINESE 출려
- Subscriber가 구독할 떄마다 데이터 흐름이 처음부터 다시 시작
- 구독이 발생할 때마다 emit된 데이터를 처음부터 다시 전달
- 결과적으로 Publisher Sequence 타임라인이 구독할때마다 하나씩 더 생긴다.
- Subscriber가 구독할 떄마다 데이터 흐름이 처음부터 다시 시작
- Hot Sequence
String[] singers = {"A", "B", "C", "D", "E"}; Flux<String> concertFlux = Flux.fromArray(singers) .delayElements(Duration.ofSeconds(1)) // 각 데이터의 emit을 일정 시간 지연시킨다. .share(); // cold sequence를 hot sequence로 변환 concertFlux.subscribe(System.out.println); // A ~ E 출력 Thread.sleep(2500); concertFlux.subscribe(System.out.println); // C ~ E 출력 Thread.sleep(3000);
- 구독이 발생한 시점 이전에 emit된 데이터는 Subscriber가 전달받지 못하고 구독이 발생한 이후 emit된 데이터만 전달 받는다.
- 구독이 여러번 발생해도 타임라인은 하나밖에 생성되지 않는다.
- HTTP 요청/응답에서 Cold, Hot Sequence 동작 흐름
public static main(String[] args) throws InterruptedException { // Cold Sequence Mono<String> coldMono = getWordTime(uri); coldMono.subscribe(dateTime -> log.info("# dateTime1: {}". dateTime); Thread.sleep(2000); coldMono.subscribe(dateTime -> log.info("# dateTime2: {}". dateTime); Thread.sleep(2000); // Hot Sequence Mono<String> hotMono = getWordTime(uri).cache(); hotMono.subscribe(dateTime -> log.info("# dateTime1: {}". dateTime); Thread.sleep(2000); hotMono.subscribe(dateTime -> log.info("# dateTime2: {}". dateTime); Thread.sleep(2000); } private static Mono<String> getWordTime(URI wordTimeUri) { return webClient.create() .get().uri(worldTimeUri).retrieve() .bodyToMono(String.class) .ma(response -> { DocumentContext jsonContext = JsonPath.parse(response); return jsonContext.read("$.datetime"); }) }
- Cold Sequence 동작 방식
- 구독이 발생할 때마다 데이터의 emit 과정이 처음부터 새로 시작되는 Cold Sequence 특징으로 인해 두번의 구독이 발생했기 때문에 두번의 HTTP 요청이 이뤄지고 2초 정도 차이나는 응답을 확인할 수 있다.
- Hot Sequence 동작 방식
- cache operator
- hot source(sequence)로 병환하며 마지막 emit된 데이터에 대해 캐시한뒤, 구독이 발생할 때마다 캐시된 데이터를 전달
- Subscriber의 최초 구독이 발생해야 Publisher가 데이터를 emit 하는 warm-up 과 subscriber의 구독 여부와 상관없이 데이터를 emit 하는 Hot으로 구분할 수 있다.
- cache operator
- Cold Sequence 동작 방식
Backpressure
- Backpressure란
- 배압이란 뜻으로 Publisher가 끊임없이 emit 하는 무수한 데이터를 적절하게 제어하여 데이터 처리에 과부하가 발생하지 않도록 제어 하는 것
- Reactor에서 Backpressure 처리 방식
- 데이터 개수 제어: subscriber가 처리할 수 있는 수준의 적절한 데이터 개수 요청
Flux.range(1, 5) .doOnRequest(data -> log.info("# doOnRequest: {}", data)) .subscribe(new BaseSubscriver<Integer>() { @Override protected void hookOnSubscribe(Subscription subscription) { request(1); } @SneakyThrows @Override protected void hookOnNext(Integer value) { Thread.sleep(2000L); log.info("# hookOnNext: {}", value); request(1); } });
- hookOnSubscribe: onSubscribe() 대신하여 구독 시점에 request() 호출하여 최초 데이터 요청 개수 제어
- hookOnNext: onNext 메서드를 대신하여 emit 한 데이터를 처리한 후 Publisher에게 다시 데이터를 요청하는 역할로 request() 로 데이터 요청 개수 제어
- Backpressure 전략 사용
- IGNORE 전략: Backpressure 적용하지 않는다. (IllegalStateException 발생)
- ERROR 전략: Downstream으로 전달할 데이터가 버퍼에 가득 찰 경우 예외 발생 (IllegalStateException 발생)
- DROP 전략: Downstream으로 전달할 데이터가 버퍼에 가득 찰 경우 버퍼 밖에서 대기하는 먼저 emit된 데이터부터 drop
- LATEST 전략: Downstream으로 전달할 데이터가 버퍼에 가득 찰 경우 버퍼 밖에서 대기하는 가장 최근에 emit된 데이터부터 버퍼에 채우는 전략
- BUFFER 전략: Downstream으로 전달할 데이터가 버퍼에 가득찰 경우 버퍼 안에 있는 데이터부터 drop 시키는 전략
- onBackpressureBuffer(int buffer_size, Consume
con, BufferOverflowStrategy strategy) - DROP_LATEST: 가장 최근에 버퍼안에 채워진 데이터를 drop하여 확보된 공간에 emit된 데이터를 채우는 전략
- DROP_OLDEST: 버퍼 안에 채워진 데이터 중 가장 오래된 데이터를 drop 한 후, 확보된 공간에 emit된 데이터를 새우는 정략
- onBackpressureBuffer(int buffer_size, Consume
- 데이터 개수 제어: subscriber가 처리할 수 있는 수준의 적절한 데이터 개수 요청
Sinks
- Sinks 란?
- Reactor에선 Processor 인터페이스를 구현한 구현 클래스인 FluxProcessor, MonoProcessor, EmitterProcessor 등을 지원
- Sinks가 Reactor 3.4.0에 등장하였으며 Processor는 Reactor 3.5.0부터 완전히 제거될 예정
- Sinks는 리액티브 스트림즈의 Signal을 프로그래밍 방식으로 푸쉬할 수 있는 구조이며 Flux, Mono의 의미 체계를 갖는다.
- Flux, Mono는 onNext같은 Signal을 내부적으로 전송해주는 방식이었는데, Sinks를 사용하면 프로그래밍 코드를 통해 명시적으로 Signal을 전송할 수 있다.
- generate, create operator는 싱글스레드 기반에서 Signal을 전송하는 반면 Sinks는 멀티 스레드 방식으로 Signal을 전송해도 스레드 안정성을 보장하기 때문에 예기치 않은 동작으로 이어지는 것을 방지
```java
Sinks.Many
unicastSink = Sinks.many().unicast().onBacpressureBuffer(); Flux flux = unicastSink.asFlux(); IntStream.range(1, 10) .forEach(n -> { try { new Thread(() -> { unicastSink.emitNext(doTask(n), Sinks.EmitFailureHandler.FAIL_FAST); log.info("# emitted: {}", n); }).start(); } catch(InterruptedException e) { log.error(e.getMessage(); } }); flux.publishOn(Schedulers.parallel()) .map(result -> result + " success") .doOnNext(n -> log.info("# map: {}", n) .publishOn(Schedulers.parallel()) .subscribe(data -> log.info("# onNext: {}", data);
Thread.sleep(200L); ```
- doTask() 메서드가 루프를 돌때마다 새로운 스레드에서 실행
- Sink 종류 및 특징
- Sinks.One, Sinks.Many를 사용하여 전송할 수 있다.
- Sinks.One은 한건의 데이터를 프로그래밍 방식으로 emit하는 역할을 하기도 하고, Mono 방식으로 Subscriber가 데이터를 소비할 수 있도록 해주는 Sinks 클래스 내부에서 인터페이스로 정의된 Sinks의 스펙 또는 사양으로 볼 수 있다.
- EmitFailureHandler객체를 통해서 emit 도중 발생한 에러를 실패 처리하며 에러가 발생했을 때 재시도를 하지 않고 즉시 실패 처리 한다.
public interface EmitFailuerHandler { EmitFailureHandler FAIL_FAST = (signalType, emission) -> false; boolean onEmitFailure(SignalType signalType, EmitResult emitResult); }
- EmitFailureHandler객체를 통해서 emit 도중 발생한 에러를 실패 처리하며 에러가 발생했을 때 재시도를 하지 않고 즉시 실패 처리 한다.
- Sinks.Many
- Sinks.Many는 ManySpec을 리턴하며 UnicatSpec, MulticastSpec, MulticastRelpaySpec 를 리턴하는 형태의 추상메소드를 갖는 인터페이스이다.
- UnicastSpec
- onBackpressureBuffer 메서드를 호출하여 사용할 수 있다.
- unicast는 하나의 특정 시스템만 정보를 전달받는 방식으로 단 하나의 Subscriber에게만 데이터를 emit
- 두번째 subscribe를 호출하면 IllegalStateException 발생
- MulticastSpec
- onBackpressureBuffer 메서드를 호출하여 사용할 수 있다.
- 하나 이상의 Subscriber에게 데이터를 emit
- MulticaspReplaySpec
- emit된 데이터 중에서 특정 시점으로 되돌린 데이터부터 emit
Scheduler
- 스레드의 개념 이해
- Scheduler 는 Reactor Sequence에서 사용되는 스레드 관리자 역할
- 물리적인 스레드는 병렬성과 관련이 있으며 논리적인 스레드는 동시성과 관련이 있다.
- Scheduler란?
- 운영체제 레벨에서 Scheduler는 실행되는 프로세스를 선택하고 실행하는 등 프로세스의 라이프사이클을 관리
- Reactor에서도 Scheduler는 비동기 프로그래밍을 위해 사용되는 스레드를 관리해주는 역할
- Ract Condition 등 생길 수 있는 문제를 최소화하여 간결한 코드, 스레드 제어에 대한 부담을 적게 해준다.
- Scheduler를 위한 전용 Operator
- subscribeOn
Flux.fromArray(new Integer[] {1, 3, 5, 7}) .subscribeOn(Schedulers.boundedElastic()) .doOnNext(data -> log.info("# doOnNext: {}", data) .doOnSubscribe(subscription -> log.info("# doOnSubscription")) .subscribe(data -> log.info("#next: {}", data);
- 구독이 발생한 직후에 원본 Publisher의 동작을 처리하기 위해 스레드 할당
- doOnNext, doOnSubscribe는 데이터 emit, 구독 발생 시점에 처리 동작이며 스레드 확인 가능
- doOnSubscribe는 main 스레드 동작, 그 외에는 Scheduler에 의해 새로운 스레드로 동작한다.
- publishOn
Flux.fromArray(new Integer[] {1, 3, 5, 7}) .doOnNext(data -> log.info("# doOnNext: {}", data) .doOnSubscribe(subscription -> log.info("# doOnSubscription")) .publishOn(Schedulers.parallel()) .subscribe(data -> log.info("#next: {}", data);
- Signal을 전송할 때 실행되는 스레드를 제어하는 역할로 Downstream의 실행 스레드 변경
- doOnSubscribe, doOnNext는 메인 스레드에서 동작하며 onNext는 새로운 스레드에서 실행된다.
- parallel()
Flux.fromArray(new Integer[] {1, 3, 5, 7}) .parallel(4) .runOn(Schedulers.parallel()) .subscribe(data -> log.info("#next: {}", data);
- subscribeOn, publishOn은 동시성을 가지는 논리적인 스레드이지만 parallel 은 병렬성을 가지는 물리적인 스레드
- parallel 의 인자로 실행될 스레드의 개수를 지정할 수 있다. (CPU 코어 개수만큼 병렬 처리)
- 실제로 병렬 작업을 수행할 스레드 할당은 runOn() 에서 담당
- subscribeOn, publishOn은 동시성을 가지는 논리적인 스레드이지만 parallel 은 병렬성을 가지는 물리적인 스레드
- subscribeOn
- publishOn, subscribeOn의 동작 이해
Flux.fromArray(new Integer[] {1, 3, 5, 7}) .publishOn(Schedulers.parallel()) // 하위에 새로운 스레드 생성 .doOnNext(data -> log.info("# doOnNext: {}", data) .filter(data -> data > 3) // A thread .doOnNext(data -> log.info("# doOnNext: {}", data) .publishOn(Schedulers.parallel()) // 하위에 새로운 스레드 생성 .map(data -> data * 10); // B thread .subscribe(data -> log.info("#next: {}", data); // B thread
- 여러 operator 가 있을 때 publishOn을 호출하면 하위 downstream에 대해 새로운 스레드로 처리되며 Operator 체인상에 한개 이상 사용 가능하며 실행 목적에 맞게 적절하게 분리할 수 있다.
- subscribe와 publishOn을 함께 사용하여 원본 Publisher에서 데이터를 emit하는 스레드와 emit된 데이터를 가공 처리하는 스레드를 적절하게 분리할 수 있다.
- subscribeOn 체이상에 위치와 상관없이 구독 시점 직후, 즉 Publisher가 데이터를 emit하기 전에 실행 스레드를 변경
- Scheduler 종류
- Schedulers.immediate()
- 별도 스레드 생성없이 현재 스레드에서 작업을 처리하고자 할 때 사용
- Scedulers.single()
- 스레드 하나만 생성하여 Scheduler가 제거되기 전까지 재사용
- 여러번 호출해도 하나만 생성된다(single-1)
- Schedulers.newSingle()
- 호출할 때마다 매번 새로운 스레드 하나 생성
- 파라미터로는 스레드 명과 데몬 스레드 동작 여부 설정
- 데몬 스레드는 보조 스레드라고도 불리며 주 스레드가 종료되면 자동으로 종료된다.
- Schedulers.boundedElastic()
- ExecutorService 기반의 스레드 풀을 생성한 후, 그 안에서 정해진 수만큼의 스레드를 사용하여 작업을 처리하고 작업이 종료된 스레드는 바납하여 재사용하는 방식으로 Blocking-IO에 최적화 되어 있다.
- Schedulers.parallel()
- Non-Blocking IO 에 최적화되어 있는 Scheduler로서 CPU 코어 수만큼 스레드 생성
- Schedulers.fromExcutorService()
- 기존에 사용하고 있는 ExecutorService가 있다면 그로부터 Scheduler를 생성하는 방식, Reactor에서는 이 방식이 권장되지 않는다.
- Schedulers.newXXX()
- single(), boundedElastic(), parallel()은 Reactor에서 제공하는 Scheduler 인스턴스를 사용하지만 newXXX를 사용하면 새로운 Scheduler 인스턴스를 생성할 수 있다.
- 스레드 이름, 디폴트 스레드 개수, 유휴시간, 데몬스레드 여부등을 직접 지정하여 커스텀 스레드 풀로 새로 생성할 수 있다.
- Schedulers.immediate()
Context
- Context란
the situation, events or information that are related to something and that help you to understand it
- Context는 어떤 상황에서 그 상황을 처리하기 위해 필요한 정보
Reactor에서 Context 정의는 아래와 같다. A key/value stroe that is propagated between components such as operators via the context protocol
- propagate: downstream, upstream으로 Context가 전파되어 Operator 체인상의 각 Operator가 해당 Context 정보를 동일하게 이용할 수 있다.
- ThreadLocal과 다소 유사한 면이 있지만 실행 스레드와 매핑하는 것이 아닌 Scheduler와 매핑한다.
- 구독이 발생할 때마다 해당 구독과 연결된 하나의 Context가 생기는 것
Mono.deferContextual(ctx -> Mono.just("Hello " + ctx.get("firstName")) .subscribeOn(Schedulers.boundedElastic()) .publishOn(Schedulers.parallel()) .transformDefferedContextual((mono, ctx) -> mono.map(data -> data + " " + ctx.get("lastName:))) .contextWrite(context -> context.put("lastName", "Jobs")) .contextWrite(context -> context.put("firstName", "Steve")) .subscribe(data -> log.info("# onNext: {}", data));
- 구독이 발생할 때마다 해당 구독과 연결된 하나의 Context가 생기는 것
- Context에 데이터 쓰기
- contextWrite을 통해 Context에 데이터를 쓰고 있다.
- contextWrite 내부 구현을 보면 Function 으로 람다 타입의 Context 파라미터, 리턴으로 사용되고 있다.
- Context에 쓰인 데이터 읽기
- Context에서 데이터를 읽는 방식
- 원본 데이터 소스 레벨에서 읽는 방식
- Operator 체인의 중간에서 읽는 방식
- contextWriter 로 쓰기 작업 가능
- deferContextual operator 원본 데이터 소스 레벨에서 Context 데이터를 읽을 수 있다.
- transformDeferredContextual 을 사용하면 체인 중간에서 데이터를 읽을 수 있다.
- 저장된 데이터를 읽을 때는 ContextView를 사용하며 Reactor에서 Operator 체인상의 서로 다른 스레드들이 Context의 저장된 데이터에 손쉽게 접근할 수 있으며 context.put 은 매번 불변객체로 스레드 안정성을 보장
- Context에서 데이터를 읽는 방식
- Context는 어떤 상황에서 그 상황을 처리하기 위해 필요한 정보
- 자주 사용되는 Context API
- 쓰기
- put(key,value)
- key/value 형태로 context 값을 쓴다.
- of(key1, value1, key2, value2 …)
- key/value 형태로 여러개의 값을 쓴다.
- putAll(ContextView)
- 현재 Context와 입력받은 ContextVlue merge
- delete(key)
- Context에서 key에 해당하는 value를 지운다.
- put(key,value)
- 읽기
- get(key)
- getOrEmpty(key)
- getOrDefault(key, default value)
- hasKey(key)
- isEmpty()
- size()
- 쓰기
- Context 특징
- Context는 구독이 발생할 때마다 하나의 Context가 해당 구독에 연결
- Context는 Operator chain의 아래에서 위로 전파
- 동일한 키에 대한 값을 중복해 저장하면 Operator 체인에서 가장 위쪽에 위치한 contextWrite()로 저장한 값으로 덮어쓴다.
- Inner Sequence 내부에서는 외부 Context에 저장한 데이터를 읽을 수 있지만 Inner Sequence 외부에서는 Inner Sequence 내부 Context에 저장된 데이터를 읽을 수 없다.
Debugging
- Dubug Mode를 사용한 디버깅
Hooks.onOperationDebug();
- 실행 결과에 출력된 에러 메세지 이외에 stacktrace 내부에는 의미있는 내용을 확인하기 어렵기 때문에 디버그모드를 실행하면 Operator 체인상에 에러가 발생한 지점을 정확히 가리키고 있다.
- 디버그 모드를 활성화하는 경우 애플리케이션 내부에 비용이 많이 드는 동작 과정을 거친다.
- 애플리케이션 내에 있는 모든 Operator의 Stacktrace를 캡처한다.
- 에러가 발생하면 캡처한 정보를 기반으로 에러가 발생한 Assembly의 Stacktrace를 원본 Stacktrace 중간에 끼어 넣는다.
- Assembly란 Operator 연산으로 반환되는 Flux/Mono를 말한다.
- 에러가 발생한 operator의 stacktrace를 캡처한 assembly 정보를 traceback 이라 한다.
- checkpoint() operator를 사용한 디버깅
- checkpoint operator를 사용하면 operator 채인 내의 스택트레이스만 캡처
- Traceback 출력
Flux.just(2, 4, 6, 8) .zipWith(Flux.just(1, 2, 3, 0), (x, y) -> x/y) .map(num -> num + 2) .checkpoint() .subscribe( data -> log.info("# onNext: {}", data), error -> log.error("# onError: ", error); );
- map 다음에 추가한 checkpoint 지점까지 에러 전파 예상 가능
- traceback 출력 없이 식별자를 포함한 Description 을 출력하여 에러 발생 지점 예상하는 방법
Flux.just(2, 4, 6, 8) .zipWith(Flux.just(1, 2, 3, 0), (x, y) -> x/y) .checkpoint("zipwith checkpoint") .map(num -> num + 2) .checkpoint("map checkpoint") .subscribe( data -> log.info("# onNext: {}", data), error -> log.error("# onError: ", error); );
- 파라미터로 입력한 description 출력되는 것 확인 가능
- Traceback과 Description 모두 출력하는 방법
Flux.just(2, 4, 6, 8) .zipWith(Flux.just(1, 2, 3, 0), (x, y) -> x/y) .checkpoint("zipwith checkpoint", true) .map(num -> num + 2) .checkpoint("map checkpoint", true) .subscribe( data -> log.info("# onNext: {}", data), error -> log.error("# onError: ", error); );
- checkpoint와 description ahen cnffur
- operator 체인이 조금 복잡해지면 에러 발생 지점을 찾는 것이 쉽지 않다.
- log() operator를 사용한 디버깅
Flux.fromArray(new String[]{ "BANANAS", "APPLES", "PEARS", "MELONS")) .map(String::toLowCase) .map(fruit -> fruit.substring(0, fruit.length() - 1)) .log() .map(fruitMap::get) .subscribe( data -> log.info("# onNext: {}", data), error -> log.error("# onError: ", error); );
- log는 reactor sequence의 동작을 출력하는 데, 이 로그를 통해 디버깅 가능
- log operator를 추가하면 onSubscribe, request, onNext 같은 singal 출력
- log operator에 파라미터 추가 가능
- log(“Fruit.Substring”, Level.FINE)
- Fruit.Substring -> 카테고리 표시, Level.FINE -> 로그 레벨 지정
Testing
- StepVerifier를 사용한 테스팅
StepVerifier.create(Mono.just("Hello Reactor")) // 테스트할 대상 sequence 생성 .expectNext("Hello Reactor") // expectXX를 통해 기댓값 평가 .as("# expect next") // 이전 기댓값 평가 단계에 대한 설명 추가 가능 .expectComplete() .verify(); // 전체 operator chain의 테스트를 트리거
- 가장 일반적인 테스트 방식은 Flux, Mono를 Reactor sequence로 정의한 후, 구독 시점에 해당 operator chain이 시나리오 대로 동작하는 지 테스트
- 다음에 발생할 signal에 대해 데이터 emit, 특정 시간동안 emit된 데이터가 있는 지 등을 단계적으로 테스트할 수 있다. |메소드|설명| |—|—| |expectSubscription|구독이 이뤄짐을 기대| |expectNext|onNext 시그널을 통해 전달된 값이 파라미터로 전달된 값과 같음을 기대| |expectCopmlete|onComplete 시그널 전송되기를 기대| |expectError|onError 시그널 기대| |expectNextCount|구독 시점 또는 이전 expectNext 를 통해 기댓값이 평가된 데이터 이후부터 emit된 수를 기대| |expectNoEvent|주어진 시간동안 signal 이벤트가 발생하지 않았음을 기대| |expectAccessibleContext|구독 시점 이후 context가 전파되었음을 기대| |expectNextSequence|emit된 데이터들이 파라미터로 전달되 iterable 요소와 매치됨을 기대|
메소드 설명 verify 검증을 트리거 verifyComplete 검증 트리거 및 onComplete signal 기대 verifyError 검증 트리거 및 onError signal 기대 verifyTimeout 검증 트리거 및 주어진 시간이 초과되어도 publisher 가 종료되지 않음을 기대 - withVirtualTime(() -> method()).then(() -> VirtualTimeScheduler.get().advanceTimeBy(Duration.ofHours(1)))
- VirtualTimeScheduler 라는 가상 스케줄러의 제어를 받아 특정 시간에 대한 테스트가 가능하다.
- Backpressure 테스트
- verifyThenAssertThat().hasDroppedElements()
- 검증을 트리거한 후, 추가적인 Assertion이 가능한데, hasDroppedElements()를 사용하면 drop된 데이터가 있음을 판단
- Context 테스트
- expectAccessibleContext.hasKey(key1).hasKey(key2).then()
- expectAccessibleContext 를 통해 Context 전파를 확인하고 hasKey를 통해 key가 있는 지 확인할 수 있다.
- then 메소드를 통해 다음 signal의 기대값 평가를 할 수 있다.
- record 기반 테스트
- recordWith를 사용하여 단순 기댓값 평가를 넘어 좀 더 구체적인 조건으로 테스트 가능
- TestPublisher를 사용한 테스팅
- 정상 동작(Well-behaved)하는 TestPublisher
TestPublisher<Integer> source = TestPublisher.create(); StepVerifier.create(method(source.flux())) // 테스트 대상 클래스에 파라미터로 전달하기 위해 변환 .expectSubscription() .then(() -> source.emit(2, 4, 6, 8, 10)) // 필요한 데이터를 emit .expectNext(1, 2, 3, 4) .expectError() .verify();
- TestPublisher를 사용하여 복잡한 로직이 포함된 대상 메서드를 테스트하거나 조건에 따라 signal을 변경해야 되는 등의 특정 상황을 테스트하기 용이하게 한다.
- 오동작(Misbehaving)하는 TestPublisher
// 데이터가 null이어도 동작하는 TestPublisher 생성 TestPublisher<Integer> source = TestPublisher.createNoncompliant(TestPublisher.Violation.ALLOW_NULL); StepVerifier.create(method(source.flux())) // 테스트 대상 클래스에 파라미터로 전달하기 위해 변환 .expectSubscription() .then(() -> { getDataSource().stream().forEach(data -> source.next(data))); source.complete(); }).expectNext(1, 2, 3, 4) .expectError() .verify();
- 정상 동작(Well-behaved)하는 TestPublisher
- PublisherProbe를 사용한 테스팅
PublisherProbe<String> probe = PublisherProbe.of(method()); StepVerifier.create(method(probe.mono)) .expectNextCount(1) .verifyComplete(); probe.assertWasSubscribed(); // 해당 파라미터가 구독이 되었고, 요청을 했는지, 중간에 취소되지 않았는 지 확인할 수 있다. probe.assertWasRequested(); probe.assertWasNotCanceled();
- PublisherProbe를 통해 Sequence의 실행 경로를 테스트할 수 있다.
Operators
- Sequence를 생성하기 위한 operator
- justOrEmpty
- just의 확장 operator로 emit할 데이터가 null일 경우 NullPointerException이 발생하지 않고 onComplete signal 전송
- fromIterable
- iterable에 포함된 데이터를 emit하는 Flux 생성
- fromStream
- stream에 포함된 데이터를 emit하는 flux를 생성
- java stream 특성 상 Stream 은 재사용할 수 없으며 cancel, error, complete 시 자동으로 닫히게 된다.
- range
- n부터 1씩 증가한 연속된 수를 m개 emit 하는 flux를 생성
- for문처럼 특정 횟수만큼 어떤 작업을 처리하고자 할 경우에 주로 사용
- defer
System.out.println(LocalDateTime.now()); // 2025.06.01 20:54:00 Mono<LocalDateTime> justMono = Muno.just(LocalDateTime.now()); Mono<LocalDateTime> deferMono = Muno.defer(() -> Mono.just(LocalDateTime.now())); Thread.sleep(2000L); justMono.subscribe(System.out::println); // 2025.06.01 20:54:00 deferMono.subscribe(System.out::println); // 2025.06.01 20:56:00 Thread.sleep(2000L); justMono.subscribe(System.out::println); // 2025.06.01 20:54:00 deferMono.subscribe(System.out::println); // 2025.06.01 20:58:00
- 구독하는 시점에 데이터를 emit 하는 flux, mono 생성
- emit을 지연시키기 때문에 꼭 필요한 시점에 데이터를 emit하여 불필요한 프로세스를 줄일 수 있다.
- just는 hot publisher 이고, defer는 cold publisher 이다.
- using
Path path = Paths.get("./example.txt"); Flux.using(() -> Files.lines(path), Flux::fromStream, Stream::close) .subscribe(log::info)
- 파라미터로 전달받은 resource를 emit하는 flux 생성
- 첫번째 파라미터는 읽어 올 resource이고, 두번째 파라미터는 읽어 온 resource를 emit하는 flux, 세번째는 종료 signal이 발생할 경우 resources를 해제하는 등의 후처리를 할 수 있게 한다.
- generate
Flux.generate(() -> 0, (state, sink) -> { sink.next(state); if(state == 10) sink.complete(); return ++state; }).subscribe(log::info)
- 프로그래밍 방식으로 signal 이벤트를 발생시키며 특히 동기적으로 데이터를 하나씩 순차적으로 emit할 때 사용
- 첫번째 파라미터는 emit할 숫자의 초깃값 지정, 두번째는 상태를 지정한다.
- create
- generate와 동일하게 프로그래밍 방식으로 signal 이벤트를 발생시키지만 차이가 있다.
- generate는 동기적으로 한번에 한건씩 발생하지만 create는 한번에 여러건을 비동기적으로 emit할 수 있다.
- justOrEmpty
- Sequence 필터링 Operator
- filter
Flux.range(1, 30) .filter(num -> num % 2 != 0) .subscribe(log::info)
- Upstream에서 emit된 데이터 중에서 조건에 일치하는 데이터만 Downstream으로 emit
- filterWhen
Flux.fromIterable(method()) .filterWhen(vaccine -> Mono.just(vaccineMap.get(vacchine).getT2() >= 23_000_000) .publishOn(Scheduelrs.parallel())) // filterWhen operator 의 inner sequence를 통해 백신명에 해당하는 수량이 3_000_000개 이상이라면 해당 백신명을 Subscriber에게 emit .subscribe(log::info)
- 비동기적으로 필터링을 수행한다.
- filterWhen() operator는 내부에서 Inner Sequence를 통해 조건에 맞는 데이터인지를 비동기적으로 테스트한 후, 테스트 결과가 true 라면 downstream으로 emit
- skip
Flux.interval(Duration.ofSeconds(1)) // 5500ms 동안 0 ~ 5 까지 emit .skip(2) // 0, 1 skip .subscribe(log::info); // 2, 3, 4 출력 Threa.sleep(5500L);
Flux.interval(Duration.ofMIllis(300)) // 2000ms 동안 0 ~ 5 까지 emit .skip(Duration.ofSeconds(1)) // 1000ms 동안(0, 1, 2) emit skip .subscribe(log::info); // 3, 4, 5 출력 Threa.sleep(2000L);
- skip operator는 upstream에서 emit된 데이터 중 파라미터로 입력받은 숫자만큼 건너뛴 후 나머지를 emit
- take
Flux.interval(Duration.ofSeconds(1)) // 4000ms 동안 0 ~ 3 까지 emit .take(3) // 0 ~ 2 까지 take // take(Duration.ofMillis(2500) - 2500ms 동안 emit된 데이터 take .subscibe(log::info); // 0, 1, 2 출력 Threa.sleep(4000L);
- upstream에서 emit되는 데이터 중 파라미터로 입력받은 숫자만큼 downstream으로 emit
- takeXXX
Flux.fromIterable(method()) .takeLast(2) // .takeUntil(tuple -> tuple.getT2() > 20_000_000) // .takeWhile(tuple -> tuple.gett2() < 20_000_000) .subscribe(log::info);
- takeLast: upstream에서 emit된 데이터 중 가장 마지막에 emit된 데이터를 downstream으로 emit
- takeUntil: 파라미터로 입력한 람다표현식을 true가 될때까지 upstream에서 emit된 데이터를 downstream으로 emit
- takeWhile: takeUntil과 반대로 표현식이 true일 때까지 upstream에서 emit된 데이터를 downstream으로 emit
- next
Flux.fromIterable(method()) .next() .subscribe(log::info);
- upstream에서 emit된 데이터 중 첫번째 데이터만 downstream으로 emit
- filter
- Sequence 변환 Operator
- map
- upstream에서 emit된 데이터를 mapper function 을 사용하여 변환한 후 downstream으로 emit
- flatMap
Flux.just("Good", "Bad") .flatMap(feel -> Flux.just("Morning", "Afternoon", "Evening") .map(time -> feeling + " " + time)) .subscribe(log::info); //Good Morning, Good Afternoon, Good Evening, Bad ~ 출력
- upstream에서 emit 한 데이터는 flatMap 내부 inner sequence를 생성하여 1개 이상의 변환된 데이터를 emit
- flatMap 내부 sequence에서 Scheduler를 설정하여 비동기적으로 데이터를 emit할 수 있다.
- concat
Flux.concat(Flux.just(1, 2, 3), Flux.just(4, 5, 6)) .subscribe(log::info); // 1, 2, 3, 4, 5, 6 출력
- 파라미터로 입력되는 publisher sequence를 연결하여 데이터를 순차적으로 emit
- 먼저 입력되는 publisher가 종료될 때까지 나머지 publisher가 대기하는 특성을 갖는다.
- merge
Flux.merge( Flux.just(1, 2, 3, 4).delayElements(Duration.ofMillis(300L)) Flux.just(5, 6, 7).delayElements(Duration.ofMillis(500L)) ).subscribe(log::info); // 1(300ms), 5(500ms), 2(600ms), 3(900ms), 6(1000ms), 4(1200ms), 7(1500ms) 출력 Thread.sleep(2000L);
- merge operator의 동작을 이해하기 위해선 emit되는 시간 주기를 다르게 설정하는 것이 좋다.
- publisher가 emit하는 시간이 빠른 데이터부터 차례대로 emit한다.
- zip
Flux.zip( Flux.just(1, 2, 3).delayElements(Duration.ofMillis(300L)) Flux.just(4, 5, 6).delayElements(Duration.ofMillis(500L)) // (n1, n2) -> n1 * n2 ).subscribe(log::info); // [1,4], [2,5], [3,6] // 4, 10, 18 출력 Thread.sleep(2000L);
- emit된 데이터를 결합하는 데, 하나씩 emit을 기다렸다가 결합, Tuple로 묶어서 전달
- 세번째 파라미터로 tuple이 아닌 최종 변환된 데이터를 전달할 수 있도록 한다.
- and
Mono.just("Task 1") .delayElements(Duration.ofSeconds(1)) .doOnNext(data -> log.info("mono: {}", data)) .and(Flux.just("Task 2", "Task 3") .delayElements(Duration.ofMillis(600)) .doOnNext(data -> log.info("flux: {}", data)) .subscribe(log::info); // flux: Task 2, Mono: Task 1, Flux: Task 3 출력 Thread.sleep(5000);
- Mono의 complete signal과 파라미터로 입력된 publisher 의 complete signal을 결합하여 새로운 Mono
반환
- Mono의 complete signal과 파라미터로 입력된 publisher 의 complete signal을 결합하여 새로운 Mono
- collectList
- Flux에서 데이터를 모아 List로 변환하여 Mono<List
> 로 반환
- Flux에서 데이터를 모아 List로 변환하여 Mono<List
- collectMap
Flux.range(0, 26) .collectMap(key -> methodKey(key), value -> methodValue(value)) .subscribe(log::info);
- Flux에서 emit된 데이터를 기반으로 key, value를 생성하여 Mono<Map<T, V»로 반환
- map
-
Sequence 내부 동작 확인하기 위한 Operator |operator|description| |—|—| |doOnSubscribe()|Publisher가 구독중일 때 트리거되는 동작을 추가할 수 있다.| |doOnRequest()|Publisher가 요청을 수신할 때 트리거되는 동작을 추가할 수 있다.| |doOnNext()|Publisher가 데이터를 emit할 때 트리거되는 동작을 추가할 수 있다.| |doOnComplete()|Publisher가 성공적으로 완료되었을 때 트리거되는 동작을 추가할 수 있다.| |doOnError()|Publisher가 에러가 발생한 상태로 종료되었을 때 트리거되는 동작을 추가할 수 있다.| |doOnCancel()|Publisher가 취소되었을 때 트리거되는 동작을 추가할 수 있다.| |doOnTerminate()|Publisher가 성공적으로 완료되었을 떄 또는 에러가 발생한 상태로 종료되었을 때 트리거되는 동작을 추가할 수 있다.| |doOnEach()|Publisher가 데이털르 emit할 때, 성공적으로 완료되었을 때, 에러가 발생한 상태로 종료되었을 때 트리거되는 동작을 추가할 수 있다.| |doOnDiscard()|upstream에 있는 전체 operator 체인의 동작 중 operator에 의해 폐기되는 요소를 조건부로 정리할 수 있다.| |doAfterTerminate()|downstream을 성공적으로 완료한 직후 또는 에러가 발생하여 publisher가 종료된 직후에 트리거되는 동작을 추가할 수 있다.| |doFirst()|publisher가 구독되기 전에 트리거되는 동작을 추가할 수 있다.| |doFinally()|에러를 포함해 어떤 이유든 publisher가 종료된 후 트리거되는 동작을 추가할 수 있다.|
- 예외처리를 위한 Operator
- error
Flux.range(1, 5) .flatMap(num -> { if(num % 2 == 0) return Flux.error(IllegalArgumentException::new); return Mono.just(num * 2); }).subscribe(data -> log.info("next: {}", data)); // next: 1, onError 발생
- error operator는 파라미터로 지정된 에러로 종료하는 flux 생성
- throw 키워드로 예외를 의도적으로 던지는 것과 같은 역할을 한다.
- onErrorReturn
getBooks() .map(book -> book.getName().toUpperCase()) .onErrorReturn(NullPointerException.class, "No pen name") // onErrorReturn("No pen name") 가능 .onErrorReturn(IllegalFormatException.class, "Illegal pen name") .subscribe(log::info); // NPE가 발생하면 No pen name 출력, IllegalFormatException 발생 시 Illegal pen name 출력
- 에러 이벤트가 발생했을 때 Downstream으로 전파하지 않고 대체값을 emit
- onErrorResume
getBooksFromCache() .onErrorResume(error -> getBooksFromDb()) .subscribe(log::info, log::error);
- 에러 이벤트가 발생했을 때 Downstream으로 전파하지 않고 대체 publisher emit
- try catch문의 catch 블록에서 예외가 발새앟여 또 다른 메서드를 호출하는 형태
- onErrorContinue
Flux.just(1, 2, 4, 0, 12) .map(num -> 12 / num) .onErrorContinue((error, num) -> log.error("e: {}, num: {}", e.getMessage(), num)) .subscribe(log::info); // 12, 6, 3, 1 출력
- 에러가 발생했을 때, 에러 영역 내에 있는 데이터는 제거하고, upstream에서 후속 데이터를 emit하는 방식으로 에러 복구
- retry
- 에러가 발생하면 파라미터로 입력한 횟수만큼 원본 Flux의 Sequence를 다시 구독, 만약 Long.MAX_VALUE를 입력하면 무한 반복된다.
- error
- Sequence의 동작 시간 측정을 위한 Operator
- elasped
Flux.range(1, 3) .delayElements(Duration.ofSeconds(1)) .elasped() .subscribe(data -> log.info("onNext: {}, time: {}", data.getT2(), data.getT1()); // onNext: 1, time: 1029, // onNext: 2, time: 1005, // onNext: 3, time: 1001
- emit된 데이터 사이의 경과 시간을 측정하여 Tuple<Long, T> 형태로 Downstream에 emit
- elasped
- Flux Sequence 분할을 위한 Operator
- window
Flux.range(1, 7) .window(3) .flatMap(flux -> { log.info("============="); return flux; }).subscribe(new BaseSubscriber<>()) { @Override protected void hookOnSubscirbe(Subscription subscription) { subscription.request(2); } @Override protected void hookOnNext(Integer value) { log.info("# onNext: {}", value); request(2); } }); // ======= 1, 2, 3 ======= 4, 5, 6 ======= 7
- Upstream에서 emit되는 첫번째 데이터부터 maxSize 숫자만큼의 데이터를 포함한 새로운 Flux로 분할
- buffer
Flux.range(1, 7) .buffer(3) .subscribe(buffer -> log.info("onNext: {}", buffer); // onNext 1, 2, 3 // onNext: 4, 5, 6 // onNext: 7
- upstream에서 emit되는 첫번째 데이터부터 maxSize 숫자만큼 데이터를 list 버퍼로 한번에 emit
- bufferTimeout
Flux.range(1, 7) .map(num -> { try { if(num < 5) Thread.sleep(100L); else Thread.sleep(300L); } catch(InterruptedException e) {} return num; }) .bufferTimeout(3, Duration.ofMillis(450L)) .subscribe(buffer -> log.info("onNext: {}", buffer); // onNext: 1, 2, 3 // onNext: 4, 5 // onNext: 6 // onNext: 7
- upstream에서 emit되는 첫번째 데이터부터 maxSize 숫자만큼 데이터 또는 maxTime 내에 emit되는 데이터를 list 버퍼로 한번에 emit
- groupBy
Flux.fromIterable(SampleData.books) .groupBy(book -> book.getAuthorName()) .flatMap(groupedFlux -> groupedFlux.map(book -> book.getName() + " " + book.getAuthorName()).collectList()) .subscribe(log::info);
- emit되는 keyMapper로 생성한 key를 기준으로 그룹화한 GroupedFlux를 리턴하며 이 GroupedFlux를 통해서 그룹별로 작업ㅇ르 수행할 수 있다.
- window
- 다수의 Subscriber에게 Flux를 멀티캐스팅(Multicasting)
- publish
- autoConnect
-
refCount