RabbitMQ란?
RabbitMQ는 AMQP 메시지 브로커이며, 생산자(producer)로부터 메시지를 받아 소비자(consumer)에게 전송하기 위한 오픈소스 기반의 소프트웨어이다.
- AMQP(Advanced Message Queuing Protocol)란 MQ를 오픈 소스에 기반한 표준 프로토콜이며, Client application과 Middleware broker와의 메세지를 주고 받기 위한 프로토콜이기도 하다.
- 메시지 브로커는 대규모 메시지 기반 미들웨어 아키텍쳐에서 사용되며, RabbitMQ, Redis 같은 기술들이 사용된다.
RabbitMQ 특징
- Appliation Layer AMQP 구현
- 비동기 처리를 위한 메시지 브로커 - Queue 라는 임시 저장소가 있기 때문에 나중에 처리 가능
- 내장 모니터링 페이지(Management UI) 제공
- erlang과 java 언어로 만들어짐
- 분산처리를 고려한 MQ (Cluster, Federation)
- 고가용성 보장 (HA(High Availability))
- Publish/Subscribe 방식 지원
- 다양한 plugin 지원
RabbitMQ 구성요소
- Producer : 메시지를 보내는 Application
- Consumer : 메시지를 수신하는 Application이며, 수신받은 메시지를 사용
- Queue : 메시지를 메모리나 디스크에 저장했다가, Consumer에게 메시지를 전달하는 역할을 함
- Exchange : Producer로부터 전달받은 메시지를 Queue에 분배하는 라우터의 역할(어떤 큐들에게 발송할지 결정)
- Binding : Exchange와 Queue의 관계를 정의
- Channel : 연결 내부의 가상 연결. 하나의 물리적인 connection 내에 생성되는 여러 가상 논리적인 connection 존재. Queues에서 메시지를 publish 하거나 consume 할 때 모든 작업이 Channel을 통해 이루어짐
- Routing : Exchange가 Queue에 메시지를 전달하는 과정
- Routing key : 메시지를 Qeues로 라우팅하는 방법을 결정하기 위해 Exchange에서 확인하는 키
How to use
- 코드에서는 application.yml 같은 애플리케이션 설정 파일에서 각 exchage, binding, queue의 name을 적고, RabbitMQ 설정 시 가져다 사용할 수 있다.
rabbitmq:
exchange: exchange.event.product
binding: bind.event.product
queue: event.product.status.update
Exchange Type
Exchange는 Queues에게 메시지를 어떤 방법으로 라우팅할지 결정하는 것이며, 이 방법에는 4가지 타입이 있다.
- Direct :
- 메시지는 바인딩 키가 메시지의 라우팅 키와 정확히 일치하는 Queue로 라우팅 됨. 즉, 지정된 라우팅 키를 가진 Queue에만 메시지 전달. unicast 방식
- Fanout :
- 1:N 관계로, Exchange에 바인딩 된 모든 Queue에 동일한 메시지를 전달. broadcast 방식
- Topic :
- Exchange에 바인딩 된 Queue 중에서, 메시지의 라우팅 키가 패턴에 맞는 Queue에게 모두 메시지를 전달. multicast 방식
- Headers :
- 라우팅 키 대신, 메시지 헤더에 속성을 추가하여 속성이 매칭되는 큐에 메시지 전달. multicast 방식
How to use 'Exchange Type'
- RabbitMQ config 설정 시, Spring AMQP 에서 제공하는 ExchangeBuilder 클래스를 사용하여 Exchange Type을 설정할 수 있다. 아래 코드에서는 directExchange를 사용하여 Direct 방법을 사용한 것을 볼 수 있다.
@Bean
public Exchange exchange() {
return ExchangeBuilder
.directExchange("exchange.event.test")
.durable(true)
.build();
}
- ExchangeBuilder에 대한 설명은 아래 spring-amqp 문서에서 살펴볼 수 있다.
메시지 수신 통보 (Acknowledgment)
메시지 전달 보장을 위해 Acknowlegment(ACK)라는 개념을 사용한다.
Queue는 Consumer에게 데이터를 전달하고나면 Queue에서 메시지를 삭제하므로, Consumer가 전달 받았지만 처리도중 오류가 발생하여 재처리해야하는 경우를 위해 보관 유예기간을 두는 용도로 이용된다. 즉, ACK이 온 message만 Queue에서 삭제 처리 하는 것이다.
How to use 'ACK'
com.rabbitmq.client.Channel 의 basicAck 메소드를 사용하여 하나 또는 여러 개의 수신된 메시지를 확인하는 것이 가능하다.
@RabbitListener(queues = "${rabbitmq.queue}")
public void listen(Message message, Channel channel) {
long deliveryTag = message.getMessageProperties().getDeliveryTag();
channel.basicAck(deliveryTag, false);
}
메시지 보존
1. durable
RabbitMQ 서버가 다운되거나 재기동되면, 기본적으로 Queue는 모두 제거 되며 이는 message 유실로 이어질 수 있다. 이를 방지하기 위해 Queue를 생성 할 때 durable을 true로 설정하면, 래빗엠큐 서버다운 후에도 Queue가 제거 되는 것을 막을 수 있으며 message를 보존할 수 있다. 다만, I/O 성능이 충분하지 않은 서버의 경우 메시지 저장 때문에 성능 저하가 발생할 수 있다.
코드에서는 다음과 같이 channel에 Queue를 만들 때 durable에 true값을 주어 해당 Queue를 계속 기억하도록 설정한다. 다만, 이미 durable 속성이 false인데 true로 변경할 경우에는 해당 Queue를 삭제하고 다시 생성해주어야 한다.
channel.queueDeclare(QUEUE_NAME, true, false, false, null); // 두번째 인자값이 durable이며, true로 설정
2. delivery-mode
Queue에 있는 메시지를 보존하는 속성인 delivery-mode는
1로 설정하면 메모리에서 메시지를 관리하고,
2로 설정하면 RabbitMQ는 디스크에 메시지를 영속화시킨다(메시지는 RabbitMQ 브로커 디스크에 저장됨).
기본값은 1로 설정되어 있다.
메모리의 경우 RabbitMQ 노드의 데이터가 메모리에만 상주하여 작동 속도가 빠르고, 디스크의 경우 노드의 데이터들이 디스크에 기록되며 작동된다.
RabbitMQ 서버 다운 후에도 메시지를 보존하는 설정 방법
RabbitMQ를 서버 다운 후 큐에 쌓여 있던 메시지가 사라지지 않기 위해서는 다음과 같이 설정해볼 수 있다.
- durable = true
- delivery-mode = 2(persistent)
실제로 durable 값이 true인 Queue로 delivery-mode가 2(persistent)로 설정된 메시지를 생성하여 테스트를 해보았다. Payload에 {"title":"persistent","messge":"test"} 를 입력하고 메시지를 publish 하였다.
그 다음 RabbitMQ 서버를 재기동하고
publish된 메시지를 받아보았다.
메시지를 살펴보면,
- Propertis가 delivery_mode가 2 이고
- Payload가 {"title":"persistent","messge":"test"}
인 것을 확인할 수 있다.
이런 설정은 상황에 따라 달라질 수 있는데, 금융 거래 이벤트와 같이 비즈니스와 밀접하게 연관된 지속성 메시지와 로그인 이벤트와 같이 비즈니스에 영향을 주지 않는 비지속성 메시지를 구분하여 값을 설정할 수 있다.
또한 메시지를 디스크에 저장할 경우 서버의 성능 저하를 가져올 수 있기 때문에 실패한 메시지만을 저장하는 DLQ(Dead-Letter-Queue)를 만들어 관리하는 방법도 구성해볼 수 있다.
Management UI
RabbitMQ는 기본적으로 내장 모니터링 페이지를 제공한다.
로컬환경에서 RabbitMQ 서버를 띄우고 난 후, http://localhost:15672 로 내장 모니터링 페이지에 접근 가능하다. 모니터링할 유저도 추가 및 삭제가 가능하다.
유저 및 권한 추가하기
유저는 rabbitmqctl 을 이용하여 추가할 수 있다. rabbitmqctl 은 RabbitMQ broker의 command line tool 이며, 다음과 같은 경로에 위치해 있다.
- $RABBITMQ_HOME/sbin/rabbitmqctl
다음과 같이 rabbitmqctl을 이용하여 유저, vhost, 권한을 차례대로 추가할 수 있다.
rabbitmqctl add_user <user_name> <password>
rabbitmqctl set_user_tags <tag_name> administrator
rabbitmqctl add_vhost <vhost_name>
rabbitmqctl list_vhosts
rabbitmqctl set_permissions -p <vhost_name> <user_name> ".*" ".*" ".*" # conf, wirte, read 권한 추가
RabbitMQ HA(High Availability)
- RabbitMQ의 고가용성(HA)은 클러스터에 모든 RabbitMQ 노드의 Queue들을 미러링(Mirroring) 하는 것이다.
- RabbitMQ의 고가용성(HA)을 사용하기 위해서는 클러스터를 구성해야 한다. 이 RabbitMQ 노드들은 보안을 위해 erlang의 cookie를 사용하여 클러스터와 통신할 수 있다.
- HA가 설정된 큐에 트랜잭션 메시지를 보내면 모든 활성노드에 메시지가 동기화된 후에 성공응답을 보낸다.
큐 미러링 (Queue Mirroring)
- 큐 미러링은 모든 노드에 메시지를 동기화 하는 것이다. 즉, 큐에 있는 동안 메시지가 손실되지 않도록, 큐를 모든 서버에 복사본을 저장하는 것이다.
- 미러링을 하는 이유는 만약 특정 큐를 가지고 있는 RabbitMQ 노드가 죽는다면, 이 노드에 있는 메시지는 노드가 살아나기 전까지 사용하지 못하게 된다. 이럴 때 미러링을 사용하여 해결할 수 있다.
- Queues는 다른 클러스터 노드에서 미러(추가 복제본, additional replicas)를 실행시킬 수 있다.
- 큐의 기본 복제본(the primary replica)은 "리더"로, 보조 복제본(secondary replicas)은 "미러"로 불린다.
- 각각의 미러링된 큐는 하나의 리더 복제본(replica)과 하나 혹은 그 이상의 미러들(replicas)로 구성된다. Each mirrored queue consists of one leader replica and one or more mirrors(replicas).
- 큐에 publish 된 메시지는 모든 미러에 복제된다. Consumer는 연결된 노드에 관계없이 리더에 연결되며, 리더에서 확인된 메시지는 미러에서 삭제된다.
- 큐 리더(Queue Leader)를 호스트하는 노드가 실패하면, 가장 오래된 동기화된 미러가 새 리더로 승격된다. 큐 미러링 매개변수에 따라 비동기화된 미러도 승격될 수 있다.
- HA 큐로 메시지가 발행되면 HA 큐를 담당하는 클러스터의 각 서버로 메시지 동기화되며, 메시지를 소비하면 다른 노드의 모든 메시지 복사본이 제거된다.
공식문서 링크