새소식

카테고리 없음

RabbitMQ 코드로 몸통박치기

  • -

 

안녕하세요.

 

이전 포스팅에서 용어들을 좍 정리해봤습니다.

 

RabbitMQ 시작하기

추석을 맞아 인프런의 향로님께서 추석 완강 챌린지를 열어주셨습니다. 그래서 이참에 관심있었던 강의 하나를 완주해보기로 했습니다. 바로 메시지큐 라이브러리인 RabbitMQ입니다. 사실 예전

dockerel.tistory.com

이제 그럼 코드로 몸통박치기 해봐야겠죠?

 

1. 단순 메시지 전송

간단하게 하나의 큐에서 메시지를 주고 받는 과정을 보겠습니다.

 

메시지 전송 측은

@Component
@RequiredArgsConstructor
public class Sender {

    private final RabbitTemplate rabbitTemplate;

    public void send(String message) {
        rabbitTemplate.convertAndSend(RabbitMQConfig.QUEUE_NAME, message);
        System.out.println("[#] Sent : " + message);
    }
}

 

RabbitTemplate을 사용해서 어떤 큐(RabbitMQConfig.QUEUE_NAME)로 무슨 메시지를 보낼지 convertAndSend를 통해 보내주면 됩니다.

 

그럼 RabbitMQConfig 를 보겠습니다.

@Configuration
public class RabbitMQConfig {

    public static final String QUEUE_NAME = "helloqueue";

    @Bean
    public Queue queue() {
        return new Queue(QUEUE_NAME, false);
    }

    @Bean
    public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
        return new RabbitTemplate(connectionFactory);
    }

    @Bean
    public SimpleMessageListenerContainer container(ConnectionFactory connectionFactory,
                                                    MessageListenerAdapter listenerAdapter) {
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
        container.setConnectionFactory(connectionFactory);
        container.setQueueNames(QUEUE_NAME);
        container.setMessageListener(listenerAdapter);
        return container;
    }

    @Bean
    public MessageListenerAdapter listenerAdapter(Receiver receiver) {
        return new MessageListenerAdapter(receiver, "receiveMessage");
    }

}

 

우선 큐 이름과 큐가 빈으로 등록되어 있습니다.

 

Queue는 import org.springframework.amqp.core.Queue; <- 이걸 import 해주셔야 하고요, 두번째에 false로 되어 있는 boolean값 인자는 durable이라는 특성을 나타냅니다.

 

보통은 레빗 엠큐 서버를 껐다 키면 큐가 사라지는데요, 큐를 보존하고 싶으면 true로 넣어주시면 됩니다. 지금은 그냥 간단하게 false로 설정했습니다.

 

그리고 container() 메서드와 같이 SimpleMessageListenerContainer에 아무 조건 없이, 즉 빈 문자열의 routingKey로 큐 등록이 가능합니다.

 

listenerAdapter() 메서드는 Receiver 즉 메시지가 들어오면 처리할 수 있는 빈 객체를 인자로 받고 MessageListenerAdapter 생성 시 두번째 인자로 처리할 메서드 명을 적습니다. 현재 설정대로라면 Receiver 클래스의 receiveMessage 메서드로 처리를 하나보네요?

@Component
public class Receiver {
    public void receiveMessage(String message) {
        System.out.println("[#] Received: " + message);
    }
}

 

그쵸. 이렇게 처리할 메서드 명을 적습니다.

 

마지막으로 가장 중요한 레빗 엠큐 설정 정보를 작성해야겠죠?

application.yml에서

spring:
  rabbitmq:
    host: localhost
    port: 5672
    username: guestuser
    password: guestuser
  application:
    name: HelloWorldMessageQueue
server:
  port: 8080

 

처음 설정할 때 포트 2개(5672, 15672)를 열었는데 관리 및 모니터링 포트인 15672 말고 기본 통신 포트인 5672로 작성해주시면 됩니다.

 

그리고 간단한 RestController 하나 만들어서 Sender.send()로 메시지를 전송 해보시면 그대로 메시지가 잘 오는 모습을 볼 수 있습니다.

2. WorkQueue

다음은 여러 Consumer를 두어 메시지 처리를 분산하여 처리 속도를 향상시키는 방식인 WorkQueue에 대해 알아보겠습니다.

 

메시지를 여러 Customer들에게 분배하여 작업을 분산합니다. 이로써 작업 부하를 분산하고, 병렬 처리로 인해 처리 속도가 향상됩니다.

 

그런데 이렇게 메시지 처리 중 예외가 발생해서 제대로 처리가 안된다면 어떻게 될까요?

 

메시지는 생성 시 ready에서 전송이 되면 unacked를 지나 정상 전송이 확인되면 ack 상태로 변하는데, 만약 Consumer가 확인을 보내지 못하거나 연결이 끊어지면 메시지는 다시 ready 상태로 되돌아갑니다.

 

이를 위해서는 전송 확인을 위한

container.setAcknowledgeMode(AcknowledgeMode.AUTO)

해당 코드를 컨테이너에 설정해줘야 합니다.

 

만약 ready가 많은 경우 Consumer의 수를 늘리거나 Consumer의 메시지 처리 속도를 최적화 해야 합니다.

 

하지만 unacked가 많은 경우 대부분은 프로그램 에러일 가능성이 높고 Consumer 코드를 수정해서 재시작하면 unacked에서 처리가 되어 사라집니다.

3. Pub/Sub

Fanout Exchange

다음은 Pub/Sub 구조로 메시지를 중간 브로커인 Exchange를 통해 Subscriber들에게 메시지를 전달하는 방식입니다.

 

레빗 엠큐에서는 메시지를 브로드 캐스트하는 Fanout Exchange를 통해 구현이 가능합니다.

 

이제 부터는 Queue에 Exchange를 정의하여 직접 바인딩하는 과정이 들어갑니다.

@Configuration
public class RabbitMQConfig {
    public static final String QUEUE_NAME = "notificationQueue";
    public static final String FANOUT_EXCHANGE = "notificationExchange";

    @Bean
    public Queue notificationQueue() {
        return new Queue(QUEUE_NAME, false);
    }

    @Bean
    public FanoutExchange fanoutExchange() {
        return new FanoutExchange(FANOUT_EXCHANGE);
    }

    @Bean
    public Binding bindNotification(Queue notificationQueue, FanoutExchange fanoutExchange) {
        return BindingBuilder.bind(notificationQueue).to(fanoutExchange);
    }
}

 

위와 같이 BindingBuilder.bind()를 통해 Queue에 Exchange 바인딩이 가능합니다. 여기서는 fanoutExchange를 notificationQueue에 바인딩 해주었습니다.

 

이를 통해 실시간 채팅, 알림 등이 구현이 가능합니다.

Direct Exchange

Direct Exchange로 Routing Key에 정확히 부합하는 큐에만 메시지를 보낼 수 있습니다.

    public static final String DIRECT_EXCHANGE = "direct_exchange";

    @Bean
    public DirectExchange directExchange() {
        return new DirectExchange(DIRECT_EXCHANGE);
    }

 

다른 코드들과 다를 건 없고 direct exchange를 정의해주고

 

메시지 전송 시

@Component
public class LogPublisher {

    private final RabbitTemplate rabbitTemplate;

    public LogPublisher(RabbitTemplate rabbitTemplate) {
        this.rabbitTemplate = rabbitTemplate;
    }

    public void publish(String routingKey, String message) {
        rabbitTemplate.convertAndSend(RabbitMQConfig.DIRECT_EXCHANGE, routingKey, message);
        System.out.println("message published :" + routingKey + ":" + message);
    }
}

 

어떤 exchange로 어떤 routingKey 기반으로 message를 보낼건지만 잘 지정해주시면 됩니다.

@Component
public class LogConsumer {

    @RabbitListener(queues = RabbitMQConfig.ERROR_QUEUE)
    public void consumeError(String message) {
        System.out.println("[ERROR]를 받음 : " + message);
    }

    @RabbitListener(queues = RabbitMQConfig.WARN_QUEUE)
    public void consumeWarn(String message) {
        System.out.println("[WARN]를 받음 : " + message);
    }

    @RabbitListener(queues = RabbitMQConfig.INFO_QUEUE)
    public void consumeInfo(String message) {
        System.out.println("[INFO]를 받음 : " + message);
    }

}

 

그럼 Consumer 쪽에서는 어떤 Queue를 Listen 할건지 지정해서 깔끔하게 따로 처리가능합니다.

Topic Exchange

Topic Exchange로는 특정 패턴에 맞춰 메시지 수신이 가능합니다.

@Configuration
public class RabbitMQConfig {

    public static final String ERROR_QUEUE = "error_queue";
    public static final String WARN_QUEUE = "warn_queue";
    public static final String INFO_QUEUE = "info_queue";
    
    public static final String ALL_LOG_QUEUE = "all_log_queue";

    public static final String TOPIC_EXCHANGE = "topic_exchange";

    @Bean
    public TopicExchange topicExchange() {
        return new TopicExchange(TOPIC_EXCHANGE);
    }

    @Bean
    public Queue errorQueue() {
        return new Queue(ERROR_QUEUE, false);
    }

    @Bean
    public Queue warnQueue() {
        return new Queue(WARN_QUEUE, false);
    }

    @Bean
    public Queue infoQueue() {
        return new Queue(INFO_QUEUE, false);
    }

    @Bean
    public Queue allLogQueue() {
        return new Queue(ALL_LOG_QUEUE, false);
    }


    @Bean
    public Binding errorBinding() {
        return BindingBuilder.bind(errorQueue()).to(topicExchange()).with("log.error");
    }
    @Bean
    public Binding warnBinding() {
        return BindingBuilder.bind(warnQueue()).to(topicExchange()).with("log.warn");
    }

    @Bean
    public Binding infoBinding() {
        return BindingBuilder.bind(infoQueue()).to(topicExchange()).with("log.info");
    }

    @Bean
    public Binding allLogBinding() {
        return BindingBuilder.bind(allLogQueue()).to(topicExchange()).with("log.*");
    }

}

 

topic exchange에 문자열을 넣어서 direct exchange처럼 쓸 수도 있고 하나의 단어 대체를 뜻하는 * 혹은 0개 이상의 단어 대체를 뜻하는 #를 사용하여 패턴 매칭 기반으로 처리할 수도 있습니다.

 

위 코드는 각 로그를 log.error, log.warn, log.info로 받고 모든 로그에 대해서는 log.* 로 패턴 매칭을 시켜놓았습니다.

 

결국 결론은 도메인마다 해결법은 다르고 정답은 없다.

관건은 fanout, direct, topic exchange를 얼마나 잘 조합해서 큐마다 효율적이게 처리할 수 있을지 고민해보면 좋을 것 같습니다.

 

 

 

Contents

포스팅 주소를 복사했습니다

이 글이 도움이 되었다면 공감 부탁드립니다.