Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MQTT服务器是否可以主动向客户端发送消息 #6

Open
omgcwz opened this issue Nov 14, 2024 · 4 comments
Open

MQTT服务器是否可以主动向客户端发送消息 #6

omgcwz opened this issue Nov 14, 2024 · 4 comments

Comments

@omgcwz
Copy link

omgcwz commented Nov 14, 2024

MQTT服务器是否可以主动向客户端发送消息,如果可以该如何做

@nnhy
Copy link
Member

nnhy commented Nov 15, 2024

看IMqttHandler的定义,其中的Publish就是用下向下发布消息。

`

///

MQTT处理器
///
public interface IMqttHandler
{
/// 处理消息
/// 消息
///
MqttMessage? Process(MqttMessage message);

/// <summary>发布消息</summary>
/// <param name="topic">主题</param>
/// <param name="data">消息数据</param>
/// <param name="qos">服务质量</param>
/// <returns></returns>
Task<MqttIdMessage?> PublishAsync(String topic, Object data, QualityOfService qos = QualityOfService.AtMostOnce);

/// <summary>发布消息</summary>
/// <param name="topic">主题</param>
/// <param name="data">消息数据</param>
/// <param name="qos">服务质量</param>
/// <param name="allowExchange">允许消息交换</param>
/// <returns></returns>
Task<MqttIdMessage?> PublishAsync(String topic, Object data, Boolean allowExchange, QualityOfService qos = QualityOfService.AtMostOnce);

/// <summary>发布消息</summary>
/// <param name="message">消息</param>
/// <returns></returns>
Task<MqttIdMessage?> PublishAsync(PublishMessage message);

/// <summary>关闭连接。网络连接被关闭时触发</summary>
/// <param name="reason"></param>
void Close(String reason);

}
`

@omgcwz
Copy link
Author

omgcwz commented Nov 18, 2024

感谢作者的指点,但是我还是没有解决,我不知道如何获取到MqttHandler的实例服务,我用下面的方法写的会出现
One or more errors occurred. (Object reference not set to an instance of an object.)的报错信息
是否有相对完整的案例,麻烦你了

public void PublishMessage(string topic,string message)
{
var mqttHandler = NewLife.Model.ModelExtension.GetService(mqttServer.ServiceProvider);
mqttHandler.PublishAsync(topic, message).Wait();
Console.WriteLine("发");
}

@nnhy
Copy link
Member

nnhy commented Nov 18, 2024

以下代码,来自商用版FIoT平台。

using System.Text.RegularExpressions;
using IoT.Data;
using IoTCore.Models;
using IoTMqtt.Services;
using IoTServer.Models;
using NewLife;
using NewLife.Data;
using NewLife.IoT.Models;
using NewLife.IoT.ThingModels;
using NewLife.Log;
using NewLife.MQTT.Handlers;
using NewLife.MQTT.Messaging;
using NewLife.Net;
using NewLife.Serialization;
using XCode;
using LogLevel = NewLife.Log.LogLevel;

namespace IoTServer.Services;

///

MQTT控制器。处理业务逻辑
public class MqttController : MqttHandler
{
#region 属性
private readonly MyDeviceService _deviceService;
private readonly ThingService _thingService;
private readonly MqttService _mqttService;
private readonly QueueService _queue;
private CancellationTokenSource _source;
#endregion

#region 构造
/// <summary>实例化</summary>
/// <param name="deviceService"></param>
/// <param name="thingService"></param>
/// <param name="queue"></param>
/// <param name="tracer"></param>
/// <param name="log"></param>
public MqttController(MyDeviceService deviceService, ThingService thingService, MqttService mqttService, QueueService queue, ITracer tracer, ILog log)
{
    Tracer = tracer;
    Log = log;
    _deviceService = deviceService;
    _thingService = thingService;
    _mqttService = mqttService;
    _queue = queue;
}
#endregion

#region 接收指令
/// <summary>当客户端连接时</summary>
/// <param name="session"></param>
/// <param name="message"></param>
/// <returns></returns>
protected override ConnAck OnConnect(ConnectMessage message)
{
    using var span = Tracer?.NewSpan("MqttLogin", message);

    var session = Session;
    var clientId = message.ClientId;
    WriteLog("客户端[{0}]连接 user={1} pass={2} clientId={3}", session.Remote.EndPoint, message.Username, message.Password, clientId);
    if (clientId.IsNullOrEmpty()) clientId = message.Username;

    try
    {
        var dvSession = _mqttService.MqttLogin(new MqttAuthReq
        {
            ClientId = clientId,
            Username = message.Username,
            Password = message.Password,
            RemoteIp = session.Remote.Host
        });

        // 存储客户端信息
        if (session is IExtend extend) extend["DeviceSession"] = dvSession;

        var device = dvSession.Device;

        _source = new CancellationTokenSource();
        _ = Task.Run(() => ConsumeMessage(device, session.Remote.Host, _source));

        return base.OnConnect(message);
    }
    catch (Exception ex)
    {
        span?.SetError(ex, null);

        // 登录出错时,向MQTT客户端发送错误响应
        return new ConnAck { ReturnCode = ConnectReturnCode.RefusedBadUsernameOrPassword };
    }
}

/// <summary>
/// 检查是否登录,如果未登录则断开会话
/// </summary>
/// <param name="session"></param>
/// <returns></returns>
private MqttDeviceSession CheckLogin(INetSession session)
{
    if (session is IExtend extend && extend["DeviceSession"] is MqttDeviceSession ss) return ss;

    // 未登录
    session.TryDispose();

    return null;
}

/// <summary>当客户端断开连接时</summary>
/// <param name="session"></param>
/// <param name="message"></param>
/// <returns></returns>
protected override MqttMessage OnDisconnect(DisconnectMessage message)
{
    _source?.Cancel();

    var session = Session;
    WriteLog("客户端[{0}]断开", session.Remote);

    var dvSession = CheckLogin(session);
    if (dvSession == null) return null;

    _deviceService.Logout(dvSession.Device, "OnDisconnect", "Mqtt", session.Remote.Host);

    return base.OnDisconnect(message);
}

/// <summary>当收到客户端发布的消息时</summary>
/// <param name="session"></param>
/// <param name="message"></param>
/// <returns></returns>
protected override MqttIdMessage OnPublish(PublishMessage message)
{
    var session = Session;
    var dvSession = CheckLogin(session);
    if (dvSession == null) return null;

    using var span = Tracer?.NewSpan(nameof(OnPublish));

    var dv = dvSession.Device;
    //var deviceCode = dv.Code;
    //var productKey = dvSession.ProductKey;

    // TODO 通过产品查询判断上报的topic是否合规
    //var serviceReplyTopic = $"sys/{productKey}/{deviceCode}/thing/service/post_reply";
    //var propertyPostTopic = $"sys/{productKey}/{deviceCode}/thing/property/post";
    //var pingPostTopic = $"sys/{productKey}/{deviceCode}/thing/event/ping/post";

    var ip = session.Remote.Host;
    if (dv == null)
    {
        //throw new Exception($"非法设备编码,产品[{productKey}]-设备[{deviceCode}]不存在!");
        session.TryDispose();
        return null;
    }

    // 更新设备在线状态。网关上线
    _deviceService.SetDeviceOnline(dv, ip, nameof(OnPublish));

    var topic = message.Topic;
    var msg = message.Payload.ToStr();
    span?.SetTag(topic + Environment.NewLine + msg);

    try
    {
        var segs = topic.Split('/', StringSplitOptions.RemoveEmptyEntries);
        //if (segs == null || segs.Length < 4) throw new Exception($"收到过短Topic[{topic}],Payload={msg}");
        if (segs != null && segs.Length >= 4)
        {
            var productKey = segs[1];
            var deviceCode = segs[2];
            var child = Device.FindByCode(deviceCode);
            if (child == null || child.Id != dv.Id && child.ParentId != dv.Id)
                throw new Exception($"非法设备编码,[{deviceCode}]并非当前登录设备[{dv}]的子设备");

            // 默认使用登录设备,然后使用Topic中的设备
            dv = child;

            var topic2 = segs.Skip(3).Join("/");

            // 数据上报
            if (topic2.EqualIgnoreCase("thing/property/post"))
            {
                OnPostProperty(msg, dv, productKey, deviceCode, ip);
            }
            // 服务调用结果
            else if (topic2.EqualIgnoreCase("thing/service/post_reply"))
            {
                OnServiceReply(msg, dv, productKey, deviceCode, ip);
            }
            // 事件上报
            else if (topic2.EqualIgnoreCase("thing/event/info/post", "thing/event/alert/post", "thing/event/error/post"))
            {
                var eType = segs[2];

                OnPostEvent(msg, dv, productKey, deviceCode, ip, eType);
            }
            // ping上报
            else if (topic2.EqualIgnoreCase("thing/event/ping/post"))
            {
                OnPing(msg, dv, productKey, deviceCode, ip);
            }
            // OTA升级
            else if (topic2.EqualIgnoreCase("thing/ota/firmware/get"))
            {
                OnUpgrade(msg, dv, productKey, deviceCode, ip);
            }

            //else
            //    throw new Exception($"收到非法Topic[{topic}],Payload={msg}");
        }
    }
    catch (Exception ex)
    {
        span?.SetError(ex, null);
        Log?.Error(ex.Message);

        // 收到无法识别的Topic
        var ev = new EventModel
        {
            Type = "error",
            Name = "非法发布",
            Remark = ex.Message,
            Data = topic,
            Time = DateTime.UtcNow.ToLong(),
        };

        _deviceService.WriteHistory(dv, ev.Name, false, ex.Message, ip);
        _thingService.PostEvent(dv, ev, ip);

        return null;
    }

    return base.OnPublish(message);
}

/// <summary>当收到客户端的心跳时</summary>
/// <param name="session"></param>
/// <param name="message"></param>
/// <returns></returns>
protected override NewLife.MQTT.Messaging.PingResponse OnPing(PingRequest message)
{
    var session = Session;
    var dvSession = CheckLogin(session);
    if (dvSession == null) return null;

    //var clientId = dvSession.ClientId;
    _deviceService.Ping(dvSession.Device, null, null, "Mqtt", session.Remote.Host);

    //if (_Logins[clientId] >= 1)
    //    _Logins[clientId]--;

    return base.OnPing(message);
}

/// <summary>当收到客户端的订阅时</summary>
/// <param name="session"></param>
/// <param name="message"></param>
/// <returns></returns>
protected override SubAck OnSubscribe(SubscribeMessage message)
{
    var session = Session;
    WriteLog("客户端[{0}]订阅主题[{1}]", session.Remote, String.Join(", ", message.Requests.Select(p => p.TopicFilter)));

    var dvSession = CheckLogin(session);
    if (dvSession == null) return null;

    var ip = session.Remote.Host;
    var ev = new EventModel
    {
        Type = "info",
        Name = "订阅",
        Remark = message.Requests.ToJson(),
        Time = DateTime.UtcNow.ToLong(),
    };

    _thingService.PostEvent(dvSession.Device, ev, ip);

    return base.OnSubscribe(message);
}

/// <summary>取消订阅</summary>
/// <param name="session"></param>
/// <param name="message"></param>
/// <returns></returns>
protected override UnsubAck OnUnsubscribe(UnsubscribeMessage message)
{
    var session = Session;
    WriteLog("客户端[{0}]取消订阅主题[{1}]", session.Remote, String.Join(", ", message.TopicFilters));

    var dvSession = CheckLogin(session);
    if (dvSession == null) return null;

    var ip = session.Remote.Host;
    var ev = new EventModel
    {
        Type = "info",
        Name = "取消订阅",
        Remark = message.TopicFilters.ToJson(),
        Time = DateTime.UtcNow.ToLong(),
    };

    _thingService.PostEvent(dvSession.Device, ev, ip);

    return base.OnUnsubscribe(message);
}
#endregion

#region 业务处理
private void OnPostProperty(String msg, Device dv, String productKey, String deviceCode, String ip)
{
    var model = msg.ToJsonEntity<DataModels>();
    if (model != null && model.Items != null)
    {
        _thingService.PostData(dv, model, "MqttPost", ip);

        _ = PublishAsync($"sys/{productKey}/{deviceCode}/thing/property/post_reply", "ok");
    }
}

private void OnServiceReply(String msg, Device dv, String productKey, String deviceCode, String ip)
{
    var model = msg.ToJsonEntity<ServiceReplyModel>();
    _thingService.ServiceReply(dv, model);
}

private void OnPing(String msg, Device dv, String productKey, String deviceCode, String ip)
{
    var model = msg.ToJsonEntity<PingInfo>();

    _deviceService.Ping(dv, model, null, "Mqtt", ip);

    var rs = new NewLife.IoT.Models.PingResponse
    {
        Time = model?.Time ?? 0,
        ServerTime = DateTime.UtcNow.ToLong(),
    };

    _ = PublishAsync($"sys/{productKey}/{deviceCode}/thing/event/ping/post_reply", rs);
}

private void OnPostEvent(String msg, Device dv, String productKey, String deviceCode, String ip, String eventType)
{
    var model = msg.ToJsonEntity<EventModel>();

    _thingService.PostEvent(dv, model, ip);

    var rs = new EventResponse()
    {
        Time = model?.Time ?? 0,
        ServerTime = DateTime.UtcNow.ToLong(),
        EventType = eventType,
        Status = "ok",
        Name = model?.Name
    };

    _ = PublishAsync($"sys/{productKey}/{deviceCode}/thing/event/{eventType}/post_reply", rs);
}

private void OnUpgrade(String msg, Device dv, String productKey, String deviceCode, String ip)
{
    // 应用过滤规则,使用最新的一个版本
    var pv = _deviceService.Upgrade(dv, ip);
    if (pv == null) return;

    //todo 需要处理url为完整http地址
    var rs = new UpgradeInfo
    {
        Version = pv.Version,
        Source = pv.Source,
        Executor = pv.Executor,
        Force = pv.Force,
        FileSize = pv.Size,
        FileHash = pv.FileHash,
        Description = pv.Remark,
    };

    _ = PublishAsync($"sys/{productKey}/{deviceCode}/thing/ota/firmware/get_reply", rs);
}

private async Task ConsumeMessage(Device device, String ip, CancellationTokenSource source)
{
    DefaultSpan.Current = null;
    var cancellationToken = source.Token;
    var queue = _queue.GetQueue(device.Code);
    try
    {
        while (!cancellationToken.IsCancellationRequested && !Session.Disposed)
        {
            ISpan span = null;
            var mqMsg = await queue.TakeOneAsync(30);
            if (mqMsg != null)
            {
                // 埋点
                span = Tracer?.NewSpan($"mqtt:ServiceQueue", mqMsg);
                if (Log != null && Log.Level <= LogLevel.Debug) WriteLog("消费到下发指令消息:{0}", mqMsg);

                // 解码
                var dic = JsonParser.Decode(mqMsg);
                span?.Detach(dic);
                var msg = JsonHelper.Convert<ServiceModel>(dic);

                if (msg == null || msg.Id == 0 || msg.Expire.Year > 2000 && msg.Expire < DateTime.Now)
                    _deviceService.WriteHistory(device, "Mqtt发送", false, "消息无效。" + mqMsg, ip);
                else
                {
                    _deviceService.WriteHistory(device, "Mqtt发送", true, mqMsg, ip);

                    // 向客户端传递埋点信息,构建完整调用链
                    msg.TraceId = span + "";

                    var log = DeviceServiceLog.FindById(msg.Id);
                    if (log != null)
                    {
                        if (log.TraceId.IsNullOrEmpty()) log.TraceId = span?.TraceId;
                        log.Status = ServiceStatus.处理中;
                        log.Update();
                    }

                    var topic = $"sys/{device.Product.Code}/{device.Code}/thing/service/post";
                    var data = msg.ToDictionary();
                    data["id"] = msg.Id.ToString();

                    await PublishAsync(topic, data);
                }
            }
            else
            {
                await Task.Delay(100, cancellationToken);
            }
            span?.Dispose();
        }
    }
    catch (TaskCanceledException) { }
    catch (Exception ex)
    {
        XTrace.WriteException(ex);
    }
    finally
    {
        source.Cancel();
    }
}
#endregion

#region 辅助
///// <summary>写日志</summary>
///// <param name="format"></param>
///// <param name="args"></param>
//private void WriteLog(String format, params Object[] args) => Log?.Info($"[MqttServer]{format}", args);
#endregion

}

@omgcwz
Copy link
Author

omgcwz commented Nov 18, 2024

上面的案例是在实现MqttHandler接口的情况下去做方式数据,我希望做到的在业务中去使用PublishAsync方法去主动发送数据而不是在连接或断开事件后被动触发

我想实现的是一个线程,定时发布一条数据数据

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants