Framework/Spring

[Spring Boot]SSE 구현

  • -
반응형

예약 관련 서비스를 개발하다보니 관리자 페이지에서 예약 현황을 확인할 때 실시간으로 확인할 수 있는 기능이 필요했습니다. 예약 내용을 누가 확인하는지를 실시간으로 확인할 수 있어야했는데 "알림에 주로 사용되는 SSE(Server Sent Event)를 사용해보면 어떨까"라는 생각이 들엇습니다. 

브라우저 접속 후 서버와 커넥션을 열어두고 A 직원이 예약 내용을 확인할 때 서버에 이벤트를 발행하고, 서버에서는 커넥션되어있는 클라이언트에게 알림을 보내주는 형태로(B, C 직원에게 알림) 말이죠. 고민 끝에 웹 소켓보다 가볍고 구현이 쉬운 SSE를 도입했었습니다.

오늘 포스팅할 내용은 해당 기능을 구현하며 만든 예제입니다.

※ 테스트 환경
  • Spring Boot
  • Gradle
  • React
  • Postman

1. 프론트엔드 구성

1-1) 리액트 프로젝트 생성

본 예제에서는 특정 코드를 입력해 서버와 SSE 연결에 성공하면 입력한 코드에 해당하는 메세지를 수신할 수 있도록 했습니다.

 

1-2) 화면 추가

리액트 기본 프로젝트 생성 후 아래와 같이 App.js 파일만 수정해줍니다.

import './App.css';

function init() {
    const connectStore = () => {
      const storeCode = document.getElementById('storeCode').value;
      if (storeCode === "") return;

      document.getElementById('connectMessage').innerText = `${storeCode} 에 연결되었습니다.`;
      document.getElementById('storeCode').disabled = true;
      
      const eventSource = new EventSource(`http://localhost:8080/notify/connect/${storeCode}`);
    
      eventSource.onopen = (e) => {
        // console.log(e);
      };	
      eventSource.onerror = (e) => {
        // console.log(e);
      };	
      eventSource.onmessage = (e) => {
        // console.log(e.data);

        if (e.data !== "tick") {
          const result = JSON.parse(e.data);

          document.getElementById('messageArea').innerHTML += `<p>! ${result.message} !</p>`;
        }
      };
    };

    return (
    <div>   
      <div>
        <label>가게 코드 : </label><input type='text' id='storeCode' placeholder='가게 코드를 입력해주세요.'/>
        <button type="button" onClick={connectStore}>접속</button> 
        <span id='connectMessage'></span>   
      </div>      
      <hr></hr>
      <div>메세지</div>
      <div id="messageArea"></div>
    </div>            
  );
}

export default init;

SSE를 사용하기 위해 서버와 연결을 하기 위해서는 자바스크립트에서 제공하는 EventSource라는 인터페이스를 이용합니다. 여기서 입력한 코드를 통해(/notify/connect/${storeCode}) 서버와 연결을 맺고 데이터가 오는 것을 체크해 화면에 알림을 띄우게 될 것 입니다.

 

2. 백엔드 구성

2-1) CORS 설정

먼저 클라이언트와 연결이 가능하도록 CORS를 추가해줍니다.

package com.example.webflux.config;

import org.springframework.context.annotation.Configuration;
import org.springframework.web.servlet.config.annotation.CorsRegistry;
import org.springframework.web.servlet.config.annotation.WebMvcConfigurer;

@Configuration
public class WebConfig implements WebMvcConfigurer {

    @Override
    public void addCorsMappings(CorsRegistry registry) {
        registry.addMapping("/**")
                .allowedOrigins("http://localhost:3000");
    }
}

 

2-2) SSE 채널 생성

SseChannel은 연결된 채널 관리와 이벤트를 발행합니다.

Spring에서는 이벤트를 스트림에 푸쉬하는데 사용할 수 있는 FluxSink를 제공합니다. 여기서 EmitterProcessor는 FlutterProcessor의 하위 클래스로 sink() 메소드를 이용해 스트림에 푸쉬를 사용할 수 있는 FluxSink 인스턴스를 반환합니다. 그리고 FluxSink의 sink() 메소드를 이용해 생성된 스트림의 (추가되어 있는)다음 항목을 방출합니다.

package com.example.webflux.component;

import com.example.webflux.notify.dto.NotifyDTO;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import reactor.core.publisher.EmitterProcessor;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;

import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;

@Slf4j
@Component
public class SseChannel {

    private final ConcurrentHashMap<String, StoreChannel> channelMap = new ConcurrentHashMap<>();

    public StoreChannel connect(String storeCode) {
        /**
         * 스토어 채널이 없는 경우 해당 채널맵을 생성하고,
         * 채널맵의 스트림이 끝나면 StoreChannel 객체를 제거한다.
         */
        return channelMap.computeIfAbsent(storeCode, key -> new StoreChannel()
                                                                    .onClose(() -> channelMap.remove(storeCode)));
    }

    public void post(NotifyDTO notifyDTO) {
        //연결된 스토어 채널이 있는 경우 StoreChannel로 데이터를 전송한다.
        Optional.ofNullable(channelMap.get(notifyDTO.getStoreCode()))
                .ifPresent(ch -> ch.send(notifyDTO));
    }

    public static class StoreChannel {
        private final EmitterProcessor<NotifyDTO> processor;
        private final Flux<NotifyDTO> flux;
        private final FluxSink<NotifyDTO> sink;
        private Runnable closeCallback;

        public StoreChannel() {
            //클라이언트에게 데이터 전달을 하기 위한 프로세서(EmitterProcessor) 생성
            processor = EmitterProcessor.create();

            this.sink = processor.sink();
            this.flux = processor
                                //클라이언트의 연결이 끊기면 발생하는 이벤트
                                .doOnCancel(() -> {
                                    log.info("doOnCancel, downstream - {}", processor.downstreamCount());

                                    if (processor.downstreamCount() == 1) close();
                                })
                                .doOnTerminate(() -> {
                                    log.info("doOnTerminate, downstream - {}", processor.downstreamCount());
                                });
        }

        public void send(NotifyDTO notifyDTO) {
            log.info("message : {}", notifyDTO.getMessage());

            sink.next(notifyDTO);
        }

        public Flux<NotifyDTO> toFlux() {
            return flux;
        }

        private void close() {
            if (closeCallback != null) closeCallback.run();
            sink.complete();
        }

        public StoreChannel onClose(Runnable closeCallback) {
            this.closeCallback = closeCallback;
            return this;
        }
    }
}

 

2-3) DTO 추가

package com.example.webflux.notify.dto;

import lombok.Builder;
import lombok.Data;

@Builder
@Data
public class NotifyDTO {

    String storeCode;
    String message;

}

 

2-4) Controller 추가

NotifyController는 SSE 채널을 연결하고 메세지를 송신하는 기능을 제공합니다.

  • connect() : storeCode를 통해 특정 채널에 사용자를 연결시키고 1초마다 이벤트를 발생하는 스트림을 생성합니다. ServerSentEvent 엔티티를 선언함으로써 미디어 타입(TEXT_EVENT_STREAM_VALUE)을 정의하지 않아도 됩니다.
  • reserve() : NotifyDTO에 포함된 채널 코드를 통해 해당 채널에 메세지를 송신합니다.
package com.example.webflux.notify.controller;

import com.example.webflux.component.SseChannel;
import com.example.webflux.notify.dto.NotifyDTO;
import lombok.extern.slf4j.Slf4j;
import org.springframework.http.codec.ServerSentEvent;
import org.springframework.web.bind.annotation.*;
import reactor.core.publisher.Flux;

import java.time.Duration;

@Slf4j
@RestController
public class NotifyController {

    private final SseChannel sseChannel = new SseChannel();

    @GetMapping("/notify/connect/{storeCode}")
    public Flux<ServerSentEvent<Object>> connect(@PathVariable("storeCode") String storeCode) {
        Flux<NotifyDTO> userStream = sseChannel.connect(storeCode).toFlux();

        //1초 간격으로 이벤트를 발생하는 스트림
        Flux<String> tickStream = Flux.interval(Duration.ofSeconds(1)).map(tick -> "tick");

        return Flux.merge(userStream, tickStream)
                    .map(str -> ServerSentEvent.builder(str).build());
    }

    @PostMapping("/notify/reserve")
    public void reserve(@RequestBody NotifyDTO notifyDTO) {
        sseChannel.post(notifyDTO);
    }

}

 

3. 테스트

그럼 이제 각각 다른 브라우저를 열어 테스트해보겠습니다.

먼저 채널 코드 입력 후 같은 채널에 연결합니다. => T01

그리고 Postman을 이용해 해당 채널에 메세지를 보냅니다.

두 브라우저에서 같은 메세지가 수신된다면 성공입니다.

 

 

참고자료
 

스프링 웹플럭스(WebFlux) SSE(Server Sent Event) 구현 1

Server Sent Event, 줄여서 SSE는 웹 서버에서 웹 브라우저로 이벤트를 푸시하고 싶을 때 유용하게 사용할 수 있다. 스프링 웹플럭스를 사용하면 간단하게 SSE를 구현할 수 있다. 이 글에서는 간단한 예

javacan.tistory.com

 

스프링 웹플럭스(WebFlux) SSE(Server Sent Event) 구현 2

[수정] 2020-05-01 : 종료 처리를 위한 내용 추가. UnicastProcessor이 아닌 Emitterprocessor로 구현 변경 이전 글(스프링 웹플럭스(WebFlux) SSE(Server Sent Event) 구현 1)에서 작성한 SSE 예제는 현실적이지 않다. 서

javacan.tistory.com


+ 전체 소스는 GitHub에서 확인하실 수 있습니다.

반응형
Contents

포스팅 주소를 복사했습니다.

이 글이 도움이 되었다면 공감 부탁드립니다.