DeadLetterQueue와 Retry로 재처리하기 참 쉽죠?
- -
메시지들이 발행되고 ready에서 제대로 처리되지 못한 경우 unacked 상태가 되는데 만약 클라이언트에서 메시지 처리를 성공한 경우 ack 그 외에는 nack 혹은 reject 상태가 됩니다.
이렇게 메시지가 nack, reject 상태가 되거나 ttl 만료, 길이 초과 그리고 큐 설정 초과 등 특정 Dead Letter 조건이 충족한 경우 DLQ(Dead Letter Queue)로 이동됩니다.
@Bean
public Queue orderQueue() {
return QueueBuilder.durable(MAIN_QUEUE)
.withArgument("x-dead-letter-exchange", DLX)
.withArgument("x-dead-letter-routing-key", DLQ)
.build();
}
위와 같이 DLX(Dead Letter Exchange)를 설정하여 문제 메시지들을 DLQ로 이동시킨 후 에러의 원인을 분석하고 재처리를 시도할 수 있습니다.
@Bean
public Queue deadLetterQueue() {
return new Queue(DLQ);
}
@Bean
public TopicExchange deadLetterExchange() {
return new TopicExchange(DLX);
}
@Bean
public Binding deadLetterBinding() {
return BindingBuilder.bind(deadLetterQueue()).to(deadLetterExchange()).with(DLQ);
}
물론 DLQ와 DLX를 생성하고 바인딩을 해줘야 합니다.
이렇게 DLQ에 도착한 메시지들을 확인하고 만약 코드 문제라면 개발자는 이를 잘 해결하여 재처리를 해주어야 합니다.
이때 DLQ에 전송되기 전 타겟 큐에 일정 횟수 재시도 후에 DLQ로 보내야 합니다. 왜냐하면 코드의 문제가 아니라 단순한 네트워크 문제와 같은 외부적인 원인 때문일 수도 있기 때문이죠.
그럼 이렇게 재시도 하는 방식에 대해 알아보겠습니다.
첫 번째는 바로 for문으로 직접 구현하는 방법입니다.
@Component
public class OrderConsumer {
private static final int MAX_RETRIES = 3; // 총 시도 제한 수
private int retryCount = 0; // 현재 재시도 횟수
@RabbitListener(queues = RabbitMQConfig.ORDER_COMPLETED_QUEUE, containerFactory = "rabbitListenerContainerFactory")
public void processOrder(String message, Channel channel, @Header("amqp_deliveryTag") long tag) {
try {
// 실패 유발
if ("fail".equalsIgnoreCase(message)) {
if (retryCount < MAX_RETRIES) {
System.err.println("#### Fail & Retry = " + retryCount);
retryCount++;
throw new RuntimeException(message);
} else {
System.err.println("#### 최대 횟수 초과, DLQ 이동 시킴 ");
retryCount = 0;
channel.basicNack(tag, false, false);
return;
}
}
// 성공 처리
System.out.println("# 성공 : " + message);
channel.basicAck(tag, false);
retryCount = 0;
} catch (Exception e) {
System.err.println("# error 발생 : " + e.getMessage());
try {
// 실패 시 basicReject 재처리 전송
channel.basicReject(tag, true);
} catch (IOException e1) {
System.err.println("# fail & reject message : " + e1.getMessage());
}
}
}
}
크게 보면 3번 시도 후 모두 실패 시 DLQ로 이동 같은데, 위 방식에서는 메시지 처리 성공 시 직접 ack을 쏴줍니다.
// 성공 처리 밑에 있는 channel.basicAck(tag, false)에서 이를 볼 수 있습니다. basicAck의 첫 번째 인자는 메시지의 식별자를 의미하고, 두 번째 boolean은 multiple 플래그로, 특정 deliveryTag 하나만 ack할지, 그 태그 이하로 이 채널에서 전달된 모든 미확인 메시지를 한꺼번에 ack할지를 의미합니다.
그런데 이렇게 직접 ack을 설정해 줄 때는 우리가 직접 manual하게 ack을 설정하겠다고 해줘야 합니다. 즉 레빗 엠큐의 리스너의 설정을 바꿔줘야 하는데요,
@EnableRabbit
@Configuration
public class RabbitMQManualConfig {
@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
// 수정 모드 설정이 들어가야 한다.
factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
return factory;
}
}
위 코드와 같이 ack 모드를 MANUAL로 설정해줘야 직접 ack을 전송 가능합니다.
그렇지 않으면 기본 AUTO 모드에서 컨테이너가 자동으로 ack/nack를 처리합니다.
그런데 우리는 spring-retry 로 이런 반복 작업을 더 깔끔하게 처리가능합니다. retryTemplate을 잘 활용하면 되는데요, 사실 레빗 엠큐에서 이런것들을 이미 다 만들어놨습니다.
spring:
rabbitmq:
host: localhost
port: 5672
username: guestuser
password: guestuser
listener:
simple:
retry:
enabled: true # 재시도 활성화
initial-interval: 1000 # 첫 재시도 대기 시간 1초
max-attempts: 3 # 최대 재시도 횟수
max-interval: 1000 # 시도간 최대 대기시간
default-requeue-rejected: false # 재시도 실패 시 다시 큐에 넣을지
위와 같이 yml에 설정해주면
@Component
@RequiredArgsConstructor
public class OrderConsumer {
private final RabbitTemplate rabbitTemplate;
private int retryCount;
@RabbitListener(queues = RabbitMQConfig.ORDER_COMPLETED_QUEUE)
public void processMessage(String message) {
System.out.println("Received message: " + message + "count : " + retryCount++);
if ("fail".equals(message)) {
throw new RuntimeException("- Processing failed. Retry");
}
System.out.println("Message processed successfully: " + message);
}
}
위와 같이 따로 재처리 로직 없이도 재처리가 시도됩니다.
DLQ에 대해서는 앞에서 했던 것 처럼
@Component
@RequiredArgsConstructor
public class OrderDeadLetterConsumer {
private final RabbitTemplate rabbitTemplate;
@RabbitListener(queues = RabbitMQConfig.DLQ)
public void processDeadLetter(String message) {
System.out.println("[DLQ Received]: " + message);
try {
if ("fail".equalsIgnoreCase(message)) {
message = "success";
System.out.println("[DLQ] Message fixed: " + message);
} else {
System.err.println("[DLQ] Message already fixed. Ignoring: " + message);
return;
}
rabbitTemplate.convertAndSend(RabbitMQConfig.ORDER_TOPIC_EXCHANGE, "order.completed", message);
System.out.println("[DLQ] Message requested to original queue: " + message);
} catch (Exception e) {
System.err.println("[DLQ] Failed to reprocess message: " + e.getMessage());
}
}
}
DLQ를 listen하는 Consumer를 만들어준 다음 재처리 후 다시 원래 큐에 쏴주는 로직으로 구현하면 됩니다.
끝
'RabbitMQ' 카테고리의 다른 글
RabbitMQ를 적용하며 발생한 풍부한 버그와 다양한 삽질의 향연 (1) | 2025.10.17 |
---|---|
RabbitMQ에서 Transaction 처리는? (0) | 2025.10.09 |
RabbitMQ 코드로 몸통박치기 (0) | 2025.10.07 |
RabbitMQ 시작하기 (0) | 2025.10.06 |
소중한 공감 감사합니다