[Spring Boot]SSE 구현
예약 관련 서비스를 개발하다보니 관리자 페이지에서 예약 현황을 확인할 때 실시간으로 확인할 수 있는 기능이 필요했습니다. 예약 내용을 누가 확인하는지를 실시간으로 확인할 수 있어야했는데 "알림에 주로 사용되는 SSE(Server Sent Event)를 사용해보면 어떨까"라는 생각이 들엇습니다.
브라우저 접속 후 서버와 커넥션을 열어두고 A 직원이 예약 내용을 확인할 때 서버에 이벤트를 발행하고, 서버에서는 커넥션되어있는 클라이언트에게 알림을 보내주는 형태로(B, C 직원에게 알림) 말이죠. 고민 끝에 웹 소켓보다 가볍고 구현이 쉬운 SSE를 도입했었습니다.
오늘 포스팅할 내용은 해당 기능을 구현하며 만든 예제입니다.
※ 테스트 환경
- Spring Boot
- Gradle
- React
- Postman
1. 프론트엔드 구성
1-1) 리액트 프로젝트 생성
본 예제에서는 특정 코드를 입력해 서버와 SSE 연결에 성공하면 입력한 코드에 해당하는 메세지를 수신할 수 있도록 했습니다.
1-2) 화면 추가
리액트 기본 프로젝트 생성 후 아래와 같이 App.js 파일만 수정해줍니다.
import './App.css';
function init() {
const connectStore = () => {
const storeCode = document.getElementById('storeCode').value;
if (storeCode === "") return;
document.getElementById('connectMessage').innerText = `${storeCode} 에 연결되었습니다.`;
document.getElementById('storeCode').disabled = true;
const eventSource = new EventSource(`http://localhost:8080/notify/connect/${storeCode}`);
eventSource.onopen = (e) => {
// console.log(e);
};
eventSource.onerror = (e) => {
// console.log(e);
};
eventSource.onmessage = (e) => {
// console.log(e.data);
if (e.data !== "tick") {
const result = JSON.parse(e.data);
document.getElementById('messageArea').innerHTML += `<p>! ${result.message} !</p>`;
}
};
};
return (
<div>
<div>
<label>가게 코드 : </label><input type='text' id='storeCode' placeholder='가게 코드를 입력해주세요.'/>
<button type="button" onClick={connectStore}>접속</button>
<span id='connectMessage'></span>
</div>
<hr></hr>
<div>메세지</div>
<div id="messageArea"></div>
</div>
);
}
export default init;
SSE를 사용하기 위해 서버와 연결을 하기 위해서는 자바스크립트에서 제공하는 EventSource라는 인터페이스를 이용합니다. 여기서 입력한 코드를 통해(/notify/connect/${storeCode}) 서버와 연결을 맺고 데이터가 오는 것을 체크해 화면에 알림을 띄우게 될 것 입니다.
2. 백엔드 구성
2-1) CORS 설정
먼저 클라이언트와 연결이 가능하도록 CORS를 추가해줍니다.
package com.example.webflux.config;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.servlet.config.annotation.CorsRegistry;
import org.springframework.web.servlet.config.annotation.WebMvcConfigurer;
@Configuration
public class WebConfig implements WebMvcConfigurer {
@Override
public void addCorsMappings(CorsRegistry registry) {
registry.addMapping("/**")
.allowedOrigins("http://localhost:3000");
}
}
2-2) SSE 채널 생성
SseChannel은 연결된 채널 관리와 이벤트를 발행합니다.
Spring에서는 이벤트를 스트림에 푸쉬하는데 사용할 수 있는 FluxSink를 제공합니다. 여기서 EmitterProcessor는 FlutterProcessor의 하위 클래스로 sink() 메소드를 이용해 스트림에 푸쉬를 사용할 수 있는 FluxSink 인스턴스를 반환합니다. 그리고 FluxSink의 sink() 메소드를 이용해 생성된 스트림의 (추가되어 있는)다음 항목을 방출합니다.
package com.example.webflux.component;
import com.example.webflux.notify.dto.NotifyDTO;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import reactor.core.publisher.EmitterProcessor;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
@Slf4j
@Component
public class SseChannel {
private final ConcurrentHashMap<String, StoreChannel> channelMap = new ConcurrentHashMap<>();
public StoreChannel connect(String storeCode) {
/**
* 스토어 채널이 없는 경우 해당 채널맵을 생성하고,
* 채널맵의 스트림이 끝나면 StoreChannel 객체를 제거한다.
*/
return channelMap.computeIfAbsent(storeCode, key -> new StoreChannel()
.onClose(() -> channelMap.remove(storeCode)));
}
public void post(NotifyDTO notifyDTO) {
//연결된 스토어 채널이 있는 경우 StoreChannel로 데이터를 전송한다.
Optional.ofNullable(channelMap.get(notifyDTO.getStoreCode()))
.ifPresent(ch -> ch.send(notifyDTO));
}
public static class StoreChannel {
private final EmitterProcessor<NotifyDTO> processor;
private final Flux<NotifyDTO> flux;
private final FluxSink<NotifyDTO> sink;
private Runnable closeCallback;
public StoreChannel() {
//클라이언트에게 데이터 전달을 하기 위한 프로세서(EmitterProcessor) 생성
processor = EmitterProcessor.create();
this.sink = processor.sink();
this.flux = processor
//클라이언트의 연결이 끊기면 발생하는 이벤트
.doOnCancel(() -> {
log.info("doOnCancel, downstream - {}", processor.downstreamCount());
if (processor.downstreamCount() == 1) close();
})
.doOnTerminate(() -> {
log.info("doOnTerminate, downstream - {}", processor.downstreamCount());
});
}
public void send(NotifyDTO notifyDTO) {
log.info("message : {}", notifyDTO.getMessage());
sink.next(notifyDTO);
}
public Flux<NotifyDTO> toFlux() {
return flux;
}
private void close() {
if (closeCallback != null) closeCallback.run();
sink.complete();
}
public StoreChannel onClose(Runnable closeCallback) {
this.closeCallback = closeCallback;
return this;
}
}
}
2-3) DTO 추가
package com.example.webflux.notify.dto;
import lombok.Builder;
import lombok.Data;
@Builder
@Data
public class NotifyDTO {
String storeCode;
String message;
}
2-4) Controller 추가
NotifyController는 SSE 채널을 연결하고 메세지를 송신하는 기능을 제공합니다.
- connect() : storeCode를 통해 특정 채널에 사용자를 연결시키고 1초마다 이벤트를 발생하는 스트림을 생성합니다. ServerSentEvent 엔티티를 선언함으로써 미디어 타입(TEXT_EVENT_STREAM_VALUE)을 정의하지 않아도 됩니다.
- reserve() : NotifyDTO에 포함된 채널 코드를 통해 해당 채널에 메세지를 송신합니다.
package com.example.webflux.notify.controller;
import com.example.webflux.component.SseChannel;
import com.example.webflux.notify.dto.NotifyDTO;
import lombok.extern.slf4j.Slf4j;
import org.springframework.http.codec.ServerSentEvent;
import org.springframework.web.bind.annotation.*;
import reactor.core.publisher.Flux;
import java.time.Duration;
@Slf4j
@RestController
public class NotifyController {
private final SseChannel sseChannel = new SseChannel();
@GetMapping("/notify/connect/{storeCode}")
public Flux<ServerSentEvent<Object>> connect(@PathVariable("storeCode") String storeCode) {
Flux<NotifyDTO> userStream = sseChannel.connect(storeCode).toFlux();
//1초 간격으로 이벤트를 발생하는 스트림
Flux<String> tickStream = Flux.interval(Duration.ofSeconds(1)).map(tick -> "tick");
return Flux.merge(userStream, tickStream)
.map(str -> ServerSentEvent.builder(str).build());
}
@PostMapping("/notify/reserve")
public void reserve(@RequestBody NotifyDTO notifyDTO) {
sseChannel.post(notifyDTO);
}
}
3. 테스트
그럼 이제 각각 다른 브라우저를 열어 테스트해보겠습니다.
먼저 채널 코드 입력 후 같은 채널에 연결합니다. => T01
그리고 Postman을 이용해 해당 채널에 메세지를 보냅니다.
두 브라우저에서 같은 메세지가 수신된다면 성공입니다.
참고자료
+ 전체 소스는 GitHub에서 확인하실 수 있습니다.