Skip to content

Commit

Permalink
异步操作加上 CancellationToken
Browse files Browse the repository at this point in the history
  • Loading branch information
nnhy committed Dec 11, 2024
1 parent c1c0b38 commit dc2fb9c
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 24 deletions.
55 changes: 32 additions & 23 deletions NewLife.MQTT/MqttClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -196,8 +196,9 @@ private void Init()
/// <summary>发送命令</summary>
/// <param name="msg">消息</param>
/// <param name="waitForResponse">是否等待响应</param>
/// <param name="cancellationToken"></param>
/// <returns></returns>
protected virtual async Task<MqttMessage?> SendAsync(MqttMessage msg, Boolean waitForResponse = true)
protected virtual async Task<MqttMessage?> SendAsync(MqttMessage msg, Boolean waitForResponse, CancellationToken cancellationToken)
{
if (msg is MqttIdMessage idm && idm.Id == 0 && (msg.Type != MqttType.Publish || msg.QoS > 0))
idm.Id = (UInt16)Interlocked.Increment(ref g_id);
Expand Down Expand Up @@ -226,7 +227,7 @@ private void Init()
return null;
}

var rs = await client.SendMessageAsync(msg).ConfigureAwait(false);
var rs = await client.SendMessageAsync(msg, cancellationToken).ConfigureAwait(false);

// 重置
_taskCanceledCount = 0;
Expand Down Expand Up @@ -357,8 +358,9 @@ private void Client_Received(Object sender, ReceivedEventArgs e)
public event EventHandler<EventArgs>? Connected;

/// <summary>连接服务端</summary>
/// <param name="cancellationToken"></param>
/// <returns></returns>
public Task<ConnAck> ConnectAsync()
public Task<ConnAck> ConnectAsync(CancellationToken cancellationToken = default)
{
if (ClientId.IsNullOrEmpty()) throw new ArgumentNullException(nameof(ClientId));

Expand All @@ -370,13 +372,14 @@ public Task<ConnAck> ConnectAsync()
CleanSession = CleanSession,
};

return ConnectAsync(message);
return ConnectAsync(message, cancellationToken);
}

/// <summary>连接服务端</summary>
/// <param name="message"></param>
/// <param name="cancellationToken"></param>
/// <returns></returns>
public async Task<ConnAck> ConnectAsync(ConnectMessage message)
public async Task<ConnAck> ConnectAsync(ConnectMessage message, CancellationToken cancellationToken = default)
{
if (message == null) throw new ArgumentNullException(nameof(message));

Expand All @@ -391,7 +394,7 @@ public async Task<ConnAck> ConnectAsync(ConnectMessage message)
// 心跳
if (KeepAlive > 0 && message.KeepAliveInSeconds == 0) message.KeepAliveInSeconds = (UInt16)KeepAlive;

var rs = (await SendAsync(message).ConfigureAwait(false)) as ConnAck;
var rs = (await SendAsync(message, true, cancellationToken).ConfigureAwait(false)) as ConnAck;

// 判断响应,是否成功连接
if (rs!.ReturnCode != ConnectReturnCode.Accepted)
Expand All @@ -414,7 +417,7 @@ public async Task<ConnAck> ConnectAsync(ConnectMessage message)
Requests = _subs.Values.ToArray(),
};

var rs2 = (await SendAsync(message2).ConfigureAwait(false)) as SubAck;
var rs2 = (await SendAsync(message2, true, cancellationToken).ConfigureAwait(false)) as SubAck;
if (rs2 == null) _subs.Clear();
}

Expand All @@ -423,11 +426,11 @@ public async Task<ConnAck> ConnectAsync(ConnectMessage message)

/// <summary>断开连接</summary>
/// <returns></returns>
public async Task DisconnectAsync()
public async Task DisconnectAsync(CancellationToken cancellationToken = default)
{
var message = new DisconnectMessage();

await SendAsync(message, false).ConfigureAwait(false);
await SendAsync(message, true, cancellationToken).ConfigureAwait(false);

var e = new EventArgs();
Disconnected?.Invoke(this, e);
Expand All @@ -448,7 +451,7 @@ private void Client_Closed(Object sender, EventArgs e)
if (Disposed || !Reconnect) return;

WriteLog("尝试重新连接");
ConnectAsync().GetAwaiter();
ConnectAsync().Wait(Timeout);
}
#endregion

Expand Down Expand Up @@ -476,17 +479,18 @@ private void Client_Closed(Object sender, EventArgs e)

/// <summary>发布消息</summary>
/// <param name="message"></param>
/// <param name="cancellationToken"></param>
/// <returns></returns>
public async Task<MqttIdMessage?> PublishAsync(PublishMessage message)
public async Task<MqttIdMessage?> PublishAsync(PublishMessage message, CancellationToken cancellationToken = default)
{
if (message == null) throw new ArgumentNullException(nameof(message));

var rs = (await SendAsync(message, message.QoS != QualityOfService.AtMostOnce).ConfigureAwait(false)) as MqttIdMessage;
var rs = (await SendAsync(message, message.QoS != QualityOfService.AtMostOnce, cancellationToken).ConfigureAwait(false)) as MqttIdMessage;

if (rs is PubRec)
{
var rel = new PubRel();
var cmp = (await SendAsync(rel, true).ConfigureAwait(false)) as PubComp;
var cmp = (await SendAsync(rel, true, cancellationToken).ConfigureAwait(false)) as PubComp;
return cmp;
}

Expand All @@ -498,30 +502,33 @@ private void Client_Closed(Object sender, EventArgs e)
/// <summary>订阅主题</summary>
/// <param name="topicFilter">主题过滤器</param>
/// <param name="callback">收到该主题消息时的回调</param>
/// <param name="cancellationToken"></param>
/// <returns></returns>
public Task<SubAck?> SubscribeAsync(String topicFilter, Action<PublishMessage>? callback = null)
public Task<SubAck?> SubscribeAsync(String topicFilter, Action<PublishMessage>? callback = null, CancellationToken cancellationToken = default)
{
var subscription = new Subscription(topicFilter, QualityOfService.AtMostOnce);

return SubscribeAsync([subscription], callback);
return SubscribeAsync([subscription], callback, cancellationToken);
}

/// <summary>订阅主题</summary>
/// <param name="topicFilters">主题过滤器</param>
/// <param name="qos">服务质量</param>
/// <param name="cancellationToken"></param>
/// <returns></returns>
public Task<SubAck?> SubscribeAsync(String[] topicFilters, QualityOfService qos = QualityOfService.AtMostOnce)
public Task<SubAck?> SubscribeAsync(String[] topicFilters, QualityOfService qos = QualityOfService.AtMostOnce, CancellationToken cancellationToken = default)
{
var subscriptions = topicFilters.Select(e => new Subscription(e, qos)).ToList();

return SubscribeAsync(subscriptions);
return SubscribeAsync(subscriptions, null, cancellationToken);
}

/// <summary>订阅主题</summary>
/// <param name="subscriptions">订阅集合</param>
/// <param name="callback">收到该主题消息时的回调</param>
/// <param name="cancellationToken"></param>
/// <returns></returns>
public async Task<SubAck?> SubscribeAsync(IList<Subscription> subscriptions, Action<PublishMessage>? callback = null)
public async Task<SubAck?> SubscribeAsync(IList<Subscription> subscriptions, Action<PublishMessage>? callback = null, CancellationToken cancellationToken = default)
{
// 已订阅,不重复
subscriptions = subscriptions.Where(e => !_subs.ContainsKey(e.TopicFilter)).ToList();
Expand All @@ -532,7 +539,7 @@ private void Client_Closed(Object sender, EventArgs e)
Requests = subscriptions,
};

var rs = (await SendAsync(message).ConfigureAwait(false)) as SubAck;
var rs = (await SendAsync(message, true, cancellationToken).ConfigureAwait(false)) as SubAck;
if (rs != null)
{
foreach (var item in subscriptions)
Expand All @@ -547,15 +554,16 @@ private void Client_Closed(Object sender, EventArgs e)

/// <summary>取消订阅主题</summary>
/// <param name="topicFilters">主题过滤器</param>
/// <param name="cancellationToken"></param>
/// <returns></returns>
public async Task<UnsubAck?> UnsubscribeAsync(params String[] topicFilters)
public async Task<UnsubAck?> UnsubscribeAsync(String[] topicFilters, CancellationToken cancellationToken = default)
{
var message = new UnsubscribeMessage
{
TopicFilters = topicFilters,
};

var rs = (await SendAsync(message).ConfigureAwait(false)) as UnsubAck;
var rs = (await SendAsync(message, true, cancellationToken).ConfigureAwait(false)) as UnsubAck;
if (rs != null)
{

Expand All @@ -571,8 +579,9 @@ private void Client_Closed(Object sender, EventArgs e)

#region 心跳
/// <summary>心跳</summary>
/// <param name="cancellationToken"></param>
/// <returns></returns>
public async Task<PingResponse?> PingAsync()
public async Task<PingResponse?> PingAsync(CancellationToken cancellationToken = default)
{
if (!IsConnected)
{
Expand All @@ -581,7 +590,7 @@ private void Client_Closed(Object sender, EventArgs e)

var message = new PingRequest();

var rs = (await SendAsync(message).ConfigureAwait(false)) as PingResponse;
var rs = (await SendAsync(message, true, cancellationToken).ConfigureAwait(false)) as PingResponse;
return rs;
}

Expand Down
2 changes: 1 addition & 1 deletion NewLife.MqttServer/NewLife.MqttServer.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@

<ItemGroup>
<PackageReference Include="NewLife.Core" Version="11.1.2024.1206" />
<PackageReference Include="NewLife.Stardust" Version="3.2.2024.1203" />
<PackageReference Include="NewLife.Stardust" Version="3.2.2024.1211" />
</ItemGroup>

<ItemGroup>
Expand Down

0 comments on commit dc2fb9c

Please sign in to comment.