Reactor 개요 (operator, marble diagram ... )
황정식 - 스프링으로 시작하는 리액티브 프로그래밍
Spring WebFlux를 이용한 Non-Blocking 애플리케이션 구현
책을 통해 공부하면서 개인적으로 정리한 내용입니다.
✅ 개요
이전에 프로젝트를 구현하기 위해 Webflux의 간단한 개요 정도만 이해를 했었는데 이번에는 리액티브 프로그래밍에 (Reactor)에 대해 조금 더 세세하게 개념을 이해해보자.
✅ 리액티브 프로그래밍의 구조
선언형 프로그래밍 방식을 사용한다.
명령형은 순차적으로 위에서 아래로 실행이 되는데,
선언형은 최종연산이 수행되는 메서드를 호출하지 않으면, 메서드 체인들이 실행되지 않는다.
>> 이런작업을 하라는 람다 표현식만 넘기고, 최종 연산에서 동작을 수행함
Java 8부터 지원되는 Stream API의 예
int sum =
numbers.stream()
.filter(number -> number > 4 && (number % 2 == 0))
.mapToInt(number -> number)
.sum();
⏺ Reactor 예제
import reactor.core.publisher.Mono;
public class Example {
public static void main(String[] args) {
// Publisher의 역할 : Mono/Flux
Mono<String> mono = Mono.just("안녕");
//Subscriber의 역할 : subscribe() 메서드 내부 람다 표현식
mono.subscribe(message -> System.out.println(message));
}
}
Mono와 Flux는 퍼블리셔(생산자)의 역할을 하는 객체이다.
즉 비동기/넌블로킹 방식의 리액티브 프로그래밍에서, 데이터를 내보내는 역할이다.
Mono : 0~1개의 반환값을 가짐
Flux : 0~N개의 반환값을 가짐
✍️ Signal : 퍼블리셔(생산자)가 발생시키는 이벤트
emit이 일어날 때, 리액티브 프로그래밍에선 이벤트가 발생하는것을 간주하는데
이 이벤트를 다른 컴포넌트에게 전달하는것을 Signal을 전송한다고 표현한다.
✍️ Operator : 리액티브 프로그래밍에서 어떤 동작을 수행하는 메서드
filter(), reduce() 등 내부적으로 수행하는 메서드들이다!
Flux
.fromIterable(List.of(1, 3, 6, 7, 8, 11))
.filter(number -> number > 4 && (number % 2 == 0))
.reduce((n1, n2) -> n1 + n2)
.subscribe(System.out::println);
✍️ Sequence : Operator 체인으로 표현되는 데이터의 흐름
위에 작성된 예시 코드 자체를 하나의 Sequence로 이해하면 됨
✅ Reactor
⏺ Reactor의 특징
- 리액티브 스트림즈를 구현한 라이브러리
- Non-Blocking 통신 지원
- Publisher type : Mono Flux 지원
- MSA기반 application 구조에 적합한 라이브러리
- Backpressure-ready network
- 만약 Publisher가 emit하는 데이터가 끊임없이 들어오는데, Subscriber의 처리 속도가 느리면?
- 이럴 때 적절하게 제어하는 전략을 의미한다.
✅ Marble Diagram
동그라미 : 하나의 데이터
다이어그램에서 시간의 흐름에 따라 변화되는 데이터의 흐름을 볼 수 있다.
Marble Diagram- Mono 해석
Marble Diagram- Flux 해석
✅ Operator
Reactor에서 제공하는 다양한 Operator가 존재한다.
자주 사용될만한 Operator를 간단하게 알아보자.
⏺ 새로운 Sequence를 생성
- just() : 단순 생성
- fromStream() : 입력으로 Stream 전달
- fromIterable() : 자바의 Iterable을 입력으로 전달받아 emit (List, Map, Set)
- create() : signal 이벤트를 발생시키는 operator,
- signal은 퍼블리셔(생성자)가 발생하는 이벤트로, 한번에 여러건의 데이터를 비동기적으로 emit할 수 있다.
⏺ 기존 Sequence에서 변환하는 작업
- flatMap() : 바깥쪽에서 하나의 데이터가 들어올때마다 새로운 Sequence가 생성된다.
- flatMap 내부에서 정의되는 sequence는 inner sequence이다.
- concat() : 입력으로 전달하는 Publisher의 Sequence를 연결해 차례대로 emit
- concat의 파라미터로 두개의 Flux Sequence가 있다고 가정하면, concat을 통해 논리적으로 하나의 시퀀스로 동작함
- zip() : 입력으로 전달되는 여러개의 Sequence에서 emit된 데이터를 결합하는 Operator
⏺ Sequence의 내부 동작을 확인
- doOnNext() 처리되기전에 무언가 수행하기 위한 목적
- (처리흐름에 영향을 미치지 않고 디버깅이나 로깅같은 사이드 이펙트 생성에 사용)
- log() : publisher에서 발생하는 Signal이벤트를 로그로 출력
⏺ Sequence에서 데이터 필터링
- filter() : 주어진 조건이 참인것만을 통과시키도록 함
- take() : 통과하는 개수를 지정
⏺ 에러 처리
error() : onErrorSignal 이벤트를 발생시킬 때 사용 (의도적인 예외를 던짐)
- timeout() : 주어진 시간동안 emit되는 데이터가 없을때 onErrorSignal 발생
- retry() : Sequence상에서 에러가 발생했을 떄 입력으로 주어진 숫자만큼 재구독/ Sequence를 다시 시작
references
https://projectreactor.io/docs/core/release/api/reactor/core/publisher/Flux.html
https://tech.kakao.com/2018/05/29/reactor-programming/