새소식

RabbitMQ

RabbitMQ에서 Transaction 처리는?

  • -

 

이번에는 메시지 단위의 트랜잭션 처리에 대해 알아보겠습니다.

 

이전에 백기선님의 예외와 트랜잭션에 관한 영상을 본 적이 있습니다.

 

그 당시에 볼 때는 이게 무슨 소리지 했었지만

공부를 제대로 안했나 봅니다.

이제는 무슨 소리인지 알죠.

 

간단히 설명드리자면 모든 블로그에서 언체크드 예외와 체크드 예외에 대해 무조건 롤백을 한다 / 롤백을 하지 않는다 로 작성해 놓은 글에 대해 비판하시는 영상이었는데요, 그 이유는

 

트랜잭션에는 DB 트랜잭션만 있는것이 아니고 그리고 설사 DB 트랜잭션을 의미하는 것일지라도 기본적으로 런타임 계열의 언체크드 예외는 롤백, 그 외에 체크드 예외에는 롤백을 하지 않고 커밋되는 것이 맞지만 이것도 사실 rollbackFor 혹은 try-catch 등의 처리를 통해 충분히 롤백할 수 있다는 의미였습니다.

 

갑자기 그래서 이 영상이 생각이 난 이유는

DB 트랜잭션 말고도 다른 트랜잭션이 있나? 라는 의문이 그 당시에 들었기 때문입니다.

 

레빗 엠큐에서도 메시지에 대해 트랜잭션이 존재합니다.

트랜잭션 메시징

@Configuration
public class RabbitMQConfig {

    @Bean
    public Queue queue() {
        return new Queue("transactionQueue", true);
    }

    @Bean
    public MessageConverter messageConverter() {
        return new Jackson2JsonMessageConverter();
    }

    @Bean
    public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory, MessageConverter messageConverter) {
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        
        rabbitTemplate.setMessageConverter(messageConverter); // JSON 변환기 등록
        
        rabbitTemplate.setChannelTransacted(true);  // 트랜잭션 활성화

        return rabbitTemplate;
    }

}

 

일단 위와 같이 RabbitTemplate에 setChannelTransacted를 true로 설정해 트랜잭션을 활성화해줍니다.

 

그리고 이번에는 단순히 메시지 전송이 아닌 엔티티를 보내볼 거기 때문에 따로 엔티티 JSON 컨버터를 설정해줬습니다.

@Entity
public class StockEntity {

    @Id
    @GeneratedValue(strategy = GenerationType.IDENTITY)
    private Long id;

    private String userId;
    private int stock;

    private boolean processed; // RabbitMQ Consumer에서의 처리 여부
    
    private LocalDateTime createdAt;
    private LocalDateTime updatedAt;

 

이런 엔티티를 전송해보겠습니다.

 

아무튼

@Component
@AllArgsConstructor
public class MessageProducer {
    private final StockRepository stockRepository;
    private final RabbitTemplate rabbitTemplate;

    @Transactional
    public void sendMessage(StockEntity stockEntity, String testCase) {
        rabbitTemplate.execute(channel -> {
            try {
                channel.txSelect(); // 메시지 트랜잭션 시작
                stockEntity.setProcessed(false);
                stockEntity.setCreatedAt(LocalDateTime.now());
                StockEntity stockEntitySaved = stockRepository.save(stockEntity);

                System.out.println("Stock Saved : " + stockEntitySaved);

                // 메시지 발행
                rabbitTemplate.convertAndSend("transactionQueue", stockEntitySaved);

                if ("fail".equalsIgnoreCase(testCase)) {
                    throw new RuntimeException("메시지 트랜잭션 작업중에 에러 발생");
                }

                channel.txCommit();
                System.out.println("메시지 트랜잭션이 정상적으로 처리 되었음 (발행된 메시지 전송)");
            } catch (Exception e) {
                System.out.println("메시지 트랜잭션 실패 : " + e.getMessage());
                channel.txRollback();
                throw new RuntimeException("메시지 트랜잭션 롤백 완료 (전송 취소) ", e);
            } finally {
                if (channel != null) {
                    try {
                        channel.close();
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }
            }
            return null;
        });
    }
}

 

위와 같이 메시지에 대한 트랜잭션을 조작할 수 있습니다.

 

RabbitTemplate에 대해 channel.select()로 트랜잭션을 시작합니다. 이때 시작하는건 "메시지"에 대한 트랜잭션이지 DB의 트랜잭션과는 별도의 개념입니다.

 

그리고 convertAndSend를 통해 큐에 엔티티를 전송합니다.

 

이때 메시지는 바로 큐에 발행되지 않고 대기합니다. 그리고 정상 처리된 후 channel.txCommit()이 호출되어야 비로소 브로커에 커밋됩니다. 반대로 예외 발생 후 channel.txRollback() 호출 시 해당 채널 트랜잭션 범위에서 "발행 예정"이던 메시지가 취소됩니다.

 

이를 JPA에 비유하자면 convertAndSend는 영속성 컨텍스트에 엔티티를 넣는것과 유사하고 채널의 txCommit 시점에 flush/commit이 일어나 큐에 확정되는 것입니다.

 

다시 한번 말하지만 DB와 메시지 큐의 트랜잭션은 별도로 일어납니다.

 

그런데 이런 트랜잭션 메시징에는 한계가 존재합니다.

 

첫 번째로 성능상 오버헤드가 존재합니다.

 

트랜잭션을 시작, 커밋, 롤백하며 어플리케이션과 레빗 엠큐 사이에 왕복 네트워크가 증가하고 브로커와 클라이언트 모두 추가 연산을 수행하게 되기 때문입니다. 이에 대량 메시지 처리 작업에는 부적합한 특성을 가지게 됩니다.

 

또한 트랜잭션 처리를 잘못 구성하면 메시지가 중복 처리되거나 손실될 가능성이 있고 데이터베이스 작업과 메시지 사이에 완벽한 원자성을 보장하지는 않습니다. 만약 둘 사이에 원자성을 위해서는 2 Phase Commit 또는 보상 트랜잭션 등을 필요로 합니다.

 

그래서 레빗 엠큐에서는 트랜잭션 대신 Publisher Confirms 방식을 사용하는 것이 더 효율적이라 합니다.

Publisher Confirms

Publisher Confirms 방식은 메시지가 레빗 엠큐 메시지 브로커에 성공적으로 도달했는지 확인하는 기법입니다.

 

메시지는 메시지 브로커에 도달하여 익스체인지로 그리고 익스체인지에서 큐로 전달되어 각 소비자에서 처리되는데 이 중간 과정(프로듀서 -> 익스체인지, 익스체인지 -> 큐) 에 대해 정상적인 전달이 이루어졌는지 확인하는 방법입니다.

@Configuration
public class RabbitMQConfig {

    // ConfirmCallback, ReturnsCallback 설정
    @Bean
    public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        rabbitTemplate.setMandatory(true);  // ReturnCallback 활성화

        // confirmCallback 설정
        rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
            if (ack) {
                System.out.println("#### [Message confirmed]: " +
                        (correlationData != null ? correlationData.getId() : "null"));
            } else {
                System.out.println("#### [Message not confirmed]: " +
                        (correlationData != null ? correlationData.getId() : "null") + ", Reason: " + cause);

                // 실패 메시지에 대한 추가 처리 로직 (예: 로그 기록, DB 적재, 관리자 알림 등)
            }
        });

        // ReturnCallback 설정
        rabbitTemplate.setReturnsCallback(returned -> {
            System.out.println("Return Message: " + returned.getMessage().getBody());
            System.out.println("Exchange : " + returned.getExchange());
            System.out.println("RoutingKey : " + returned.getRoutingKey());

            // 데드레터 설정 추가
        });
        return rabbitTemplate;
    }

}

 

위 코드에서 confirmCallback이 Producer에서 Exchange 까지를 확인하고, returnCallback이 Exchange에서 Queue까지의 라우팅 여부를 확인합니다.

ConfirmCallback

// confirmCallBack 설정
rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
    if (ack) {
        System.out.println("#### [Message confirmed]: " +
                (correlationData != null ? correlationData.getId() : "null"));
    } else {
        System.out.println("#### [Message not confirmed]: " +
                (correlationData != null ? correlationData.getId() : "null") + ", Reason: " + cause);
    }
});

 

위 코드와 같이 메시지 고유 번호를 나타내는 correlationData, 정상 처리를 의미하는 ack 그리고 오류시 원인을 뜻하는 cause를 파라미터로 받으면 됩니다.

 

이때 ConfirmCallback을 사용하기 위해서는 설정이 필요한데

spring:
  rabbitmq:
    publisher-confirm-type: correlated # Enable Publisher Confirms

 

위와 같이 설정하거나

@Bean
public ConnectionFactory connectionFactory() {
    CachingConnectionFactory cf = new CachingConnectionFactory();
    cf.setPublisherConfirmType(CachingConnectionFactory.ConfirmType.CORRELATED);
    return cf;
}

 

이렇게 correlated 설정을 빈으로 등록해주면 됩니다.

ReturnCallback

rabbitTemplate.setMandatory(true); // ReturnCallback 활성화

// ReturnCallback 설정
rabbitTemplate.setReturnsCallback(returned -> {
    System.out.println("Return Message: " + returned.getMessage().getBody());
    System.out.println("Exchange : " + returned.getExchange());
    System.out.println("RoutingKey : " + returned.getRoutingKey());

    // 데드레터 설정 추가
});

 

ReturnCallback은 위와 같이 setMandatory를 true로 설정해주어야 합니다. 이때 returned 파라미터에는 반환된 메시지, 무슨 익스체인지에서 반환되었는지, 어떤 라우팅 키로 보냈을 때 반환되었는지 등의 정보가 존재합니다.

 

이렇게 ConfirmCallback과 ReturnCallback로 구현하면 개별 메시지 단위로 전송 후 브로커로부터 ack을 받기 때문에 가볍고 빠르다는 특징이 있습니다.  여러 메시지를 한 묶음으로 처리하는 트랜잭션과는 다르게 미세한 조정을 필요로 합니다.

 

이는 대규모 메시지 전송이나 메시지 손실 방지가 중요한 결제나 알림 시스템 등에 적합합니다. 컨펌 콜백으로 전송 보장, 리턴 콜백으로 라우팅 실패 탐지, 소비 측 수동 ack/DLQ로 최종 일관성을 확보하는 조합이 효과적입니다.

 

그런데 이게 비동기의 이점을 활용은 하지만 대용량의 데이터에 있어서 속도에서 이점이 있는지는 크게 모르겠다...는 그런 생각 있습니다.

 

그럴 때 사용하는 방법이 TCC 기법이라 생각합니다.

TCC

TCC 방식은 Try-Confirm-Cancel을 의미합니다. 즉 시도 후 확인 혹은 취소 이런 느낌인데, 사실 메시지 큐를 활용하여 다른 시스템에 메시지를 전달하였을 때 이는 비동기이기 때문에 어떤 트랜잭션으로 묶는다는 말 자체가 말이 안됩니다. 즉 일반적인 케이스에서 코드가 서로 다른 서버에 존재하고 트랜잭션 처리를 한다는 것이 비동기의 사상와 맞지 않습니다.

 

비동기의 사상이란 “비동기”는 호출 시점에서의 즉시 일관성을 버리고, 각 단계가 독립적으로 진행되며 나중에 수렴한다는 철학을 전제로 하기 때문에, 여러 시스템을 하나의 동기적 트랜잭션처럼 묶으려는 발상과 충돌한다는 뜻입니다.

 

어쨌든 이를 보완하기 위해 분산 서버간의 트랜잭션 처리를 TCC를 통해 취소 및 확인 단계를 두는 방식으로 처리하는 방법이 있습니다.

 

TCC의 진행 프로세스는 다음과 같습니다.

  • 전송되는 데이터의 원본 데이터를 저장하고 호출이 정상 종료일 경우 확정, 비정상일 경우에는 취소 처리
  • Consumer쪽에서 확정된 데이터, 그리고 예외가 발생한 cancel된 데이터들을 한 곳의 Queue에 정리
  • 경계(서비스↔브로커↔서비스)마다 요청/응답의 원본 데이터를 기록으로 남겨 재처리 및 멱등 판정

사실 느낌이 잘 안오는데요, 주문(Order)/배송(Shipping) 시나리오로 한번 보겠습니다.

inflearn - RabbitMQ를 이용한 비동기 아키텍처 한방에 해결하기

 

1. Order 도메인 : 주문 상태를 초기 상태로 설정하고 메시지 발행

2. Shipping 도메인 : Consumer 역할로 주문 상태를 CONFIRMED 또는 CANCEL로 변경하여 메시지 발행

3. 최종 큐 : CONFIRM/CANCEL 상태 메시지는 Order 도메인이 바라보는 큐로 수신. 즉 Shipping에서 발행한 메시지로 Order의 주문 완료 / 취소 결정

4. 배치 처리 : Order와 Shipping 도메인에서 메시지를 대조하여 보정 (주문 완료이지만 배송 불가인 경우 혹은 주문 취소이지만 배송 확정 등)

 

자 그래서 결론은

 

이것 또한 정답이 없고 도메인 혹은 비즈니스 상황에 맞는 적절한 방식을 선택해서 도입해보면 좋을 것 같습니다.

 

Contents

포스팅 주소를 복사했습니다

이 글이 도움이 되었다면 공감 부탁드립니다.