카테고리 없음

IoT 데이터 통신 프로토콜 : MQTT

YGwan 2024. 11. 26. 11:32

 회사에서 IoT 간의 데이터 통신 시스템을 구축할 필요성이 있다고 해 서버에 적용할 필요가 있었습니다. 현재 IoT 데이터는 MQTT라는 프로토콜을 통해 MQTT Broker(AWS IoT)와 연결되어 통신하고 있으며, 이번에 백엔드 서버까지 해당 브로커에 연결되어 IoT 기기와 통신 시스템 구축이 필요하다고 했고 이 파트를 제가 담당하게 되었습니다. MQTT에 대한 기본적인 이해가 잘 되어야 어떤식으로 환경 및 설정을 처리할지 윤곽이 잡히기 때문에 이번 기회에 한번도 경험하지 못한 MQTT 프로토콜이 뭔지 정리하면서, 백엔드 내부 Config 로직까지 정리해보려고 합니다.

 

※  MQTT란

  • MQTT(Message Queue Telemetry Transport)는 기계 간 통신에 사용되는 표준 기반 메세지 프로토콜 중 하나
  • M2M(Machine To Machine), IoT(Internet Of Things) 통신에 적합한 프로토콜이다.
  • Publish / Subscribe 패턴을 사용하여 데이터를 주고 받는다.

 

※  MQTT는 왜 M2M, IoT 통신에 적합한가?

  • M2M과 IoT 환경은 제한된 리소스와 네트워크 환경에서 동작한다.
    • 대부분 비용 절감과 저전력 소비를 위해 고성능 CPU 대신 경량 프로세서를 많이 사용한다.
    • 하드웨어 하나하나의 가격이 비싸기 때문에 저비용 설계를 통해 제조 단가를 낮춰 대량 배포에 유리하게 설계한다.
    • 일부 IoT 네트워크는 낮은 대역폭과 저속 연결을 지원한다.
MQTT는 이러한 제한된 처리 능력, 작은 메모리 용량, 저전력 하드웨어 장치에서 동작하기 위해 설계된 "경량 프로토콜"이기 때문에 적합하다.

 


※  MQTT의 특징

  • 경량 프로토콜이다. -> 최소한의 헤더 정보만을 포함해 데이터 통신 간 오버헤드 감소 (약 80 ~ 100kb 정도의 메모리 크기)
  • 빠른 데이터 전송 -> 데이터가 빠르게 전송되고, 네트워크 대역폭 소비를 최소화
  • 연결 유지 -> 데이터를 주고 받을 때만 연결하는 것이 아닌 지속적인 TCP 연결을 통해 데이터 송수신에 드는 비용을 절약
  • 무선 통신이 가지는 제한된 상황에서 어느정도의 신뢰성을 보장한다. -> QoS(Quality Of Service) 3단계 지원
  • 장치 간 복잡한 네트워크 연결 관리가 필요하지 않다 -> 중앙 브로커를 통해 모든 메세지 관리 (발행 - 구독 모델)

 

그렇다면, MQTT에 관여하는 주체들은 어떤게 있는지 간단히 설명드리도록 하겠습니다.

 

※  통신에 관여하는 주체

  • MQTT Client
    • MQTT 프로토콜을 통해 데이터를 주고 받는 객체들
    • Publisher & Subscriber로 나뉜다.
  • MQTT Broker
    • 서로 다른 클라이언트 간의 메세지를 조정하는 시스템 (서버)
    • 메세지 수신 및 필터링, 각 메세지에 구독된 클라이언트 식별 및 메세지 전송 등의 로직 담당
    • 클라이언트는 서로 연결하지 않고 브로커와만 연결한다.
    • MQTT Client와 Broker는 모두 통신을 위해 TCP/IP 스택 필요
    • 하이브엠큐, 엑티브엠큐, 모스키토, AWS IOT 등
즉, 통신에 관여하는 주체들은 주체들끼리 직접 소통하는 것이 아닌, MQTT Broker에 메세지를 송 / 수신합니다. 따라서 MQTT Broker와 연결을 계속해서 유지하고 있습니다. 그렇다면, 메세지 브로커는 무슨 의미일까요?

 

※  메세지 브로커란

  • 메세지 송신자와 수신자를 중개하는 미들웨어
  • 메세지 형태의 통신에 주로 사용됨
  • 중간에서 송신자와 수신자의 메세지를 관리하기 때문에 시스템 간의 결합도를 완화시킴
  • 메세지 처리 관련 기능을 모아놓는 컴포넌트
  • 주로 pub / sub 구조를 사용하여 메세지를 처리하고, 큐(Queue)나 토픽(Topic)을 사용하여 메시지를 관리한다.

 

그렇다면, 기본적인 MQTT의 모델인 발행 / 구독 모델에 대해서 설명드리도록 하겠습니다.

 

※  발행 (Publish) / 구독(Subscribe) 모델이란

  • 메세지를 수신하는 클라이언트(Subscriber)와 메세지를 송신하는 클라이언트(Publisher)를 분리하는 방식
  • 중간 브로커 (Message Broker)가 존재하여 각각의 클라이언트는 이 브로커와만 통신함
  • Publisher는 특정 "토픽"으로 데이터를 전송하고 Subscriber는 필요한 "토픽"만 구독하여 데이터 수신
  • Publish는 클라이언트가 특정 Topic에 데이터를 보내는 행위이며, Subscribe는 특정 Topic을 구독하여 데이터를 수신하는 행위

 

그렇다면, publish와 subscribe 과정에서 계속해서 등장하는 "토픽"은 무엇일까요?

 

※  Topics란

  • Broker가 연결된 각 클라이언트의 메세지를 필터링 하는데 사용하는 문자열 (메시지 전달을 위한 라우팅 경로)
  • 토픽 단위로 메세지 발행 / 구독이 발생함
  • 하나 이상의 topic 레벨로 구성되고, “/” 로 구분된다. (”/”으로 구분되는 계층 구조)
  • 임의의 publisher가 특정 topic으로 메시지를 발행하면, 해당 topic을 구독 중인 subscriber는 메시지를 실시간으로 수신하여 처리
  • Topic 와일드카드
    • "+" -> topic에서 single level 문자열을 대체하는 문자
    • "#" -> topic에서 multi level 문자열을 대체하는 문자

 

그렇다면 이러한 IoT의 제한된 상황에서 안정적으로 메세지를 송 / 수신 할 수 있도록 신뢰성을 보장할 수 있는 방법이 있을까?

 

 

※  QoS(Quality Of Service)

  • MQTT는 서비스 품질 레벨에 따라서 데이터 송수신에 특정 수준의 성능을 보장해줍니다.
  • 무선 통신이 가진 오류 상황에서 어느정도(3단계) 신뢰성을 보장해준다.
    • Level 0 (At most once) : 메시지는 한 번만 전달된다. ( 발행자는 구독자의 수신 여부와 상관없이 메세지 전송 완료)
    • Level 1 (At least once) : 메시지는 최소 한 번은 전달된다. (발행자는 브로커의 발행 응답을 받은 후 메세지 전송 완료 - 중복 전송 우려)
    • Level 2 (Exactly once) : 메시지는 반드시 한 번 전달된다. (메세지의 핸드쉐이크 과정을 추적 - 고품질을 보장하지만 과부하 우려)
  • QOS 선택 시 고려사항
    • 신뢰성: 데이터가 반드시 전달되어야 한다면 높은 QoS(1 또는 2)를 사용
    • 속도 및 네트워크 효율성: 속도가 중요하고 약간의 데이터 손실을 허용할 수 있다면 QoS 0을 선택
    • 리소스 제약: 네트워크 대역폭이나 디바이스 리소스가 제한적이면 낮은 QoS가 적합
  • Default QoS값은 Level 0이다.

 

그렇다면, MQTT 브로커에 퍼블리시 된 메세지는 어느 시점에 사라질까요?

 

※  MQTT 메세지 제거 시점

  • MQTT 메세지 브로커의 메세지 삭제 시점은 크게 QoS, retain 플래그, 메세지 만료 시간 설정, 클라이언트 연결 설정 등으로 메세지 제거 시점을 결정할 수 있습니다.

 

1. QoS

  • QoS 0 : 메세지가 브로커에서 구독자에게 전달되면 즉시 삭제된다.
  • QoS 1 : 브로커는 구독자로부터 PUBACK(확인 응답)을 받을 때까지 메세지를 유지하고 삭제한다.
  • QoS 2 : 브로커는 구독자로부터 PUBREC → PUBREL → PUBCOMP 3단계의 확인 절차가 완료될 때까지 메시지를 유지하고 삭제한다.

 

2. Retain 플래그

  • Retain=true 시 브로커는 마지막으로 퍼블리시된 메세지를 해당 토픽에 저장하고 새로운 메세지가 같은 토픽에 퍼블리시 되면 이전 메세지를 삭제하고 대체한다.
  • Reain=false 시 메세지는 구독자에게 전달되면 바로 삭제된다.

 

3. 메세지 만료시간 설정

  • MQTT 5.0 이상부터 설정 가능
  • 메세지에 만료 시간(Message Expiry Interval)을 설정해 만료 시간이 지나면 브로커는 메세지를 자동으로 삭제

 

4. 클라이언트 연결 상태에 따른 clean session

  • clean session = true 설정 시, 클라이언트가 연결을 끊으면 브로커는 해당 클라이언트에 대한 모든 메세지와 세선 정보 삭제
  • clean session = false 설정 시,  브로커는 클라이언트의 세션 정보를 유지하고, 오프라인 상태 동안 도착한 메세지를 저장함, 그 후 클라이언트가 다시 연결되면 저장된 메세지를 전송한 후 삭제한다.

 

이렇게 전반적인 MQTT의 특징을 설명드렸습니다. 그렇다면, 왜 MQTT는 경량 프로토콜 일 수 있을까요?

 


※  왜 MQTT는 초경량 프로토콜인가?

 

MQTT 패킷 구조

  • Fixed Header
    • 고정 헤더로 모든 MQTT 패킷에 공통적으로 존재한다.
    • 종류
      • Control Header (1byte)
        • Message type : MQTT 메세지 유형
        • Flags : 패킷 타입에 따라 DUP, QoS, Retain 플래그 설정
      • Remaining Length (1 ~ 4byte)
        • 가변 헤더와 페이로드의 길이를 나타낸다.

 

  • Variable Header
    • 특정 패킷 타입에만 존재한다.
    • 패킷 타입에 따라 표현하는 값이 다르다.
      • ex) CONNECT Type : 프로토콜 이름, 버전, 클라이언트 ID, 인증 정보 등
      • ex) PUBLISH Type :  Topic 이름, Packet Indentifier 등

 

  •  Payload
    • 메세지 데이터가 포함되는 부분

 

 이렇게 간단하게 MQTT 패킷 구조를 살펴봤습니다. 그렇다면, 위의 패킷 구조를 기반으로 왜 MQTT가 경량 프로토콜이 될 수 있을까요? 정답은,

 

고정 영역의 메세지 사이즈가 작다.

 

 입니다. 고정 영역의 길이가 1~4 바이트로 구성되어 있어 다른 프로토콜에 비해 경량 프로토콜로 동작할 수 있습니다. 또한, MQTT는 저전력 및 낮은 대역폭 환경을 염두에 두고 설계된 프로토콜로, 기본적으로 최소한의 정보만 담도록 최적화되어 있습니다. 지속적으로 연결을 유지하기 때문에 연결 시 필요한 데이터를 매번 재전송하거나 저장할 필요가 없어 네트워크 및 리소스의 효율성을 더욱 높일 수 있습니다.

 

그렇다면, HTTP와 MQTT를 간단하게 비교해보도록 하겠습니다.

 

※  HTTP vs MQTT 

특징 HTTP MQTT
프로토콜 유형 요청/응답 (Request/Response) 게시/구독 (Publish/Subscribe)
전송 방식 클라이언트-서버 통신 브로커 기반 통신
기반 프로토콜 TCP TCP
오버헤드 높음 (헤더 크기와 요청 구조가 큼) 낮음 (경량 메시지 형식)
데이터 전송 단방향, 요청이 있어야 응답 가능 양방향, 지속적인 연결 유지
실시간성 낮음 (짧은 지연 시간에 부적합) 높음 (지속적 연결로 빠른 데이터 전송 가능)
보안 HTTPS(TLS) TLS 지원 가능
QoS 지원 없음 QoS (0, 1, 2) 제공
사용 사례 IoT 디바이스와 서버 간 간헐적 데이터 전송 IoT 디바이스 간 빈번하고 실시간 데이터 교환
  • 실제로 HTTP도 간단한 IoT 디바이스 서비스 통신을 할때 사용하는 경우도 많습니다.
  • 별도의 Message Broker 서버도 필요 없으며, 익숙하기 때문에 빠르게 적용할 수 있기 때문입니다.

 

그럼 둘의 가장 큰 차이 중 하나인, Message Broker가 왜 필요한걸까요?

 


※  메세지 브로커 존재 이유

  • 메세지 브로커가 필요한 이유는 크게 시스템의 확장성, 안정성, 유연성을 보장하면서 기기 간 복잡성을 줄이기 위해서입니다. 직접 장치와 장치를 연결하는 Point-to-Point 통신 방식에는 여러가지 한계가 존재하기 때문입니다.

 

1. 느슨한 결합

  • Point-to-Point 방식의 문제점
    • 장치들이 서로 직접 통신하려면, 모든 장치가 서로의 IP 주소, 포트 등을 알아야 합니다.
    • N대의 장치가 통신할 때, 장치 수가 늘어날수록 연결의 복잡도는 기하급수적으로 증가합니다.
    • 하나의 장치라도 다운되면, 해당 장치와 연결된 다른 장치들까지 문제를 발생할 수 있습니다.
  • Message Broker 방식의 장점
    • Message Broker가 있어 모든 장치는 브로커와만 연결되면 돼 장치 수가 늘어나도 복잡성이 많이 증가하지 않습니다.
    • 발행자와 구독자는 서로를 몰라도 메세지를 주고 받을 수 있습니다.
    • 새로운 장치나 서비스가 추가되어도 기존 시스템에 영향을 주지 않습니다.

 

2. 메세지 전달 보장

  • 브로커는 QoS (Quality of Service)를 통해 메시지가 손실되지 않도록 합니다.
  • 네트워크 불안정이나 장애 발생 시에도 브로커가 옵션을 통해 메시지를 저장(Retain)하고 나중에 전달할 수 있습니다.

 

3. 확장성

  • 브로커는 수천~수백만 개의 장치와 동시에 통신할 수 있습니다.
  • 직접 연결로는 이런 확장이 사실상 불가능합니다.
  • 브로커는 트래픽을 균등하게 분산하여 시스템 과부하를 방지합니다.

 

4. 다양한 통신 패턴 지원

  • 브로커는 다양한 통신 패턴을 지원합니다:
    • 1:1 (Direct Messaging)
    • 1:N (Broadcasting)
    • N:N (Multicast)
  • 이를 통해 특정 장치에만 메시지를 보내거나, 전체 네트워크에 알림을 보낼 수 있습니다.

 

MQTT 브로커는 단순한 "중간자"가 아니라,
효율적이고 안정적인 시스템 아키텍처를 유지하기 위한 핵심 요소
이렇게, 간단하게 MQTT 프로토콜에 관련해서 정리를 했습니다. 그렇다면, 지금부터는 실제 Springboot 코드에 적용할 수 있는 방법에 대해서 설명드리도록 하겠습니다.

 


※  개발 환경

springboot 3.3.4
java 17
Kotlin 1.9.20

 

1. 의존성 추가

// mqtt
implementation("org.springframework.integration:spring-integration-mqtt:6.3.4")
  • Spring Integration 프레임워크의 MQTT 프로토콜 지원을 추가하는 라이브러리
  • 이 라이브러리를 사용하면 Spring 기반 애플리케이션에서 MQTT 클라이언트와 브로커 간의 통신을 쉽게 구성할 수 있다.
  • 버전 호환성을 잘 확인해봐야한다. (Spring Boot 버전에 따라 호환되는 버전이 다름)

 

Spring Integration이란?

Spring Integration은 Spring Framework 기반의 엔터프라이즈 애플리케이션 통합(EAI, Enterprise Application Integration)을 간편하게 구현하기 위해 설계된 모듈

 

2. MqttConfig 클래스 생성

@Configuration
class MqttConfig(private val awsKeyManager: AwsKeyManager) {

    @Value("\${mqtt.broker.url}")
    lateinit var mqttBrokerUrl: String

    @Value("\${mqtt.client-id}")
    lateinit var clientId: String

    @Bean
    fun mqttClientFactory(): MqttPahoClientFactory {
        val factory = DefaultMqttPahoClientFactory()
        factory.connectionOptions = mqttConnectOptions()
        return factory
    }

    private fun mqttConnectOptions(): MqttConnectOptions {
        val options = MqttConnectOptions().apply {
            serverURIs = arrayOf(mqttBrokerUrl)
            ...
        }
        return options
    }

    @Bean
    fun mqttOutboundChannel(): MessageChannel = DirectChannel()

    @Bean
    @ServiceActivator(inputChannel = "mqttOutboundChannel")
    fun mqttOutboundHandler(mqttClientFactory: MqttPahoClientFactory): MessageHandler {
        val handler = MqttPahoMessageHandler(clientId + "publish", mqttClientFactory)
        handler.setAsync(true)
        return handler
    }
    
    @Bean
    @ServiceActivator(inputChannel = "mqttInboundChannel")
    fun handler(): MessageHandler = MessageHandler { message -> println(message.payload) }}

    @Bean
    fun mqttInboundAdapter(): MqttPahoMessageDrivenChannelAdapter {
        val adapter = MqttPahoMessageDrivenChannelAdapter(
            clientId + "subscribe",
            mqttClientFactory(),
            "ygwan/test",
        )

        adapter.setConverter(DefaultPahoMessageConverter())
        adapter.setQos(1)
        adapter.outputChannel = mqttInboundChannel()
        return adapter
    }

    
    @Bean
    fun mqttInboundChannel(): MessageChannel = DirectChannel()
    
    @Bean
    fun mqttoutboundChannel(): MessageChannel = DirectChannel()
}

@MessagingGateway(defaultRequestChannel = "mqttOutboundChannel")
interface MqttGateway {
    fun sendToMqtt(@Header("mqtt_topic") topic: String, message: String)
}

 

MqttConfig.class 파일은 크게 3가지 부분으로 나뉠 수 있습니다.

  1. 연결 관련 처리 함수

  2. 메세지 Publishing을 위한 outbound 관련 처리 함수

  3. 메세지 subScrIbe을 위한 inbound 관련 처리 함수

 

특징 Outbound Inbound
의미 애플리케이션 → MQTT 브로커로 메시지 전송 역할 MQTT 브로커 → 애플리케이션으로 메시지 수신 역할
핵심 컴포넌트 mqttOutboundChannel,
MqttPahoMessageHandler
MqttPahoMessageDrivenChannelAdapter
Spring 설정 @ServiceActivator(inputChannel = "mqttOutboundChannel") @Bean을 통해 채널 어댑터를 생성하고 특정 토픽 구독 설정
역할 특정 토픽에 메시지를 발행(publish) 특정 토픽에 메시지를 구독(subscribe)하고 수신한 메시지를 처리
주요 메서드 sendToMqtt(topic, message) handleMessage()

 


1.  연결 관련 처리 함수

※  MqttPahoClientFactory

  • MQTT 클라이언트를 생성하는 데 필요한 MqttPahoClientFactory를 반환합니다.
  • MQTT 통신을 위한 Paho 클라이언트 팩토리 클래스를 반환합니다.
  • MQTT 브로커와의 연결을 관리하기 위해 사용됩니다.
PahoClient 란?
- Eclipse Foundation에서 제공하는 MQTT 프로토콜 구현 라이브러리
- Spring Integration MQTT 모듈은 Paho MQTT 클라이언트를 기반으로 동작함

 

※  MqttConnectOptions

spring-integration-mqtt:6.3.4 MqttConnectOptions 기본 설정값

 

 

2.  메세지 Publish을 위한 outbound 관련 처리 함수

※  mqttOutboundChannel

  • Spring Integration에서 이 채널은 MQTT 메시지를 브로커로 전송(publish)하는 플로우의 시작 지점 역할을 수행합니다.
  • 메시지를 특정 핸들러(mqttOutboundHandler)로 전달하여 최종적으로 MQTT 브로커에 발행(publish)합니다.

 

※  mqttOutboundHandler

  • MQTT 브로커에 메시지를 발행(publish)하는 핸들러를 정의합니다.
  • @ServiceActivator : 특정 채널의 메시지를 처리하는 메서드를 지정합니다.(mqttOutboundChannel 채널의 메시지를 처리)
  • MqttPahoMessageHandler : 클라이언트 ID와 MqttPahoClientFactory를 사용하여 MQTT 브로커와 연결합니다.
  • 메시지 송신 핸들러

 

※  @MessageGateway

  • 메시지 게이트웨이는 애플리케이션과 Spring Integration 메시징 시스템 간의 인터페이스입니다.
  • MQTT를 비롯한 다양한 통신 프로토콜에서 간단하고 추상화된 방식으로 메시지 송수신을 구현할 수 있습니다.
  • 메시지 게이트웨이는 애플리케이션의 일반적인 Java 메서드 호출을 Spring Integration 메시지로 변환하거나, 반대로 Spring Integration 메시지를 일반적인 Java 메서드 응답으로 변환하는 역할을 합니다.

 

3.  메세지 Subscribe을 위한 inbound 관련 처리 함수

※  mqttInboundChannel

  • Spring Integration에서 MQTT 브로커로부터 수신된 메시지를 전달받는 메시지 채널입니다.
  • MQTT Inbound 어댑터가 수신한 메시지를 핸들러로 전달하여, 메시지의 페이로드를 처리하거나 출력하는 데 사용됩니다.
DirectChannel: 메시지를 동기적으로 전송하는 채널입니다. 송신자와 수신자가 한 번에 하나의 메시지를 처리하도록 설정합니다.

 

※  mqttInboundHandler

  • MQTT 브로커에 메시지를 구독(subscribe)하는 핸들러를 정의합니다.
  • @ServiceActivator : 특정 채널의 메시지를 처리하는 메서드를 지정합니다.(mqttInboundChannel 채널의 메시지를 처리)
  • 메시지가 mqttInboundChannel에 도착하면, 이 핸들러가 트리거되어 메시지의 페이로드를 출력합니다. (출력 외 다른 추가적인 작업도 해당 메서드를 수정하면 처리 가능)

 

※  mqttInboundAdapter

  • 지정된 MQTT 브로커에 연결하고, 특정 토픽을 구독하여 수신된 메시지를 mqttInboundChannel로 전달합니다.

 

아래는 이렇게 설정했을 때의 설정 구조입니다.

  • 보시는 바와 같이 publish 전용 Client와 subscribe 전용 Client가 각각 존재합니다. (각각 다른 Client Id를 가짐)

 

 

그런데 같은 Client를 공유해서 쓰면 안되나?

라는 의문점이 들었습니다.

 

※  그것에 관한 spring.io 공식 레포지토리의 질문 & 답 내용

  • 정리하자면, 여러 제한 사항과 여러 유형을 처리할 수 있게 하기 위해서 구현
  • 여러 클라이언트를 효율적으로 관리하기 위해선 ClientManager 도입 가능

 

그래서 저는, 서버 당 inbound & outbound 클라이언트를 분리할 필요성을 잘 못느껴 이를 하나의 공유된 클라이언트로 처리하는게 좋다고 생각해, ClientManager를 도입하기로 했습니다.
(사실 분리했을 때의 이점을 찾기가 힘들어 하나로 합치기로 했습니다. 하나로 합치면 MQTT Broker에 호율적으로 클라이언트를 연결 할 수 있고 하나의 서버 당 하나의 클라이언트가 붙기 때문에 이후에 작업 시 관리가 용이할 것 같다고 생각했기 때문입니다.)

 

설정 코드를 설명하기에 앞서, 앞에 말한 ClientManager가 뭔지 설명드리도록 하겠습니다.

 

※  ClientManager

  • 여러 통합에 단일 MQTT ClientID가 필요한 경우 MQTT 브로커가 ClientID당 연결 수에 제한이 있을 수 있으므로(일반적으로 단일 연결이 허용됨) 여러 MQTT 클라이언트 인스턴스를 사용할 수 없을 때 주로 사용합니다.
  • 다른 채널 어댑터에 단일 클라이언트를 재사용하려면 org.springframework.integration.mqtt.core.ClientManager구성 요소를 사용하여 필요한 모든 채널 어댑터에 전달할 수 있습니다.
  • ClientManager는 MQTT 연결 수명 주기를 관리하고 필요한 경우 자동으로 다시 연결합니다.
  • 다중 클라이언트나 연결 관리가 필요한 경우 이에 대한 작업을 처리해줍니다.

 

※ 예시 설정 코드

@Bean
fun clientManager(): ClientManager<IMqttAsyncClient, MqttConnectOptions> {
    val connectionOptions = mqttConnectOptions()

    val clientManager = Mqttv3ClientManager(connectionOptions, "mqttServerClient")
    clientManager.setPersistence(MqttDefaultFilePersistence())

    return clientManager
}

@Bean
@ServiceActivator(inputChannel = "outboundMessageChannel")
fun mqttOutboundHandler(): MessageHandler {
    val handler = MqttPahoMessageHandler(clientManager())
    handler.setAsync(true)
    return handler
}


@Bean
fun mqttInboundAdapter(): MqttPahoMessageDrivenChannelAdapter {
  val adapter = MqttPahoMessageDrivenChannelAdapter(
      clientManager(),
      "test/topic",
  )

  adapter.setConverter(DefaultPahoMessageConverter())
  adapter.outputChannel = inboundMessageChannel()
  return adapter
}
  • 이전 설정 코드랑 전반적으로 비슷하지만, 큰 다른 점은 ClientManager을 통해 inboundAdapter와 outboundHandler를 정의했다 정도인 것 같습니다.

 

아래는 이렇게 설정했을 때의 설정 구조입니다.

 

이때 inbound message channel과 outboud message channel을 분리해야 하는 이유는, 
각각의 채널의 목적이 다르기 때문입니다.
실제로 저도 이 메세지 채널을 같게 해서 처리를 해봣지만, INFO 에러가 발생하는 것을 확인할 수 있었습니다.
이에 대한 설명을 아래 링크에 첨부하도록 하겠습니다.

 


※  정리

 이렇게하면, 전반적인 환경 설정이 완료됩니다. 아래 동영상은 , MQTT Broker 중 하나인 AWS IoT에 서버를 연결하고 publish & subscribe를 진행하는 테스트에 관한 동영상입니다.

  • Publish는 MQTT 브로커에서 특정 토픽을 구독하고 있는 기기가 있을 때, 서버에서 해당 토픽으로 publish 하면, 이를 구독하고 있는 기기에 이 메세지가 출력되는 과정을 보여주는 플로우 입니다.
  • Subscribe는 서버가 특정 토픽을 구독하고 있을 때, 해당 브로커에서 해당하는 토픽이 publish 됐을 때, 서버에서 이 메세지를 가져와 출력하는 과정을 보여주는 플로우입니다. 

AWS IoT와 서버를 연결하는 방법은 다음번에 정리해보도록 하겠습니다.

 

※  publish test

 

 

※  subscribe test

 


※  출처