새소식

RabbitMQ

DeadLetterQueue와 Retry로 재처리하기 참 쉽죠?

  • -

 

메시지들이 발행되고 ready에서 제대로 처리되지 못한 경우 unacked 상태가 되는데 만약 클라이언트에서 메시지 처리를 성공한 경우 ack 그 외에는 nack 혹은 reject 상태가 됩니다.

 

이렇게 메시지가 nack, reject 상태가 되거나 ttl 만료, 길이 초과 그리고 큐 설정 초과 등 특정 Dead Letter 조건이 충족한 경우 DLQ(Dead Letter Queue)로 이동됩니다.

@Bean
public Queue orderQueue() {
    return QueueBuilder.durable(MAIN_QUEUE)
        .withArgument("x-dead-letter-exchange", DLX)
        .withArgument("x-dead-letter-routing-key", DLQ)
        .build();
}

 

위와 같이 DLX(Dead Letter Exchange)를 설정하여 문제 메시지들을 DLQ로 이동시킨 후 에러의 원인을 분석하고 재처리를 시도할 수 있습니다.

@Bean
public Queue deadLetterQueue() {
    return new Queue(DLQ);
}

@Bean
public TopicExchange deadLetterExchange() {
    return new TopicExchange(DLX);
}

@Bean
public Binding deadLetterBinding() {
    return BindingBuilder.bind(deadLetterQueue()).to(deadLetterExchange()).with(DLQ);
}

 

물론 DLQ와 DLX를 생성하고 바인딩을 해줘야 합니다.

 

이렇게 DLQ에 도착한 메시지들을 확인하고 만약 코드 문제라면 개발자는 이를 잘 해결하여 재처리를 해주어야 합니다.

 

이때 DLQ에 전송되기 전 타겟 큐에 일정 횟수 재시도 후에 DLQ로 보내야 합니다. 왜냐하면 코드의 문제가 아니라 단순한 네트워크 문제와 같은 외부적인 원인 때문일 수도 있기 때문이죠.

 

그럼 이렇게 재시도 하는 방식에 대해 알아보겠습니다.

 

첫 번째는 바로 for문으로 직접 구현하는 방법입니다.

@Component
public class OrderConsumer {
    private static final int MAX_RETRIES = 3; // 총 시도 제한 수
    private int retryCount = 0; // 현재 재시도 횟수

    @RabbitListener(queues = RabbitMQConfig.ORDER_COMPLETED_QUEUE, containerFactory = "rabbitListenerContainerFactory")
    public void processOrder(String message, Channel channel, @Header("amqp_deliveryTag") long tag) {
        try {
            // 실패 유발
            if ("fail".equalsIgnoreCase(message)) {
                if (retryCount < MAX_RETRIES) {
                    System.err.println("#### Fail  & Retry = " + retryCount);
                    retryCount++;
                    throw new RuntimeException(message);
                } else {
                    System.err.println("#### 최대 횟수 초과, DLQ 이동 시킴 ");
                    retryCount = 0;
                    channel.basicNack(tag, false, false);
                    return;
                }
            }
            // 성공 처리
            System.out.println("# 성공 : " + message);
            channel.basicAck(tag, false);
            retryCount = 0;
        } catch (Exception e) {
            System.err.println("# error 발생 : " + e.getMessage());
            try {
                // 실패 시 basicReject 재처리 전송
                channel.basicReject(tag, true);
            } catch (IOException e1) {
                System.err.println("# fail & reject message : " + e1.getMessage());
            }
        }
    }
}

 

크게 보면 3번 시도 후 모두 실패 시 DLQ로 이동 같은데, 위 방식에서는 메시지 처리 성공 시 직접 ack을 쏴줍니다.

 

// 성공 처리 밑에 있는 channel.basicAck(tag, false)에서 이를 볼 수 있습니다. basicAck의 첫 번째 인자는 메시지의 식별자를 의미하고, 두 번째 boolean은 multiple 플래그로, 특정 deliveryTag 하나만 ack할지, 그 태그 이하로 이 채널에서 전달된 모든 미확인 메시지를 한꺼번에 ack할지를 의미합니다.

 

그런데 이렇게 직접 ack을 설정해 줄 때는 우리가 직접 manual하게 ack을 설정하겠다고 해줘야 합니다. 즉 레빗 엠큐의 리스너의 설정을 바꿔줘야 하는데요,

@EnableRabbit
@Configuration
public class RabbitMQManualConfig {

    @Bean
    public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory) {
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory);
        // 수정 모드 설정이 들어가야 한다.
        factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
        return factory;
    }

}

 

위 코드와 같이 ack 모드를 MANUAL로 설정해줘야 직접 ack을 전송 가능합니다.

 

그렇지 않으면 기본 AUTO 모드에서 컨테이너가 자동으로 ack/nack를 처리합니다.

 

그런데 우리는 spring-retry 로 이런 반복 작업을 더 깔끔하게 처리가능합니다. retryTemplate을 잘 활용하면 되는데요, 사실 레빗 엠큐에서 이런것들을 이미 다 만들어놨습니다.

spring:
    rabbitmq:
        host: localhost
        port: 5672
        username: guestuser
        password: guestuser
        listener:
            simple:
            	retry:
            		enabled: true # 재시도 활성화
            		initial-interval: 1000 # 첫 재시도 대기 시간 1초
            		max-attempts: 3 # 최대 재시도 횟수
            		max-interval: 1000 # 시도간 최대 대기시간
            	default-requeue-rejected: false # 재시도 실패 시 다시 큐에 넣을지

 

위와 같이 yml에 설정해주면

@Component
@RequiredArgsConstructor
public class OrderConsumer {

    private final RabbitTemplate rabbitTemplate;

	private int retryCount;

    @RabbitListener(queues = RabbitMQConfig.ORDER_COMPLETED_QUEUE)
    public void processMessage(String message) {
        System.out.println("Received message: " + message + "count : " + retryCount++);
        if ("fail".equals(message)) {
            throw new RuntimeException("- Processing failed. Retry");
        }
        System.out.println("Message processed successfully: " + message);
    }
}

 

위와 같이 따로 재처리 로직 없이도 재처리가 시도됩니다.

 

DLQ에 대해서는 앞에서 했던 것 처럼

@Component
@RequiredArgsConstructor
public class OrderDeadLetterConsumer {

    private final RabbitTemplate rabbitTemplate;

    @RabbitListener(queues = RabbitMQConfig.DLQ)
    public void processDeadLetter(String message) {
        System.out.println("[DLQ Received]: " + message);

        try {
            if ("fail".equalsIgnoreCase(message)) {
                message = "success";
                System.out.println("[DLQ] Message fixed: " + message);
            } else {
                System.err.println("[DLQ] Message already fixed. Ignoring: " + message);
                return;
            }

            rabbitTemplate.convertAndSend(RabbitMQConfig.ORDER_TOPIC_EXCHANGE, "order.completed", message);
            System.out.println("[DLQ] Message requested to original queue: " + message);
        } catch (Exception e) {
            System.err.println("[DLQ] Failed to reprocess message: " + e.getMessage());
        }
    }
}

 

DLQ를 listen하는 Consumer를 만들어준 다음 재처리 후 다시 원래 큐에 쏴주는 로직으로 구현하면 됩니다.

 

 

Contents

포스팅 주소를 복사했습니다

이 글이 도움이 되었다면 공감 부탁드립니다.