Framework/Spring

[Spring Boot]SSE 구현

  • -
반응형

예약 관련 서비스를 개발하다보니 관리자 페이지에서 예약 현황을 확인할 때 실시간으로 확인할 수 있는 기능이 필요했습니다. 예약 내용을 누가 확인하는지를 실시간으로 확인할 수 있어야했는데 "알림에 주로 사용되는 SSE(Server Sent Event)를 사용해보면 어떨까"라는 생각이 들엇습니다. 

브라우저 접속 후 서버와 커넥션을 열어두고 A 직원이 예약 내용을 확인할 때 서버에 이벤트를 발행하고, 서버에서는 커넥션되어있는 클라이언트에게 알림을 보내주는 형태로(B, C 직원에게 알림) 말이죠. 고민 끝에 웹 소켓보다 가볍고 구현이 쉬운 SSE를 도입했었습니다.

오늘 포스팅할 내용은 해당 기능을 구현하며 만든 예제입니다.

※ 테스트 환경
  • Spring Boot
  • Gradle
  • React
  • Postman

본 예제에서는 특정 코드를 입력해 서버와 SSE 연결에 성공하면 입력한 코드에 해당하는 메세지를 수신할 수 있도록 했습니다.

 

리액트 기본 프로젝트 생성 후 아래와 같이 App.js 파일만 수정해줍니다.

import './App.css'; function init() { const connectStore = () => { const storeCode = document.getElementById('storeCode').value; if (storeCode === "") return; document.getElementById('connectMessage').innerText = `${storeCode} 에 연결되었습니다.`; document.getElementById('storeCode').disabled = true; const eventSource = new EventSource(`http://localhost:8080/notify/connect/${storeCode}`); eventSource.onopen = (e) => { // console.log(e); }; eventSource.onerror = (e) => { // console.log(e); }; eventSource.onmessage = (e) => { // console.log(e.data); if (e.data !== "tick") { const result = JSON.parse(e.data); document.getElementById('messageArea').innerHTML += `<p>! ${result.message} !</p>`; } }; }; return ( <div> <div> <label>가게 코드 : </label><input type='text' id='storeCode' placeholder='가게 코드를 입력해주세요.'/> <button type="button" onClick={connectStore}>접속</button> <span id='connectMessage'></span> </div> <hr></hr> <div>메세지</div> <div id="messageArea"></div> </div> ); } export default init;

SSE를 사용하기 위해 서버와 연결을 하기 위해서는 자바스크립트에서 제공하는 EventSource라는 인터페이스를 이용합니다. 여기서 입력한 코드를 통해(/notify/connect/${storeCode}) 서버와 연결을 맺고 데이터가 오는 것을 체크해 화면에 알림을 띄우게 될 것 입니다.

 

먼저 클라이언트와 연결이 가능하도록 CORS를 추가해줍니다.

package com.example.webflux.config; import org.springframework.context.annotation.Configuration; import org.springframework.web.servlet.config.annotation.CorsRegistry; import org.springframework.web.servlet.config.annotation.WebMvcConfigurer; @Configuration public class WebConfig implements WebMvcConfigurer { @Override public void addCorsMappings(CorsRegistry registry) { registry.addMapping("/**") .allowedOrigins("http://localhost:3000"); } }

 

SseChannel은 연결된 채널 관리와 이벤트를 발행합니다.

Spring에서는 이벤트를 스트림에 푸쉬하는데 사용할 수 있는 FluxSink를 제공합니다. 여기서 EmitterProcessor는 FlutterProcessor의 하위 클래스로 sink() 메소드를 이용해 스트림에 푸쉬를 사용할 수 있는 FluxSink 인스턴스를 반환합니다. 그리고 FluxSink의 sink() 메소드를 이용해 생성된 스트림의 (추가되어 있는)다음 항목을 방출합니다.

package com.example.webflux.component; import com.example.webflux.notify.dto.NotifyDTO; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Component; import reactor.core.publisher.EmitterProcessor; import reactor.core.publisher.Flux; import reactor.core.publisher.FluxSink; import java.util.Optional; import java.util.concurrent.ConcurrentHashMap; @Slf4j @Component public class SseChannel { private final ConcurrentHashMap<String, StoreChannel> channelMap = new ConcurrentHashMap<>(); public StoreChannel connect(String storeCode) { /** * 스토어 채널이 없는 경우 해당 채널맵을 생성하고, * 채널맵의 스트림이 끝나면 StoreChannel 객체를 제거한다. */ return channelMap.computeIfAbsent(storeCode, key -> new StoreChannel() .onClose(() -> channelMap.remove(storeCode))); } public void post(NotifyDTO notifyDTO) { //연결된 스토어 채널이 있는 경우 StoreChannel로 데이터를 전송한다. Optional.ofNullable(channelMap.get(notifyDTO.getStoreCode())) .ifPresent(ch -> ch.send(notifyDTO)); } public static class StoreChannel { private final EmitterProcessor<NotifyDTO> processor; private final Flux<NotifyDTO> flux; private final FluxSink<NotifyDTO> sink; private Runnable closeCallback; public StoreChannel() { //클라이언트에게 데이터 전달을 하기 위한 프로세서(EmitterProcessor) 생성 processor = EmitterProcessor.create(); this.sink = processor.sink(); this.flux = processor //클라이언트의 연결이 끊기면 발생하는 이벤트 .doOnCancel(() -> { log.info("doOnCancel, downstream - {}", processor.downstreamCount()); if (processor.downstreamCount() == 1) close(); }) .doOnTerminate(() -> { log.info("doOnTerminate, downstream - {}", processor.downstreamCount()); }); } public void send(NotifyDTO notifyDTO) { log.info("message : {}", notifyDTO.getMessage()); sink.next(notifyDTO); } public Flux<NotifyDTO> toFlux() { return flux; } private void close() { if (closeCallback != null) closeCallback.run(); sink.complete(); } public StoreChannel onClose(Runnable closeCallback) { this.closeCallback = closeCallback; return this; } } }

 

package com.example.webflux.notify.dto; import lombok.Builder; import lombok.Data; @Builder @Data public class NotifyDTO { String storeCode; String message; }

 

NotifyController는 SSE 채널을 연결하고 메세지를 송신하는 기능을 제공합니다.

  • connect() : storeCode를 통해 특정 채널에 사용자를 연결시키고 1초마다 이벤트를 발생하는 스트림을 생성합니다. ServerSentEvent 엔티티를 선언함으로써 미디어 타입(TEXT_EVENT_STREAM_VALUE)을 정의하지 않아도 됩니다.
  • reserve() : NotifyDTO에 포함된 채널 코드를 통해 해당 채널에 메세지를 송신합니다.
package com.example.webflux.notify.controller; import com.example.webflux.component.SseChannel; import com.example.webflux.notify.dto.NotifyDTO; import lombok.extern.slf4j.Slf4j; import org.springframework.http.codec.ServerSentEvent; import org.springframework.web.bind.annotation.*; import reactor.core.publisher.Flux; import java.time.Duration; @Slf4j @RestController public class NotifyController { private final SseChannel sseChannel = new SseChannel(); @GetMapping("/notify/connect/{storeCode}") public Flux<ServerSentEvent<Object>> connect(@PathVariable("storeCode") String storeCode) { Flux<NotifyDTO> userStream = sseChannel.connect(storeCode).toFlux(); //1초 간격으로 이벤트를 발생하는 스트림 Flux<String> tickStream = Flux.interval(Duration.ofSeconds(1)).map(tick -> "tick"); return Flux.merge(userStream, tickStream) .map(str -> ServerSentEvent.builder(str).build()); } @PostMapping("/notify/reserve") public void reserve(@RequestBody NotifyDTO notifyDTO) { sseChannel.post(notifyDTO); } }

 

그럼 이제 각각 다른 브라우저를 열어 테스트해보겠습니다.

먼저 채널 코드 입력 후 같은 채널에 연결합니다. => T01

그리고 Postman을 이용해 해당 채널에 메세지를 보냅니다.

두 브라우저에서 같은 메세지가 수신된다면 성공입니다.

 

 

참고자료
 

스프링 웹플럭스(WebFlux) SSE(Server Sent Event) 구현 1

Server Sent Event, 줄여서 SSE는 웹 서버에서 웹 브라우저로 이벤트를 푸시하고 싶을 때 유용하게 사용할 수 있다. 스프링 웹플럭스를 사용하면 간단하게 SSE를 구현할 수 있다. 이 글에서는 간단한 예

javacan.tistory.com

 

스프링 웹플럭스(WebFlux) SSE(Server Sent Event) 구현 2

[수정] 2020-05-01 : 종료 처리를 위한 내용 추가. UnicastProcessor이 아닌 Emitterprocessor로 구현 변경 이전 글(스프링 웹플럭스(WebFlux) SSE(Server Sent Event) 구현 1)에서 작성한 SSE 예제는 현실적이지 않다. 서

javacan.tistory.com


+ 전체 소스는 GitHub에서 확인하실 수 있습니다.

반응형

포스팅 주소를 복사했습니다.

이 글이 도움이 되었다면 공감 부탁드립니다.