前言
MQTT 广泛应用于工业物联网、智能家居、各类智能制造或各类自动化场景等。MQTT 是一个基于客户端-服务器的消息发布/订阅传输协议,在很多受限的环境下,比如说机器与机器通信、机器与物联网通信等。
好了,科普的废话不多说,下面直接通过.NET 环境来实现一套 MQTT 通信 demo,实现服务端与客户端的双边消息发布与订阅的功能和演示。
开发环境
VS 2022 + .NET 6 + WebAPI / 控制台
1、新建一个 WebAPI 项目,用来后面做测试使用
2、新建一个继承自 IHostedService 的服务,用于随着 webapi 程序的启动而自动执行。(最终代码在文末)
3、引入 MQTTNet 包,该项目提供了.net 环境下的 MQTT 通信协议支持,这款框架很优秀,此处直接引用它来进行使用。
4、在上面的 MqttHostService 类里面,开始方法里面新增初始化 MQTT 服务端的一些功能,例如 IP、端口号、事件等等。
5、mqtt 服务端支持的一系列功能很多,大佬们可以自行去尝试一些新发现,此处只使用若干个简单功能。
6、添加客户端连接事件、连接关闭事件
7、由于事件要用的可能有点多,此处就不一一例举了,可以直接看以下的代码,以及有关注释来理解。
8、事件触发时候,打印输出
9、输出之前,记录一个当前事件名称标记一下,用于可以更加清楚看出是哪个事件输出的。
10、对 MqttHostService 类进行注册,用于程序启动时候跟随启动。
11、上面貌似设计的不是很友好,所以把 mqtt 服务实例单独弄出来,写入到单独的类里面做成属性,供方便调用。
12、把先前的一些东西改一下,换成使用上面步骤的属性来直接调用使用。
13、运行一下,看看是否可以成功,显示服务已启动,说明服务启动时 OK 的了.
14、新增一个控制台程序 MqttClient,用于模拟客户端。
15、创建客户端启动以及有关配置信息和有关事件,如图。具体使用可以看代码注释,就不过多解释了。
16、在 program 类里面,调用客户端启动方法,用于测试使用。
17、上面客户端对应的三个事件的实现如图,同时进行有关信息的打印输出。
18、启动服务端,然后启动客户端,可以看到服务端有一个连接失败的消息,这个是因为上面配置的客户端用户名是 admin,密码是 1234567,而服务端配置的规则是,用户名是 admin 密码是 123456
19、密码改回正常匹配项以后,再重新运行试试看,可以看到客户端与服务端连接上了。
20、如果关闭客户端,也可以看到服务端会进入客户端关闭事件内。
21、把上面主题订阅的内容写到连接成功以后的事件里面,不然客户端连接期间,可能就执行了主题订阅,会存在订阅失败的情况。改为写入到连接成功以后的事件里面,可以保证主题订阅肯定是在客户端连接成功以后才执行的。
22、接下来测试服务端消息推送,在 MqttService 服务里面,新增一个方法,用来执行 mqtt 服务端发布主题消息使用。有关配置信息和消息格式,如图所示。
23、新增一个 API 控制器,用来测试使用。API 参数直接拿来进行消息的推送使用。
24、运行服务端和客户端,并访问刚刚新增的 api 接口,手动随意输入一条消息,可以看到客户端订阅的主题消息已经被实时接收到了。
25、接下来对客户端新增一个消息推送的方法,用来测试客户端消息发布的功能。有关消息格式和调用,如图所示,以及注释部分的说明。
26、客户端 program 类里面,客户端连接以后,通过手动回车,来执行客户端发布消息。
27、再次启动服务端和客户端
28、然后客户端内按一下回车,执行消息发布功能。可以看到,服务端成功接收到了客户端发过来的主题消息。
29、接下来测试客户端与客户端之间的消息发布与订阅,为了模拟多客户端效果,把上面客户端已经编译好的文件拷贝一份出来。
30、然后本地的代码进行一些修改,用来当做第二个客户端程序。所以客户端 id 也进行变更为 testclient02
31、对客户端订阅的主题,也改成 topic_02
32、启动服务端,以及拷贝出来的客户端 1,和上面修改了部分代码的客户端 2,保证都已经连接上服务端。
33、调用服务端的 api 接口,由于服务端发布的消息是发布给 topic_01 的,所以只有客户端 1 可以接收到消息。
34、客户端 1 执行回车,用于发布一段消息给主题 topic_02,可以看到客户端 01 发布的消息,同时被服务端和客户端 02 接收到了。因为服务端是总指挥,所以客户端发布的消息都会经过服务端,从而服务端都可以接收到连接的客户端发布的所有消息。
35、测试数据保持,下面先对客户端 1 进行断开,然后再重新连接客户端 1,可以看到客户端 1 直接接收到了它订阅的主题的上一次最新的消息内容,这个就是消息里面,Retain 属性设为 True 的结果,用于让服务端记忆该主题消息使用的。如果设为 false,就没有这个效果了,大佬们也可以自己尝试。
36、最终的服务端代码:
MqttHostService:
public class MqttHostService : IHostedService, IDisposable
{
public void Dispose()
{
}
const string ServerClientId = "SERVER";
public Task StartAsync(CancellationToken cancellationToken)
{
MqttServerOptionsBuilder optionsBuilder = new MqttServerOptionsBuilder();
optionsBuilder.WithDefaultEndpoint();
optionsBuilder.WithDefaultEndpointPort(10086); // 设置 服务端 端口号
optionsBuilder.WithConnectionBacklog(1000); // 最大连接数
MqttServerOptions options = optionsBuilder.Build();
MqttService._mqttServer = new MqttFactory().CreateMqttServer(options);
MqttService._mqttServer.ClientConnectedAsync += _mqttServer_ClientConnectedAsync; //客户端连接事件
MqttService._mqttServer.ClientDisconnectedAsync += _mqttServer_ClientDisconnectedAsync; // 客户端关闭事件
MqttService._mqttServer.ApplicationMessageNotConsumedAsync += _mqttServer_ApplicationMessageNotConsumedAsync; // 消息接收事件
MqttService._mqttServer.ClientSubscribedTopicAsync += _mqttServer_ClientSubscribedTopicAsync; // 客户端订阅主题事件
MqttService._mqttServer.ClientUnsubscribedTopicAsync += _mqttServer_ClientUnsubscribedTopicAsync; // 客户端取消订阅事件
MqttService._mqttServer.StartedAsync += _mqttServer_StartedAsync; // 启动后事件
MqttService._mqttServer.StoppedAsync += _mqttServer_StoppedAsync; // 关闭后事件
MqttService._mqttServer.InterceptingPublishAsync += _mqttServer_InterceptingPublishAsync; // 消息接收事件
MqttService._mqttServer.ValidatingConnectionAsync += _mqttServer_ValidatingConnectionAsync; // 用户名和密码验证有关
MqttService._mqttServer.StartAsync();
return Task.CompletedTask;
}
/// <summary>
/// 客户端订阅主题事件
/// </summary>
/// <param name="arg"></param>
/// <returns></returns>
private Task _mqttServer_ClientSubscribedTopicAsync(ClientSubscribedTopicEventArgs arg)
{
Console.WriteLine($"ClientSubscribedTopicAsync:客户端 ID=【{arg.ClientId}】订阅的主题=【{arg.TopicFilter}】 ");
return Task.CompletedTask;
}
/// <summary>
/// 关闭后事件
/// </summary>
/// <param name="arg"></param>
/// <returns></returns>
private Task _mqttServer_StoppedAsync(EventArgs arg)
{
Console.WriteLine($"StoppedAsync:MQTT 服务已关闭……");
return Task.CompletedTask;
}
/// <summary>
/// 用户名和密码验证有关
/// </summary>
/// <param name="arg"></param>
/// <returns></returns>
private Task _mqttServer_ValidatingConnectionAsync(ValidatingConnectionEventArgs arg)
{
arg.ReasonCode = MqttConnectReasonCode.Success;
if ((arg.Username ?? string.Empty)!="admin" || (arg.Password??String.Empty)!="123456")
{
arg.ReasonCode = MqttConnectReasonCode.Banned;
Console.WriteLine($"ValidatingConnectionAsync:客户端 ID=【{arg.ClientId}】用户名或密码验证错误 ");
}
return Task.CompletedTask;
}
/// <summary>
/// 消息接收事件
/// </summary>
/// <param name="arg"></param>
/// <returns></returns>
private Task _mqttServer_InterceptingPublishAsync(InterceptingPublishEventArgs arg)
{
if (string.Equals(arg.ClientId, ServerClientId))
{
return Task.CompletedTask;
}
Console.WriteLine($"InterceptingPublishAsync:客户端 ID=【{arg.ClientId}】 Topic 主题=【{arg.ApplicationMessage.Topic}】 消息=【{Encoding.UTF8.GetString(arg.ApplicationMessage.Payload)}】 qos 等级=【{arg.ApplicationMessage.QualityOfServiceLevel}】");
return Task.CompletedTask;
}
/// <summary>
/// 启动后事件
/// </summary>
/// <param name="arg"></param>
/// <returns></returns>
private Task _mqttServer_StartedAsync(EventArgs arg)
{
Console.WriteLine($"StartedAsync:MQTT 服务已启动……");
return Task.CompletedTask;
}
/// <summary>
/// 客户端取消订阅事件
/// </summary>
/// <param name="arg"></param>
/// <returns></returns>
private Task _mqttServer_ClientUnsubscribedTopicAsync(ClientUnsubscribedTopicEventArgs arg)
{
Console.WriteLine($"ClientUnsubscribedTopicAsync:客户端 ID=【{arg.ClientId}】已取消订阅的主题=【{arg.TopicFilter}】 ");
return Task.CompletedTask;
}
private Task _mqttServer_ApplicationMessageNotConsumedAsync(ApplicationMessageNotConsumedEventArgs arg)
{
Console.WriteLine($"ApplicationMessageNotConsumedAsync:发送端 ID=【{arg.SenderId}】 Topic 主题=【{arg.ApplicationMessage.Topic}】 消息=【{Encoding.UTF8.GetString(arg.ApplicationMessage.Payload)}】 qos 等级=【{arg.ApplicationMessage.QualityOfServiceLevel}】");
return Task.CompletedTask;
}
/// <summary>
/// 客户端断开时候触发
/// </summary>
/// <param name="arg"></param>
/// <returns></returns>
/// <exception cref="NotImplementedException"></exception>
private Task _mqttServer_ClientDisconnectedAsync(ClientDisconnectedEventArgs arg)
{
Console.WriteLine($"ClientDisconnectedAsync:客户端 ID=【{arg.ClientId}】已断开, 地址=【{arg.Endpoint}】 ");
return Task.CompletedTask;
}
/// <summary>
/// 客户端连接时候触发
/// </summary>
/// <param name="arg"></param>
/// <returns></returns>
private Task _mqttServer_ClientConnectedAsync(ClientConnectedEventArgs arg)
{
Console.WriteLine($"ClientConnectedAsync:客户端 ID=【{arg.ClientId}】已连接, 用户名=【{arg.UserName}】地址=【{arg.Endpoint}】 ");
return Task.CompletedTask;
}
public Task StopAsync(CancellationToken cancellationToken)
{
return Task.CompletedTask;
}
}
Service:
public class MqttService
{
public static MqttServer _mqttServer { get; set; }
public static void PublishData(string data)
{
var message = new MqttApplicationMessage
{
Topic = "topic_01",
Payload = Encoding.Default.GetBytes(data),
QualityOfServiceLevel = MqttQualityOfServiceLevel.AtLeastOnce,
Retain = true // 服务端是否保留消息。true 为保留,如果有新的订阅者连接,就会立马收到该消息。
};
_mqttServer.InjectApplicationMessage(new InjectedMqttApplicationMessage(message) // 发送消息给有订阅 topic_01 的客户端
{
SenderClientId = "Server_01"
}).GetAwaiter().GetResult();
}
}
37、最终的客户端代码:
public class MqttClientService
{
public static IMqttClient _mqttClient;
public void MqttClientStart()
{
var optionsBuilder = new MqttClientOptionsBuilder()
.WithTcpServer("127.0.0.1", 10086) // 要访问的 mqtt 服务端的 ip 和 端口号
.WithCredentials("admin", "123456") // 要访问的 mqtt 服务端的用户名和密码
.WithClientId("testclient02") // 设置客户端 id
.WithCleanSession()
.WithTls(new MqttClientOptionsBuilderTlsParameters
{
UseTls = false // 是否使用 tls 加密
});
var clientOptions = optionsBuilder.Build();
_mqttClient = new MqttFactory().CreateMqttClient();
_mqttClient.ConnectedAsync += _mqttClient_ConnectedAsync; // 客户端连接成功事件
_mqttClient.DisconnectedAsync += _mqttClient_DisconnectedAsync; // 客户端连接关闭事件
_mqttClient.ApplicationMessageReceivedAsync += _mqttClient_ApplicationMessageReceivedAsync; // 收到消息事件
_mqttClient.ConnectAsync(clientOptions);
}
/// <summary>
/// 客户端连接关闭事件
/// </summary>
/// <param name="arg"></param>
/// <returns></returns>
private Task _mqttClient_DisconnectedAsync(MqttClientDisconnectedEventArgs arg)
{
Console.WriteLine($"客户端已断开与服务端的连接……");
return Task.CompletedTask;
}
/// <summary>
/// 客户端连接成功事件
/// </summary>
/// <param name="arg"></param>
/// <returns></returns>
private Task _mqttClient_ConnectedAsync(MqttClientConnectedEventArgs arg)
{
Console.WriteLine($"客户端已连接服务端……");
// 订阅消息主题
// MqttQualityOfServiceLevel: (QoS): 0 最多一次,接收者不确认收到消息,并且消息不被发送者存储和重新发送提供与底层 TCP 协议相同的保证。
// 1: 保证一条消息至少有一次会传递给接收方。发送方存储消息,直到它从接收方收到确认收到消息的数据包。一条消息可以多次发送或传递。
// 2: 保证每条消息仅由预期的收件人接收一次。级别 2 是最安全和最慢的服务质量级别,保证由发送方和接收方之间的至少两个请求/响应(四次握手)。
_mqttClient.SubscribeAsync("topic_02", MqttQualityOfServiceLevel.AtLeastOnce);
return Task.CompletedTask;
}
/// <summary>
/// 收到消息事件
/// </summary>
/// <param name="arg"></param>
/// <returns></returns>
private Task _mqttClient_ApplicationMessageReceivedAsync(MqttApplicationMessageReceivedEventArgs arg)
{
Console.WriteLine($"ApplicationMessageReceivedAsync:客户端 ID=【{arg.ClientId}】接收到消息。Topic 主题=【{arg.ApplicationMessage.Topic}】 消息=【{Encoding.UTF8.GetString(arg.ApplicationMessage.Payload)}】 qos 等级=【{arg.ApplicationMessage.QualityOfServiceLevel}】");
return Task.CompletedTask;
}
public void Publish(string data)
{
var message = new MqttApplicationMessage
{
Topic = "topic_02",
Payload = Encoding.Default.GetBytes(data),
QualityOfServiceLevel = MqttQualityOfServiceLevel.AtLeastOnce,
Retain = true // 服务端是否保留消息。true 为保留,如果有新的订阅者连接,就会立马收到该消息。
};
_mqttClient.PublishAsync(message);
}
}
总结
MQTT 以上演示已经完毕,可以看到它的一些特性,跟 websocket 很接近,但是又比 websocket 通信更加灵活。其实,实际上 MQTT 的客户端在现实生产环境场景下,并不需要咱们开发者进行开发,很多硬件设备都支持提供 MQTT 协议的通信客户端,所以只需要自己搭建一个服务端,就可以实现实时监控各种设备推送过来的各种信号数据。
同时客户端支持发布消息给其他客户端,所以就实现了设备与设备之间的一对一信号通信的效果了。
如果需要下发信号给硬件设备,MQTT 服务端也可以直接下发给某个指定设备来进行实现即可。上面案例只提供入门方案,如果有感兴趣的大佬,可以自己去拓展一下,来达到更好的效果。
来源:cnblogs.com/weskynet/p/16441219.html