백엔드/스프링, 스프링부트, JPA, Spring Webflux

[Spring WebFlux] Flux를 이용한 SSE(Server Sent Event) 처리하기

Kangjieun11 2023. 6. 6. 23:03
728x90

 

 


황정식 - 스프링으로 시작하는 리액티브 프로그래밍

Spring WebFlux를 이용한 Non-Blocking 애플리케이션 구현

 

 

책을 통해 공부하면서 개인적으로 정리한 내용입니다.

 

 


 

 

✅ 개요

스프링 웹플럭스는 스프링5에 추가된 Reactive 스택이다. 

Reactor는 두가지 타입을 지원한다.

  • Mono :  0개 또는 1개의 데이터만 emit 
  • Flux :  n개의 데이터를 emit한다

 

 

오늘은 N개의 데이터를 emit하는 FLUX를 이용해

스트리밍으로 데이터를 처리하는 것에 대해 알아보자.

 

 

✅ Reacive Streaming 데이터처리

  • Spring Webflux는 SSE를 이용하여 데이터를 스트리밍 할 수 있다. 
  • SSE는 Spring 4.2부터 지원되었으며
  • Spring 5에서부턴, Reactor의 Publisher타입인 Flux를 이용해 SSE를 편리하게 사용할 수 있다.

 

✍️ SSE (Server Sent Event)

클라이언트가 서버로부터 전송되는 업데이트 데이터를 지속적으로 수신 가능한 단방향 서버 push 기술

 

지속적으로 스트리밍을 하는 프로토콜

  • 데이터를 여러번에 나누어 보낼 수 있다.
  • 연결선 (Stream)을 유지할 수 있다.

 

 

일반적인 통신은 1개의 요청당 1개의 응답이 돌아갔지만,

SSE를 이용하면 한번의 커넥션에, 서버가 지속적으로 데이터를 보내줄 수 있다.

 

 

✍️ 스트리밍 형태로 반환하려면? 

결국 Flux 타입을 이용해야한다.

FLUX를 이요해서 여러번의 응답 데이터를 나누어 보내줄 수 있다.

public class BookService {
	private final @NonNull R2dbcEntityTemplate template;

	//생략
    
    public Flux<Book> stermingBooks() {
    	return template
        .select(Book.class)
        .all()
        .delayElements(Duration.ofSeconds(2L));
    }
    
    //생략 
}

 

 

 

✍️ Flux를 이용해 SSE  2번 전송 예제

 

Flux를 이용해 간편하게 SSE를 실습해보자.

 

아래는 그림과 같은 상황을 가정한다.

 

  1. 클라이언트는 백엔드 서버에 요청을 한다.
  2. 외부 서버로의 요청은, 응답까지 5초가 걸린다고 가정한다.
  3. 백엔드서버는 클라이언트의 요청을 받으면 외부서버로 요청을 보내놓고,   바로 첫번째 응답을 준다.
  4. 백엔드 서버로 외부서버의 응답을 받으면, 클라이언트에게 두번째 응답을 돌려 준다.

 

@RestController
@RequestMapping("/test")
public class TestController {

    @GetMapping("/data")
    public Flux<ServerSentEvent<String>> getData() {
		
        Mono<String> firstResponse = Mono.just("첫번째 응답입니다😆");
        Mono<String> secondResponse = Mono.just("두번째 응답입니다😎");
        
        Flux<ServerSentEvent<String>> responseStream = Flux.concat(
                firstResponse.map(data -> ServerSentEvent.<String>builder().data(data).build()),
                secondResponse.delayElement(Duration.ofSeconds(5)).map(data -> ServerSentEvent.<String>builder().data(data).build())
        );

        return responseStream;
    }
    
}

 


Mono secondResponse는 현재 Mono.just로 문자열을 넣어놨지만  외부서버로의 요청으로 대체하면 된다. 

여기에서는 외부 API를 호출하지 않기 때문에, 첫번째응답과 5초의 딜레이를 따로 주었다.

 

 

😁 결과

 

요청을 보내보았다.

 

Connected되고, 첫번째 응답, 5초후의 2번째응답이 돌아온다

2번째응답이 돌아옴과 동시에 Connection은 닫혔다.

 

 


references 

 

황정식 - 스프링으로 시작하는 리액티브 프로그래밍

 

스프링으로 시작하는 리액티브 프로그래밍 | 황정식 - 교보문고

스프링으로 시작하는 리액티브 프로그래밍 | *리액티브 프로그래밍의 기본기를 확실하게 다진다*리액티브 프로그래밍은 적은 컴퓨팅 파워로 대량의 요청 트래픽을 효과적으로 처리할 수 있는

product.kyobobook.co.kr

 

https://www.saichoiblog.com/flux-sse/