IoT 데이터 통신 프로토콜 : MQTT
회사에서 IoT 간의 데이터 통신 시스템을 구축할 필요성이 있다고 해 서버에 적용할 필요가 있었습니다. 현재 IoT 데이터는 MQTT라는 프로토콜을 통해 MQTT Broker(AWS IoT)와 연결되어 통신하고 있으며, 이번에 백엔드 서버까지 해당 브로커에 연결되어 IoT 기기와 통신 시스템 구축이 필요하다고 했고 이 파트를 제가 담당하게 되었습니다. MQTT에 대한 기본적인 이해가 잘 되어야 어떤식으로 환경 및 설정을 처리할지 윤곽이 잡히기 때문에 이번 기회에 한번도 경험하지 못한 MQTT 프로토콜이 뭔지 정리하면서, 백엔드 내부 Config 로직까지 정리해보려고 합니다.
※ MQTT란
- MQTT(Message Queue Telemetry Transport)는 기계 간 통신에 사용되는 표준 기반 메세지 프로토콜 중 하나
- M2M(Machine To Machine), IoT(Internet Of Things) 통신에 적합한 프로토콜이다.
- Publish / Subscribe 패턴을 사용하여 데이터를 주고 받는다.
※ MQTT는 왜 M2M, IoT 통신에 적합한가?
- M2M과 IoT 환경은 제한된 리소스와 네트워크 환경에서 동작한다.
- 대부분 비용 절감과 저전력 소비를 위해 고성능 CPU 대신 경량 프로세서를 많이 사용한다.
- 하드웨어 하나하나의 가격이 비싸기 때문에 저비용 설계를 통해 제조 단가를 낮춰 대량 배포에 유리하게 설계한다.
- 일부 IoT 네트워크는 낮은 대역폭과 저속 연결을 지원한다.
MQTT는 이러한 제한된 처리 능력, 작은 메모리 용량, 저전력 하드웨어 장치에서 동작하기 위해 설계된 "경량 프로토콜"이기 때문에 적합하다.
※ MQTT의 특징
- 경량 프로토콜이다. -> 최소한의 헤더 정보만을 포함해 데이터 통신 간 오버헤드 감소 (약 80 ~ 100kb 정도의 메모리 크기)
- 빠른 데이터 전송 -> 데이터가 빠르게 전송되고, 네트워크 대역폭 소비를 최소화
- 연결 유지 -> 데이터를 주고 받을 때만 연결하는 것이 아닌 지속적인 TCP 연결을 통해 데이터 송수신에 드는 비용을 절약
- 무선 통신이 가지는 제한된 상황에서 어느정도의 신뢰성을 보장한다. -> QoS(Quality Of Service) 3단계 지원
- 장치 간 복잡한 네트워크 연결 관리가 필요하지 않다 -> 중앙 브로커를 통해 모든 메세지 관리 (발행 - 구독 모델)
그렇다면, MQTT에 관여하는 주체들은 어떤게 있는지 간단히 설명드리도록 하겠습니다.
※ 통신에 관여하는 주체
- MQTT Client
- MQTT 프로토콜을 통해 데이터를 주고 받는 객체들
- Publisher & Subscriber로 나뉜다.
- MQTT Broker
- 서로 다른 클라이언트 간의 메세지를 조정하는 시스템 (서버)
- 메세지 수신 및 필터링, 각 메세지에 구독된 클라이언트 식별 및 메세지 전송 등의 로직 담당
- 클라이언트는 서로 연결하지 않고 브로커와만 연결한다.
- MQTT Client와 Broker는 모두 통신을 위해 TCP/IP 스택 필요
- 하이브엠큐, 엑티브엠큐, 모스키토, AWS IOT 등
즉, 통신에 관여하는 주체들은 주체들끼리 직접 소통하는 것이 아닌, MQTT Broker에 메세지를 송 / 수신합니다. 따라서 MQTT Broker와 연결을 계속해서 유지하고 있습니다. 그렇다면, 메세지 브로커는 무슨 의미일까요?
※ 메세지 브로커란
- 메세지 송신자와 수신자를 중개하는 미들웨어
- 메세지 형태의 통신에 주로 사용됨
- 중간에서 송신자와 수신자의 메세지를 관리하기 때문에 시스템 간의 결합도를 완화시킴
- 메세지 처리 관련 기능을 모아놓는 컴포넌트
- 주로 pub / sub 구조를 사용하여 메세지를 처리하고, 큐(Queue)나 토픽(Topic)을 사용하여 메시지를 관리한다.
그렇다면, 기본적인 MQTT의 모델인 발행 / 구독 모델에 대해서 설명드리도록 하겠습니다.
※ 발행 (Publish) / 구독(Subscribe) 모델이란
- 메세지를 수신하는 클라이언트(Subscriber)와 메세지를 송신하는 클라이언트(Publisher)를 분리하는 방식
- 중간 브로커 (Message Broker)가 존재하여 각각의 클라이언트는 이 브로커와만 통신함
- Publisher는 특정 "토픽"으로 데이터를 전송하고 Subscriber는 필요한 "토픽"만 구독하여 데이터 수신
- Publish는 클라이언트가 특정 Topic에 데이터를 보내는 행위이며, Subscribe는 특정 Topic을 구독하여 데이터를 수신하는 행위
그렇다면, publish와 subscribe 과정에서 계속해서 등장하는 "토픽"은 무엇일까요?
※ Topics란
- Broker가 연결된 각 클라이언트의 메세지를 필터링 하는데 사용하는 문자열 (메시지 전달을 위한 라우팅 경로)
- 토픽 단위로 메세지 발행 / 구독이 발생함
- 하나 이상의 topic 레벨로 구성되고, “/” 로 구분된다. (”/”으로 구분되는 계층 구조)
- 임의의 publisher가 특정 topic으로 메시지를 발행하면, 해당 topic을 구독 중인 subscriber는 메시지를 실시간으로 수신하여 처리
- Topic 와일드카드
- "+" -> topic에서 single level 문자열을 대체하는 문자
- "#" -> topic에서 multi level 문자열을 대체하는 문자
그렇다면 이러한 IoT의 제한된 상황에서 안정적으로 메세지를 송 / 수신 할 수 있도록 신뢰성을 보장할 수 있는 방법이 있을까?
※ QoS(Quality Of Service)
- MQTT는 서비스 품질 레벨에 따라서 데이터 송수신에 특정 수준의 성능을 보장해줍니다.
- 무선 통신이 가진 오류 상황에서 어느정도(3단계) 신뢰성을 보장해준다.
- Level 0 (At most once) : 메시지는 한 번만 전달된다. ( 발행자는 구독자의 수신 여부와 상관없이 메세지 전송 완료)
- Level 1 (At least once) : 메시지는 최소 한 번은 전달된다. (발행자는 브로커의 발행 응답을 받은 후 메세지 전송 완료 - 중복 전송 우려)
- Level 2 (Exactly once) : 메시지는 반드시 한 번 전달된다. (메세지의 핸드쉐이크 과정을 추적 - 고품질을 보장하지만 과부하 우려)
- QOS 선택 시 고려사항
- 신뢰성: 데이터가 반드시 전달되어야 한다면 높은 QoS(1 또는 2)를 사용
- 속도 및 네트워크 효율성: 속도가 중요하고 약간의 데이터 손실을 허용할 수 있다면 QoS 0을 선택
- 리소스 제약: 네트워크 대역폭이나 디바이스 리소스가 제한적이면 낮은 QoS가 적합
- Default QoS값은 Level 0이다.
그렇다면, MQTT 브로커에 퍼블리시 된 메세지는 어느 시점에 사라질까요?
※ MQTT 메세지 제거 시점
- MQTT 메세지 브로커의 메세지 삭제 시점은 크게 QoS, retain 플래그, 메세지 만료 시간 설정, 클라이언트 연결 설정 등으로 메세지 제거 시점을 결정할 수 있습니다.
1. QoS
- QoS 0 : 메세지가 브로커에서 구독자에게 전달되면 즉시 삭제된다.
- QoS 1 : 브로커는 구독자로부터 PUBACK(확인 응답)을 받을 때까지 메세지를 유지하고 삭제한다.
- QoS 2 : 브로커는 구독자로부터 PUBREC → PUBREL → PUBCOMP 3단계의 확인 절차가 완료될 때까지 메시지를 유지하고 삭제한다.
2. Retain 플래그
- Retain=true 시 브로커는 마지막으로 퍼블리시된 메세지를 해당 토픽에 저장하고 새로운 메세지가 같은 토픽에 퍼블리시 되면 이전 메세지를 삭제하고 대체한다.
- Reain=false 시 메세지는 구독자에게 전달되면 바로 삭제된다.
3. 메세지 만료시간 설정
- MQTT 5.0 이상부터 설정 가능
- 메세지에 만료 시간(Message Expiry Interval)을 설정해 만료 시간이 지나면 브로커는 메세지를 자동으로 삭제
4. 클라이언트 연결 상태에 따른 clean session
- clean session = true 설정 시, 클라이언트가 연결을 끊으면 브로커는 해당 클라이언트에 대한 모든 메세지와 세선 정보 삭제
- clean session = false 설정 시, 브로커는 클라이언트의 세션 정보를 유지하고, 오프라인 상태 동안 도착한 메세지를 저장함, 그 후 클라이언트가 다시 연결되면 저장된 메세지를 전송한 후 삭제한다.
이렇게 전반적인 MQTT의 특징을 설명드렸습니다. 그렇다면, 왜 MQTT는 경량 프로토콜 일 수 있을까요?
※ 왜 MQTT는 초경량 프로토콜인가?
- Fixed Header
- 고정 헤더로 모든 MQTT 패킷에 공통적으로 존재한다.
- 종류
- Control Header (1byte)
- Message type : MQTT 메세지 유형
- Flags : 패킷 타입에 따라 DUP, QoS, Retain 플래그 설정
- Remaining Length (1 ~ 4byte)
- 가변 헤더와 페이로드의 길이를 나타낸다.
- Control Header (1byte)
- Variable Header
- 특정 패킷 타입에만 존재한다.
- 패킷 타입에 따라 표현하는 값이 다르다.
- ex) CONNECT Type : 프로토콜 이름, 버전, 클라이언트 ID, 인증 정보 등
- ex) PUBLISH Type : Topic 이름, Packet Indentifier 등
- Payload
- 메세지 데이터가 포함되는 부분
이렇게 간단하게 MQTT 패킷 구조를 살펴봤습니다. 그렇다면, 위의 패킷 구조를 기반으로 왜 MQTT가 경량 프로토콜이 될 수 있을까요? 정답은,
고정 영역의 메세지 사이즈가 작다.
입니다. 고정 영역의 길이가 1~4 바이트로 구성되어 있어 다른 프로토콜에 비해 경량 프로토콜로 동작할 수 있습니다. 또한, MQTT는 저전력 및 낮은 대역폭 환경을 염두에 두고 설계된 프로토콜로, 기본적으로 최소한의 정보만 담도록 최적화되어 있습니다. 지속적으로 연결을 유지하기 때문에 연결 시 필요한 데이터를 매번 재전송하거나 저장할 필요가 없어 네트워크 및 리소스의 효율성을 더욱 높일 수 있습니다.
그렇다면, HTTP와 MQTT를 간단하게 비교해보도록 하겠습니다.
※ HTTP vs MQTT
특징 | HTTP | MQTT |
프로토콜 유형 | 요청/응답 (Request/Response) | 게시/구독 (Publish/Subscribe) |
전송 방식 | 클라이언트-서버 통신 | 브로커 기반 통신 |
기반 프로토콜 | TCP | TCP |
오버헤드 | 높음 (헤더 크기와 요청 구조가 큼) | 낮음 (경량 메시지 형식) |
데이터 전송 | 단방향, 요청이 있어야 응답 가능 | 양방향, 지속적인 연결 유지 |
실시간성 | 낮음 (짧은 지연 시간에 부적합) | 높음 (지속적 연결로 빠른 데이터 전송 가능) |
보안 | HTTPS(TLS) | TLS 지원 가능 |
QoS 지원 | 없음 | QoS (0, 1, 2) 제공 |
사용 사례 | IoT 디바이스와 서버 간 간헐적 데이터 전송 | IoT 디바이스 간 빈번하고 실시간 데이터 교환 |
- 실제로 HTTP도 간단한 IoT 디바이스 서비스 통신을 할때 사용하는 경우도 많습니다.
- 별도의 Message Broker 서버도 필요 없으며, 익숙하기 때문에 빠르게 적용할 수 있기 때문입니다.
그럼 둘의 가장 큰 차이 중 하나인, Message Broker가 왜 필요한걸까요?
※ 메세지 브로커 존재 이유
- 메세지 브로커가 필요한 이유는 크게 시스템의 확장성, 안정성, 유연성을 보장하면서 기기 간 복잡성을 줄이기 위해서입니다. 직접 장치와 장치를 연결하는 Point-to-Point 통신 방식에는 여러가지 한계가 존재하기 때문입니다.
1. 느슨한 결합
- Point-to-Point 방식의 문제점
- 장치들이 서로 직접 통신하려면, 모든 장치가 서로의 IP 주소, 포트 등을 알아야 합니다.
- N대의 장치가 통신할 때, 장치 수가 늘어날수록 연결의 복잡도는 기하급수적으로 증가합니다.
- 하나의 장치라도 다운되면, 해당 장치와 연결된 다른 장치들까지 문제를 발생할 수 있습니다.
- Message Broker 방식의 장점
- Message Broker가 있어 모든 장치는 브로커와만 연결되면 돼 장치 수가 늘어나도 복잡성이 많이 증가하지 않습니다.
- 발행자와 구독자는 서로를 몰라도 메세지를 주고 받을 수 있습니다.
- 새로운 장치나 서비스가 추가되어도 기존 시스템에 영향을 주지 않습니다.
2. 메세지 전달 보장
- 브로커는 QoS (Quality of Service)를 통해 메시지가 손실되지 않도록 합니다.
- 네트워크 불안정이나 장애 발생 시에도 브로커가 옵션을 통해 메시지를 저장(Retain)하고 나중에 전달할 수 있습니다.
3. 확장성
- 브로커는 수천~수백만 개의 장치와 동시에 통신할 수 있습니다.
- 직접 연결로는 이런 확장이 사실상 불가능합니다.
- 브로커는 트래픽을 균등하게 분산하여 시스템 과부하를 방지합니다.
4. 다양한 통신 패턴 지원
- 브로커는 다양한 통신 패턴을 지원합니다:
- 1:1 (Direct Messaging)
- 1:N (Broadcasting)
- N:N (Multicast)
- 이를 통해 특정 장치에만 메시지를 보내거나, 전체 네트워크에 알림을 보낼 수 있습니다.
MQTT 브로커는 단순한 "중간자"가 아니라,
효율적이고 안정적인 시스템 아키텍처를 유지하기 위한 핵심 요소
이렇게, 간단하게 MQTT 프로토콜에 관련해서 정리를 했습니다. 그렇다면, 지금부터는 실제 Springboot 코드에 적용할 수 있는 방법에 대해서 설명드리도록 하겠습니다.
※ 개발 환경
springboot 3.3.4
java 17
Kotlin 1.9.20
1. 의존성 추가
// mqtt
implementation("org.springframework.integration:spring-integration-mqtt:6.3.4")
- Spring Integration 프레임워크의 MQTT 프로토콜 지원을 추가하는 라이브러리
- 이 라이브러리를 사용하면 Spring 기반 애플리케이션에서 MQTT 클라이언트와 브로커 간의 통신을 쉽게 구성할 수 있다.
- 버전 호환성을 잘 확인해봐야한다. (Spring Boot 버전에 따라 호환되는 버전이 다름)
Spring Integration이란?
Spring Integration은 Spring Framework 기반의 엔터프라이즈 애플리케이션 통합(EAI, Enterprise Application Integration)을 간편하게 구현하기 위해 설계된 모듈
2. MqttConfig 클래스 생성
@Configuration
class MqttConfig(private val awsKeyManager: AwsKeyManager) {
@Value("\${mqtt.broker.url}")
lateinit var mqttBrokerUrl: String
@Value("\${mqtt.client-id}")
lateinit var clientId: String
@Bean
fun mqttClientFactory(): MqttPahoClientFactory {
val factory = DefaultMqttPahoClientFactory()
factory.connectionOptions = mqttConnectOptions()
return factory
}
private fun mqttConnectOptions(): MqttConnectOptions {
val options = MqttConnectOptions().apply {
serverURIs = arrayOf(mqttBrokerUrl)
...
}
return options
}
@Bean
fun mqttOutboundChannel(): MessageChannel = DirectChannel()
@Bean
@ServiceActivator(inputChannel = "mqttOutboundChannel")
fun mqttOutboundHandler(mqttClientFactory: MqttPahoClientFactory): MessageHandler {
val handler = MqttPahoMessageHandler(clientId + "publish", mqttClientFactory)
handler.setAsync(true)
return handler
}
@Bean
@ServiceActivator(inputChannel = "mqttInboundChannel")
fun handler(): MessageHandler = MessageHandler { message -> println(message.payload) }}
@Bean
fun mqttInboundAdapter(): MqttPahoMessageDrivenChannelAdapter {
val adapter = MqttPahoMessageDrivenChannelAdapter(
clientId + "subscribe",
mqttClientFactory(),
"ygwan/test",
)
adapter.setConverter(DefaultPahoMessageConverter())
adapter.setQos(1)
adapter.outputChannel = mqttInboundChannel()
return adapter
}
@Bean
fun mqttInboundChannel(): MessageChannel = DirectChannel()
@Bean
fun mqttoutboundChannel(): MessageChannel = DirectChannel()
}
@MessagingGateway(defaultRequestChannel = "mqttOutboundChannel")
interface MqttGateway {
fun sendToMqtt(@Header("mqtt_topic") topic: String, message: String)
}
MqttConfig.class 파일은 크게 3가지 부분으로 나뉠 수 있습니다.
1. 연결 관련 처리 함수
2. 메세지 Publishing을 위한 outbound 관련 처리 함수
3. 메세지 subScrIbe을 위한 inbound 관련 처리 함수
특징 | Outbound | Inbound |
의미 | 애플리케이션 → MQTT 브로커로 메시지 전송 역할 | MQTT 브로커 → 애플리케이션으로 메시지 수신 역할 |
핵심 컴포넌트 | mqttOutboundChannel, MqttPahoMessageHandler |
MqttPahoMessageDrivenChannelAdapter |
Spring 설정 | @ServiceActivator(inputChannel = "mqttOutboundChannel") | @Bean을 통해 채널 어댑터를 생성하고 특정 토픽 구독 설정 |
역할 | 특정 토픽에 메시지를 발행(publish) | 특정 토픽에 메시지를 구독(subscribe)하고 수신한 메시지를 처리 |
주요 메서드 | sendToMqtt(topic, message) | handleMessage() |
1. 연결 관련 처리 함수
※ MqttPahoClientFactory
- MQTT 클라이언트를 생성하는 데 필요한 MqttPahoClientFactory를 반환합니다.
- MQTT 통신을 위한 Paho 클라이언트 팩토리 클래스를 반환합니다.
- MQTT 브로커와의 연결을 관리하기 위해 사용됩니다.
PahoClient 란?
- Eclipse Foundation에서 제공하는 MQTT 프로토콜 구현 라이브러리
- Spring Integration MQTT 모듈은 Paho MQTT 클라이언트를 기반으로 동작함
※ MqttConnectOptions
- MQTT Broker 연결에 필요한 설정을 정의하고 반환합니다.
- 연결할 MQTT Broker URL, isCleanSession, connectionTimeout 등 여러 값들을 정의할 수 있습니다.
- https://eclipse.dev/paho/files/javadoc/org/eclipse/paho/client/mqttv3/MqttConnectOptions.html
2. 메세지 Publish을 위한 outbound 관련 처리 함수
※ mqttOutboundChannel
- Spring Integration에서 이 채널은 MQTT 메시지를 브로커로 전송(publish)하는 플로우의 시작 지점 역할을 수행합니다.
- 메시지를 특정 핸들러(mqttOutboundHandler)로 전달하여 최종적으로 MQTT 브로커에 발행(publish)합니다.
※ mqttOutboundHandler
- MQTT 브로커에 메시지를 발행(publish)하는 핸들러를 정의합니다.
- @ServiceActivator : 특정 채널의 메시지를 처리하는 메서드를 지정합니다.(mqttOutboundChannel 채널의 메시지를 처리)
- MqttPahoMessageHandler : 클라이언트 ID와 MqttPahoClientFactory를 사용하여 MQTT 브로커와 연결합니다.
- 메시지 송신 핸들러
※ @MessageGateway
- 메시지 게이트웨이는 애플리케이션과 Spring Integration 메시징 시스템 간의 인터페이스입니다.
- MQTT를 비롯한 다양한 통신 프로토콜에서 간단하고 추상화된 방식으로 메시지 송수신을 구현할 수 있습니다.
- 메시지 게이트웨이는 애플리케이션의 일반적인 Java 메서드 호출을 Spring Integration 메시지로 변환하거나, 반대로 Spring Integration 메시지를 일반적인 Java 메서드 응답으로 변환하는 역할을 합니다.
3. 메세지 Subscribe을 위한 inbound 관련 처리 함수
※ mqttInboundChannel
- Spring Integration에서 MQTT 브로커로부터 수신된 메시지를 전달받는 메시지 채널입니다.
- MQTT Inbound 어댑터가 수신한 메시지를 핸들러로 전달하여, 메시지의 페이로드를 처리하거나 출력하는 데 사용됩니다.
DirectChannel: 메시지를 동기적으로 전송하는 채널입니다. 송신자와 수신자가 한 번에 하나의 메시지를 처리하도록 설정합니다.
※ mqttInboundHandler
- MQTT 브로커에 메시지를 구독(subscribe)하는 핸들러를 정의합니다.
- @ServiceActivator : 특정 채널의 메시지를 처리하는 메서드를 지정합니다.(mqttInboundChannel 채널의 메시지를 처리)
- 메시지가 mqttInboundChannel에 도착하면, 이 핸들러가 트리거되어 메시지의 페이로드를 출력합니다. (출력 외 다른 추가적인 작업도 해당 메서드를 수정하면 처리 가능)
※ mqttInboundAdapter
- 지정된 MQTT 브로커에 연결하고, 특정 토픽을 구독하여 수신된 메시지를 mqttInboundChannel로 전달합니다.
아래는 이렇게 설정했을 때의 설정 구조입니다.
- 보시는 바와 같이 publish 전용 Client와 subscribe 전용 Client가 각각 존재합니다. (각각 다른 Client Id를 가짐)
그런데 같은 Client를 공유해서 쓰면 안되나?
라는 의문점이 들었습니다.
※ 그것에 관한 spring.io 공식 레포지토리의 질문 & 답 내용
- 정리하자면, 여러 제한 사항과 여러 유형을 처리할 수 있게 하기 위해서 구현
- 여러 클라이언트를 효율적으로 관리하기 위해선 ClientManager 도입 가능
그래서 저는, 서버 당 inbound & outbound 클라이언트를 분리할 필요성을 잘 못느껴 이를 하나의 공유된 클라이언트로 처리하는게 좋다고 생각해, ClientManager를 도입하기로 했습니다.
(사실 분리했을 때의 이점을 찾기가 힘들어 하나로 합치기로 했습니다. 하나로 합치면 MQTT Broker에 호율적으로 클라이언트를 연결 할 수 있고 하나의 서버 당 하나의 클라이언트가 붙기 때문에 이후에 작업 시 관리가 용이할 것 같다고 생각했기 때문입니다.)
설정 코드를 설명하기에 앞서, 앞에 말한 ClientManager가 뭔지 설명드리도록 하겠습니다.
※ ClientManager
- 여러 통합에 단일 MQTT ClientID가 필요한 경우 MQTT 브로커가 ClientID당 연결 수에 제한이 있을 수 있으므로(일반적으로 단일 연결이 허용됨) 여러 MQTT 클라이언트 인스턴스를 사용할 수 없을 때 주로 사용합니다.
- 다른 채널 어댑터에 단일 클라이언트를 재사용하려면 org.springframework.integration.mqtt.core.ClientManager구성 요소를 사용하여 필요한 모든 채널 어댑터에 전달할 수 있습니다.
- ClientManager는 MQTT 연결 수명 주기를 관리하고 필요한 경우 자동으로 다시 연결합니다.
- 다중 클라이언트나 연결 관리가 필요한 경우 이에 대한 작업을 처리해줍니다.
※ 예시 설정 코드
@Bean
fun clientManager(): ClientManager<IMqttAsyncClient, MqttConnectOptions> {
val connectionOptions = mqttConnectOptions()
val clientManager = Mqttv3ClientManager(connectionOptions, "mqttServerClient")
clientManager.setPersistence(MqttDefaultFilePersistence())
return clientManager
}
@Bean
@ServiceActivator(inputChannel = "outboundMessageChannel")
fun mqttOutboundHandler(): MessageHandler {
val handler = MqttPahoMessageHandler(clientManager())
handler.setAsync(true)
return handler
}
@Bean
fun mqttInboundAdapter(): MqttPahoMessageDrivenChannelAdapter {
val adapter = MqttPahoMessageDrivenChannelAdapter(
clientManager(),
"test/topic",
)
adapter.setConverter(DefaultPahoMessageConverter())
adapter.outputChannel = inboundMessageChannel()
return adapter
}
- 이전 설정 코드랑 전반적으로 비슷하지만, 큰 다른 점은 ClientManager을 통해 inboundAdapter와 outboundHandler를 정의했다 정도인 것 같습니다.
아래는 이렇게 설정했을 때의 설정 구조입니다.
이때 inbound message channel과 outboud message channel을 분리해야 하는 이유는,
각각의 채널의 목적이 다르기 때문입니다.
실제로 저도 이 메세지 채널을 같게 해서 처리를 해봣지만, INFO 에러가 발생하는 것을 확인할 수 있었습니다.
이에 대한 설명을 아래 링크에 첨부하도록 하겠습니다.
- https://docs.spring.io/spring-integration/reference/overview.html#overview-components-channel
- https://github.com/spring-projects/spring-integration/discussions/9686
※ 정리
이렇게하면, 전반적인 환경 설정이 완료됩니다. 아래 동영상은 , MQTT Broker 중 하나인 AWS IoT에 서버를 연결하고 publish & subscribe를 진행하는 테스트에 관한 동영상입니다.
- Publish는 MQTT 브로커에서 특정 토픽을 구독하고 있는 기기가 있을 때, 서버에서 해당 토픽으로 publish 하면, 이를 구독하고 있는 기기에 이 메세지가 출력되는 과정을 보여주는 플로우 입니다.
- Subscribe는 서버가 특정 토픽을 구독하고 있을 때, 해당 브로커에서 해당하는 토픽이 publish 됐을 때, 서버에서 이 메세지를 가져와 출력하는 과정을 보여주는 플로우입니다.
AWS IoT와 서버를 연결하는 방법은 다음번에 정리해보도록 하겠습니다.
※ publish test
※ subscribe test