Thread Scheduling

  • Reactor는 비동기 실행을 강행하지 않는다.
    Flux<Integer> flux = Flux.range(0, 5);
    flux.subscribe(item -> log.info("onNext: " + item),
      throwable -> log.info("onError: " + throwable.getMessage()),
      () -> log.info("onComplete"));
    
    22:02:38.940 [main] INFO com.example.demo.iterable.SchedulerController - onNext: 0
    22:02:38.941 [main] INFO com.example.demo.iterable.SchedulerController - onNext: 1
    22:02:38.941 [main] INFO com.example.demo.iterable.SchedulerController - onNext: 2
    22:02:38.941 [main] INFO com.example.demo.iterable.SchedulerController - onNext: 3
    22:02:38.941 [main] INFO com.example.demo.iterable.SchedulerController - onNext: 4
    22:02:38.941 [main] INFO com.example.demo.iterable.SchedulerController - onComplete
    
    • 모두 main 메서드에서 실행한다.
    • Publisher(subscribe)가 Subscribe(onSubscribe)를 호출하고, Subscribe는 Subscription을 통해 onNext, onError, onComplete가 실행되는 일련의 동작이 하나의 main 메서드를 통해 실행된다.
    • 실제로는 Pub과 Sub이 직렬적으로 돌아가는 구현하지 않는다.
      • Scheduler의 publishOn, subscribeOn을 통해 새로운 쓰레드를 구성한다.
  • subscribeOn

    image

    ** 이미지 출처: https://tech.kakao.com/2018/05/29/reactor-programming/

    • Publisher에서 subscribe(Subscriber) 신호를 별도 Scheduler로 처리한다
    • 전형적으로 blocking IO와 같이 publisher가 느리고 consumer가 빠른 경우에 사용
    • 예시
      • Flux 활용
        flux.subscribeOn(Scehdulers.single()).subscribe();
        
      • Publisher 구현체 활용
        Publisher<Integer> publisher = subscriber -> {
          subscriber.onSubscribe(new Subscription() {
            @Override
            public void request(long n) { 
              Arrays.asList(1, 2, 3, 4, 5).forEach(i -> subscriber.onNext(i));
              subscriber.onComplete();
            }
            @Override
            public void cancel() {}
          });
        };
        Publisher<Integer> subOnPub = subscriber -> {
          ExecutorService executorService = Executors.newSingleThreadExecutor();
          executorService.execute(() -> publisher.subscribe(subscriber));
        };
        Subscriber<Integer> subscriber = new Subscriber<Integer>() {
          public void onSubscribe(Subscription subscription) {log.info("onSubscribe"); subscription.request(Long.MAX_VALUE);};
          public void onNext(Integer item) {log.info("onNext: " + item);};
          public void onError(Throwable throwable) {log.info("onError: " + throwable.getMessage());};
          public void onComplete() {log.info("onComplete");};
        };
        subOnPub.subscribe(subscriber);
        
        22:37:35.742 [pool-1-thread-1] INFO com.example.demo.iterable.SchedulerController - onSubscribe
        22:37:35.748 [pool-1-thread-1] INFO com.example.demo.iterable.SchedulerController - onNext: 1
        22:37:35.748 [pool-1-thread-1] INFO com.example.demo.iterable.SchedulerController - onNext: 2
        22:37:35.748 [pool-1-thread-1] INFO com.example.demo.iterable.SchedulerController - onNext: 3
        22:37:35.748 [pool-1-thread-1] INFO com.example.demo.iterable.SchedulerController - onNext: 4
        22:37:35.748 [pool-1-thread-1] INFO com.example.demo.iterable.SchedulerController - onNext: 5
        22:37:35.748 [pool-1-thread-1] INFO com.example.demo.iterable.SchedulerController - onComplete
        
        
        • single()을 사용했기 때문에 동일한 Thread를 사용하였지만 Main method를 사용하지는 않는다.
  • publishOn

    image

    ** 이미지 출처: https://tech.kakao.com/2018/05/29/reactor-programming/

    • publishOn은 Subscriber에 onNext, onComplete, onError을 호출할 때 별도 쓰레드를 생성한다.
    • publishOn() 메서드를 이용하면 next, complete, error신호를 별도 쓰레드로 처리할 수 있다.
    • map(), flatMap() 등의 변환도 publishOn()이 지정한 쓰레드를 이용해서 처리한다.
    • Publisher는 빠르게 진행되며 Subscriber가 상대적으로 느린 경우 사용한다.
      flux.publishOn(Schedulers.single()).subscribe()
      
      Publisher<Integer> publisher = subscriber -> {
        subscriber.onSubscribe(new Subscription() {
          @Override
          public void request(long n) { 
            Arrays.asList(1, 2, 3, 4, 5).forEach(item -> subscriber.onNext(item));
            subscriber.onComplete();
          }
          @Override
          public void cancel() {}
        });
      };		
      Publisher<Integer> pubOnPub = sub -> {
        publisher.subscribe(new Subscriber<Integer>() {
          ExecutorService executorService = Executors.newSingleThreadExecutor();
          public void onSubscribe(Subscription subscription) {sub.onSubscribe(subscription);}
          public void onNext(Integer item) { executorService.execute(()-> sub.onNext(item)); }
          public void onError(Throwable throwable) {executorService.execute( () -> sub.onError(throwable)); }
          public void onComplete() { executorService.execute(()-> sub.onComplete());}
        });
      };
      Subscriber<Integer> subscriber = new Subscriber<Integer>() {
        public void onSubscribe(Subscription subscription) {log.info("onSubscribe"); subscription.request(Long.MAX_VALUE);};
        public void onNext(Integer item) {log.info("onNext: " + item);};
        public void onError(Throwable throwable) {log.info("onError: " + throwable.getMessage());};
        public void onComplete() {log.info("onComplete");};
      };
      pubOnPub.subscribe(subscriber);
      
      11:16:26.826 [main] INFO com.example.demo.iterable.SchedulerController - onSubscribe
      11:16:26.839 [pool-1-thread-1] INFO com.example.demo.iterable.SchedulerController - onNext: 1
      11:16:26.840 [pool-1-thread-1] INFO com.example.demo.iterable.SchedulerController - onNext: 2
      11:16:26.840 [pool-1-thread-1] INFO com.example.demo.iterable.SchedulerController - onNext: 3
      11:16:26.841 [pool-1-thread-1] INFO com.example.demo.iterable.SchedulerController - onNext: 4
      11:16:26.841 [pool-1-thread-1] INFO com.example.demo.iterable.SchedulerController - onNext: 5
      11:16:26.841 [pool-1-thread-1] INFO com.example.demo.iterable.SchedulerController - onComplete
      
  • publishOn과 subscribeOn 모두 적용 가능하다.
    Publisher<Integer> publisher = subscriber -> {
      subscriber.onSubscribe(new Subscription() {
        @Override
        public void request(long n) { 
          Arrays.asList(1, 2, 3, 4, 5).forEach(item -> subscriber.onNext(item));
          subscriber.onComplete();
        }				
        @Override
        public void cancel() {}
      });
    };
    Publisher<Integer> subOnPub = sub -> {
      ExecutorService executorService = Executors.newSingleThreadExecutor(new CustomizableThreadFactory() {
        @Override
        public String getThreadNamePrefix() { return "subon-"; }
      });
      executorService.execute(() -> publisher.subscribe(sub));
    };
    Publisher<Integer> pubOnPub = sub -> {
      subOnPub.subscribe(new Subscriber<Integer>() {
        ExecutorService executorService = Executors.newSingleThreadExecutor(new CustomizableThreadFactory() {
          @Override
          public String getThreadNamePrefix() { return "pubon-"; }
        });
        public void onSubscribe(Subscription subscription) {sub.onSubscribe(subscription);}
        public void onNext(Integer item) { executorService.execute(()-> sub.onNext(item)); }
        public void onError(Throwable throwable) {executorService.execute( () -> sub.onError(throwable)); }
        public void onComplete() { executorService.execute(()-> sub.onComplete());}
      });
    };
    Subscriber<Integer> subscriber = new Subscriber<Integer>() {
      public void onSubscribe(Subscription subscription) {log.info("onSubscribe"); subscription.request(Long.MAX_VALUE);};
      public void onNext(Integer item) {log.info("onNext: " + item);};
      public void onError(Throwable throwable) {log.info("onError: " + throwable.getMessage());};
      public void onComplete() {log.info("onComplete");};
    };
    pubOnPub.subscribe(subscriber);
    
    Flux.range(1, 5)
      .log()
      .subscribeOn(Schedulers.newSingle("subOn-"))
      .publishOn(Schedulers.newSingle("pubOn-"))
      .subscribe(item -> log.info("onNext: " + item),
        throwable -> log.info("onComplete: " + throwable.getMessage()),
        () -> log.info("onComplete"));
    
    11:25:48.183 [subon-1] INFO com.example.demo.iterable.SchedulerController - onSubscribe
    11:25:48.190 [pubon-1] INFO com.example.demo.iterable.SchedulerController - onNext: 1
    11:25:48.190 [pubon-1] INFO com.example.demo.iterable.SchedulerController - onNext: 2
    11:25:48.190 [pubon-1] INFO com.example.demo.iterable.SchedulerController - onNext: 3
    11:25:48.190 [pubon-1] INFO com.example.demo.iterable.SchedulerController - onNext: 4
    11:25:48.190 [pubon-1] INFO com.example.demo.iterable.SchedulerController - onNext: 5
    11:25:48.194 [pubon-1] INFO com.example.demo.iterable.SchedulerController - onComplete
    
    • onSubscribe, request에서는 subscribeOn에서 생성한 스레드, onNext, onError, onComplete는 publishOn에서 생성한 스레드로 실행된다.

interval

Flux.interval(Duration.ofMillis(200L))
  .take(5)
  .subscribe(item -> log.info("onNext: " + item));
		
Thread.sleep(1200);
13:21:43.819 [main] DEBUG reactor.util.Loggers - Using Slf4j logging framework
13:21:44.066 [parallel-1] INFO com.example.demo.iterable.SchedulerController - onNext: 0
13:21:44.267 [parallel-1] INFO com.example.demo.iterable.SchedulerController - onNext: 1
13:21:44.454 [parallel-1] INFO com.example.demo.iterable.SchedulerController - onNext: 2
13:21:44.656 [parallel-1] INFO com.example.demo.iterable.SchedulerController - onNext: 3
13:21:44.858 [parallel-1] INFO com.example.demo.iterable.SchedulerController - onNext: 4
  • interval에 설정한 주기마다 데이터를 전송한다.
  • Schedulers.parallel()를 사용해서 신호를 주기적으로 발생한다. 다른 스케줄러를 사용하고 싶다면 interval(Duration, Scheduler) 메서드를 사용하면 된다.
  • 주의할 점은 메인메서드가 해당 주기 이상 살아있어야 한다.
  • 쓰레드를 살펴보면 메인쓰레드가 아닌 다른 쓰레드에서 실행되는 것을 확인할 수 있다.
  • Publisher와 Subscriber 로 구현해보기
    • subscription
      new Subscription() {
        int no = 0;
        volatile boolean cancelled = false;
        public void request(long n) {
          // ExecutorService es = Executors.newSingleTrheadExecutor();
          // es.execute(() -> sub.onNex(i);
          ScheduledExecutorService exec = Executors.newSingleThreadScheduledExecutor();
          exec.scheduleAtFixedRate( () -> {
            if(cancelled) {
              exec.shutdown();
              return;
            }
            sub.onNext(i);
          }, 0, 300, TimeUnit.MILLISECONDS);
        }
            
        public void cancel() {
          cancelled = true;
        }
      }  
      
      • ExecutorService 대신 ScheduledExecutorService를 사용했다.
      • scheduleAtFixedRate를 사용하여 지정된 기간만큼 대기 후에 전송한다.
    • Subscriber 수정
      Subscriber<Integer> subscriber = new Subscriber<Integer>() {
        int count = 0;
        Subscription subsc;
        public void onSubscribe(Subscription subscription) {
          log.info("onSubscribe"); 
          subsc = subscription;
          subscription.request(Long.MAX_VALUE);
        };
        public void onNext(Integer item) {
          log.info("onNext: " + item);
          if(++count >= 5) {
            subscription.cancel();
          }
        };
        public void onError(Throwable throwable) {log.info("onError: " + throwable.getMessage());};
        public void onComplete() {log.info("onComplete");};
      };
      
      • subscription의 cancel을 호출하여 중단해준다

Schedulers의 메서드 정리

  • immediate()
    • 현재 쓰레드에서 실행한다.
  • single()
    • 쓰레드가 한 개인 쓰레드 풀을 이용해서 실행한다. 즉 한 쓰레드를 공유한다.
  • parallel()
    • 고정 크기 쓰레드 풀을 이용해서 실행한다. 병렬 작업에 적합하다.
    • ExecutorService기반으로 단일 스레드 고정 크기(Fixed) 스레드 풀을 사용하여 병렬 작업에 적합함.
  • elastic()
    • 쓰레드 풀을 이용해서 실행한다. 블로킹 IO를 리액터로 처리할 때 적합하다. 쓰레드가 필요하면 새로 생성하고 일정 시간(기본 60초) 이상 유휴 상태인 쓰레드는 제거한다. 데몬 쓰레드를 생성한다.
    • 스레드 갯수는 무한정으로 증가할 수 있고 수행시간이 오래걸리는 블로킹 작업에 대한 대안으로 사용할 수 있게 최적화 되어있다.
  • single, parallel, elastic
    • 매번 새로운 쓰레드 풀을 만들지 않고 동일한 쓰레드 풀을 리턴한다.
    • 해당 메서드가 생성하는 쓰레드는 데몬 쓰레드로서 main 쓰레드가 종료되면 함께 종료된다.
  • newSingle(), newParallel(), newElastic()
    • 같은 종류의 쓰레드 풀인데 새로 생성하고 싶다면 해당 메서드를 사용하면 된다.
    • 기본으로 데몬 쓰레드가 아니기 때문에 어플리케이션 종료시에는 다음과 같이 dispose() 메서드를 호출해서 쓰레드를 종료시켜 주어야 한다. 그렇지 않으면 어플리케이션이 종료되지 않는 문제가 발생할 수 있다.
    • 파라미터
      • name : 쓰레드 이름으로 사용할 접두사이다.
      • daemon : 데몬 쓰레드 여부를 지정한다. 지정하지 않으면 false이다. 데몬 쓰레드가 아닌 경우 JVM 종료시에 생성한 스케줄러의 dispose()를 호출해서 풀에 있는 쓰레드를 종료해야 한다.
      • ttlSeconds : elastic 쓰레드 풀의 쓰레드 유휴 시간을 지정한다. 지정하지 않으면 60(초)이다.
      • parallelism : 작업 쓰레드 개수를 지정한다. 지정하지 않으면 Runtime.getRuntime().availableProcessors()이 리턴한 값을 사용한다.
  • 예시
    Flux.range(0, 5)
      .subscribeOn(Schedulers.newParallel("SUB1"))
      .log()
      .map(item -> item * 10)
      .publishOn(Schedulers.newParallel("PUB1"))
      .log()
      .subscribe(item -> log.info("first onNext:" + item));
    
    20:41:30.064 [main] DEBUG reactor.util.Loggers - Using Slf4j logging framework
    20:41:30.124 [main] INFO reactor.Flux.SubscribeOn.1 - onSubscribe(FluxSubscribeOn.SubscribeOnSubscriber)
    20:41:30.130 [main] INFO reactor.Flux.PublishOn.2 - | onSubscribe([Fuseable] FluxPublishOn.PublishOnSubscriber)
    20:41:30.131 [main] INFO reactor.Flux.PublishOn.2 - | request(unbounded)
    20:41:30.135 [main] INFO reactor.Flux.SubscribeOn.1 - request(256)
    20:41:30.141 [SUB1-2] INFO reactor.Flux.SubscribeOn.1 - onNext(0)
    20:41:30.142 [SUB1-2] INFO reactor.Flux.SubscribeOn.1 - onNext(1)
    20:41:30.143 [SUB1-2] INFO reactor.Flux.SubscribeOn.1 - onNext(2)
    20:41:30.143 [SUB1-2] INFO reactor.Flux.SubscribeOn.1 - onNext(3)
    20:41:30.143 [SUB1-2] INFO reactor.Flux.SubscribeOn.1 - onNext(4)
    20:41:30.148 [PUB1-1] INFO reactor.Flux.PublishOn.2 - | onNext(0)
    20:41:30.148 [PUB1-1] INFO com.example.demo.iterable.SchedulerController - first onNext:0
    20:41:30.148 [PUB1-1] INFO reactor.Flux.PublishOn.2 - | onNext(10)
    20:41:30.148 [PUB1-1] INFO com.example.demo.iterable.SchedulerController - first onNext:10
    20:41:30.149 [PUB1-1] INFO reactor.Flux.PublishOn.2 - | onNext(20)
    20:41:30.149 [PUB1-1] INFO com.example.demo.iterable.SchedulerController - first onNext:20
    20:41:30.149 [PUB1-1] INFO reactor.Flux.PublishOn.2 - | onNext(30)
    20:41:30.149 [PUB1-1] INFO com.example.demo.iterable.SchedulerController - first onNext:30
    20:41:30.149 [PUB1-1] INFO reactor.Flux.PublishOn.2 - | onNext(40)
    20:41:30.149 [PUB1-1] INFO com.example.demo.iterable.SchedulerController - first onNext:40
    20:41:30.153 [SUB1-2] INFO reactor.Flux.SubscribeOn.1 - onComplete()
    20:41:30.154 [PUB1-1] INFO reactor.Flux.PublishOn.2 - | onComplete()
    

출처

  • 토비의 봄 TV 7회 스프링 리액티브 프로그래밍(3) - Reactive Streams - Scheduler
  • https://javacan.tistory.com/entry/Reactor-Start-6-Thread-Scheduling