MQTT 기반 IoT 제어에서 비동기 요청을 동기적으로 처리하는 방법
기본적으로 서버에서 IoT 기기들과 통신할 때, MQTT 프로토콜을 사용해서 통신합니다. 이 때 서버와 IoT 기기들끼리 1 : 1 통신을 하는 것이 아닌 중간 중개자인 MQTT Broker에 PUB / SUB 방식으로 통신을 진행합니다. 예를 들어, 서버가 특정 IoT 기기에게 메세지(신호)를 보내고 싶다면, IoT 기기가 구독하고 있는 토픽에 보낼 메세지를 추가하여 MQTT Broker에 Publish 합니다. 그러면, 해당 토픽을 구독하고 있는 IoT 기기가 해당 토픽을 Subscribe 한 후 메세지를 받습니다. 기본적으로 이러한 형식으로 서로 통신합니다.
결국, MQTT Pub-Sub 기반 IoT 제어 및 응답 흐름은,
- 서버는 IoT 기기가 구독하는 특정 토픽에 요청 메시지를 담아 MQTT Broker에 발행(Publish)한다.
- IoT 기기는 MQTT Broker에서 구독(Subscribe)한 메시지를 수신하여 처리한다.
- 처리 결과는 서버가 구독하는 특정 토픽에 메시지로 담아 MQTT Broker에 발행한다.
- 서버는 MQTT Broker에서 해당 메시지를 수신하고 처리한다.
MQTT 자체가 기본적으로 비동기 메세징 방식을 사용하기 때문에 이러한 과정은 기본적으로 비동기 방식으로 처리됩니다. 그래서인지 제가 기아 차 제어 어플인 Kia Connect를 사용하고 있는데, 차량 제어 (시동 끄기, 문 잠그기 등)을 하려고 하면 일단은 "차량 제어 명령이 전달됐습니다. " 라는 응답이 먼저 나오고, 제어가 잘 처리되면 완료되었다는 푸시 알림이 발생합니다. (IoT 제어 & 응답이 비동기로 처리됨)
저희도 IoT 기기와 통신할 때 기본적으로는 비동기 방식으로 처리됩니다. 하지만, 특정 명령의 경우 사용자가 제어 요청을 보냈을 때 동기적으로 요청이 완료되었다는 응답까지 한번에 보내야 할 니즈가 있어 이를 처리해야 했습니다. 그래서 이러한 처리를 어떻게 했으며, 어떤 고민을 했는지 정리하려고 합니다.
고민 1 : 비동기를 어떻게 동기적으로 처리할까?
기본적으로 MQTT는 비동기적으로 처리됩니다. 최종적인 결과까지 IoT기기에 받아 응답을 보내려면 이러한 과정을 하나로 묶어 동기 방식으로 처리해야 됩니다. 비동기적인 처리를 단순히 동기적으로 한다고 해도(Future, CountDownLatch 등) 사용자 요청에 대한 응답은, MQTT Broker로 메세지 발행까지이기 때문에 결과까지를 하나로 묶기 위해선 추가적인 방법이 필요합니다.
이러한 방법을 고민하던 끝에 제가 내린 결론은 "표시"를 하는 것이었습니다. 사용자가 명령을 입력하면, 이를 MQTT에 발행하기 전에 먼저 "이러한 명령이 들어왔어"라는 상태를 표시합니다. 이후 IoT가 명령을 처리하고 결과를 발행하면, 서버가 이를 수신하여 핸들러에서 이전에 표시한 곳을 업데이트해 "최종적으로 명령의 결과가 들어왔어"라고 사용자에게 알리는 방식으로 구현했습니다.
이 "표시"는 DB를 통해 구현했습니다. 사용자가 명령을 입력하면 해당 명령을 DB에 저장하고, 서버는 별도의 스레드에서 DB를 폴링하여 값의 변경 여부를 확인합니다. 이후 IoT 명령어 처리 결과가 서버로 수신되면, 해당 결과값을 DB에 업데이트합니다. 그러면 DB 폴링이 종료되고 최종적으로 사용자에게 응답하는 형식으로 로직을 구성했습니다.
이렇게해서, 비동기를 동기적으로 처리하는 문제는 해결했습니다. 하지만 추가적인 문제가 있었습니다.
고민 2 : 명령들을 어떻게 구분하며, 요청들을 어떻게 묶을 수 있을까?
현재 내부 구조는 MQTT Broker에 여러 IoT 기기들이 연결되어 있습니다. 사용자가 프론트 화면을 통해 서버에 특정 기기에 대한 특정 요청을 보낼 때, 서버에서 이 기기들을 구분할 수 있는 방법은 기기마다 고유의 ID가 존재해 해당 ID을 사용해 구분합니다. 하지만 MQTT는 비동기 방식으로 요청을 처리하기 때문에 사용자가 요청한 요청을 구분할 수 있는 방법이 없습니다.
사용자가 한명일 때는 결국 처리해야되는 명령어는 하나밖에 없으니 문제가 되지 않지만, 여러 사용자가 있는 경우 현재 명령어를 구분할 수 없기 때문에 IoT의 결과값이 들어와도 이 결과값이 어떤 명령에 대한 결과 값인지 알 수 있는 방법이 없습니다. 단순히 명령어 명과 기기 ID만을 가지고 구분하기에는 사람들이 동일한 명령어 명과 기기 ID에 요청을 보냈을 경우 이를 구분할 수 없기 때문입니다. 그래서 구분을 위한 방법이 필요했습니다. 또한 하나의 명령어에 여러 요청 (위의 1 ~ 4 단계)가 존재하기 때문에 이를 묶을 방법이 필요했습니다.
이러한 문제를 해결하기 위해 correlation ID을 필드 값으로 추가해 명령어마다의 고유한 UUID 값을 통해 명령어를 구분했습니다. 예를 들어, 서버가 IoT 기기에 요청을 보낼 때 correlation ID를 함께 포함시키면, IoT 기기는 해당 key를 결과 메시지에도 포함하여 응답할 수 있습니다. 이렇게 하면, 서버는 수신한 응답에서 어떤 요청에 대한 결과인지를 명확하게 식별할 수 있게 됩니다. 또한 하나의 명령어에 속한 요청들은 동일한 correlation ID 값을 공유하기 때문에 요청들을 하나의 명령어로 묶을 수 있었습니다.
Correlation ID란 분산 시스템이나 비동기 메시징 환경에서 관련된 요청과 응답, 혹은 여러 메시지를 하나의 흐름으로 묶어 추적할 수 있도록 하는 고유 식별자를 의미한다.
요청-응답 추적
: 서버가 클라이언트나 IoT 기기에 요청을 보낼 때, 해당 요청에 고유한 Correlation ID를 포함시킵니다. 이 ID는 이후 발생하는 모든 관련 응답이나 이벤트에도 함께 전달되어, 어떤 요청에 대한 결과인지 쉽게 파악할 수 있게 한다.
비동기 통신 관리
: MQTT와 같이 비동기 방식으로 메시지를 처리하는 환경에서는 요청과 응답의 순서가 보장되지 않을 수 있습니다. 이때 Correlation ID를 사용하면 어떤 응답이 어떤 요청에 해당하는지 명확히 할 수 있다.
즉,Correlation ID는 분산 환경에서 서로 관련된 여러 메시지를 한 그룹으로 묶어 추적할 수 있도록 도와줄 수 있다.
- 이런식으로, Correlation ID을 통해 여러 요청들을 하나로 묶을 수 있는 값을 만들고 해당 값으로 구분하는 방식으로 구현
- 이를 통해 요청들마다 개별적인 값이 존재하기 때문에 요청들을 구분할 수 있고, 하나의 명령어에 속해 있는 요청들을 같은 Correlation ID 값을 공유하기 때문에 여러 요청들을 하나로 묶을 수 있다.
고민 3 : IoT가 응답을 제대로 하지 않으면...?
현재 DB 폴링 방식은 IoT가 응답을 보내지 않으면 계속해서 폴링해 엄청난 서버 리소스를 잡아먹습니다. IoT가 응답을 보내지 못하는 경우는 생각보다 많이 발생할 수 있습니다. IoT 기기 자체가 제한된 리소스를 기반으로 동작하고, 기기 자체가 꺼져 있거나 하는 문제가 발생할 수 있기 때문에 이러한 문제에 대한 대비가 필요합니다. 따라서 저는 Timeout을 도입해 이러한 문제를 해결했습니다. 물론 명령어마다 처리하는 시간은 상의하기 때문에 만약 요청에 같이 Timeout 시간도 같이 보내준다면 해당 시간으로 설정하고 없다면 미리 설정한 Default 값으로 Timeout 시간을 설정해 무한정 폴링을 막았습니다. 저는 Kotlin을 사용했기 때문에 이를 corutine을 통해 구현했습니다.
fun waitForCommandResult(
timeout: Duration,
...
): Boolean {
var success: Boolean
try {
runBlocking {
withTimeout(timeout.toMillis()) {
// DB 폴링
}
}
} catch (e: TimeoutCancellationException) {
// Timeout 에러 발생
}
}
이렇게 제가 구현하면서 고민했던 점에 대해서 정리했습니다.
따라서 최종적인 저의 플로우를 도식도를 통해 정리하자면,
- 사용자의 IoT 제어 명령 요청을 백엔드 서버에 전달
- 사용자에게 받은 명령어를 저장 -> 이 때, 명령어 결과 & 결과 응답 시간은 null인 상태로 저장
- Server -> MQTT Broker로 IoT 기기가 인식할 수 있는 명령어 요청 토픽에 사용자 명령어를 메세지로 넣어 발행
- 서버는 특정 시간동안 2번에서 저장한 디비 컬럼의 명령어 결과 & 결과 응답시간이 null이 아닌지 확인 (DB Polling)
- 3번에서 발행한 메세지를 IoT 기기가 수신 ( MQTT Broker -> IoT )
- IoT기기가 해당 명령어를 처리
- 명령어 결과를 IoT -> MQTT Broker로 Server가 인식할 수 있는 명령어 결과 관련 토픽에 명령어 결과를 메세지로 넣어 발행
- 7번에서 발행한 메세지를 서버가 수신 ( MQTT Broker -> Server )
- 서버가 해당 메세지를 수신하면, 해당 토픽을 처리하는 Handler에서 명령어에 맞는 컬럼을 찾아 명령어 결과와 응답 시간 업데이트
- 4번에서 계속해서 진행하고 있는 DB 폴링이 9번을 통해 끝나고, 명령어 결과 응답 값을 사용자에게 전달
PS) 만약 IoT 기기가 요청을 제대로 받지 못했거나, 응답을 제대로 못했을 경우 미리 설정된 Timeout 시간에 따라 해당 시간이 넘어가면 자동으로 에러를 발생시킴
이러한 과정을 통해 사용자가 IoT 기기에 요청을 보내면 최종적인 응답 값을 리턴하는 동기적 로직에 대한 처리를 끝냈습니다.
아쉬운 점
1. DB 폴링 방식의 비효율성
- 지속적인 DB 폴링은 서버 리소스를 과도하게 소모할 가능성이 큽니다.
- 요청마다 별도의 쓰레드를 두고, DB 커넥션도 가져가게 되므로 자원 낭비가 크다.
이 부분에 대한 저만의 답은 Redis & Kafka를 사용한 이벤트 기반 아키텍처를 통해 Pub/Sub 방식으로 우회하는 것이였습니다. 제가 DB 폴링 방식으로 처리한 이유는, 이벤트 기반 아키텍처를 사용하기 위해서는 추가적인 infra 리소스가 들어가는데 이를 처리하기보단 일단은 간단하게 구현하고 이후에 업그레이드 하는 것이 그때 상황에서는 최선의 방향성이였기 때문입니다. 또한, 최대한 IoT기기의 처리를 최소화해야 했기 때문입니다. 하지만, 현재는 Kafka를 도입해 쓰고 있습니다. 따라서 이후에는 이벤트 기반 처리로 DB 커넥션과 쓰레드 풀 관리를 효율적으로 처리하고 나아가 성능적으로도 향상시키는 방향으로 업그레이드 할 예정입니다.
2. 적절한 timeout 값 설정
- Timeout이 설정된다고 해도, IoT 기기가 늦게 응답하면 유실될 가능성이 있다.
- 명령어 별로 timeout이 다르기 때문에 최적의 timeout 값을 찾기 어렵다.
사실 이 부분은 고민이 많았습니다. IoT 기기의 상황은 제가 예상할 수 있는 범위가 아니였기 때문에 상황에 따라 빠르게 응답을 할 수도 있지만, 빠르게 응답을 못할 경우도 존재하기 때문입니다. 그렇다고 무작정 사용자가 기다리는 것은 사용자 경험 상 안좋을 것이 뻔하기 때문에 이 부분에 대한 고민이 많았습니다. 그래서 다른 어플은 명령어와 실행을 분리한 것인가를 생각하기도 했습니다. 하지만, 저희 상황 상 한번에 처리하는 것을 요구했기 때문에 제가 내린 결론은, 무작정 기다릴 수는 없으니 명령어에 따라 적당한 timeout만을 구현하고 유실된 값은 dashboard을 통해 추가적으로 확인할 수 있게 끔 구현하는 것이였습니다. 이는 사실 어쩔 수 없는 IoT의 제한된 환경이라고 생각해 이정도로 마무리지었지만, 더 좋은 방법은 계속해서 고민해 볼 예정입니다.