현재 개발 중인 서비스에서 댓글이나 좋아요 등 전체적인 알림 기능 구현을 맡았다.
예전에 했던 서비스에서는 단순한 생각으로 전형적인 클라이언트-서버 구조의 요청, 응답을 통해 알림 테이블에 유저마다 알림을 DB에 저장하고, 알림 페이지를 조회하는 api를 또 개발하고..
이런 방식으로 개발했었다..!
이번에 마침 기회가 되어 실시간 알림 기능을 구현하기로 했다.
이런 방식으로 개발했을 때 제일 큰 문제는, 클라이언트의 요청이 있어야만 서버가 응답을 할 수 있다는 것이다.
그래서 도입될 수 있던 개발 방식들이,
1. 클라이언트가 설정한 주기마다 서버로 요청을 보내는 Short Polling - 지나친 요청으로 인한 불필요한 서버의 부하가 단점
2. 클라이언트가 요청을 보내면 서버는 대기하였다가 요청한 데이터가 업데이트 되면 응답하는 Long Polling - 1번과 같은 단점
3. 클라이언트가 서버를 구독(연결)하면, 서버는 변동사항이 생길 때마다 데이터를 전송해주는 Server-Sent-Events - 연결 수 최대 제한이 있다는 단점
4. 클라이언트와 서버가 양방향 통신 가능한 웹 소켓 - 초기 세팅 및 구현에 많은 투자가 필요하며 트래픽 양이 많아진다면 서버에 큰 부담이 됨
이처럼 다양한 방식이 있지만, 이번 실시간 알림에 적용하기에는 불필요한 서버 부하를 야기하는 1번과 2번을 제외하고 웹 소켓과 SSE를 비교하자면 리소스 및 비용 대비, 그리고 채팅과 같이 클라이언트에서 데이터 전송이 필요없는 기능이기 때문에 SSE 방식을 선택했다.
@Operation(
summary = "알림 구독",
description = "알림을 위한 SSE 구독"
)
@GetMapping(value = "/subscribe", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public SseEmitter subscribe(
@Parameter(hidden = true) @LoginMember Member loginMember,
@RequestHeader(value = "Last-Event-ID", required = false, defaultValue = "") String lastEventId
) {
return notificationService.subscribe(loginMember, lastEventId);
}
먼저 연결이다.
SSE를 연결하기 위해서는 text/event-stream 타입으로 설정해서 서버가 클라이언트에게 이벤트 스트림을 전송한다는 것을 명시해야 한다. 클라이언트는 이 타입의 응답을 통해 서버가 이벤트 스트림을 전송할 준비가 되어있음을 인지하고 서버로부터 데이터를 전달 받을 수 있게 된다.
그리고 Last-Event-ID(SSE 연결이 끊어졌을 경우, 클라이언트가 수신한 마지막 이벤트 ID)를 받는 이유는, 만약 해당 사용자에 대한 미수신 알림이 있다면 보내주기 위함이다. -> 중단된 연결에서 이벤트 전송을 재개할 수 있도록 함
서비스 레이어
@Service
@RequiredArgsConstructor
public class NotificationService {
private static final Long DEFAULT_TIMEOUT = 60L * 1000 * 60;
private final EmitterRepository emitterRepository;
public SseEmitter subscribe(Member member, String lastEventId) {
Long memberId = member.getId();
String emitterId = makeTimeIncludeId(memberId);
SseEmitter emitter = emitterRepository.save(emitterId, new SseEmitter(DEFAULT_TIMEOUT));
// Emitter가 완료될 때(모든 데이터가 성공적으로 전송된 상태) Emitter를 삭제한다.
emitter.onCompletion(() -> emitterRepository.deleteById(emitterId));
// Emitter가 타임아웃 되었을 때(지정된 시간동안 어떠한 이벤트도 전송되지 않았을 때) Emitter를 삭제한다.
emitter.onTimeout(() -> {
emitter.complete();
emitterRepository.deleteById(emitterId);
});
// 첫 연결 시 503 Service Unavailable 방지용 더미 Event 전송
sendToClient(emitter, emitterId, "알림 서버 연결 성공. [memberId=" + memberId + "]");
if (!lastEventId.isEmpty()) {
sendLostData(lastEventId, memberId, emitter);
}
return emitter;
}
private String makeTimeIncludeId(Long memberId) { // (3)
return memberId + "_" + System.currentTimeMillis();
}
private void sendToClient(SseEmitter emitter, String emitterId, Object data) {
try {
emitter.send(SseEmitter.event()
.id(emitterId)
.name("sse")
.data(data)
);
} catch (IOException exception) {
emitterRepository.deleteById(emitterId);
emitter.completeWithError(exception);
}
}
private void sendLostData(String lastEventId, Long memberId, SseEmitter emitter) {
// eventCache에서 해당 memberId로 시작하는 이벤트들 가져오기
Map<String, Object> eventCaches = emitterRepository.findAllEventCacheStartWithByMemberId(String.valueOf(memberId));
// lastEventId 이후의 이벤트만 필터링
eventCaches.entrySet().stream()
.filter(entry -> lastEventId.compareTo(entry.getKey()) < 0) // lastEventId 이후의 이벤트만
.forEach(entry -> sendToClient(emitter, entry.getKey(), entry.getValue())); // 필터링된 이벤트를 클라이언트로 전송
}
}
- memberId 값을 통해 이벤트(알림) 데이터를 구분합니다.
- 최초 연결 시 첫 더미데이터를 생성 및 보내지 않으면, 503 에러가 발생하여 연결이 실패합니다...!
Repo
@Repository
@RequiredArgsConstructor
public class EmitterRepository {
private final Map<String, SseEmitter> emitters = new ConcurrentHashMap<>();
private final Map<String, Object> eventCache = new ConcurrentHashMap<>();
public SseEmitter save(String emitterId, SseEmitter sseEmitter) {
emitters.put(emitterId, sseEmitter);
return sseEmitter;
}
public void deleteById(String emitterId) {
emitters.remove(emitterId);
}
public Map<String, Object> findAllEventCacheStartWithByMemberId(String memberId) {
return eventCache.entrySet().stream()
.filter(entry -> entry.getKey().startsWith(memberId)) // memberId로 시작하는 이벤트들 필터링
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); // 필터링된 이벤트를 Map으로 반환
}
}
- findAllEventCacheStartWithByMemberId() -> 사용자 ID로 시작하는 캐시 데이터를 해당 map에서 찾아 반환합니다.
- HashMap은 멀티스레드 환경에서 동시에 수정을 시도하는 경우 예상하지 못한 결과가 발생할 수 있습니다. 멀티 스레드 환경에서 컬렉션을 안전하게 사용하기 위해 java에서는 concurrent 패키지를 제공합니다.
-> ConcurrentHashMap을 사용하면 thread-safe가 보장됩니다.
SSE 연결 테스트 - POSTMAN
위와 같이 디폴트 Accept 요청은 비활성화 후, 새롭게 헤더에 Accept를 Key에 넣고 Value에 text/event-stream을 넣어 요청합니다.
정상적으로 작동되면 최초 더미 데이터와 함께 연결이 성공적으로 이뤄진 것을 확인할 수 있습니다.
추가로, 포스트맨에서는 위와 같이 Close 버튼을 통해 연결을 끊을 수도 있네요!
Close 후
연결 끝
현재 엔드포인트가 존재하는 기능은,
1. admin 유저가 직접 api를 호출하는 전체 사용자 알림 전송 (ex. 공지사항, 상품 추천(광고)) -> NotificationType에 따라 달라짐
2. 사용자의 모든 알림 목록 조회
3. 사용자가 클릭한 알림 읽음 처리
4. 사용자의 모든 알림을 읽음 처리
5. 사용자가 선택한 알림 삭제
6. 사용자의 모든 알림 삭제
이렇게 되어있다. 이는 모두 NotificationRepository의 메소드를 이용해서 구현한다!
모두 비슷한 방식이기 때문에 예시로 사용자의 모든 알림 목록 조회만 기록하려고 한다.
Controller
@Operation(
summary = "알림 목록 조회",
description = "사용자의 모든 알림 메시지를 조회합니다."
)
@GetMapping()
public ResponseEntity<CommonResponse<NotificationListResponse>> getNotifications(
@Parameter(hidden = true) @LoginMember Member loginMember,
@Valid NotificationListRequest request
) {
return CommonResponse.success(SuccessCode.SUCCESS, notificationService.getNotifications(loginMember, request));
}
Service
@Transactional(readOnly = true)
public NotificationListResponse getNotifications(Member member, NotificationListRequest request) {
Slice<NotificationSummary> notificationList =
notificationReader.getAllByMemberId(member.getId(), request.lastNotificationId(), request.pageSize());
return new NotificationListResponse(notificationList);
}
Reader (현재 서비스에서는 Reader와 Writer로 구분하여 구조를 갖는다.)
public Slice<NotificationSummary> getAllByMemberId(
Long memberId, Long lastNotificationId, int pageSize) {
return notificationQueryRepository.getAllByMemberId(memberId, lastNotificationId, pageSize);
}
QueryRepository
public Slice<NotificationSummary> getAllByMemberId(Long memberId, Long lastNotificationId, int pageSize) {
List<NotificationSummary> notificationSummaryList = jpaQueryFactory
.selectDistinct(
Projections.constructor(
NotificationSummary.class,
notification.id,
notification.relatedUrl,
notification.content,
notification.notificationType,
notification.isRead,
notification.createdAt
)
)
.from(notification)
.where(
notification.receiver.id.eq(memberId),
noOffsetByNotificationId(notification, lastNotificationId)
)
.orderBy(notification.id.desc())
.limit(pageSize + 1L)
.fetch();
boolean hasNext = notificationSummaryList.size() > pageSize;
if (hasNext) {
notificationSummaryList.remove(pageSize);
}
return new SliceImpl<>(notificationSummaryList, PageRequest.ofSize(pageSize), hasNext);
}
보이는 그대로 DTO 조회이고, 알림을 받는 사용자 ID와 로그인한 사용자의 ID를 비교하여 조회시킨다.
api로 분리되는 기능 외에 알림이 실행되는 로직은 댓글 작성, 좋아요 등등 여러 곳에 존재한다.
그 중 게시글에 댓글 작성 api를 호출했을 때 실행되는 알림 로직을 기록하려고 한다.
* 우리 서비스에서는 DailyLife가 곧 -> Post 이다.
Controller
@Operation(
summary = "일상생활 댓글 작성",
description = "전통주 일상생활 게시글에 댓글을 작성한다."
)
@PostMapping("/{dailyLifeId}/comments")
public ResponseEntity<CommonResponse<WriteDailyLifeCommentResponse>> writeComment(
@Parameter(hidden = true) @LoginMember Member loginMember,
@Valid @RequestBody WriteDailyLifeCommentRequest request,
@PathVariable Long dailyLifeId
) {
return CommonResponse.success(SuccessCode.SUCCESS_INSERT, dailyLifeService.writeComment(loginMember, request, dailyLifeId));
}
Service
@Transactional
public WriteDailyLifeCommentResponse writeComment(
Member member,
WriteDailyLifeCommentRequest request,
Long dailyLifeId
) {
DailyLife dailyLife = getDailyLife(dailyLifeId);
DailyLifeComment dailyLifeComment = createCommentOrReply(request, member, dailyLife);
DailyLifeComment comment = dailyLifeCommentWriter.store(dailyLifeComment);
String notificationRelatedUrl = getRelatedUrl(dailyLifeId);
String notificationMessage;
if (Objects.isNull(request.parentCommentId())) {
notificationMessage = member.getNickname() + "님이 내 게시물에 댓글을 남겼어요.";
notificationService.sendCommentNotification(dailyLife.getMember(), notificationRelatedUrl, notificationMessage, comment.getId());
} else {
notificationMessage = member.getNickname() + "님이 내 댓글에 답글을 남겼어요.";
notificationService.sendCommentNotification(comment.getMember(), notificationRelatedUrl, notificationMessage, comment.getId());
}
return new WriteDailyLifeCommentResponse(
comment.getContent(),
dailyLife.getId(),
new MemberInfo(member.getId(), member.getNickname(), member.getProfileImage()));
}
우리 댓글 엔티티는, Parent가 존재하는지에 따라 댓글과 답글로 구분한다.
이에 따라 메세지 내용도 달라진다.
sendCommentNotification 메소드
@Transactional
public void sendCommentNotification(Member author, String relatedUrl, String message, Long commentId) {
Notification notification = notificationWriter.save(
Notification.createWithCommentId(author, NotificationType.COMMENT, message, relatedUrl, commentId)
);
sendNotification(author, notification);
}
sendNotification 메소드
private void sendNotification(Member author, Notification notification) {
Long receiverId = author.getId();
String eventId = makeTimeIncludeId(receiverId);
Map<String, SseEmitter> emitters = emitterRepository.findAllEmitterStartWithByMemberId(String.valueOf(receiverId));
emitters.forEach(
(key, emitter) -> {
emitterRepository.saveEventCache(key, notification);
sendToClient(emitter, eventId,
new NotificationSummary(
notification.getId(),
notification.getRelatedUrl(),
notification.getContent(),
notification.getNotificationType(),
notification.isRead(),
notification.getCreatedAt()
));
}
);
}
좋아요와 같은 기능은 commentId가 Null인 상태로 Notification이 생성되기 때문에 댓글 작성 알림과 로직을 분리해줬다.
Notification에 commentId가 있는 이유는,
사용자가 댓글을 작성해서 알림을 보냈다가 댓글을 삭제하려는 과정에서 아래 메소드를 호출하는데,
emitterRepository.deleteEventCache(author.getId() + "_" + relatedUrl);
이렇게 되면 3개의 댓글을 작성했다가 1개만 지우려고 해도 알림은 3개가 모두 삭제된다.
이는 commentId에 따른 구분값이 없어서 일어난 문제이므로, commentId를 붙여서 아래와 같이 변경하여 문제를 해결했다.
emitterRepository.deleteEventCache(author.getId() + "_" + relatedUrl + "_" + commentId);
이후에 배포 서버에 적용하면서, Nginx 및 HTTP 1.0, 1.1 버전에 대한 문제점을 수정하면서 추가로 기록할 예정이다!
'대충 넘어가지 않는 습관을 위한 기록' 카테고리의 다른 글
API 요청 방식 변경 : RestTemplate -> FeignClient (6) | 2024.11.12 |
---|---|
Gemini api pro 연동 기록 - RestTemplate (0) | 2024.11.12 |
no-offset 적용 기록 (1) | 2024.08.19 |
ArgumentResolver 응용 기록 (0) | 2024.08.13 |
GitHub Actions 워크플로우 이벤트 수동 전환 (0) | 2024.07.18 |