Microsoft MVP성태의 닷넷 이야기
글쓴 사람
정성태 (techsharer at outlook.com)
홈페이지
첨부 파일
(연관된 글이 1개 있습니다.)

C# - MQTT를 이용한 클라이언트/서버(Broker) 통신 예제

예전에 소개한 CoAP처럼,

C# - CoAP 서버 및 클라이언트 제작 (UDP 소켓 통신)
; https://www.sysnet.pe.kr/2/0/12629

MQTT 또한 사물 인터넷 용으로 나온 프로토콜이긴 하지만 여타 응용 프로그램에서 사용하지 않을 이유가 없습니다. ^^ 그리고 CoAP와 다른 점이 있는데요. 일반적인 서버/클라이언트 개념으로 보면, 서버 역할을 MQTT broker가, 클라이언트 역할을 MQTT client가 수행합니다. 하지만, 서버/클라이언트처럼 서버 측에서 서비스를 제공하는 방식이 아니고, broker와 연결된 특정 클라이언트가 서비스를 수행하는 주체가 될 수 있습니다. 즉, MQTT broker는 말 그대로 통신 간의 중계 역할에 더 초점을 둡니다.

비슷한 예로, WPF의 Prism 프레임워크에서 사용하는 Event Subscriber/Publisher의 개념과 유사하다고 보면 되겠습니다.

실제 코드로 간단한 예를 들면 더 편하겠지요. ^^ .NET에서 MQTT 프로토콜을 구현한 MQTTnet 라이브러리를 사용하면 더 쉽게 접근할 수 있으니,

chkr1011/MQTTnet
; https://github.com/chkr1011/MQTTnet

Getting Started with MQTT
; https://www.hivemq.com/blog/how-to-get-started-with-mqtt/

Example of MQTT communication using MQTTnet
; https://www.programmersought.com/article/65964914395/

landbroken/MQTTLearning
; https://github.com/landbroken/MQTTLearning

이것을 이용해 예제를 만들어 보겠습니다. 그런데, 의외로 웹 상에 공개된 예제들이 2.x 버전의 것이라 현재 3.x 버전으로는 빌드가 안 됩니다. 하위 호환성이 ^^; 꽝인데 업그레이드 가이드에서 기록을 찾아볼 수 있습니다.

이를 기반으로 우선 서버 먼저 다음과 같이 간단하게 만들 수 있습니다.

// Install-Package MQTTnet

using MQTTnet;
using MQTTnet.Client.Receiving;
using MQTTnet.Server;
using System;
using System.Text;
using System.Threading.Tasks;

namespace MqttSample
{
    class Program
    {
        static async Task Main(string[] args)
        {
            var optionsBuilder = new MqttServerOptionsBuilder()
                    .WithConnectionBacklog(100)
                    .WithDefaultEndpointPort(8222);

            IMqttServer broker = new MqttFactory().CreateMqttServer();

            broker.ApplicationMessageReceivedHandler = new MqttApplicationMessageReceivedHandlerDelegate(e => OnMessageReceived(e));
            broker.ClientConnectedHandler = new MqttServerClientConnectedHandlerDelegate(e => OnClientConnected(e));
            broker.ClientDisconnectedHandler = new MqttServerClientDisconnectedHandlerDelegate(e => OnClientDisconnected(e));
            broker.ClientSubscribedTopicHandler = new MqttServerClientSubscribedHandlerDelegate(e => OnClientSubscribedToTopic(e));
            broker.ClientUnsubscribedTopicHandler = new MqttServerClientUnsubscribedTopicHandlerDelegate(e => OnClientUnsubscribedToTopic(e));

            await broker.StartAsync(optionsBuilder.Build());

            Console.WriteLine("Press any key to exit...");
            Console.ReadLine();
        }

        private static void OnClientUnsubscribedToTopic(MqttServerClientUnsubscribedTopicEventArgs e)
        {
            Console.WriteLine($"OnClientUnsubscribedToTopic: {e.ClientId} - {e.TopicFilter}");
        }

        private static void OnClientSubscribedToTopic(MqttServerClientSubscribedTopicEventArgs e)
        {
            Console.WriteLine($"OnClientSubscribedToTopic: {e.ClientId} - {e.TopicFilter.Topic}");
        }

        private static void OnMessageReceived(MqttApplicationMessageReceivedEventArgs e)
        {
            Console.WriteLine($"OnMessageReceived: {e.ApplicationMessage.Topic}, {Encoding.UTF8.GetString(e.ApplicationMessage.Payload)}");
        }

        private static void OnClientConnected(MqttServerClientConnectedEventArgs e)
        {
            Console.WriteLine($"OnClientConnected: {e.ClientId}");
        }

        private static void OnClientDisconnected(MqttServerClientDisconnectedEventArgs e)
        {
            Console.WriteLine($"OnClientDisconnected: {e.ClientId} - {e.DisconnectType}");
        }
    }
}

그런데, 디버깅 용도의 로그 정보 출력에 해당하는 이벤트 구독은 안 해도 됩니다. 그래서 서버는 다음과 같이 간단하게 만들어도 무방합니다.

using MQTTnet;
using MQTTnet.Server;
using System;
using System.Threading.Tasks;

namespace MqttSample
{
    class Program
    {
        static async Task Main(string[] args)
        {
            var optionsBuilder = new MqttServerOptionsBuilder()
                    .WithConnectionBacklog(100)
                    .WithDefaultEndpointPort(8222);

            IMqttServer broker = new MqttFactory().CreateMqttServer();

            await broker.StartAsync(optionsBuilder.Build());

            Console.WriteLine("Press any key to exit...");
            Console.ReadLine();
        }
    }
}

그다음, 위의 Broker에 Topic을 발행하고/구독할 클라이언트를 만들어야 하는데요. 이것은 MQTTnet을 기반으로 클라이언트를 좀 더 만들기 쉽게 래퍼한 MQTTnet.Extensions.ManagedClient 패키지를 이용하면 더 쉽게 만들 수 있습니다. 따라서 다음의 소스 코드를 기반으로,

MQTTnet/Tests/MQTTnet.TestApp.NetCore/ManagedClientTest.cs /
; https://github.com/chkr1011/MQTTnet/blob/master/Tests/MQTTnet.TestApp.NetCore/ManagedClientTest.cs

이렇게 만들 수 있습니다.

// Install-Package MQTTnet
// Install-Package MQTTnet.Extensions.ManagedClient

using MQTTnet;
using MQTTnet.Client.Options;
using MQTTnet.Client.Receiving;
using MQTTnet.Extensions.ManagedClient;
using MQTTnet.Protocol;
using System;
using System.Threading.Tasks;

namespace MqttClient
{
    class Program
    {
        static async Task Main(string[] args)
        {
            ManagedMqttClientOptions options = new ManagedMqttClientOptions();

            options.ClientOptions = new MqttClientOptions()
            {
                ClientId = "MyMqttClient",
                ChannelOptions = new MqttClientTcpOptions
                {
                    Server = "localhost",
                    Port = 8222,
                }
            };

            options.AutoReconnectDelay = TimeSpan.FromSeconds(1);

            try
            {
                var managedClient = new MqttFactory().CreateManagedMqttClient();
                managedClient.ApplicationMessageReceivedHandler = new MqttApplicationMessageReceivedHandlerDelegate(e =>
                {
                    Console.WriteLine(">> RECEIVED: " + e.ApplicationMessage.Topic);
                });

                await managedClient.StartAsync(options);

                await managedClient.PublishAsync(builder => builder.WithTopic("Step").WithPayload("1"));
                await managedClient.PublishAsync(builder => builder.WithTopic("Step").WithPayload("2").WithAtLeastOnceQoS());

                await managedClient.SubscribeAsync(new MqttTopicFilter { Topic = "xyz", QualityOfServiceLevel = MqttQualityOfServiceLevel.AtMostOnce });
                await managedClient.SubscribeAsync(new MqttTopicFilter { Topic = "abc", QualityOfServiceLevel = MqttQualityOfServiceLevel.AtMostOnce });

                await managedClient.PublishAsync(builder => builder.WithTopic("Step").WithPayload("3"));

                Console.WriteLine("Managed client started.");
                Console.ReadLine();
                await managedClient.UnsubscribeAsync("xyz", "abc");
                await managedClient.StopAsync();
            }
            catch (Exception e)
            {
                Console.WriteLine(e);
            }
        }
    }
}

위의 클라이언트는 "Step" 토픽을 만들어 각각 "1", "2", "3"에 해당하는 문자열을 발행하고 있습니다. 또한 토픽 "xyz", "abc"를 구독하고 있는데요, 만약 구독 토픽에 해당하는 메시지가 수신되면 managedClient.ApplicationMessageReceivedHandler에 등록된 이벤트 핸들러의 코드로 인해 화면에는 "RECEIVED: ..." 문자열이 출력될 것입니다.

일단, 위의 상태로 Broker와 Client를 실행시키면 Broker 측의 콘솔 화면에 다음과 같은 출력을 확인할 수 있습니다.

Press any key to exit...
OnClientConnected: MyMqttClient
OnMessageReceived: Step, 1
OnClientSubscribedToTopic: MyMqttClient - xyz
OnClientSubscribedToTopic: MyMqttClient - abc
OnMessageReceived: Step, 2
OnMessageReceived: Step, 3

// [MqttClient를 종료하면]

OnClientUnsubscribedToTopic: MyMqttClient - xyz
OnClientUnsubscribedToTopic: MyMqttClient - abc
OnClientDisconnected: MyMqttClient - Clean

이처럼, broker를 통해 이벤트를 브로드캐스팅할 수 있는 구조를 제공하는데 WPF의 Prism이 이벤트를 exe 프로세스 내에서 중계하는 구조라면 MQTT는 그 범위가 네트워크로 확장되었다고 보면 됩니다. 어쩌면 살짝 UDP 브로드캐스팅과도 닮았는데요, 물론 기존의 UDP 브로드캐스팅은 연결되지 않았어도 서브넷 범위에만 있다면 통신을 할 수 있는 반면 MQTT는 일단 접속해야 한다는 차이점은 있습니다.

(첨부 파일은 이 글의 예제 코드를 포함합니다.)




실습하면서 짐작할 수 있겠지만, MQTT broker는 그 자체로 특정 서비스에 종속적이지 않게 동작할 수 있습니다. 이런 특성 때문에 검색해 보면 MQTT broker를 클라우드 규모로 확장해 서비스하는 제품들이 있습니다. 그중 하나가 HiveMQ인데요,

Getting Started with MQTT
; https://www.hivemq.com/blog/how-to-get-started-with-mqtt/

HiveMQ 사이트에 보면 MQTT에 대해 자세하게 설명하는 문서들을 찾아볼 수 있습니다. 시간이 되면 저도 ^^ 천천히 한 번 읽어봐야겠습니다.

Everything you need to know about the latest version of MQTT: MQTT 5
; https://www.hivemq.com/mqtt-5/

MQTT Basics
; https://www.hivemq.com/mqtt-essentials/

    Introducing the MQTT Protocol - MQTT Essentials: Part 1
    ; https://www.hivemq.com/blog/mqtt-essentials-part-1-introducing-mqtt/

    Publish & Subscribe - MQTT Essentials: Part 2
    ; https://www.hivemq.com/blog/mqtt-essentials-part2-publish-subscribe/

    MQTT Client and Broker and MQTT Server and Connection Establishment Explained - MQTT Essentials: Part 3
    ; https://www.hivemq.com/blog/mqtt-essentials-part-3-client-broker-connection-establishment/

    MQTT Publish, Subscribe & Unsubscribe - MQTT Essentials: Part 4
    ; https://www.hivemq.com/blog/mqtt-essentials-part-4-mqtt-publish-subscribe-unsubscribe/

    MQTT Topics & Best Practices - MQTT Essentials: Part 5
    ; https://www.hivemq.com/blog/mqtt-essentials-part-5-mqtt-topics-best-practices/
    ; https://medium.com/@jspark141515/mqtt-%EC%8B%A4%EC%A0%84-%EC%9D%91%EC%9A%A9%ED%95%98%EA%B8%B0-wildcard-9643877262f2

    Quality of Service 0,1 & 2 - MQTT Essentials: Part 6
    ; https://www.hivemq.com/blog/mqtt-essentials-part-6-mqtt-quality-of-service-levels/
    ; https://medium.com/@jspark141515/mqtt%EB%9E%80-314472c246ee

    Persistent Session and Queuing Messages - MQTT Essentials: Part 7
    ; https://www.hivemq.com/blog/mqtt-essentials-part-7-persistent-session-queuing-messages/

    Retained Messages - MQTT Essentials: Part 8
    ; https://www.hivemq.com/blog/mqtt-essentials-part-8-retained-messages/

    Last Will and Testament - MQTT Essentials: Part 9
    ; https://www.hivemq.com/blog/mqtt-essentials-part-9-last-will-and-testament/

    Keep Alive and Client Take-Over - MQTT Essentials Part 10
    ; https://www.hivemq.com/blog/mqtt-essentials-part-10-alive-client-take-over/

그 외, 기타.
ESP8266 and Node-RED with MQTT (Publish and Subscribe)
; https://randomnerdtutorials.com/esp8266-and-node-red-with-mqtt/

Send and Receive Messages to your IoT Devices using MQTT
; https://www.digikey.com/en/maker/projects/send-and-receive-messages-to-your-iot-devices-using-mqtt/39ed5690cc46473abe8904c8f960341f




[이 글에 대해서 여러분들과 의견을 공유하고 싶습니다. 틀리거나 미흡한 부분 또는 의문 사항이 있으시면 언제든 댓글 남겨주십시오.]

[연관 글]






[최초 등록일: ]
[최종 수정일: 7/14/2021]

Creative Commons License
이 저작물은 크리에이티브 커먼즈 코리아 저작자표시-비영리-변경금지 2.0 대한민국 라이센스에 따라 이용하실 수 있습니다.
by SeongTae Jeong, mailto:techsharer at outlook.com

비밀번호

댓글 작성자
 



2021-06-05 11시51분
[사용자] 시간되실때 Prism 프레임워크 강의 좀 해주시면 안되나요? 국내에서는 WPF와 함께 가장 많이 사용되기는한데 처음 접하는 사람 입장에서는 도대체가 이해가 안가네요. 공식 사이트나 유튜브를 봐도 뭔지 모르겠고, 본문에서 언급한 Event Subscriber/Publisher 방법도 모르겠고...
[guest]
2021-06-07 11시07분
제가 WPF를 다룬 것이 거의 12년 정도 전이라서 이제는 어떻게 바뀌었는지도 잘 모릅니다. ^^ 다른 현업에 계신 분들이 더 실력이 좋기 때문에 그분들의 강의를 기대하는 것이 더 좋을 듯합니다.
정성태
2021-06-18 11시32분
정성태
2022-04-29 01시16분
MQTTnet 소개
; https://forum.dotnetdev.kr/t/mqttnet/3469

dotnet/MQTTnet
; https://github.com/dotnet/MQTTnet

MQTT 의 이해부터 테스트까지 (feat. POS 연동)
; https://techblog.tabling.co.kr/mqtt-의-이해부터-테스트까지-feat-pos-연동-73455d2d5532

-----------------

It's C# All The Way Down! Using .NET for home automation with IoT devices | .NET Conf 2023
  - 90s-era Gravis Gamepad with legacy game port interface
  - ESP32-S2 based microcontroller running .NET nanoFramework
  - Raspberry Pi 4 running full .NET runtime (ASP.NET Core)
; https://youtu.be/zwkspYxtFAE?t=408

Sample Project - https://github.com/burkenyo/Burkenyo.Iot
정성태

... 16  17  18  19  20  21  22  23  [24]  25  26  27  28  29  30  ...
NoWriterDateCnt.TitleFile(s)
13030정성태4/15/20226495오류 유형: 804. 정규 표현식 오류 - Quantifier {x,y} following nothing.
13029정성태4/14/20226898Windows: 203. iisreset 후에도 이전에 설정한 전역 환경 변수가 w3wp.exe에 적용되는 문제
13028정성태4/13/20226808.NET Framework: 1193. (appsettings.json처럼) web.config의 Debug/Release에 따른 설정 적용
13027정성태4/12/20227102.NET Framework: 1192. C# - 환경 변수의 변화를 알리는 WM_SETTINGCHANGE Win32 메시지 사용법파일 다운로드1
13026정성태4/11/20228631.NET Framework: 1191. C 언어로 작성된 FFmpeg Examples의 C# 포팅 전체 소스 코드 [3]
13025정성태4/11/20227965.NET Framework: 1190. C# - ffmpeg(FFmpeg.AutoGen)를 이용한 vaapi_encode.c, vaapi_transcode.c 예제 포팅
13024정성태4/7/20226459.NET Framework: 1189. C# - 런타임 환경에 따라 달라진 AppDomain.GetCurrentThreadId 메서드
13023정성태4/6/20226764.NET Framework: 1188. C# - ffmpeg(FFmpeg.AutoGen)를 이용한 transcoding.c 예제 포팅 [3]
13022정성태3/31/20226667Windows: 202. 윈도우 11 업그레이드 - "PC Health Check"를 통과했지만 여전히 업그레이드가 안 되는 경우 해결책
13021정성태3/31/20226839Windows: 201. Windows - INF 파일을 이용한 장치 제거 방법
13020정성태3/30/20226577.NET Framework: 1187. RDP 접속 시 WPF UserControl의 Unloaded 이벤트 발생파일 다운로드1
13019정성태3/30/20226574.NET Framework: 1186. Win32 Message를 Code로부터 메시지 이름 자체를 구하고 싶다면?파일 다운로드1
13018정성태3/29/20227116.NET Framework: 1185. C# - Unsafe.AsPointer가 반환한 포인터는 pinning 상태일까요? [5]
13017정성태3/28/20226916.NET Framework: 1184. C# - GC Heap에 위치한 참조 개체의 주소를 알아내는 방법 - 두 번째 이야기 [3]
13016정성태3/27/20227768.NET Framework: 1183. C# 11에 추가된 ref 필드의 (우회) 구현 방법파일 다운로드1
13015정성태3/26/20229121.NET Framework: 1182. C# 11 - ref struct에 ref 필드를 허용 [1]
13014정성태3/23/20227693VC++: 155. CComPtr/CComQIPtr과 Conformance mode 옵션의 충돌 [1]
13013정성태3/22/20226024개발 환경 구성: 641. WSL 우분투 인스턴스에 파이썬 2.7 개발 환경 구성하는 방법
13012정성태3/21/20225348오류 유형: 803. C# - Local '...' or its members cannot have their address taken and be used inside an anonymous method or lambda expression
13011정성태3/21/20226826오류 유형: 802. 윈도우 운영체제에서 웹캠 카메라 인식이 안 되는 경우
13010정성태3/21/20225751오류 유형: 801. Oracle.ManagedDataAccess.Core - GetTypes 호출 시 "Could not load file or assembly 'System.DirectoryServices.Protocols...'" 오류
13009정성태3/20/20227380개발 환경 구성: 640. docker - ibmcom/db2 컨테이너 실행
13008정성태3/19/20226673VS.NET IDE: 176. 비주얼 스튜디오 - 솔루션 탐색기에서 프로젝트를 선택할 때 csproj 파일이 열리지 않도록 만드는 방법
13007정성태3/18/20226259.NET Framework: 1181. C# - Oracle.ManagedDataAccess의 Pool 및 그것의 연결 개체 수를 알아내는 방법파일 다운로드1
13006정성태3/17/20227335.NET Framework: 1180. C# - ffmpeg(FFmpeg.AutoGen)를 이용한 remuxing.c 예제 포팅
13005정성태3/17/20226199오류 유형: 800. C# - System.InvalidOperationException: Late bound operations cannot be performed on fields with types for which Type.ContainsGenericParameters is true.
... 16  17  18  19  20  21  22  23  [24]  25  26  27  28  29  30  ...