Spring Boot Server Sent Event(SSE) 간단 샘플


Server Sent Event (SSE)는 웹 어플리케이션에서 실시간 업데이트를 제공하는 데 사용되는 웹 기술 중 하나이다. Spring에서는 Spring MVC에서 사용되는 SseEmitter 클래스를 제공하여 손쉽게 SSE 스트림을 생성하고 클라이언트에게 이벤트를 푸시할 수 있도록 한다. 이를 통해 클라이언트가 연결을 유지하면서 서버로부터 데이터를 지속적으로 수신할 수 있도록 한다. 이번 포스팅에서는 Spring에서 SSE를 사용하는 방법에 대해서 정리해보고자 한다.

Server Sent Event 특징

  • 서버 -> 클라이언트 단방향으로 이벤트를 스트리밍 할 수 있도록 대부분의 브라우저에서 채택한 사양
  • 이벤트 데이터는 UTF-8로 인코딩된 텍스트 데이터 스트림
  • 이벤트 데이터 형식은 줄 바꿈으로 구분된 일련의 키-값(id, retry, data, event)으로 구성
  • 데이터 페이로드 형식을 어떤 식으로든 제한하지 않으며, 간단한 문자열이나 복잡한 JSON 또는 XML 구조를 사용가능
  • 데이터 전송 MIME Type은 text/event-stream
    • event stream 데이터는 항상 UTF-8로 인코딩 해야 한다.

자세한 내용은
SSE spec 정의 문서를 참고

Server Sent Event 서버 구현

백문이 불여일견!! 간단한 Server Sent Event 서버 샘플을 구현해보자.
필요한 것은 다음과 같다.

  • Server Sent Event 연결 처리를 위한 Controller
  • SseEmitter 관리 Service
  • Server Sent Event DTO 정의

Server Sent Event는 SseEmitter를 통해서 client subscription을 관리한다.
client와 연결을 맺고 이벤트를 broadcasting 하는 Controller

package com.example.stream.controller;

import com.example.stream.dto.EventPayload;
import com.example.stream.service.SseEmitterService;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.http.MediaType;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;

import java.util.UUID;

@RestController
@RequiredArgsConstructor
@Slf4j
public class SSEController {
    private final SseEmitterService sseEmitterService;

    //응답 mime type 은 반드시 text/event-stream 이여야 한다.
    //클라이언트로 부터 SSE subscription 을 수락한다.
    @GetMapping(path = "/v1/sse/subscribe", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
    public ResponseEntity<SseEmitter> subscribe() {
        String sseId = UUID.randomUUID().toString();
        SseEmitter emitter = sseEmitterService.subscribe(sseId);
        return ResponseEntity.ok(emitter);
    }

    //eventPayload 를 SSE 로 연결된 모든 클라이언트에게 broadcasting 한다.
    @PostMapping(path = "/v1/sse/broadcast")
    public ResponseEntity<Void> broadcast(@RequestBody EventPayload eventPayload) {
        sseEmitterService.broadcast(eventPayload);
        return ResponseEntity.ok().build();
    }
}
Java

SseEmitter를 관리하는 Service

package com.example.stream.service;

import com.example.stream.dto.EventPayload;
import lombok.extern.slf4j.Slf4j;
import org.springframework.http.MediaType;
import org.springframework.stereotype.Service;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;

import java.io.IOException;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

@Service
@Slf4j
public class SseEmitterService {
    // thread-safe 한 컬렉션 객체로 sse emitter 객체를 관리해야 한다.
    private final Map<String, SseEmitter> emitterMap = new ConcurrentHashMap<>();
    private static final long TIMEOUT = 60 * 1000;
    private static final long RECONNECTION_TIMEOUT = 1000L;

    public SseEmitter subscribe(String id) {
        SseEmitter emitter = createEmitter();
        //연결 세션 timeout 이벤트 핸들러 등록
        emitter.onTimeout(() -> {
            log.info("server sent event timed out : id={}", id);
            //onCompletion 핸들러 호출
            emitter.complete();
        });

        //에러 핸들러 등록
        emitter.onError(e -> {
            log.info("server sent event error occurred : id={}, message={}", id, e.getMessage());
            //onCompletion 핸들러 호출
            emitter.complete();
        });

        //SSE complete 핸들러 등록
        emitter.onCompletion(() -> {
            if (emitterMap.remove(id) != null) {
                log.info("server sent event removed in emitter cache: id={}", id);
            }

            log.info("disconnected by completed server sent event: id={}", id);
        });

        emitterMap.put(id, emitter);

        //초기 연결시에 응답 데이터를 전송할 수도 있다.
        try {
            SseEmitter.SseEventBuilder event = SseEmitter.event()
                //event 명 (event: event example)
                .name("event example")
                //event id (id: id-1) - 재연결시 클라이언트에서 `Last-Event-ID` 헤더에 마지막 event id 를 설정
                .id(String.valueOf("id-1"))
                //event data payload (data: SSE connected)
                .data("SSE connected")
                //SSE 연결이 끊어진 경우 재접속 하기까지 대기 시간 (retry: <RECONNECTION_TIMEOUT>)
                .reconnectTime(RECONNECTION_TIMEOUT);
            emitter.send(event);
        } catch (IOException e) {
            log.error("failure send media position data, id={}, {}", id, e.getMessage());
        }
        return emitter;
    }

    public void broadcast(EventPayload eventPayload) {
        emitterMap.forEach((id, emitter) -> {
            try {
                emitter.send(SseEmitter.event()
                    .name("broadcast event")
                    .id("broadcast event 1")
                    .reconnectTime(RECONNECTION_TIMEOUT)
                    .data(eventPayload, MediaType.APPLICATION_JSON));
                log.info("sended notification, id={}, payload={}", id, eventPayload);
            } catch (IOException e) {
                //SSE 세션이 이미 해제된 경우
                log.error("fail to send emitter id={}, {}", id, e.getMessage());
            }
        });
    }

    private SseEmitter createEmitter() {
        return new SseEmitter(TIMEOUT);
    }
}
Java

연결 세션에 대한 timeout 은 1분을 설정하였다. 1분 동안 아무런 데이터 전송이 없는 경우 SseEmitter의 onTimeout 핸들러가 호출되고 아래와 같은 로그가 출력된다.

c.e.stream.service.SseEmitterService     : server sent event timed out : id=db76eb66-a8ed-4dc9-890f-4dcd71018821
c.e.stream.service.SseEmitterService     : server sent event removed in emitter cache: id=db76eb66-a8ed-4dc9-890f-4dcd71018821
c.e.stream.service.SseEmitterService     : disconnected by completed server sent event: id=db76eb66-a8ed-4dc9-890f-4dcd71018821
Plaintext

Server Sent Event payload 정의

package com.example.stream.dto;

import com.fasterxml.jackson.annotation.JsonProperty;

public record EventPayload(@JsonProperty("memberId") String memberId,
                           @JsonProperty("memberName") String memberName,
                           @JsonProperty("memberAge") String memberAge) {
}
Java

테스트

위 예제 코드의 서버 포트는 8081이다.

postman 테스트

1. GET http://localhost:80801/v1/sse/subscribe를 호출하여 연결을 수립한다.

Server Sent Event Connect 설정

Server Sent Event 응답헤더 화면

2. 이벤트를 발생시켜 본다.

POST
http://localhost:8081/v1/sse/broadcast 호출하고 아래와 같은 JSON body 를 payload 로 전송한다.

{
    "memberId": "test-id",
    "memberName": "test-name",
    "memberAge": "test-age"
}
JSON

1번에서 연결된 세션으로 발생시킨 이벤트를 수신한다.

Server Sent Event - 수신된 event 데이터 payload

브라우저 테스트

두 개의 크롬 브라우저 창을 열어 각각 /v1/sse/subscribe 로 연결을 수립한다.

Server Sent Event 연결

이벤트를 발생시키면 두개의 브라우저에서 이벤트 메시지를 수신한다.

Server Sent Event 이벤트 발생

서버 로그는 아래와 같이 출력된다.

c.e.stream.service.SseEmitterService     : sended notification, id=bf166289-a20c-4434-93b6-e51843a1afb9, payload=EventPayload[memberId=test-id, memberName=test-name, memberAge=test-age]
c.e.stream.service.SseEmitterService     : sended notification, id=dbe4b71a-1b12-48a7-b029-bfd17bd067f8, payload=EventPayload[memberId=test-id, memberName=test-name, memberAge=test-age]
Plaintext

마지막으로 포스팅에서 작성된 예제는 단일 서버 환경에서는 크게 문제가 없겠지만 멀티 서버 환경의 경우에는 SseEmitter를 통합 관리할 수 있는 방법을 고민해봐야 한다.