RxJava의 Operator
- RxJava에서의 연산자는 메서드다.
- 연산자를 이용하여 데이터를 생성하고 통지하는 Flowable이나 Observable 등의 생산자를 생성할 수 있다.
- Flowable이나 Observable에서 통지한 데이터를 다양한 연산자를 사용하여 가공 처리하여 결과값을 만들어 낸다.
- 목차
- Flowable/Observable 생성 연산자
- 통지된 데이터를 필터링 해주는 연산자
- 통지된 데이터를 변환해주는 연산자
- 여러 개의 Flowable/Observable을 결합하는 연산자
- 에러 처리 연산자
- 유틸리티 연산자
- 조건과 불린 연산자
- 통지된 데이터를 집계해주는 연산자
Flowable/Observable 생성 연산자
- interval(Long initialDelay, Long period, TimeUnit timeUnit)
Observable.interval(0L, 1000L, TimeUnit.MILLISECONDS) .map(num -> num + " count") .subscribe(System.out::println); Thread.sleep(3000);-
독립된 스레드를 실행하기 때문에 main thread에 sleep을 사용
- time interval 시간만큼 대기한 후에 반복적으로 계속 통지한다.
- 완료없이 계속 통지한다.
- 지정한 시간 간격마다 0부터 시작하는 숫자(Long)을 통지
- initialDelay 파라미터를 이용하여 최초 통지에 대한 대기 시간을 지정할 수 있다.
- 완료 없이 계속 통지한다.
- 호출한 스레드와는 별도의 스레드에서 실행(독립된 스레드)
- polling 용도의 작업을 수행할 때 활용할 수 있다.
-
- range(int start, int count)
Observable.range(0, 5) .subscribe(System.out::println);- 지정한 start 부터 count 개의 숫자를 통지한다.
- for, while 등 반복문을 대체할 수 있다.
- timer(long time, TimeUnit timeUnit)
Observable.timer(2000, TimeUnit.MILLISECONDS) .map(count -> "Do work!") .subscribe(System.out::println); Thread.sleep(3000L);-
독립된 스레드를 실행하기 때문에 main thread에 sleep을 사용
- 지정한 시간이 지나면 0을 통지한다.
- 0을 통지하고 onComplete() 이벤트가 발생하여 종료한다.
- 호출한 스레드와는 별도의 스레드에서 실행된다.
- 특정 시간을 대기한 후에 어떤 처리를 하고자 할 때 활용할 수 있다.
-
-
defer
Observable<LocalTime> observable = Observable.defer(() -> { LocalTime currentTime = LocalTime.now(); return Observable.just(currentTime); }); Observable<LocalTime> observableJust = Observable.just(LocalTime.now()); observable.subscribe(time -> { System.out.println("# defer() 구독1의 구독 시간: " + time); }); observableJust.subscribe(time -> { System.out.println("# just() 구독1의 구독 시간: " + time); }); Thread.sleep(3000L); observable.subscribe(time -> { System.out.println("# defer() 구독2의 구독 시간: " + time); }); observableJust.subscribe(time -> { System.out.println("# just() 구독2의 구독 시간: " + time); });- 2번째 구독 시간을 확인하면 defer에 경우 3초 지연 시간 이후에 찍히고, just는 처음 데이터를 전송한 후에 바로 전송한 것
-
즉, just는 바로 데이터를 전송한 것이고, defer는 구독할 때 전송한 것이다.
- 구독이 발생할 때마자 즉, subscribe()가 호출될 때마다 새로운 Observable을 생성한다.
- 선언한 시점의 데이터를 통지하는 것이 아닌 호출 시점의 데이터를 통지한다.
- 데이터 생성을 미루는 효과가 있기 때문에 최신 데이터를 얻고자할 때 활용할 수 있다.
- fromIterable
List<String> countries = Arrays.asList("Korea", "Canada", "Italy"); Observable.fromIterable(countries) .subscribe(System.out::println);- Iterable 구현 객체를 파라미터로 입력 받아서 순서대로 통지하는 연산자이다.
- Iterable에 담긴 데이터를 순서대로 통지한다.
- fromFuture()
public static void main(String[] args) { System.out.println("#start time"); // 긴 처리가 걸리는 작업 Future<Double> future = longTimeWork(); // 짧은 처리 작업 shortTimeWork(); Observable.fromFuture(future) .subscribe(System.out::println); } public static Future<Double> longTimeWork() { return CompletableFuture.supplyAsync(()-> calculate()); } public static double calculate() { System.out.println("#긴 처리 시간이 걸리는 작업"); try { Thread.sleep(3000L); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } return 10000000000L; } public static void shortTimeWork() { System.out.println("#짧은 처리 시간이 걸리는 작업"); }- Future 인터페이스는 자바 5에서 비동기 처리를 위해 추가된 동시성 API이다.
- 시간이 오래 걸리는 작업은 Future를 반환하는 ExcutorService에게 맡기고 비동기로 다른 작업을 수행할 수 있다.
- Java 8에서는 CompletableFuture 클래스를 통해 구현이 간결해졌다.
-
Future를 사용하면 여러 작업들을 비동기로 수행할 수 있다.
- Future 학습
데이터 필터링 연산자
- filter
- 전달받은 데이터가 조건에 맞는지 확인한 후 결과가 true인 데이터만 통지한다.
- filter라는 단어의 산전적인 의미가 무언가를 걸러낸다는 의미
- 파라미터로 받는 Predicate 함수형 인터페이스에서 조건을 확인
Observable.fromIterable(SampleData.carList) .filter(car -> car.getCarMaker() == CarMaker.CHEVOLET) .subscribe(System.out::println); - distinct
- 이미 통지된 동일한 데이터가 있다면 이후의 동일한 데이터는 통지하지 않는다.
- distinct의 사전적 의미는 명확하게 구별되는 이라는 뜻을 포함하고 있다.
Observable.fromArray(SimpleData.carMakersDuplicated) .distinct(Car::getCarMaker) .subscribe(System.out::println);- distinct 내에 람다를 입력할 수 있는데, 입력한 값에 중복을 확인한다.
- take
- 파라미터로 지정한 개수나 기간이 될 때까지 데이터를 통지한다.
- 지정한 범위가 통지 데이터보다 클 경우 데이터를 모두 통지하고 완료한다. ```java Observable.just(“a”, “b”, “c”, “d”) .take(2) .subscribe(System.out::println);
Observable.interval(1000L, TimeUnit.MILLISECONDS) .take(3500L, TimeUnit.MILLISECONDS) .subscribe(System.out::println); Thread.sleep(3500L); ```
- a, b 만 전달한다
- 시간을 지정할 수 있다.
- takeUntil 첫번째 유형
- 파라미터로 지정한 조건이 true가 될 때까지 데이터를 계속 통지한다.
- true가 된 데이터까지 전송한 후에 완료한다.
Observable.fromIterable(SampleData.carList) .takeUntil(car -> car.getName().equals("트랙스")) .subscribe(System.out::println);- 데이터 중 트랙스 데이터가 있을 때까지 데이터가 전송된다.
- 말리부, K3, Q3, 트랙스 출력
-
takeUntil 두번째 유형

- 파라미터로 지정한 Observable이 최초 데이터를 통지할 때까지 데이터를 계속 통지한다.
Observable.interval(1000L, TimeUnit.MILLISECONDS) .takeUntil(Observable.timer(5500L, TimeUnit.MILLISECONDS) .subscribe(System.out::println); Thread.sleep(5500L);- 새로운 Observable에 timer를 5500로 세팅했기 때문에 5초까지 데이터가 전송된다.
- 5.5초에 파라미터로 넘긴 Observable에 데이터가 전송되기 때문에 완료된다.
- 파라미터로 지정한 Observable이 최초 데이터를 통지할 때까지 데이터를 계속 통지한다.
- skip 첫번째 유형
- 파라미터로 지정한 숫자만큼 데이터를 건너뛴 후 나머지 데이터를 통지한다.
Observable.range(1, 15) .skip(3) .subscribe(System.out::println); - 1, 2, 3은 skip하고 4부터 출력된다.
- 파라미터로 지정한 숫자만큼 데이터를 건너뛴 후 나머지 데이터를 통지한다.
- skip 두번째 유형
- 파라미터로 지정한 시간만큼 데이터를 건너뛴 후 지정한 시간 이 후에 나머지 데이터를 통지한다.
Observable.interval(300L, TimeUnit.MILLISECONDS) .skip(1000L, TimeUnit.MILLISECONDS) .subscribe(System.out::println); Thread.sleep(5500L); - 0.3, 0.6, 0.9 초까지는 건너뛴 후, 1.2초부터 3 데이터가 출력된다.
- 파라미터로 지정한 시간만큼 데이터를 건너뛴 후 지정한 시간 이 후에 나머지 데이터를 통지한다.
- 추가: takeWhile, skipUntil, skipLast
데이터 변환 연산자
- map
- 원본 Observable 에서 통지하는 데이터를 원하는 값으로 변환 후 통지한다.
- 변환 전, 후 데이터 타입은 달라도 상관없다.
- null을 반환하면 NullpointException이 발생하므로 데이터 하나를 반드시 반환해야 한다.
- 예제
List<Integer> oddList = Arrays.asList(1, 3, 5, 7); Observable.fromIterable(oddList) .map(data -> data + 1) .subscribe(System.out::println);
- flatMap 첫번째 유형
- 원본 데이터를 원하는 값으로 변환 후 통지하는 것은 map과 같다
- map이 1대 1 변환인 것과 달리 flatMap은 1대 다 변환이므로 데이터 한개로 여러 데이터를 통지할 수 있다.
- map은 변환된 데이터를 반환하지만 flatMap은 변환된 여러개의 데이터를 담고 있는 새로운 Observable을 반환
- 예제
Observable.just("Hello") .flatMap(hello -> Observable.just("Java", "JPA", "RxJava", "Spring").map(lang -> hello + " " + lang)) .subscribe(System.out::println);
- flatMap 두번째 유형
- 원본 데이터와 변환된 데이터를 조합하여 새로운 데이터를 통지
- 즉, Observable에 원본 + 변환 = 최종 데이터를 실어서 반환
- 예제
Observable.range(2, 1) .flatMap( data -> Observable.range(1, 9), (source, transform) -> { return source + " * " + transform + " = " + (source*transform); } ) .subscribe(System.out::println);
- concatMap
- flatMap과 마찬가지로 받은 데이터를 변환하여 새로운 Observable로 반환
- 반환된 새로운 Observable을 하나씩 순서대로 실행하는 것이 flatMap과 다르다.
- 즉, 데이터의 처리 순서는 보장하지만 처리중인 Observable의 처리가 끝나야 다음 Observable이 실행되므로 처리 성능에 영향을 줄 수 있다.
- 예제
Observable.interval(100L, TimeUnit.MILLISECONDS) .take(4) .skip(2) .concatMap(num -> Observable.interval(200L, TimeUnit.MILLISECONDS).take(10).skip(1) .map(row -> num + " * " + row + " = " + (row*num))) .subscribe(System.out::println, error->{}, ()-> System.out.println("OnComplete")); Thread.sleep(5000L);- concatMap을 사용하면 concatMap 안에 Observable에서 보내는 순서를 보장한다.
- flatMap을 사용하면 flatMap 안에 Observable에서 보내는 순서를 보장하지 않는다.
- switchMap
- concatMap과 마찬가지로 받은 데이터를 변환하여 새로운 Observable로 반환
- concatMap과 다른 점은 switchMap은 순서를 보장하지만 새로운 데이터가 통지되면 현재 처리중이던 작업을 바로 중단한다.
- 예제
Observable.interval(100L, TimeUnit.MILLISECONDS) .take(4) .skip(2) .switchMap(num -> Observable.interval(200L, TimeUnit.MILLISECONDS).take(10).skip(1) .map(row -> num + " * " + row + " = " + (row*num))) .subscribe(System.out::println, error->{}, ()-> System.out.println("OnComplete")); Thread.sleep(5000L);- 2단은 출력되지 않고, 3단은 출력된다.
- 2단 출력될 시점에 3단관련 데이터를 전송하여 중단되었다.
- groupBy
- 하나의 Observable을 여러개의 새로운 GroupedByObservable로 만든다.
- 원본 Observable의 데이터를 그룹별로 묶는다기보다는 각각의 데이터들이 그룹에 해당하는 Key를 가지게 된다.
- GropuedByObsrevable은 getKey()를 통해 구분된 그룹을 알 수 있게 해준다.
- 예제
Observable<GroupedObservable<CarMaker, Car>> observable = Observable.fromIterable(SampleData.carList).groupBy(car -> car.getCarMaker()); observable.subscribe(groupedObservable -> gropuedObservable .filter(data -> gropuedObservable.getKey().equals(CarMaker.CHEVOLET) .subscribe(data -> { System.out.println(groupedObservable.getKey() + " " + data); // key가 Carmaker()가 된다 ));
- toList
- 통지되는 데이터를 모두 List에 담아 통지한다.
- 원본 Observable에서 완료 통지를 받는 즉시 리스트를 통지한다.
- 통지되는 데이터는 원본 데이터를 담은 리스트 하나이므로 Single로 반환된다.
- 예시
Single<List<Integer>> single = Observable.just(1, 3, 5, 7, 9).toList(); single.subscribe(System.out::println);Observable.fromIterable(SampleData.carList) .toList() .subscribe(System.out::println);- 1, 3, 5, 7, 9가 리스트로 한번만 전송된다.
- carList가 하나씩 전송되는 것이 아닌 리스트로 한번만 전송된다.
- toMap
- 통지되는 데이터를 모두 Map에 담아 통지한다.
- 원본 Observable에서 완료 통지를 받는 즉시 Map을 통지한다.
- 이미 사용중인 key(키)를 또 생성하면 기존에 있던 key(키)와 value(값)을 덮어쓴다.
- 통지되는 데이터는 원본 데이터를 담은 Map하나이므로 Single로 반환된다.
- 예시
Single<Map<String, String>> single = Observable.just("a-Alpha", "b-Beta", "c-Charlie", "e-Echo") .toMap( data -> data.split("-")[0], // 반환값이 key가 된다. data -> data.split("-")[1] // 반환값이 value가 된다. ); single.subscribe(System.out::println);- a: Alpha .. 식의 key, value 형태의 Map을 한번에 전송한다.
- toMap의 key만 파라미터로 넣을 수 있다. (key or key, value)
데이터 결합 연산자
- merge
- 다수의 Observable에서 통지된 데이터를 받아 다시 하나의 Observable로 통지한다.
- 통지 시점이 빠른 Observable의 데이터부터 순차적으로 통지되고 통지 시점이 같을 경우에는 merge() 함수의 파라미터로 먼저 지정된 Observable 데이터로부터 통지된다.
-
예시
- 0.2s 0.4s 0.6s 0.8s 1s 1.2s 1.4s 1.6s 1.8s 2s Observable1 0 1 2 3 4 - - - - - Observable2 - 1000 - 1001 - 1002 - 1003 - 1004 Observable 0 1,1000 2, 3,1001 4 1002 - 1003 - 1004 Observable<Long> observable1 = Observable.interval(200L, TimeUnit.MILLISECONDS) .take(5); Observable<Long> observable2 = Observable.interval(400L, TimeUnit.MILLISECONDS) .take(5) .map(data -> data + 1000); Observable.merge(observable1, observable2) .subscribe(System.out::println); Thread.sleep(4000);
- concat
- 다수의 Observable에서 통지된 데이터를 받아 다시 하나의 Observable로 통지
- 하나의 Observable에서 통지가 끝나면 다음 Observable에서 연이어 통지가 된다.
- 각 Observable의 통지시점과 상관없이 concat() 함수의 파라미터로 먼저 입력된 Observable의 데이터부터 모두 통지된 후, 다음 Observable의 데이터가 통지된다.
- 예시
Observable<Long> observable1 = Observable.interval(500L, TimeUnit.MILLISECONDS) .take(4); Observable<Long> observable2 = Observable.interval(300L, TimeUnit.MILLISECONDS) .take(5) .map(data -> data + 1000); Observable.concat(observable1, observable2) .subscribe(System.out::println); Thread.sleep(3500L);- observable2가 먼저 통지되지만 observable1이 먼저 파라미터로 입력되었기 때문에 observable1이 먼저 통지된다.
- 0, 1, 2, 3, 1000, 1001, 1002, 1003 순으로 입력된다.
- concat에 파라미터로 List와 같은 Collection도 가능하다
- zip
- 다수의 Observable에서 통지된 데이터를 받아 다시 하나의 Observable로 통지
- 각 Observable에서 통지된 데이터가 모두 모이면 각 Observable에서 동일한 index의 데이터로 새로운 데이터를 생성한 후 통지한다.
- 통지하는 데이터 개수가 가장 적은 Observable의 통지 시점에 완료 통지 시점을 맞춘다.
- 예시
Observable<Long> observable1 = Observable.interval(200L, TimeUnit.MILLISECONDS) .take(4); Observable<Long> observable2 = Observable.interval(400L, TimeUnit.MILLISECONDS) .take(6) .map(data -> data + 1000); Observable.zip(observable1, observable2, (data1, data2) -> data1 + data2) .subscribe(System.out::println); Thread.sleep(3500L);- 4개만 출력된다. observable1에 개수가 적기 때문에
- 1000, 1002, 1004, 1006 이 출력된다.
- combineLatest
- 다수의 Observable에서 통지된 데이터를 받아 다시 하나의 Observable로 통지
- 각 Observable에서 데이터를 통지할 때마다 모든 Observable에서 마지막으로 통지한 각 데이터를 함수형 인터페이스에 전달하고, 새로운 데이터를 생성해 통지한다.
- 예제
Observable<Long> observable1 = Observable.interval(500L, TimeUnit.MILLISECONDS) .take(4); Observable<Long> observable2 = Observable.interval(700L, TimeUnit.MILLISECONDS) .take(4); Observable.combineLatest(observable1, observable2, (data1, data2) -> "data1: " + data1 +", data2: " + data2) .subscribe(System.out::println); Thread.sleep(3500L);// 출력 data1: 0, data2: 0 data1: 1, data2: 0 data1: 1, data2: 1 data1: 2, data2: 1 data1: 3, data2: 1 data1: 3, data2: 2 data1: 3, data2: 3- 0.7s에 observable2에는 0이 마지막으로 도착하고, observable1에는 0이 마지막으로 도착했다.
- 1.0s에 observable1에는 1이 마지막으로 도착하고, observable2에는 0이 마지막으로 도착했다.
- 1.4s에 observable2에는 1이 마지막으로 도착하고, observable1에는 1이 마지막으로 도착했다.
- … 이런 방식으로 계속 출력되는 것
에러 처리 연산자
- RxJava에서는 Try-Catch를 할 수 있다.
try { Observable.just(2) .map(num -> num / 0) .subscribe(System.out::println); } catch(Exception e) { System.out.println("에러 처리 필요: " + e.getMessage()); // 출력되지 않는다. }- divide zero로 오류가 발생할 수 있지만, catch 문을 타지 않는다.
- console에 찍힌 오류문을 확인해보면 subscribe에 onError 문을 작성하지 않았다는 내용이 적혀 있다.
- RxJava에서는 onError를 작성하여야 한다.
Observable.just(5) .flatMap(num -> { return Observable.interval(200L, TimeUnit.MILLISECONDS) .doOnNext(data-> System.out.println("do on next: " + data)) .take(5) .map(i -> num / i); }) .subscribe( data -> System.out.println("on next: " + data), error -> System.out.println("error: " + error), () -> System.out.println("on complete!")); Thread.sleep(1000L);- do on next에서 0이 통지한다. map에서 divide zero exception이 발생하고, subscribe에서 error를 타고 종료한다.
- onErrorReturn
- 에러가 발생했을 때 에러를 의미하는 데이터로 대체할 수 있다.
- onErrorReturn()을 호출하면 onError 이벤트는 발생하지 않는다.
- 예제
Observable.just(5) .flatMap(num -> { return Observable.interval(200L, TimeUnit.MILLISECONDS) .take(5) .map(i -> num / i) .onErrorReturn(exception -> { if(exception instanceof ArithmeticException) System.out.println("ArithmeticException: " + exception.getMessage()); return -1L; }); }) .subscribe( data -> System.out.println("on next: " + data), error -> System.out.println("error: " + error), () -> System.out.println("on complete!")); Thread.sleep(1000L);// 출력 ArithmeticException: / by zero on next: -1 on complete!
- onErrorResumeNext
- 에러가 발생했을 때 에러를 의미하는 Observable로 대체할 수 있다.
- Observable로 대체할 수 있으므로 데이터 교체와 더불어 에러 처리를 위한 추가 작업을 할 수 있다.
- 예제
Observable.just(5) .flatMap(num -> { return Observable.interval(200L, TimeUnit.MILLISECONDS) .take(5) .map(i -> num / i) .onErrorResumeNext(exception -> { System.out.println("EXCEPTION: " + exception.getMessage()); return Observable.interval(200L, TimeUnit.MILLISECONDS) .take(5).skip(1).map(i -> num / i); }); }) .subscribe(data -> System.out.println("on next: " + data)); Thread.sleep(2000L);// 출력 EXCEPTION: / by zero on next: 5 on next: 2 on next: 1 on next: 1- Exception이 발생하여도 onErrorResumeNext에서 전달한 Observable에 데이터가 전송된다.
- retry
- 데이터 통지 중 에러가 발생했을 때, 데이터 통지를 재시도한다.
- 즉, onError 이벤트가 발생하면 subscribe()를 다시 호출하여 재구독한다.
- 에러가 발생한 시점에 통지에 실패한 데이터만 다시 통지되는 것이 아니라 처음부터 다시 통지된다.
- 예제
Observable.just(10, 12, 15, 16) .zipWith(Observable.just(1, 2, 0, 4), (a, b) -> { int result; try { result = a / b; return result; } catch(ArithmeticException e) { System.out.println(e.getMessage()); throw e; } }) .retry(3) .onErrorReturn(throwable -> -1) .subscribe( data -> System.out.println("ON_NEXT: " + data), error -> System.out.println("ON_ERROR: " + error), ()-> System.out.println("ON_COMPLETE") );// 출력 ON_NEXT: 10 ON_NEXT: 6 / by zero ... // 3번 반복 ... ON_NEXT: -1 ON_COMPLETE- EXCEPTION이 발생한 후 retry를 3번 반복하여 발생한다.
- 시도한 후에 onErrorReturn을 통해 -1을 전송한다.
- Exception이 발생한 시점부터 다시 값을 보내는 것이 아닌, 처음부터 값을 전송한다.
유틸리티 연산자
- delay 첫번째 유형
- 생산자가 데이터를 생성 및 통지를 하지만 설정한 시간만큼 소비자쪽으로의 데이터 전달을 지연시킨다.
- 예시
Observable.just(1, 3, 4, 5) .doOnNext(data -> System.out.println("DO ON NEXT: " + data)) .delay(2000L, TimeUnit.MILLISECONDS) .subscribe(data -> System.out.println("ON NEXT: " + data)); Thread.sleep(2500L);- 2초 정도 delay가 지난 후에 ON NEXT에 수신된다.
- delay 두번째 유형
- 파라미터로 생성되는 Observable이 데이터를 통지할 때까지 각각의 원본 데이터의 통지를 지연시킨다.
- 예시
Observable.just(1, 3, 5, 7) .delay(item -> { Thread.sleep(1000L); return Observable.just(item); }).subscribe(data-> System.out.println("ON NEXT: " + data));
- delaySubscription
- 생산자가 데이터의 생성 및 동지 자체를 설정한 시간만큼 지연시킨다.
- 즉, 소비자가 구독을 해도 구독 시점 자체가 지연된다.
- delay에 경우 구독 즉시 호출되지만, 구독 시점은 delay 시간 후에 진행된다.
- delaySubscription에 경우 delay 시간 후에 호출되고 바로 구독된다.
- 예시
Observable.just(1, 3, 4, 5) .doOnNext(data -> System.out.println("DO ON NEXT: " + data)) .delaySubscription(2000L, TimeUnit.MILLISECONDS) .subscribe(data -> System.out.println("ON NEXT: " + data)); Thread.sleep(2500L);
- timeout
- 각각의 데이터 통지 시, 지정된 시간안에 통지가 되지 않으면 에러를 통지한다.
- Timeout 시간 안에 1번에 호출이 발생해야 한다.
- 에러 통지 시 전달되는 에러 객체는 TimeoutException이다.
- 예시
Observable.range(1, 5) .map(num -> { long time = 1000L; if(num == 4) { time = 1500L; } Thread.sleep(time); return num; }) .timeout(1200L, TimeUnit.MILLISECONDS) .subscribe( data -> System.out.println("ON_NEXT: " + data), error -> System.out.println("ON_ERROR: " + error) );// 출력 ON_NEXT: 1 ON_NEXT: 2 ON_NEXT: 3 ON_ERROR: java.util.concurrent.TimeoutException:- 1, 2, 3 을 진행할 때에는 1초 간격으로 값을 던졌기 때문에 1.2초 내에 timeout 시간 안에 동작되었다.
- 4를 전송할 차례에서는 1.2초 이내에 진행되지 않아 오류가 발생된다.
- 각각의 데이터 통지 시, 지정된 시간안에 통지가 되지 않으면 에러를 통지한다.
- timeInterval
- 각각의 데이터가 통지되는 데 걸린 시간을 통지한다.
- 통지된 데이터와 데이터가 통지되는 데 걸린 시간을 소비자쪽에서 모두 처리할 수 있다.
- 예시
Observable.just(1, 3, 5, 7, 9) .delay(item -> { Thread.sleep((int)(Math.random()*1000)); return Observable.just(item); }) .timeInterval() .subscribe(timed -> System.out.println("통지 걸린 시간:" + timed.time() +", 데이터: " + timed));- 보내준 객체로 value, interval time 등을 알 수 있다.
- 출력
통지 걸린 시간:148, 데이터: Timed[time=148, unit=MILLISECONDS, value=1] 통지 걸린 시간:11, 데이터: Timed[time=11, unit=MILLISECONDS, value=3] 통지 걸린 시간:197, 데이터: Timed[time=197, unit=MILLISECONDS, value=5] 통지 걸린 시간:222, 데이터: Timed[time=222, unit=MILLISECONDS, value=7] 통지 걸린 시간:109, 데이터: Timed[time=109, unit=MILLISECONDS, value=9]
- materialize / dematerialize
- materialize: 통지된 데이터와 통지된 데이터의 통지 타입 자체를 Notification 객체에 담고 이 Notification 객체를 통지한다.
- 즉, 통지 데이터의 메타 데이터를 포함하여 통지한다고 볼 수있다.
- dematerialize: 통지된 Notification 객체를 원래의 통지 데이터로 변환하여 통지한다.
- materialize 예시
Observable.just(1, 2, 3) .materialize() .subscribe(notification-> { String notificationType = notification.isOnNext() ? "onNext()" : (notification.isOnError() ? "onError()" : "onComplete()"); System.out.println("notification 타입: " + notificationType); System.out.println("on next: " + notification.getValue()); });// 출력 notification 타입: onNext() on next: 1 notification 타입: onNext() on next: 2 notification 타입: onNext() on next: 3 notification 타입: onNext() notification 타입: onComplete() on next: null
- materialize: 통지된 데이터와 통지된 데이터의 통지 타입 자체를 Notification 객체에 담고 이 Notification 객체를 통지한다.
조건과 불린 연산자
- all
- 통지되는 모든 데이터가 설정한 조건에 맞는지를 판단한다.
- 결과 값을 한번만 토지하면 되기때문에 true/false 값을 Single로 반환한다.
- 통지된 데이터가 조건에 맞지 않는다면 이후 데이터는 구독 해지되어 통지되지 않는다.
- 예시
Observable.just(1, 3, 5, 7, 8, 10) .doOnNext(num -> System.out.println("Do on next: " + num)) .all(num -> num < 6) .subscribe(data -> System.out.println("Do on next: " + data));Do on next: 1 Do on next: 3 Do on next: 5 Do on next: 7 on next: false- false 조건이 나올 때까지는 정상 전달된다.
- false가 되면 Single
타입으로 전송된다.
- amb
- 여러 개의 Observable 중 최초 통지 시점이 가장 빠른 Observable의 데이터만 통지되고, 나머비 Observable은 무시된다.
- 즉, 가장 먼저 통지를 시작한 Observable의 데이터만 통지된다.
- 예시
List<Observable<Integer>> observables = Arrays.asList( Observable.just(1, 3, 5, 7) .delay(100L, TimeUnit.MILLISECONDS).doOnComplete(()->System.out.println("completeA")), Observable.just(2, 4, 6, 8) .delay(200L, TimeUnit.MILLISECONDS).doOnComplete(()->System.out.println("completeB")), Observable.just(10, 11, 12, 13) .delay(300L, TimeUnit.MILLISECONDS).doOnComplete(()->System.out.println("completeC")) ); Observable.amb(observables) .doOnComplete(()-> System.out.println("COMPLETE")) .subscribe(data->System.out.println("NEXT: " + data)); Thread.sleep(1000L);// 출력 NEXT: 1 NEXT: 3 NEXT: 5 NEXT: 7 completeA COMPLETE- 가장 먼저 도착한 홀수 데이터를 갖는 Observable만 통지된다.
- contains
- 파라미터의 데이터가 Observable에 포함되어 있는지를 판단한다.
- 결과값을 한번만 통지하면 되기 때문에 t/f 값은 Single로 반환한다.
- 결과 통지 시점은 Observable에 포함된 데이터를 통지하거나 완료를 통지할 때이다
- 예시
Observable.just(1, 3, 5, 7) .doOnNext(data -> System.out.println("next: " + data)) .contains(3) .subscribe(data -> System.out.println("ON NEXT: " + data))//출력 next: 1 next: 3 ON NEXT: true- contains에 부합되는 3이 통지된 후에 true를 리턴하고 종료된다.
Observable.just(1, 2, 5, 7) .doOnNext(data -> System.out.println("next: " + data)) .contains(3) .subscribe(data -> System.out.println("ON NEXT: " + data));// 출력 next: 1 next: 2 next: 5 next: 7 ON NEXT: false- 전체를 통지 받은 후 포함되어 있지 않다면 false를 Single 타입으로 보낸다.
- defaultIfEmpty
- 통지할 데이터가 없을 경우 파라미터로 입력된 값을 통지한다.
- 즉, 연산자 이름 의미 그대로 Observable에 통지할 데이터가 없이 비어 있는 상태일 때 디폴트 값을 통지한다.
- 예시
Observable.just(1, 2, 3, 4, 5) .filter(num -> num > 10) .defaultIfEmpty(10) .subscribe(System.out::println);- 10이 출력된다.
- sequenceEqual
- 두 Observable이 동일한 순서로 동일한 갯수의 같은 데이터를 통지하는지 판단한다.
- 통지 시점과 무관하게 데이터의 정합성만 판단하므로 통지 시점이 다르더라도 조건이 맞는다면 true를 통지한다.
- 예시
Observable<Integer> observable1 = Observable.just(1, 3, 5, 7, 9) .subscribeOn(Schedulers.computation()) .delay(num -> { Thread.sleep(500L); return Observable.just(num); }).doOnNext(num -> System.out.println("observable1: " + num)); Observable<Integer> observable2 = Observable.just(1, 3, 5, 6, 9) .subscribeOn(Schedulers.computation()) .delay(num -> { Thread.sleep(700L); return Observable.just(num); }).doOnNext(num -> System.out.println("observable2: " + num)); Observable.sequenceEqual(observable1, observable2) .subscribe(data-> System.out.println("ON NEXT: " + data)); Thread.sleep(6000L);- 7과 6이 다르기 때문에 false를 반환한다.
데이터 집계 연산자
- count
- Observable이 통지한 데이터의 총 개수를 통지한다.
- 총 개수만 통지하면 되므로 결과값은 Single로 반환한다.
- 데이터의 총 개수를 통지하는 시점은 완료 통지를 받은 시점이다.
- 여러 Observable을 사용하면 모든 Observable의 데이터 개수를 확인한다.
- 예시
Observable.concat(Arrays.asList( Observable.just(1, 3, 5, 7, 9), Observable.just(2, 4, 6, 8), Observable.just(10, 11, 12) )) .count() .subscribe(data-> System.out.println("ON NEXT: " + data));- 12가 출력된다.
- reduce
- Observable이 통지한 데이터를 이용하여 어떤 결과를 일정한 방식으로 합성한 후, 최종 결과 반환
- Observable이 통지한 데이터가 숫자일 경우 파라미터로 지정한 함수형 인터페이스에 정의된 계산 방식으로 값을 집계할 수 있다.
- 예시
Observable.range(1, 10) .doOnNext(System.out::println) .reduce((x, y) -> x + y) .subscribe(data -> System.out.println("누적 합계: " + data));- x가 totalNum으로 누적합을 나타내며, y는 현재 받는 데이터의 값을 나타낸다.
Observable.range(1, 10) .doOnNext(System.out::println) .reduce(10, (x, y) -> x + y) .subscribe(data -> System.out.println("누적 합계: " + data)); - 10이 초기값으로 입력되며 65가 출력된다.
Observable.just("a", "b", "c") .doOnNext(System.out::println) .reduce((x, y) -> "(" + x + "," + y + ")" ) .subscribe(data -> System.out.println("ON NEXT: " + data)); - 문자열에 경우 string 또한 중첩되어 출력된다.
- ON NEXT: ((a,b),c)이 출력된다.
- x가 totalNum으로 누적합을 나타내며, y는 현재 받는 데이터의 값을 나타낸다.
- scan
- reduce와 같은 방식으로 생성하지만 차이점이 있다.
- 차이점은 reduce는 마지막 최종 값만 보여주지만, scan에 경우 과정을 확인할 수 있다.
- 예시
Observable.just("a", "b", "c") .scan((x, y) -> "(" + x + "," + y + ")" ) .subscribe(data -> System.out.println("ON NEXT: " + data));// 출력 ON NEXT: a ON NEXT: (a,b) ON NEXT: ((a,b),c)- 문자열에 경우 string 또한 중첩되어 출력된다.
- doOnNext를 출력하지 않았지만 출력되는 결과를 계속 확인할 수 있다.
출처
- Kevin의 알기쉬운 RxJava 1부