물리적인 쓰레드와 논리적인 쓰레드의 이해

  • 물리적인 쓰레드는 하드웨어와 관련이 있고, 논리적인 쓰레드는 소프트웨어와 관련이 있다.
  • 물리적 쓰레드
    • 물리적인 쓰레드를 이해하기 위해서는 CPU의 코어를 먼저 알아야 한다.
    • 코어란?
      • CPU의 명령어를 처리하는 반도체 유닛
      • 코어의 갯수가 많으면 명령어를 병렬로(parallel) 더 많이 더 빠르게 처리할 수 있다.
    • 물리적인 쓰레드는 물리적인 코어를 논리적으로 쪼객 논리적 코어다.
  • 논리적 쓰레드
    • 논리적인 쓰레드는 프로세스 내에서 실행되는 세부 작업의 단위이다.
    • 프로세스는 컴퓨터에서 실행할 수 있는 실행 파일(프로그램)을 실행하면 생기는 인스턴스이다.
    • 논리적인 쓰레드의 생성 개수는 이론적으로는 제한이 없지만 실제로는 물리적인 쓰레드의 가용 범위내에서 생성할 수 있다.

image

  • 물리적인 쓰레드는 CPU가 가지고 있는 코어에 개수 등을 말하며 논리적인 쓰레드는 애플리케이션 단에서 실행되는 스레드이다.
  • 물리적인 쓰레드: 병렬성
    • 코어 개수와 스레드 개수만큼 할당되어 있기 때문에 병렬적으로 처리한다.
  • 논리적인 쓰레드: 동시성
    • 병렬적으로 작업하는 것으로 보이지만, 실제 물리적인 쓰레드에서 할당되어 실행되며 병렬적으로 실행되는 모습을 보이지만 동시에 진행한다.

스케쥴러(Scheduler)

  • RxJava에서의 스케쥴러는 RxJava 비동기 프로그래밍을 위한 쓰레드(Thread) 관리자이다.
  • 즉, 스케쥴러를 이용해서 어떤 쓰레드에서 무엇을 처리할 지에 대해서 제어할 수 있다.
  • 스케쥴러를 이용해서 데이터를 통지하는 쪽과 데이터를 처리하는 쪽 쓰레드를 별도로 지정해서 분리할 수 있다.
  • RxJava의 스케쥴러를 통해 쓰레드를 위한 코드의 간결성 및 쓰레드 관리의 복잡함을 줄일 수 있다.
  • RxJava에서 스케쥴러를 지정하기 위해서 subscribeOn( ), observeOn( ) 유틸리티 연산자를 사용한다.
  • 생산자쪽의 데이터 흐름을 제어하기 위해서는 subscribeOn( ) 연산자를 사용한다.
  • 소비자쪽에서 전달받은 데이터 처리를 제어하기 위해서는 observeOn( ) 연산자를 사용한다.
  • subscribeOn( ), observeOn( ) 연산자는 각각 파라미터로 Scheduler를 지정해야 한다.

스케줄러의 종류

  • Schedulers.io()
    • I/O 처리 작업을 할 때 사용하는 스케쥴러
    • 네트워크 요청 처리, 각종 입/출력 작업, 데이터베이스 쿼리 등에 사용
    • 쓰레드 풀에서 쓰레드를 가져오거나 가져올 쓰레드가 없으면 새로운 쓰레드를 생성한다.
    • 예시
      File[] files = new File("src/main/java/com/example").listFiles();
      		
      Observable.fromArray(files)
          .doOnNext(data-> System.out.println(Thread.currentThread().getName() + ", ON NEXT: " + data))
          .filter(data -> data.isDirectory())
          .map(dir -> dir.getName())
          .subscribeOn(Schedulers.io())
          .subscribe(data -> System.out.println(Thread.currentThread().getName() + ", SUBSCRIBE: " + data));
      
      Thread.sleep(1000L);
      
      // 출력
      RxCachedThreadScheduler-1, ON NEXT: src\main\java\com\example\RxJava02Application.java
      RxCachedThreadScheduler-1, ON NEXT: src\main\java\com\example\scheduler
      RxCachedThreadScheduler-1, SUBSCRIBE: scheduler
      RxCachedThreadScheduler-1, ON NEXT: src\main\java\com\example\subject
      RxCachedThreadScheduler-1, SUBSCRIBE: subject
      
      • Publisher에서 생성한 데이터를 넘길 때 새로운 스레드(RxCachedThreadScheduler-1)을 생성하여 제공
      • file io가 있기 때문에 해당 스케줄러를 사용했다.
  • Schedulers.computation()
    • 논리적인 연산 처리 시, 사용하는 스케쥴러
    • CPU 코어의 물리적 쓰레드 수를 넘지 않는 범위에서 쓰레드를 생성한다.
    • 대기 시간 없이 빠르게 계산 작업을 수행하기 위해 사용한다.
    • 예시
      File[] files = new File("src/main/java/com/example").listFiles();
      		
      Observable.fromArray(files)
          .doOnNext(data-> System.out.println(Thread.currentThread().getName() + ", ON NEXT: " + data))
          .subscribeOn(Schedulers.io())
          .observeOn(Schedulers.computation())
          .filter(data -> data.isDirectory())
          .map(dir -> dir.getName())
          .subscribe(data -> System.out.println(Thread.currentThread().getName() + ", SUBSCRIBE: " + data));
      		
      Thread.sleep(1000L);
      
      // 출력
      RxCachedThreadScheduler-1, ON NEXT: src\main\java\com\example\RxJava02Application.java
      RxCachedThreadScheduler-1, ON NEXT: src\main\java\com\example\scheduler
      RxCachedThreadScheduler-1, ON NEXT: src\main\java\com\example\subject
      RxComputationThreadPool-1, SUBSCRIBE: scheduler
      RxComputationThreadPool-1, SUBSCRIBE: subject
      
      • subscribe에는 IO로 observe에 경우 computation으로 지정하였다.
      • 다른 Thread가 생성되어 사용된 것을 확인할 수 있다.
        • RxCachedThreadScheduler-1, RxComputationThreadPool-1
    • 예시 2
      File[] files = new File("src/main/java/com/example").listFiles();
      		
      Observable.fromArray(files)
          .doOnNext(data-> System.out.println(Thread.currentThread().getName() + ", #데이터 통지"))
          .subscribeOn(Schedulers.io())
          .observeOn(Schedulers.computation())
          .filter(data -> data.isDirectory())
          .doOnNext(data-> System.out.println(Thread.currentThread().getName() + ", #필터 거침"))
          .map(dir -> dir.getName())
          .doOnNext(data-> System.out.println(Thread.currentThread().getName() + ", #map 거침"))
          .observeOn(Schedulers.computation())
          .subscribe(data -> System.out.println(Thread.currentThread().getName() + ", SUBSCRIBE: " + data));
      
      Thread.sleep(1000L);
      
      // 출력
      RxCachedThreadScheduler-1, #데이터 통지
      RxCachedThreadScheduler-1, #데이터 통지
      RxCachedThreadScheduler-1, #데이터 통지
      RxComputationThreadPool-1, #필터 거침
      RxComputationThreadPool-1, #map 거침
      RxComputationThreadPool-1, #필터 거침
      RxComputationThreadPool-2, SUBSCRIBE: scheduler
      RxComputationThreadPool-1, #map 거침
      RxComputationThreadPool-2, SUBSCRIBE: subject
      
      • map을 거친 후 다시 observeOn으로 새로운 Scheduler를 설정하였고, computation은 물리적 스레드(코어, 스래드 개수)에 맞게 새로 생성한다.
      • subscribe 지점에서 2번째 스레드가 생성된 것을 확인할 수 있다.
    • 예시
      Observable<Integer> observable1 = Observable.just(1, 3, 5, 7);
      Observable<Integer> observable2 = Observable.just(2, 4, 6, 8);
      Observable<Integer> observable3 = Observable.just(10, 20, 30, 40);
      Observable<Integer> observable4 = Observable.range(1, 24);
      		
      Observable source = Observable.zip(observable1, observable2, observable3, observable4,
          (data1, data2, data3, hour) -> hour +"시: " + Collections.max(Arrays.asList(data1, data2, data3)));
      		
      source.subscribeOn(Schedulers.computation())
          .subscribe(data-> System.out.println(Thread.currentThread().getName() + ", #First: " + data));
      source.subscribeOn(Schedulers.computation())
          .subscribe(data-> System.out.println(Thread.currentThread().getName() + ", #Second: " + data));
      		
      Thread.sleep(1000L);
      
      // 출력
      RxComputationThreadPool-1, #First: 1시: 10
      RxComputationThreadPool-1, #First: 2시: 20
      RxComputationThreadPool-2, #Second: 1시: 10
      RxComputationThreadPool-2, #Second: 2시: 20
      RxComputationThreadPool-1, #First: 3시: 30
      RxComputationThreadPool-2, #Second: 3시: 30
      RxComputationThreadPool-2, #Second: 4시: 40
      RxComputationThreadPool-1, #First: 4시: 40
      
      • First와 Second에 다른 Thread가 실행된 것을 확인할 수 있다.
  • Schedulers.newThread()
    • 요청시마다 매번 새로운 쓰레드를 생성한다.
    • 매번 생성되면 쓰레드 비용도 많이 들고, 재사용도 되지 않는다.
    • 예시
      Observable<String> observable = Observable.just("1", "2", "3", "4", "5");
      		
      observable.subscribeOn(Schedulers.newThread())
          .map(data -> "## " + data + " ##")
          .subscribe(data -> System.out.println(Thread.currentThread().getName() + " : " + data));
      		
      observable.subscribeOn(Schedulers.newThread())
          .map(data -> "## " + data + " ##")
          .subscribe(data -> System.out.println(Thread.currentThread().getName() + " : " + data));
      		
      Thread.sleep(300L);
      
      // 출력
      RxNewThreadScheduler-2 : ## 1 ##
      RxNewThreadScheduler-2 : ## 2 ##
      RxNewThreadScheduler-1 : ## 1 ##
      RxNewThreadScheduler-2 : ## 3 ##
      RxNewThreadScheduler-1 : ## 2 ##
      RxNewThreadScheduler-2 : ## 4 ##
      RxNewThreadScheduler-1 : ## 3 ##
      RxNewThreadScheduler-2 : ## 5 ##
      RxNewThreadScheduler-1 : ## 4 ##
      RxNewThreadScheduler-1 : ## 5 ##
      
  • Schedulers.trampoline()
    • 현재 실행되고 있는 쓰레드에 큐(Queue)를 생성하여 처리할 작업들을 큐에 넣고 순서대로 처리한다.
      • FIFO
    • 예시
      Observable<String> observable = Observable.just("1", "2", "3", "4", "5");
      		
      observable.subscribeOn(Schedulers.trampoline())
          .map(data-> "## " + data + " ##")
          .subscribe(data -> System.out.println(Thread.currentThread().getName() + ", ON NEXT: " + data));
      
      observable.subscribeOn(Schedulers.trampoline())
          .map(data-> "$$ " + data + " $$")
          .subscribe(data -> System.out.println(Thread.currentThread().getName() + ", ON NEXT: " + data));
      
      // 출력
      main, ON NEXT: ## 1 ##
      main, ON NEXT: ## 2 ##
      main, ON NEXT: ## 3 ##
      main, ON NEXT: ## 4 ##
      main, ON NEXT: ## 5 ##
      main, ON NEXT: $$ 1 $$
      main, ON NEXT: $$ 2 $$
      main, ON NEXT: $$ 3 $$
      main, ON NEXT: $$ 4 $$
      main, ON NEXT: $$ 5 $$
      
      • trampoline은 새로운 스레드를 생성하지 않고 main 스레드로 실행한다.
  • Schedulers.single()
    • 단일 쓰레드를 생성하여 처리 작업을 진행한다
    • 여러번 구독해도 공통으로 사용한다.
    • 예시
      Observable<String> observable = Observable.just("1", "2", "3", "4", "5");
      		
      observable.subscribeOn(Schedulers.single())
          .map(data-> "## " + data + " ##")
          .subscribe(data -> System.out.println(Thread.currentThread().getName() + ", ON NEXT: " + data));
      
      observable.subscribeOn(Schedulers.single())
          .map(data-> "$$ " + data + " $$")
          .subscribe(data -> System.out.println(Thread.currentThread().getName() + ", ON NEXT: " + data));
      
      Thread.sleep(1000L);
      
      // 출력
      RxSingleScheduler-1, ON NEXT: ## 1 ##
      RxSingleScheduler-1, ON NEXT: ## 2 ##
      RxSingleScheduler-1, ON NEXT: ## 3 ##
      RxSingleScheduler-1, ON NEXT: ## 4 ##
      RxSingleScheduler-1, ON NEXT: ## 5 ##
      RxSingleScheduler-1, ON NEXT: $$ 1 $$
      RxSingleScheduler-1, ON NEXT: $$ 2 $$
      RxSingleScheduler-1, ON NEXT: $$ 3 $$
      RxSingleScheduler-1, ON NEXT: $$ 4 $$
      RxSingleScheduler-1, ON NEXT: $$ 5 $$
      
      • single은 하나의 새로운 thread를 생성하여 사용한다.
  • Schedulers.from(executor)
    • Executor를 사용해서 생성한 쓰레드를 사용한다.
    • RxJava의 Scheduler와 Executor의 동작 방식이 다르므로 자주 사용되지 않음.

정리

  • 다양한 Scheduler 함수가 존재하지만, 주로 Schedulers.io(), Schedulers.computation() 을 사용한다.

출처

Kevin의 알기 쉬운 RxJava 2부