단잠 알림 도입기 2편입니다.
이전 포스팅에서 이론 위주로 알아보았기 때문에
이번 포스팅에서는 개발 위주로 정리를 해보려고 합니다.
이전 포스팅에 대해서 아래에 링크를 걸어두겠습니다.
단잠에 대해서 궁금하시다면 ?
https://ryudain.tistory.com/33
[단잠] 대학교 기숙사 메이트 매칭 및 대학 생활 커뮤니티 서비스 회고
✨ 단잠 / 각기 다른 사람들이 모여 단잠에 들다 ✨ 단잠은 대학교 기숙사 메이트 매칭 및 대학 생활 커뮤니티 서비스입니다. 기숙사 메이트 매칭 외에도사용자의 성향에 맞는다양한 운동, 산
ryudain.tistory.com
단잠 알림 도입기 1편 (왜 SSE를 채택하였는 지)
https://ryudain.tistory.com/34
[단잠] 알림 도입기 (feat. AMQP, RabbitMQ, SSE) [1편]
서론단잠 알림 도입기 포스팅 입니다. 사실 FCM 으로 진행하기 위해 코드들을 모두 작성하여서웹으로 모두 테스트까지 완료했었지만,,,, xCode 베타 버전 문제인지 ...........FCM 문제인지 .........
ryudain.tistory.com
💭 요구사항
저는 RabbitMQ와 SSE를 사용하여 알림기능을 구현하고자 합니다.
제가 현재 구현할 알림의 경우 아래와 같이 항상 상단 오른쪽에서 알림이 보여져야합니다.
현재 기능이 전부 개발된 상태는 아니여서 제가 구현해야할 알림 부분은 회원가입 승인 알림입니다.
하지만 추후에 개발 될 부분들을 고려해서 개발을 진행하도록 하겠습니다.
⚒️ Controller
먼저 SSE 연결을 위한 Controller부터 작성하겠습니다.
produces = MediaType.TEXT_EVENT_STREAM_VALUE로 지정하여, 서버가 실시간 데이터를 스트리밍하기 위해 SSE 방식을 사용하고, 클라이언트와 지속적인 연결을 유지하면서 데이터를 전송할 것임을 명시합니다.
Last-Event-Id를 통해 클라이언트가 마지막으로 수신한 이벤트의 ID를 서버로 전달합니다.
Event 목록이 존재할 경우 서버가 클라이언트에게 놓친 이벤트가 있다는 것을 의미합니다.
놓친 이벤트를 찾아 재전송하여 Event 유실을 예방합니다.
/**
* SSE 관련 Controller
*/
@RestController
@RequiredArgsConstructor
@RequestMapping("/api/notice")
public class NotificationController {
private final NotificationService notificationService;
/**
* SSE 연결 구독을 진행합니다.
* @RequestHeader Last-Event-Id<String> lastEventId 클라이언트가 마지막으로 수신한 데이터의 ID 값입니다. (항상 들어오는 값이 아닙니다.)
*/
@GetMapping(value = "/connect", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public ResponseEntity<SseEmitter> subscribe(@AuthenticationPrincipal CustomUserDetails customUserDetails,
@RequestHeader(value = "Last-Event-Id", required = false, defaultValue = "") String lastEventId) {
return ResponseEntity.ok(notificationService.subscribe(customUserDetails, lastEventId));
}
}
⚒️ enums 정의
추후 추가 될 예정입니다. 현재는 회원가입 승인을 위한 ADMIN, 전체 공지를 위한 NOTICE, 채팅방 알림을 위한 CHATROOM을 enum으로 지정했습니다.
public enum NotificationType {
ADMIN, NOTICE, CHATROOM;
}
public enum NotificationStatus {
PENDING, // 대기
DONE, // 완료
DISMISSED; // 삭제
}
🤔 Entity & 흐름도
다음으로 Entity 정의 입니다.
Redis로 알림 내역을 저장할 지, RDBMS로 저장할지 고민을 많이 했습니다만 ,,,,,,,,,
저희는 RabbitMq를 사용하고 있고,
추후에 알림페이지 페이지네이션 처리를 고려하여서 RDBMS를 선택하기로 결정했습니다.
1. 메시지를 발행
2. 메시지를 RDBMS에 저장
3. RabbitMQ에 해당 메시지를 전송
서버는 이후에 처리를 신경쓰지 않습니다.
4. SSE 를 통해 실시간 스트림 클라이언트에 전송
전체 흐름도 입니다.
[알림 요청]
|
v
[RDBMS 저장] -------> [RabbitMQ 발행] --------> [SSE로 클라이언트에 전송]
| \
v v
[데이터 조회 및 추가 처리(페이지네이션 등)] [즉시 클라이언트에 알림 업데이트]
데이터베이스는 설계는 아래와 같습니다.
ErdCloud를 사용하였습니다. 저희 팀은 데이터베이스에서 camelCase 를 사용하고 있습니다.
(개인적으론 snake_case를 좋아합니다............)
테이블 생성 ddl 입니다.
CREATE TABLE noti (
id BIGINT AUTO_INCREMENT PRIMARY KEY,
userId BIGINT NOT NULL, -- User 테이블의 외래 키
type ENUM('ADMIN', 'NOTICE', 'CHATROOM') NOT NULL, -- NotificationType Enum
status ENUM('PENDING', 'DONE', 'DISMISSED') NOT NULL, -- NotificationStatus Enum
title VARCHAR(50),
content TEXT, -- 알림 내용
toName VARCHAR(50), -- 발신인
url TEXT, -- URL
FOREIGN KEY (userId) REFERENCES user(id) ON DELETE CASCADE
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci;
💭 DTO
다음으로 사용하는 Dto 항목들입니다.
메시지를 전달하기 위한 DTO 입니다.
@Data
@Builder
@AllArgsConstructor
@NoArgsConstructor
public class NotificationMessage {
private List<String> receiverNames;
private NotificationType type;
private NotificationStatus status;
private String title;
private String content;
private String url;
private String fromName;
}
메시지의 Response 형식입니다.
@Getter
@Builder
public class NotificationResponse {
private Long id; // 알림 고유 ID
private Long receiverId; // 수신자 ID
private NotificationType type; // 알림 타입
private NotificationStatus status; // 알림 상태
private String title; // 제목
private String content; // 알림 내용
private String fromName; // 발신인 이름
private String url; // 관련 URL(link 이동)
public static NotificationResponse from(Notification notification) {
return NotificationResponse.builder()
.id(notification.getId())
.receiverId(notification.getId())
.type(notification.getType())
.status(notification.getStatus())
.title(notification.getTitle())
.content(notification.getContent())
.fromName(notification.getFromName())
.url(notification.getUrl())
.build();
}
}
알림 처리를 위한 DTO 입니다.
@Getter
public class NotificationStatusDto {
@NotNull
@NotEmpty(message = "알림의 ID 값은 필수입니다.")
private List<Long> ids;
@NotNull(message = "알림의 상태 값은 필수입니다.")
private NotificationStatus status;
}
⚒️ SSE 설정
다음으로 SSE 설정입니다.
인 메모리 기반의 저장소를 구현하기 위한 emitter repository를 생성합니다.
클라이언트와 서버 간의 지속적인 연결을 관리하기 위해 SseEmitter 객체를 저장하고 관리합니다.
자세한 설명은 주석문을 달아두었습니다.
EmitterRepository
@Configuration
public class EmitterRepositoryImpl implements EmitterRepository {
// 클라이언트와 서버 간 연결 정보를 저장하는 Map
private final Map<String, SseEmitter> emitters = new ConcurrentHashMap<>();
// 각 연결의 마지막 이벤트 데이터를 캐시로 저장하는 Map
private final Map<String, Object> eventCache = new ConcurrentHashMap<>();
/**
* 특정 emitterId를 키로 하고, 해당 SseEmitter를 저장합니다.
*
* @param emitterId SseEmitter와 연결된 고유 ID(주로 사용자 또는 연결 ID)
* @param sseEmitter 클라이언트와 연결된 SseEmitter 객체
* @return 저장된 SseEmitter 객체
*/
@Override
public SseEmitter save(String emitterId, SseEmitter sseEmitter) {
emitters.put(emitterId, sseEmitter); // emitterId와 SseEmitter를 Map에 저장
return sseEmitter; // 저장된 SseEmitter를 반환
}
/**
* 특정 emitterId와 연결된 마지막 이벤트 데이터를 저장합니다.
*
* @param emitterId SseEmitter와 연결된 고유 ID
* @param event 저장할 이벤트 데이터
*/
@Override
public void saveEventCache(String emitterId, Object event) {
eventCache.put(emitterId, event); // 이벤트 데이터를 캐시에 저장
}
/**
* 특정 userId로 시작하는 모든 SseEmitter를 반환합니다.
*
* @param userId 사용자 ID
* @return userId로 시작하는 SseEmitter가 포함된 Map
*/
@Override
public Map<String, SseEmitter> findAllEmitterStartWithById(String userId) {
// emitters Map에서 userId로 시작하는 키를 필터링하여 반환
return emitters.entrySet().stream()
.filter(entry -> entry.getKey().startsWith(userId)) // 키가 userId로 시작하는지 확인
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); // 필터링된 결과를 새로운 Map으로 변환
}
/**
* 모든 SseEmitter를 반환합니다.
*
* @return 저장된 모든 SseEmitter가 포함된 Map
*/
@Override
public Map<String, SseEmitter> getAllEmitters() {
return emitters; // emitters Map 전체 반환
}
/**
* 특정 ID와 연결된 SseEmitter를 삭제합니다.
*
* @param id 삭제할 SseEmitter의 ID
*/
@Override
public void deleteById(String id) {
emitters.remove(id); // emitters Map에서 해당 ID의 SseEmitter 제거
}
/**
* 특정 userId로 시작하는 모든 SseEmitter를 삭제합니다.
*
* @param userId 삭제할 사용자 ID
*/
@Override
public void deleteAllEmitterStartWithId(String userId) {
// emitters Map의 각 항목을 확인하여 userId로 시작하는 키를 삭제
emitters.forEach(
(key, emitter) -> {
if (key.startsWith(userId)) { // 키가 userId로 시작하는지 확인
emitters.remove(key); // 해당 키와 연결된 SseEmitter 삭제
}
}
);
}
/**
* 특정 userId로 시작하는 모든 이벤트 캐시를 삭제합니다.
*
* @param userId 삭제할 사용자 ID
*/
@Override
public void deleteAllEventCacheStartWithId(String userId) {
// eventCache Map의 각 항목을 확인하여 userId로 시작하는 키를 삭제
eventCache.forEach(
(key, emitter) -> {
if (key.startsWith(userId)) { // 키가 userId로 시작하는지 확인
eventCache.remove(key); // 해당 키와 연결된 이벤트 캐시 삭제
}
}
);
}
}
🤔 ConcurrentHashMap()
ConcurrentHashMap은 Java의 java.util.concurrent 패키지에 포함된 클래스로, 멀티스레드 환경에서 동시성 문제 해결과 성능을 유지하기 위해 설계된 스레드 안전한 해시맵 입니다.
🤔 emitters()
String 타입의 키와 SseEmitter 타입의 값으로, 사용자 별 SseEmitter 객체를 저장합니다.
private String makeTimeIncludeUserId(Long userId) {
return userId + "_" + System.currentTimeMillis();
}
구독을 요청하면, 위 id 값을 통해서 사용자별로 고유한 SseEmitte 객체를 생성합니다.
🤔 eventCache()
String 타입의 키와 Object 타입의 값으로, 앞서 생성되는 사용자별 고유한 키를 기반으로 구성됩니다. 이를 통해 특정 사용자에게 전송되지 못한 이벤트를 캐시에 저장하고, 이벤트 유실 시 이를 재전송하는 데 사용됩니다.
⚒️ Service
다음으로 서비스 단 입니다.
RabbitMQ의 Queue 이름은 지워두었습니다.
알림을 Emitter와 전송 전에 데이터베이스에 저장함으로서 영속성을 확보합니다.
@Service
@RequiredArgsConstructor
@Slf4j
public class NotificationService {
private final EmitterRepository emitterRepository;
private final NotificationRepository notificationRepository;
private final UserRepository userRepository;
// 연결 지속시간 (한시간)
private static final Long DEFAULT_TIMEOUT = 60L * 1000 * 1000;
@MethodDescription(description = "sse 구독을 진행합니다.")
public SseEmitter subscribe(CustomUserDetails customUserDetails, String lastEventId) {
Long userId = customUserDetails.getId();
// emitterUserId Create
String emitterId = makeTimeIncludeUserId(userId);
// SseEmitter 객체를 만들고 반환, key: id, value: SseEmitter
SseEmitter emitter = emitterRepository.save(emitterId, new SseEmitter(DEFAULT_TIMEOUT));
// 시간 초과, 비동기 요청이 되지 않으면 자동으로 삭제
emitter.onCompletion(() -> emitterRepository.deleteById(emitterId));
emitter.onTimeout(() -> emitterRepository.deleteById(emitterId));
// 503 오류 방지를 위한 더미데이터 전송
sendNotification(emitter, emitterId, "EventStream Created. [user = " + userId + " ]");
if (hasLostData(lastEventId)) {
sendLostData(lastEventId, userId, emitter);
}
return emitter;
}
@MethodDescription(description = "SSE 연결을 통해 클라이언트에게 데이터를 전송합니다. 실패 시 Emitter를 삭제합니다.")
private void sendNotification(SseEmitter emitter, String emitterId, Object data) {
try {
emitter.send(SseEmitter.event()
.name("notification")
.id(emitterId)
.data(data)
);
} catch (IOException e) {
emitterRepository.deleteById(emitterId);
throw new SseEmitterException("메시지 전송을 실패했습니다.");
}
}
@MethodDescription(description = "사용자의 ID와 현재 시간을 조합하여 고유한 emitter ID를 생성합니다.")
private String makeTimeIncludeUserId(Long userId) {
return userId + "_" + System.currentTimeMillis();
}
@MethodDescription(description = "Last-Event-Id 가 존재하는 지 확인합니다.")
private Boolean hasLostData(String lastEventId) {
return !lastEventId.isEmpty();
}
@MethodDescription(description = "마지막으로 수신한 이벤트 ID 이후의 데이터를 검색하고, SSE 연결을 통해 클라이언트로 전송합니다.")
private void sendLostData(String lastEventId, Long userId, SseEmitter emitter) {
Map<String, SseEmitter> eventCaches = emitterRepository.findAllEmitterStartWithById(String.valueOf(userId));
eventCaches.entrySet().stream()
.filter(entry -> lastEventId.compareTo(entry.getKey()) < 0)
.forEach(entry -> sendNotification(emitter, entry.getKey(), entry.getValue()));
}
@MethodDescription(description = "특정 사용자들에게 알림을 전송합니다.")
@Async
public void send(List<String> toName, NotificationType type, NotificationStatus status, String title, String content, String fromName, String url) {
List<User> receivers = toName.stream()
.map(userRepository::findByUsername)
.flatMap(Optional::stream)
.toList();
receivers.forEach(receiver -> {
Notification noti = notificationRepository.save(createNotification(receiver, type, status, title, content, url, fromName));
Map<String, SseEmitter> sseEmitters = emitterRepository.findAllEmitterStartWithById(String.valueOf(receiver.getId()));
sseEmitters.forEach(
(key, emitter) -> {
emitterRepository.saveEventCache(key, noti);
sendNotification(emitter, key, NotificationResponse.from(noti));
}
);
});
}
@MethodDescription(description = "알림 객체를 생성합니다.")
public Notification createNotification(User receiver, NotificationType type, NotificationStatus status, String title, String content, String url, String fromName) {
return Notification.builder()
.receiver(receiver)
.type(type)
.status(status)
.title(title)
.content(content)
.url(url)
.fromName(fromName)
.build();
}
@MethodDescription(description = "RabbitMQ Consumer 를 통해서 메시지를 받습니다.")
@RabbitListener(queues = "")
public void consumeNotificationMessage(NotificationMessage message) {
try {
log.info("Received message: {}", message);
send(message.getReceiverNames(),
message.getType(),
message.getStatus(),
message.getTitle(),
message.getContent(),
message.getFromName(),
message.getUrl()
);
} catch (Exception e) {
log.error("Error deserializing message from RabbitMQ: {}", e.getMessage());
throw new RabbitMqException("RabbitMQ 메시지 역질렬화 중 에러가 발생했습니다.");
}
}
@MethodDescription(description = "알림의 상태를 바꿉니다.")
@Transactional
public ApiResponseMessage processNotificationStatus(CustomUserDetails customUserDetails, List<Long> ids, NotificationStatus status) {
Long userId = customUserDetails.getId();
try {
int updatedCount = notificationRepository.updateNotificationStatus(userId, ids, status);
if (updatedCount == 0) {
throw new InvalidResourceException(ResultCode.ALREADY_EXIST_NOTIFICATION);
}
return ApiResponseMessage.of("알림 상태가 정상적으로 업데이트 되었습니다.");
} catch (Exception e) {
log.error("Error updating notification status: {}", e.getMessage());
throw new InvalidResourceException(ResultCode.ALREADY_EXIST_NOTIFICATION);
}
}
@MethodDescription(description = "사용자가 삭제하지 않은 모든 알림을 조회합니다.")
@Transactional(readOnly = true)
public List<NotificationResponse> readNotifications(CustomUserDetails customUserDetails) {
String username = customUserDetails.getUsername();
User user = userRepository.findByUsername(username).orElseThrow(() -> new UserNotFoundException("유저가 존재하지 않습니다."));
List<Notification> notifications = notificationRepository.findByReceiverId(user.getId());
return notifications.stream()
.map(NotificationResponse::from)
.collect(Collectors.toList());
}
}
- Emitter ID 생성: 사용자 ID와 현재 시간을 결합하여 고유한 Emitter ID를 생성합니다.
- SseEmitter 객체 생성: 생성된 Emitter ID를 키로 하여 SseEmitter 객체를 생성하고 이를 저장합니다.
- 연결 안정성 유지: 연결이 정상적으로 완료되거나 시간 초과가 발생하면 Emitter를 자동으로 삭제합니다.
- 더미 데이터 전송: 연결 초기화 시 503 오류 방지를 위해 더미 데이터를 전송합니다.
- 유실 데이터 처리: 이전 이벤트(Last-Event-Id)를 기반으로 유실된 데이터가 있는지 확인하고, 유실된 데이터가 있다면 이를 재전송합니다.
- 실시간 이벤트 전송: RabbitMQ로부터 전달받은 데이터를 NotificationResponse DTO를 통해 sendNotification 메서드로 클라이언트에게 실시간으로 이벤트를 전송합니다.
⚒️ RabbitMq 데이터 전송
지정된 routingKey 로 데이터를 전송합니다.
@MethodDescription(description = "메시지를 RabbitMQ Queue 에 발행합니다.")
private void sendNotification(String toName, String title, String content) {
String routingKey = "";
NotificationMessage message = NotificationMessage.builder()
.receiverNames(List.of(toName))
.type(NotificationType.ADMIN)
.status(NotificationStatus.PENDING)
.title(title)
.content(content)
.fromName("ADMIN")
.url("")
.build();
try {
notificationProducer.sendNotification(routingKey, message);
} catch (Exception e) {
log.error("Failed to send message to RabbitMQ", e);
}
}
💭 POSTMAN SSE 연결
헤더에 Accept를 text/event-stream으로 지정하고, GET 요청을 보냅니다.
지정된 eventName 이 보이면서 SSE가 연결된 것을 볼 수 있습니다.
해당 유저에게 알림을 전송하면 아래와 같이 실시간으로 JSON 형식의 알림을 받아오는 것을 볼 수 있습니다.
마치며
기존의 SSE와 kafka 를 이용한 알림 기능을 구현해본 적이 있습니다.
rabbitMQ도 거의 유사하게 동작하여서 코드를 작성하는 데 큰 어려움은 없었습니다만 ,,,,,,,,,,,
SSE 연결을 진행하면서 발생한 트러블 슈팅이 있었습니다. 아래 링크는 해당 트러블 슈팅에 대한 블로그입니다.
https://ryudain.tistory.com/36
[단잠] 트러블슈팅: SSE(Server-Sent-Events) 첫 연결 이후 바로 Connection closed가 되는 현상
💭문제상황SSE 연결 시, 첫 연결 더미데이터 전송 이후에 Connection이 바로 closed 되는 현상이 발생했습니다. 💭원인 분석처음엔 RabbitMq 로 알림 데이터를 받아오고 있기 때문에 JSON 파싱 문제인
ryudain.tistory.com
-
예전에 구현할 땐 동작 방식에 대해서 제대로 이해하지 못하고 한 것 같은 데, 다시한번 차근차근 복기하면서 코드를 구현하다보니 전보다 많은 부분이 이해가 됐던 것 같습니다. ( emitterRepository를 구현하는 방식에서 ConcureentHashMap()을 사용하는 이유 등등)
항상 기능을 구현하기 급급하다보니 이러한 시간들에서 정말 많은 배움을 얻고 가는 거 같습니다.
앞으로도 해왔던 걸 다시한번 복기하며, 차근차근 저의 것으로 만드는 배움의 시간들을 많이 기르고 싶습니다.
포스팅 읽어주셔서 감사합니다.
REPERENCE
[SpringBoot] SSE를 이용한 실시간 알림 구현하기!
SSE를 이용한 실시간 알림을 구현해 보자 들어가며팀 프로젝트를 진행하던 중, 댓글이나 좋아요가 달렸을 때 사용자에게 실시간으로 알림을 제공하는 기능의 필요성을 느끼게 되었습니다. 인
back-stead.tistory.com
[Project] SSE(Servcer-Sent-Events)로 실시간 알림 기능 구현하기 !
1\. Server-Sent Events (SSE) 프로토콜 지원SseEmitter는 Spring에서 SSE 프로토콜을 지원하기 위한 클래스이므로이를 통해 실시간으로 업데이트되는 데이터나 알림과 같은 이벤트를 클라이언트에게 전달
velog.io
[Spring] kafka와 SSE(Server Send Event)를 사용한 실시간 알림 전송 방법
Kafka 선택 이유알림 전송에 Kafka를 도입한 이유는 다음과 같다. 필자의 과일샵 쇼핑몰 프로젝트는 아래 처럼 회원 서버와 어드민 서버로 나뉘어져 있다.두 서버가 DB는 공유하고 있지만 ,어드민
hstory0208.tistory.com
'개발 > 프로젝트' 카테고리의 다른 글
[단잠] 팀 컨벤션을 위한 Wiki 스타일 지정 (3) | 2024.12.17 |
---|---|
[단잠] 모놀리식 아키텍처에서 MSA 로 마이그레이션 과정 [1편] (4) | 2024.12.05 |
[단잠] 트러블슈팅: SSE(Server-Sent-Events) 첫 연결 이후 바로 Connection closed가 되는 현상 (0) | 2024.11.26 |
[단잠] 알림 도입기 (feat. AMQP, RabbitMQ, SSE) [1편] (4) | 2024.11.19 |
[단잠] 대학교 기숙사 메이트 매칭 및 대학 생활 커뮤니티 서비스 회고 (2) | 2024.11.14 |