Skip to content

Commit

Permalink
使用IPacketEncoder编码器来序列化消息编解码
Browse files Browse the repository at this point in the history
  • Loading branch information
nnhy committed Jun 29, 2024
1 parent e98581a commit 3e7e10e
Show file tree
Hide file tree
Showing 7 changed files with 26 additions and 44 deletions.
20 changes: 4 additions & 16 deletions NewLife.MQTT/Handlers/IMqttHandler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,8 @@ public class MqttHandler : IMqttHandler, ITracerFeature, ILogFeature
/// <summary>集群消息交换机</summary>
public ClusterExchange? ClusterExchange { get; set; }

/// <summary>Json序列化主机</summary>
public IJsonHost JsonHost { get; set; } = null!;
/// <summary>编码器。决定对象存储序列化格式</summary>
public IPacketEncoder Encoder { get; set; } = null!;

#region 接收消息
/// <summary>处理消息</summary>
Expand Down Expand Up @@ -194,7 +194,7 @@ protected virtual UnsubAck OnUnsubscribe(UnsubscribeMessage message)
public async Task<MqttIdMessage?> PublishAsync(String topic, Object data, QualityOfService qos = QualityOfService.AtMostOnce)
{
var pk = data as Packet;
if (pk == null && data != null) pk = Serialize(data);
if (pk == null && data != null) pk = Encoder.Encode(data);
if (pk == null) throw new ArgumentNullException(nameof(data));

var message = new PublishMessage
Expand All @@ -216,7 +216,7 @@ protected virtual UnsubAck OnUnsubscribe(UnsubscribeMessage message)
public async Task<MqttIdMessage?> PublishAsync(String topic, Object data, Boolean allowExchange, QualityOfService qos = QualityOfService.AtMostOnce)
{
var pk = data as Packet;
if (pk == null && data != null) pk = Serialize(data);
if (pk == null && data != null) pk = Encoder.Encode(data);
if (pk == null) throw new ArgumentNullException(nameof(data));

var message = new PublishMessage
Expand Down Expand Up @@ -302,18 +302,6 @@ protected virtual UnsubAck OnUnsubscribe(UnsubscribeMessage message)
throw;
}
}

/// <summary>把对象序列化为数据,字节数组和字符串以外的复杂类型,走Json序列化</summary>
/// <param name="data"></param>
/// <returns></returns>
protected virtual Packet Serialize(Object data)
{
if (data is Packet pk) return pk;
if (data is Byte[] buf) return buf;
if (data is String str) return str.GetBytes();

return JsonHost.Write(data).GetBytes();
}
#endregion

#region 辅助
Expand Down
21 changes: 4 additions & 17 deletions NewLife.MQTT/MqttClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,8 @@ public class MqttClient : DisposeBase
/// </summary>
public Boolean CleanSession { get; set; } = true;

/// <summary>Json序列化主机</summary>
public IJsonHost JsonHost { get; set; } = JsonHelper.Default;
/// <summary>编码器。决定对象存储序列化格式,默认json</summary>
public IPacketEncoder Encoder { get; set; } = new DefaultPacketEncoder();

/// <summary>
/// 断开后是否自动重连
Expand Down Expand Up @@ -209,8 +209,7 @@ private void Init()

Init();

var client = _Client;
if (client == null) throw new ArgumentNullException(nameof(_Client));
var client = _Client ?? throw new ArgumentNullException(nameof(_Client));
try
{
// 断开消息没有响应
Expand Down Expand Up @@ -455,7 +454,7 @@ private void Client_Closed(Object sender, EventArgs e)
public async Task<MqttIdMessage?> PublishAsync(String topic, Object data, QualityOfService qos = QualityOfService.AtMostOnce)
{
var pk = data as Packet;
if (pk == null && data != null) pk = Serialize(data);
if (pk == null && data != null) pk = Encoder.Encode(data);
if (pk == null) throw new ArgumentNullException(nameof(data));

var message = new PublishMessage
Expand Down Expand Up @@ -486,18 +485,6 @@ private void Client_Closed(Object sender, EventArgs e)

return rs;
}

/// <summary>把对象序列化为数据,字节数组和字符串以外的复杂类型,走Json序列化</summary>
/// <param name="data"></param>
/// <returns></returns>
protected virtual Packet Serialize(Object data)
{
if (data is Packet pk) return pk;
if (data is Byte[] buf) return buf;
if (data is String str) return str.GetBytes();

return JsonHost.Write(data).GetBytes();
}
#endregion

#region 订阅
Expand Down
17 changes: 12 additions & 5 deletions NewLife.MQTT/MqttServer.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using NewLife.Log;
using NewLife.Data;
using NewLife.Log;
using NewLife.Model;
using NewLife.MQTT.Clusters;
using NewLife.MQTT.Handlers;
Expand All @@ -23,8 +24,8 @@ public class MqttServer : NetServer<MqttSession>
/// <summary>集群服务端</summary>
public ClusterServer? Cluster { get; set; }

/// <summary>Json序列化主机</summary>
public IJsonHost? JsonHost { get; set; }
/// <summary>编码器。决定对象存储序列化格式,默认json</summary>
public IPacketEncoder Encoder { get; set; } = null!;

/// <summary>实例化MQTT服务器</summary>
public MqttServer() => Port = 1883;
Expand All @@ -40,7 +41,13 @@ protected override void OnStart()

Name = $"Mqtt{Port}";

JsonHost ??= ServiceProvider.GetService<IJsonHost>() ?? JsonHelper.Default;
//JsonHost ??= ServiceProvider.GetService<IJsonHost>() ?? JsonHelper.Default;
Encoder ??= ServiceProvider.GetService<IPacketEncoder>() ?? new DefaultPacketEncoder();
if (Encoder is DefaultPacketEncoder encoder)
{
var jsonHost = ServiceProvider.GetService<IJsonHost>();
if (jsonHost != null) encoder.JsonHost = jsonHost;
}

var exchange = Exchange;
exchange ??= ServiceProvider.GetService<IMqttExchange>();
Expand Down Expand Up @@ -130,7 +137,7 @@ protected override void OnConnected()
mqttHandler.Session = this;
mqttHandler.Exchange = Host.Exchange;
mqttHandler.ClusterExchange = Host.Cluster?.ClusterExchange;
mqttHandler.JsonHost = Host.JsonHost ?? ServiceProvider?.GetService<IJsonHost>() ?? JsonHelper.Default;
mqttHandler.Encoder = Host.Encoder;
}

MqttHandler = handler;
Expand Down
4 changes: 2 additions & 2 deletions NewLife.MQTT/NewLife.MQTT.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,8 @@
</ItemGroup>

<ItemGroup>
<PackageReference Include="NewLife.Core" Version="10.10.2024.601" />
<PackageReference Include="NewLife.Remoting" Version="2.8.2024.402" />
<PackageReference Include="NewLife.Core" Version="10.10.2024.629-beta1346" />
<PackageReference Include="NewLife.Remoting" Version="3.0.2024.628-beta0626" />
</ItemGroup>

<ItemGroup>
Expand Down
4 changes: 2 additions & 2 deletions NewLife.MqttServer/NewLife.MqttServer.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@
</PropertyGroup>

<ItemGroup>
<PackageReference Include="NewLife.Core" Version="10.10.2024.601" />
<PackageReference Include="NewLife.Stardust" Version="2.9.2024.402" />
<PackageReference Include="NewLife.Core" Version="10.10.2024.629-beta1346" />
<PackageReference Include="NewLife.Stardust" Version="3.0.2024.628-beta0628" />
</ItemGroup>

<ItemGroup>
Expand Down
2 changes: 1 addition & 1 deletion Test/Test.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
<LangVersion>latest</LangVersion>
</PropertyGroup>
<ItemGroup>
<PackageReference Include="NewLife.Core" Version="10.10.2024.601" />
<PackageReference Include="NewLife.Core" Version="10.10.2024.629-beta1346" />
</ItemGroup>
<ItemGroup>
<ProjectReference Include="..\NewLife.MQTT\NewLife.MQTT.csproj" />
Expand Down
2 changes: 1 addition & 1 deletion XUnitTestClient/XUnitTestClient.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@

<ItemGroup>
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="17.10.0" />
<PackageReference Include="NewLife.Core" Version="10.10.2024.601" />
<PackageReference Include="NewLife.Core" Version="10.10.2024.629-beta1346" />
<PackageReference Include="NewLife.UnitTest" Version="1.0.2024.102-beta0146" />
<PackageReference Include="xunit" Version="2.8.1" />
<PackageReference Include="xunit.runner.visualstudio" Version="2.8.1">
Expand Down

0 comments on commit 3e7e10e

Please sign in to comment.