새소식

RabbitMQ

RabbitMQ를 적용하며 발생한 풍부한 버그와 다양한 삽질의 향연

  • -

 

안녕하세요

 

이번 추석에 레빗 엠큐를 딥하게는 아니고 아주 살짝 맛만 봤습니다

 

'RabbitMQ' 카테고리의 글 목록

 

dockerel.tistory.com

 

근데 레빗 엠큐 이거 좀 맛도리더라구요

 

제일 맘에 드는건 데드레터 처리하고 자동 재시도 <- 이것들이 진짜 군침 싹 포인트였습니다

 

그래서 제가 한 프로젝트 중에 이걸 어디에 한번 적용해보아야 할까 생각하다가 Redis Pub/Sub 기반으로 구현했던 실시간 알림 기능에 한번 도입을 해봤습니다

 

근데 이걸 왜 함?

 

사실 그 당시에는 레디스를 인증 파트에서 refresh token 관리를 위해 사용하고 있었기에 다른 메시지 큐와 같은 새로운 기술 스택을 도입하지 않아도 레디스 선에서 커버칠 수 있다는 생각에 도입했습니다

 

근데 문제는 그렇게 발생했습니다

 

웹 서버를 배포할 때 다중 인스턴스로 배포해버린 겁니다

 

이러면 문제가

  • 기존에 실시간 알림 전달은 SSE 연결을 통해 클라이언트에 실시간으로 넘겨주고 있었음
  • 근데 이 클라이언트와의 SSE 연결은 웹서버측에서 ConcurrentMap으로 관리하고 있음
  • 클라이언트1과의 연결은 서버1에서 관리되고 있는데 레디스 pub/sub에서 발행된 메시지를 서버2에서 소비해버릴 수 있음

결론적으로 메시지가 제대로 처리되지 않고 제대로 전송될 수 있는 SSE 연결을 33% 확률로 죽음의 삼지선다를 하고 있던 겁니다

 

그거 외에도 레디스 pub/sub 특성상 실패 메시지에 대한 보관, 재처리와 데드레터 관리가 불가능합니다

아 그 물론 직접 구현하면 안되는건 없지만 기본 기능으로 없다는 소리입니다

 

그래서 이번에 배운 레빗 엠큐를 제 작고 소중한 알림 시스템에 적용해보기로 했습니다

 

밑밥까는게 좀 길었네요

레지고

동적 큐 이름 설정하기

우선 메시지의 흐름을 생각해봅시다

1. 알림 발행 이벤트 발생

2. 알림 객체 RDB에 저장 (Direct Exchange, 단일 큐 바인딩, 실패 시 데드레터큐로 전송, 성공 시 알림 전송)

3. 알림 전송 (Fanout Exchange, 인스턴스마다 큐 1개씩 바인딩)

 

인데, 저는 저장과 전송이 순차적으로 이루어지게 구현했습니다. 그 이유는

  • 실시간 알림이긴 한데 빨리 전달되는거보다 RDB에 저장되는게 더 중요함
    • 나중에 다시 접속했을 때 이전 알림들도 다 보여줘야 하기 때문
    • 그리고 뭐 친구가 댓글 달았다 이런 알림이 1분 1초 더 빨리 전송해야 할 필요가 없다고 생각함

그래서 저장 안 된 알림들은 싹 다 데드레터로 처리하고 슬랙 메시지로 무슨 오류가 나서 저장 안된건지 보내줄 예정입니다

@Configuration
public class RabbitMQConfig {

    public static final String SAVE_NOTIFICATION_QUEUE = "saveNotificationQueue";
    public static final String SAVE_NOTIFICATION_EXCHANGE = "saveNotificationExchange";

    public static final String PUBLISH_NOTIFICATION_QUEUE = "publishNotificationQueue";
    public static final String PUBLISH_NOTIFICATION_EXCHANGE = "publishNotificationExchange";

    public static final String DEAD_LETTER_QUEUE = "deadLetterQueue";
    public static final String DEAD_LETTER_EXCHANGE = "deadLetterExchange";
    
    ...

 

우선 이렇게 큐에 대해 이름과 익스체인지 이름들을 설정해줬습니다

    // 알림 저장 큐
    @Bean
    public Queue saveNotificationQueue() {
        return QueueBuilder.durable(SAVE_NOTIFICATION_QUEUE)
                .withArgument("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE)
                .withArgument("x-dead-letter-routing-key", DEAD_LETTER_QUEUE)
                .build();
    }

    @Bean
    public DirectExchange saveNotificationExchange() {
        return new DirectExchange(SAVE_NOTIFICATION_EXCHANGE);
    }

    @Bean
    public Binding saveNotificationBinding(
            Queue saveNotificationQueue,
            DirectExchange saveNotificationExchange
    ) {
        return BindingBuilder.bind(saveNotificationQueue).to(saveNotificationExchange).with(SAVE_NOTIFICATION_QUEUE);
    }

    // 알림 저장 데드레터 큐
    @Bean
    public Queue deadLetterQueue() {
        return new Queue(DEAD_LETTER_QUEUE);
    }

    @Bean
    public DirectExchange deadLetterExchange() {
        return new DirectExchange(DEAD_LETTER_EXCHANGE);
    }

    @Bean
    public Binding deadLetterBinding(
            Queue deadLetterQueue,
            DirectExchange deadLetterExchange
    ) {
        return BindingBuilder.bind(deadLetterQueue).to(deadLetterExchange).with(DEAD_LETTER_QUEUE);
    }

 

그리고 알림 저장 큐와 알림 저장 데드레터 큐를 설정해줬습니다

이러면 단일 큐로 생성되서 여러 인스턴스가 생성되어도 한번만 소비되기 때문에 한번만 저장됨이 보장됩니다

    // 알림 발송 큐
    @Bean
    public String dynamicPublishNotificationQueueName() {
        String randomString = UUID.randomUUID().toString();
        return PUBLISH_NOTIFICATION_QUEUE + " : " + randomString;
    }

    @Bean
    public Queue publishNotificationQueue(@Qualifier("dynamicPublishNotificationQueueName") String queueName) {
        return new Queue(queueName, false);
    }

    @Bean
    public FanoutExchange publishNotificationExchange() {
        return new FanoutExchange(PUBLISH_NOTIFICATION_EXCHANGE);
    }

    @Bean
    public Binding publishNotificationBinding(
            Queue publishNotificationQueue,
            FanoutExchange publishNotificationExchange
    ) {
        return BindingBuilder.bind(publishNotificationQueue).to(publishNotificationExchange);
    }

 

그 다음은 알림 발송 큐인데요, 앞에서 제가 레디스 pub/sub 사용 시 한 인스턴스에서만 메시지를 소비하도록 되어 있어 타겟 sse 연결이 없는 곳에서 메시지가 소비될 시 제대로 알림 전송이 되지 않을 수 있다고 했죠?

 

그래서 인스턴스마다 기본 큐 이름 뒤에 랜덤 문자열을 추가시켜 동적으로 큐가 생성되도록 구현했습니다

그리고 이걸 @Qualifier를 통해 주입받은 다음 알림 전송용 fanout exchange에 바인딩 시켜주고 해당 exchange에 전송하면 모든 인스턴스에 한번씩 메시지가 다 전송이 되고 대상 sse 연결이 있는 인스턴스에서 소비되겠죠?

 

자.. 그럼 여기서 의문이 생깁니다

direct exchange에 바인딩한 알림 저장 큐는

    @RabbitListener(queues = RabbitMQConfig.SAVE_NOTIFICATION_QUEUE)
    public void consumeSaveNotificationMessage(PublishNotificationRequest request) {
        // 알림 저장
        Notification notification = notificationService.saveNotification(request);

        // 알림 저장 후 알림 발행
        rabbitTemplate.convertAndSend(RabbitMQConfig.PUBLISH_NOTIFICATION_EXCHANGE, "", notification);
    }

 

이렇게 listen 할 수 있습니다

위에서 말한대로 알림 저장이 성공하면 알림 발행, 실패하면 데드레터 큐로 가게되는 방식입니다

    @RabbitListener(queues = RabbitMQConfig.DEAD_LETTER_QUEUE)
    public void consumeDeadLetterNotificationMessage(PublishNotificationRequest request) {
        slackPublisher.publishSlackMessage(request);
    }

 

물론 데드레터 큐에 대해서도 listen하고 있고 어떤 알림 발행 요청에 대해 문제가 발생했는지를 넘겨줬습니다

 

어.. 그럼 뭐가 문제냐고요?

 

각 인스턴스에서 어떤 큐를 listen할지 어떻게 아나요?

물론 그 아까 랜덤 문자열 붙혀서 만들어 놓은건 아는데 어떻게 그걸 저 @RabbitListener에 박아넣죠?

 

정답은 SpEL에 있었습니다

SpEL

SpEL(Spring Expression Language)은 Spring 프레임워크에서 제공하는 표현식 언어로, 객체의 값을 런타임 시점에 동적으로 평가하거나 조작할 수 있는 기능을 제공합니다

 

8. Spring Expression Language (SpEL)

This section introduces the simple use of SpEL interfaces and its expression language. The complete language reference can be found in the section Language Reference. The following code introduces the SpEL API to evaluate the literal string expression 'Hel

docs.spring.io

 

 

즉 우리는 런타임 시점에 동적으로 만들어지는 큐 이름을 받아와야 하고 이걸 정확하게 지원하는 SpEL 문법이 있습니다

 

SpEL에서 #{} 사용하면 해당 애플리케이션 컨텍스트에 등록된 스프링 빈을 참조할 수 있습니다

    @RabbitListener(queues = "#{@dynamicPublishNotificationQueueName}")
    public void consumePublishNotificationMessage(Notification notification) {
        notificationService.publishNotification(notification);
    }

 

저는 이렇게 사용했는데요, 이러면 현재 등록된 빈 중에서 dynamicPublishNotificationQueueName를 찾아와서 적용해줄겁니다

 

어쨌든 @RabbitLister 사용시 queues에는 컴파일 타임 상수 또는 SpEL 표현식만 허용한다고 하네요

그래서 전 이렇게 해결했습니다

 

그리고 그 다음 문제가 발생했습니다

Retry 설정 미적용

분명히 강의에서는 별다른 구현 없이도 이렇게

spring.rabbitmq.listener.simple.retry.enabled=true
spring.rabbitmq.listener.simple.retry.initial-interval=1000
spring.rabbitmq.listener.simple.retry.max-attempts=3
spring.rabbitmq.listener.simple.retry.max-interval=1000

spring.rabbitmq.listener.simple.default-requeue-rejected=false

 

retry 관련 프로퍼티만 작성해주면 자동으로 재시도가 적용된다고 했는데, 직접 예외를 일으켜서 로그를 찍어보아도 재시도를 안하고 바로 데드레터 큐로 보내버리고 있었습니다

 

왜이런거임 대체;;

이거때매 한 3시간 와리가리 치다가 갑자기 실마리를 찾았습니다

 

챕터18 retry yml 강의 부분 질문 - 인프런 | 커뮤니티 질문&답변

누구나 함께하는 인프런 커뮤니티. 모르면 묻고, 해답을 찾아보세요.

www.inflearn.com

 

에서

@RabbitListener를 사용하면 내부적으로 SimpleMessageListenerContainer가 자동으로 생성되기 때문에 retry 설정을 읽어서 exception 이 발생할 경우 RetryTemplate을 사용해서 자동으로 설정된 속성에 해당하는 작업을 수행하게 됩니다.

 

에... 그러니까

 

내가 정의한 여기에서 SimpleRabbitListenerContainerFactory를 타고 들어가면

 

어?

 

잡았습니다 요놈

 

정리하자면

SimpleRabbitListenerContainerFactory 를 수동으로 정의하고 RabbitListenerContainerFactoryConfigurer.configure 를 별도로 핸들링하지 않으면 Spring boot의 property가 작동하지 않는다고 합니다

 

아... 아니 진짜 이거 때매;;;

 

그래서 저는 왜 SimpleRabbitListenerContainerFactory <- 임마를 수동으로 굳이 설정해서 이 쌩고생을 한거냐고요?

 

왜냐하면 메시지를 전송할 때 String으로 안보내고 객체 자체로 왔다갔다 시켰거든요

그래서 따로

    // 직렬화, 역직렬화 설정
    @Bean
    public MessageConverter messageConverter() {
        return new Jackson2JsonMessageConverter();
    }

    @Bean
    public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        rabbitTemplate.setMessageConverter(messageConverter());
        return rabbitTemplate;
    }

    @Bean
    public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(
            ConnectionFactory connectionFactory
    ) {
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();

        factory.setConnectionFactory(connectionFactory);
        factory.setMessageConverter(messageConverter());

        return factory;
    }

 

이렇게 직렬화, 역직렬화에 대해 따로 messageConverter를 등록해줬습니다

 

그리고 테스트 해보면?

retry 잘되는 모습을 보실 수 있을겁니다!

 

예~~~~~~~

 

넵 마치겠습니다

'RabbitMQ' 카테고리의 다른 글

RabbitMQ에서 Transaction 처리는?  (0) 2025.10.09
DeadLetterQueue와 Retry로 재처리하기 참 쉽죠?  (0) 2025.10.08
RabbitMQ 코드로 몸통박치기  (0) 2025.10.07
RabbitMQ 시작하기  (0) 2025.10.06
Contents

포스팅 주소를 복사했습니다

이 글이 도움이 되었다면 공감 부탁드립니다.