관리 메뉴

JIE0025

Reactor 개요 (operator, marble diagram ... ) 본문

백엔드/스프링, 스프링부트, JPA, Spring Webflux

Reactor 개요 (operator, marble diagram ... )

Kangjieun11 2023. 6. 11. 18:00
728x90

 


황정식 - 스프링으로 시작하는 리액티브 프로그래밍

Spring WebFlux를 이용한 Non-Blocking 애플리케이션 구현

 

 

책을 통해 공부하면서 개인적으로 정리한 내용입니다.

 

 


 

 

✅ 개요

이전에 프로젝트를 구현하기 위해 Webflux의 간단한 개요 정도만 이해를 했었는데 이번에는 리액티브 프로그래밍에 (Reactor)에 대해 조금 더 세세하게 개념을 이해해보자.

✅ 리액티브 프로그래밍의 구조

선언형 프로그래밍 방식을 사용한다. 

 

명령형은 순차적으로 위에서 아래로 실행이 되는데, 

선언형은 최종연산이 수행되는 메서드를 호출하지 않으면, 메서드 체인들이 실행되지 않는다.

>> 이런작업을 하라는 람다 표현식만 넘기고, 최종 연산에서 동작을 수행함

 

Java 8부터 지원되는 Stream API의 예

int sum =
                numbers.stream()
                        .filter(number -> number > 4 && (number % 2 == 0))
                        .mapToInt(number -> number)
                        .sum();

 

 

⏺ Reactor 예제  

import reactor.core.publisher.Mono;


public class Example {
    public static void main(String[] args) {
    
        // Publisher의 역할 : Mono/Flux
        Mono<String> mono = Mono.just("안녕");

        //Subscriber의 역할 :  subscribe() 메서드 내부 람다 표현식
        mono.subscribe(message -> System.out.println(message));
    }
}

 

Mono와 Flux는 퍼블리셔(생산자)의 역할을 하는 객체이다. 

즉 비동기/넌블로킹 방식의 리액티브 프로그래밍에서,  데이터를 내보내는 역할이다. 

 

 

Mono  : 0~1개의 반환값을 가짐

 

 

Flux :  0~N개의 반환값을 가짐

 

✍️ Signal : 퍼블리셔(생산자)가 발생시키는 이벤트

emit이 일어날 때, 리액티브 프로그래밍에선 이벤트가 발생하는것을 간주하는데

이 이벤트를 다른 컴포넌트에게 전달하는것을 Signal을 전송한다고 표현한다.

 

✍️ Operator : 리액티브 프로그래밍에서 어떤 동작을 수행하는 메서드

filter(), reduce() 등 내부적으로 수행하는 메서드들이다!

        Flux
            .fromIterable(List.of(1, 3, 6, 7, 8, 11))
            .filter(number -> number > 4 && (number % 2 == 0))
            .reduce((n1, n2) -> n1 + n2)
            .subscribe(System.out::println);

 

✍️ Sequence : Operator 체인으로 표현되는 데이터의 흐름

위에 작성된 예시 코드 자체를 하나의 Sequence로 이해하면 됨

 

 

 

✅ Reactor

⏺ Reactor의 특징

  1. 리액티브 스트림즈를 구현한 라이브러리
  2. Non-Blocking 통신 지원
  3. Publisher type : Mono Flux 지원
  4. MSA기반 application 구조에 적합한 라이브러리
  5. Backpressure-ready network  
    • 만약 Publisher가 emit하는 데이터가 끊임없이 들어오는데, Subscriber의 처리 속도가 느리면?
    • 이럴 때 적절하게 제어하는 전략을 의미한다. 

 

 

✅ Marble Diagram 

동그라미 : 하나의 데이터

다이어그램에서 시간의 흐름에 따라 변화되는 데이터의 흐름을 볼 수 있다.

 

 

Marble Diagram- Mono 해석

 

Marble Diagram- Flux 해석

 

✅ Operator

Reactor에서 제공하는 다양한 Operator가 존재한다. 

자주 사용될만한 Operator를 간단하게 알아보자. 

 

 

⏺ 새로운 Sequence를 생성

  • just() : 단순 생성
  • fromStream() : 입력으로 Stream 전달
  • fromIterable() : 자바의 Iterable을 입력으로 전달받아 emit (List, Map, Set)
  • create() : signal 이벤트를 발생시키는 operator, 
    • signal은 퍼블리셔(생성자)가 발생하는 이벤트로, 한번에 여러건의 데이터를 비동기적으로 emit할 수 있다.

 

⏺ 기존 Sequence에서 변환하는 작업

  • flatMap() : 바깥쪽에서 하나의 데이터가 들어올때마다 새로운 Sequence가 생성된다.
    • flatMap 내부에서 정의되는 sequence는 inner sequence이다.
  • concat() : 입력으로 전달하는 Publisher의 Sequence를 연결해 차례대로 emit 
    • concat의 파라미터로 두개의 Flux Sequence가 있다고 가정하면, concat을 통해 논리적으로 하나의 시퀀스로 동작함
  • zip() : 입력으로 전달되는 여러개의 Sequence에서 emit된 데이터를 결합하는 Operator

 

⏺ Sequence의 내부 동작을 확인

  • doOnNext() 처리되기전에 무언가 수행하기 위한 목적
    • (처리흐름에 영향을 미치지 않고 디버깅이나 로깅같은 사이드 이펙트 생성에 사용)
  • log() : publisher에서 발생하는 Signal이벤트를 로그로 출력

 

Sequence에서 데이터 필터링

  • filter() : 주어진 조건이 참인것만을 통과시키도록 함
  • take() : 통과하는 개수를 지정 

 

⏺ 에러 처리

error() : onErrorSignal 이벤트를 발생시킬 때 사용 (의도적인 예외를 던짐)

  • timeout() : 주어진 시간동안 emit되는 데이터가 없을때 onErrorSignal 발생
  • retry() : Sequence상에서 에러가 발생했을 떄 입력으로 주어진 숫자만큼 재구독/ Sequence를 다시 시작

 

 


references 

 

황정식 - 스프링으로 시작하는 리액티브 프로그래밍

 

스프링으로 시작하는 리액티브 프로그래밍 | 황정식 - 교보문고

스프링으로 시작하는 리액티브 프로그래밍 | *리액티브 프로그래밍의 기본기를 확실하게 다진다*리액티브 프로그래밍은 적은 컴퓨팅 파워로 대량의 요청 트래픽을 효과적으로 처리할 수 있는

product.kyobobook.co.kr

 

https://projectreactor.io/docs/core/release/api/reactor/core/publisher/Flux.html

https://tech.kakao.com/2018/05/29/reactor-programming/

https://projectreactor.io/