MQTT 实战-.NET 实现 Broker(服务端)

作者:vkvi 来源:ITPOW(原创) 日期:2022-5-1

新建一个 .NET 项目,NuGet 搜索 mqtt,会发现很多:

  • DotNetty.Codecs.Mqtt,作者微软,是 Netty 的 .NET 版本,不单是 Mqtt。

  • MQTTnet,作者 The contributors of MQTTnet,适用于 .NET Framework。最新发布 2022/1/28。

  • Mqtt,作者 Mqtt,适用于 .NET Core。最新发布 2021/7/25。

  • 其他,像 M2Mqtt 都没人维护了。

本文以 .NET Framework 为例,所以选取的是 MQTTnet

才版本的图标:

MQTTnet

新版本的图标:

MQTTnet

【注意】本文的版本是 3.1.2,2022 年 6 月 4 日,出了个 4.0 版本,与本文可能并不兼容。

using MQTTnet;
using MQTTnet.Protocol;
using MQTTnet.Server;
using System;
using System.Threading.Tasks;

namespace Mqtt
{
    class Program
    {
        private static void MqqtServerTest()
        {

            var options = new MqttServerOptions();
            options.DefaultEndpointOptions.Port = 1883;
            options.ConnectionValidator = new MqttServerConnectionValidatorDelegate((context) => {
                if ((context.Username != "itpow" || context.Password != "123456") &&
                    (context.Username != "itpow2" || context.Password != "123456"))
                {
                    context.ReasonCode = MqttConnectReasonCode.BadUserNameOrPassword;
                }
            });

            var mqttServer = new MqttFactory().CreateMqttServer();
            mqttServer.UseClientConnectedHandler(ClientConnected);
            mqttServer.UseClientDisconnectedHandler(ClientDisconnected);
            mqttServer.ClientSubscribedTopicHandler = new MqttServerClientSubscribedTopicHandlerDelegate(ClientSubscribed);
            mqttServer.UseApplicationMessageReceivedHandler(MessageReceived);
            mqttServer.StartAsync(options);
        }
        
        
        private Task ClientConnected(MqttServerClientConnectedEventArgs args)
        {
            return Task.Run(() => {
                // args.ClientId
            });
        }


        private Task ClientDisconnected(MqttServerClientDisconnectedEventArgs args)
        {
            return Task.Run(() => {
                // args.ClientId
            });
        }
        
        
        private void ClientSubscribed(MqttServerClientSubscribedTopicEventArgs args)
        {
			// args.ClientId
            // args.TopicFilter.Topic
        }


        private Task MessageReceived(MqttApplicationMessageReceivedEventArgs args)
        {
            return Task.Run(() => {
                // args.ClientId
				// args.ApplicationMessage.Topic
                // args.ApplicationMessage.ConvertPayloadToString()
            });
        }

        static void Main(string[] args)
        {
            Task.Run(() =>
            {
                MqqtServerTest();
            });
            Console.ReadLine();
        }
    }
}

创建服务端挺简单的,如上,做了个简单的用户名、密码认证。

使用 MQTT.fx 测试,正常连接,正常收发信息。

如何让订阅者只能订阅规定的主题?

在订阅成功后,遇到不符合的主题,将其退订。

如何让发布者只能发布规定的主题?

在收到消息后,遇到不符合的主题,将 Topic 置为 null。

调用 MqttServer.StopAsync() 会触发 ClientDisconnectedHandler 吗?

不会,重要

Topic 的通配符 #,有效吗?

有效,比如订阅 itpow/#,可以收到 itpow/1/ 的发布。

相关阅读


相关文章