Skip to content

Commit

Permalink
重构应用心跳接口,支持心跳响应携带令牌和命令
Browse files Browse the repository at this point in the history
  • Loading branch information
nnhy committed Aug 11, 2024
1 parent 4047658 commit f256bd8
Show file tree
Hide file tree
Showing 5 changed files with 179 additions and 90 deletions.
14 changes: 14 additions & 0 deletions Stardust.Data/Entity/应用命令.Biz.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
using NewLife.Data;
using NewLife.Log;
using NewLife.Remoting.Models;
using Stardust.Data.Nodes;
using XCode;

namespace Stardust.Data;
Expand Down Expand Up @@ -117,6 +118,19 @@ public static IList<AppCommand> Search(Int32 appId, String command, DateTime sta
#endregion

#region 业务操作
/// <summary>获取有效命令</summary>
/// <param name="appId"></param>
/// <param name="count"></param>
/// <returns></returns>
public static IList<AppCommand> AcquireCommands(Int32 appId, Int32 count = 100)
{
var exp = new WhereExpression();
if (appId > 0) exp &= _.AppId == appId;
exp &= _.Status <= CommandStatus.处理中;

return FindAll(exp, _.Id.Asc(), null, 0, count);
}

/// <summary>转为模型</summary>
/// <returns></returns>
public CommandModel ToModel()
Expand Down
43 changes: 36 additions & 7 deletions Stardust.Server/Controllers/AppController.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
using System.Net.WebSockets;
using System.Net;
using System.Net.WebSockets;
using System.Xml.Linq;
using Microsoft.AspNetCore.Authorization;
using Microsoft.AspNetCore.Mvc;
using NewLife;
Expand All @@ -11,6 +13,7 @@
using NewLife.Serialization;
using Stardust.Data;
using Stardust.Data.Configs;
using Stardust.Data.Nodes;
using Stardust.Models;
using Stardust.Server.Services;
using WebSocket = System.Net.WebSockets.WebSocket;
Expand Down Expand Up @@ -115,17 +118,41 @@ public String Register(AppModel inf)
[HttpPost(nameof(Ping))]
public PingResponse Ping(AppInfo inf)
{
var app = _app;
var rs = new PingResponse
{
//Time = inf.Time,
Time = inf.Time,
ServerTime = DateTime.UtcNow.ToLong(),
Period = _app.Period,
};

var online = _registryService.Ping(_app, inf, UserHost, _clientId, Token);

var online = _registryService.Ping(app, inf, UserHost, _clientId, Token);
_deployService.UpdateDeployNode(online);

if (app != null)
{
rs.Period = app.Period;

// 令牌有效期检查,10分钟内到期的令牌,颁发新令牌,以获取业务的连续性。
//todo 这里将来由客户端提交刷新令牌,才能颁发新的访问令牌。
var set = _setting;
var tm = _tokenService.ValidAndIssueToken(app.Name, Token, set.TokenSecret, set.TokenExpire);
if (tm != null)
{
using var span = _tracer?.NewSpan("RefreshAppToken", new { app.Name, app.DisplayName });

rs.Token = tm.AccessToken;

//app.WriteHistory("刷新令牌", true, tm.ToJson(), ip);
}

if (!app.Version.IsNullOrEmpty() && Version.TryParse(app.Version, out var ver))
{
// 拉取命令
if (ver.Build >= 2024 && ver.Revision >= 801)
rs.Commands = _registryService.AcquireAppCommands(app.Id);
}
}

return rs;
}

Expand Down Expand Up @@ -173,7 +200,9 @@ private async Task Handle(WebSocket socket, App app, String clientId)
if (app == null) throw new ApiException(401, "未登录!");

var sid = Rand.Next();
WriteHistory("WebSocket连接", true, $"State={socket.State} sid={sid}", clientId);
var connection = HttpContext.Connection;
var remote = new IPEndPoint(connection.RemoteIpAddress, connection.RemotePort);
WriteHistory("WebSocket连接", true, $"State={socket.State} sid={sid} Remote={remote}", clientId);

var olt = AppOnline.FindByClient(clientId);
if (olt != null)
Expand Down Expand Up @@ -202,7 +231,7 @@ await socket.WaitForClose(txt =>
}
}, source);

WriteHistory("WebSocket断开", true, $"State={socket.State} CloseStatus={socket.CloseStatus} sid={sid}", clientId);
WriteHistory("WebSocket断开", true, $"State={socket.State} CloseStatus={socket.CloseStatus} sid={sid} Remote={remote}", clientId);
if (olt != null)
{
olt.WebSocket = false;
Expand Down
49 changes: 45 additions & 4 deletions Stardust.Server/Controllers/NodeController.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using System.Net.WebSockets;
using System.Net;
using System.Net.WebSockets;
using Microsoft.AspNetCore.Authorization;
using Microsoft.AspNetCore.Mvc;
using NewLife;
Expand Down Expand Up @@ -117,7 +118,45 @@ public LoginResponse Logout(String reason)

#region 心跳保活
[HttpPost(nameof(Ping))]
public PingResponse Ping(PingInfo inf) => _nodeService.Ping(_node, inf, Token, UserHost, _setting);
public PingResponse Ping(PingInfo inf)
{
var node = _node;
var rs = new PingResponse
{
Time = inf.Time,
ServerTime = DateTime.UtcNow.ToLong(),
};

var online = _nodeService.Ping(node, inf, Token, UserHost);

if (node != null)
{
rs.Period = node.Period;
rs.NewServer = !node.NewServer.IsNullOrEmpty() ? node.NewServer : node.Project?.NewServer;

// 令牌有效期检查,10分钟内到期的令牌,颁发新令牌,以获取业务的连续性。
//todo 这里将来由客户端提交刷新令牌,才能颁发新的访问令牌。
var set = _setting;
var tm = _tokenService.ValidAndIssueToken(node.Code, Token, set.TokenSecret, set.TokenExpire);
if (tm != null)
{
using var span = _tracer?.NewSpan("RefreshNodeToken", new { node.Code, node.Name });

rs.Token = tm.AccessToken;

//node.WriteHistory("刷新令牌", true, tm.ToJson(), ip);
}

if (!node.Version.IsNullOrEmpty() && Version.TryParse(node.Version, out var ver))
{
// 拉取命令
if (ver.Build >= 2023 && ver.Revision >= 107)
rs.Commands = _nodeService.AcquireNodeCommands(node.ID);
}
}

return rs;
}

[AllowAnonymous]
[HttpGet(nameof(Ping))]
Expand Down Expand Up @@ -286,7 +325,9 @@ private async Task Handle(WebSocket socket, String token, String ip)
if (error != null) throw error;

var sid = Rand.Next();
WriteHistory(node, "WebSocket连接", true, $"State={socket.State} sid={sid}");
var connection = HttpContext.Connection;
var remote = new IPEndPoint(connection.RemoteIpAddress, connection.RemotePort);
WriteHistory(node, "WebSocket连接", true, $"State={socket.State} sid={sid} Remote={remote}");

var olt = _nodeService.GetOrAddOnline(node, token, ip);
if (olt != null)
Expand All @@ -313,7 +354,7 @@ await socket.WaitForClose(txt =>
}
}, source);

WriteHistory(node, "WebSocket断开", true, $"State={socket.State} CloseStatus={socket.CloseStatus} sid={sid}");
WriteHistory(node, "WebSocket断开", true, $"State={socket.State} CloseStatus={socket.CloseStatus} sid={sid} Remote={remote}");
if (olt != null)
{
olt.WebSocket = false;
Expand Down
120 changes: 41 additions & 79 deletions Stardust.Server/Services/NodeService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -402,91 +402,62 @@ private String BuildCode(NodeInfo di, String productCode, StarServerSetting set)
#endregion

#region 心跳
public PingResponse Ping(Node node, PingInfo inf, String token, String ip, StarServerSetting set)
public NodeOnline Ping(Node node, PingInfo inf, String token, String ip)
{
var rs = new PingResponse
{
Time = inf.Time,
ServerTime = DateTime.UtcNow.ToLong(),
};
if (node == null) return null;

if (node != null)
{
if (!inf.IP.IsNullOrEmpty()) node.IP = inf.IP;
node.UpdateIP = ip;
node.FixArea();
node.FixNameByRule();
if (!inf.IP.IsNullOrEmpty()) node.IP = inf.IP;
node.UpdateIP = ip;
node.FixArea();
node.FixNameByRule();

// 在心跳中更新客户端所有的框架。因此客户端长期不重启,而中途可能安装了新版NET运行时
if (!inf.Framework.IsNullOrEmpty())
// 在心跳中更新客户端所有的框架。因此客户端长期不重启,而中途可能安装了新版NET运行时
if (!inf.Framework.IsNullOrEmpty())
{
//node.Framework = inf.Framework?.Split(',').LastOrDefault();
node.Frameworks = inf.Framework;
// 选取最大的版本,而不是最后一个,例如6.0.3字符串大于6.0.13
Version max = null;
var fs = inf.Framework.Split(',');
if (fs != null)
{
//node.Framework = inf.Framework?.Split(',').LastOrDefault();
node.Frameworks = inf.Framework;
// 选取最大的版本,而不是最后一个,例如6.0.3字符串大于6.0.13
Version max = null;
var fs = inf.Framework.Split(',');
if (fs != null)
foreach (var f in fs)
{
foreach (var f in fs)
{
if (System.Version.TryParse(f, out var v) && (max == null || max < v))
max = v;
}
node.Framework = max?.ToString();
if (System.Version.TryParse(f, out var v) && (max == null || max < v))
max = v;
}
node.Framework = max?.ToString();
}
}

// 每10分钟更新一次节点信息,确保活跃
if (node.LastActive.AddMinutes(10) < DateTime.Now) node.LastActive = DateTime.Now;
//node.SaveAsync();
node.Update();

rs.Period = node.Period;
rs.NewServer = !node.NewServer.IsNullOrEmpty() ? node.NewServer : node.Project?.NewServer;

var online = GetOrAddOnline(node, token, ip);
online.Name = node.Name;
online.ProjectId = node.ProjectId;
online.ProductCode = node.ProductCode;
online.Category = node.Category;
online.Version = node.Version;
online.CompileTime = node.CompileTime;
online.OSKind = node.OSKind;
online.ProvinceID = node.ProvinceID;
online.CityID = node.CityID;
online.Save(null, inf, token, ip);

// 令牌有效期检查,10分钟内到期的令牌,颁发新令牌,以获取业务的连续性。
//todo 这里将来由客户端提交刷新令牌,才能颁发新的访问令牌。
var tm = _tokenService.ValidAndIssueToken(node.Code, token, set.TokenSecret, set.TokenExpire);
if (tm != null)
{
using var span = _tracer?.NewSpan("RefreshToken", new { node.Code, node.Name });

rs.Token = tm.AccessToken;

//node.WriteHistory("刷新令牌", true, tm.ToJson(), ip);
}

if (!node.Version.IsNullOrEmpty() && Version.TryParse(node.Version, out var ver))
{
// 拉取命令
if (ver.Build >= 2003 && ver.Revision >= 107)
rs.Commands = AcquireCommands(node.ID);
}
// 每10分钟更新一次节点信息,确保活跃
if (node.LastActive.AddMinutes(10) < DateTime.Now) node.LastActive = DateTime.Now;
//node.SaveAsync();
node.Update();

//// 下发部署的应用服务
//rs.Services = GetServices(node.ID);
}
var online = GetOrAddOnline(node, token, ip);
online.Name = node.Name;
online.ProjectId = node.ProjectId;
online.ProductCode = node.ProductCode;
online.Category = node.Category;
online.Version = node.Version;
online.CompileTime = node.CompileTime;
online.OSKind = node.OSKind;
online.ProvinceID = node.ProvinceID;
online.CityID = node.CityID;
online.Save(null, inf, token, ip);

//// 下发部署的应用服务
//rs.Services = GetServices(node.ID);

return rs;
return online;
}

private static Int32 _totalCommands;
private static IList<NodeCommand> _commands;
private static DateTime _nextTime;

private CommandModel[] AcquireCommands(Int32 nodeId)
public CommandModel[] AcquireNodeCommands(Int32 nodeId)
{
// 缓存最近1000个未执行命令,用于快速过滤,避免大量节点在线时频繁查询命令表
if (_nextTime < DateTime.Now || _totalCommands != NodeCommand.Meta.Count)
Expand All @@ -499,7 +470,7 @@ private CommandModel[] AcquireCommands(Int32 nodeId)
// 是否有本节点
if (!_commands.Any(e => e.NodeID == nodeId)) return null;

using var span = _tracer?.NewSpan(nameof(AcquireCommands), new { nodeId });
using var span = _tracer?.NewSpan(nameof(AcquireNodeCommands), new { nodeId });

var cmds = NodeCommand.AcquireCommands(nodeId, 100);
if (cmds.Count == 0) return null;
Expand All @@ -524,15 +495,6 @@ private CommandModel[] AcquireCommands(Int32 nodeId)
return rs.ToArray();
}

public PingResponse Ping()
{
return new PingResponse
{
Time = 0,
ServerTime = DateTime.UtcNow.ToLong(),
};
}

public NodeOnline GetOrAddOnline(Node node, String token, String ip)
{
var localIp = node?.IP;
Expand Down
43 changes: 43 additions & 0 deletions Stardust.Server/Services/RegistryService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
using Stardust.Data.Nodes;
using Stardust.Data.Platform;
using Stardust.Models;
using XCode;

namespace Stardust.Server.Services;

Expand Down Expand Up @@ -392,6 +393,48 @@ public AppOnline Ping(App app, AppInfo inf, String ip, String clientId, String t
return online;
}

private static Int32 _totalCommands;
private static IList<AppCommand> _commands;
private static DateTime _nextTime;

public CommandModel[] AcquireAppCommands(Int32 appId)
{
// 缓存最近1000个未执行命令,用于快速过滤,避免大量节点在线时频繁查询命令表
if (_nextTime < DateTime.Now || _totalCommands != AppCommand.Meta.Count)
{
_totalCommands = AppCommand.Meta.Count;
_commands = AppCommand.AcquireCommands(-1, 1000);
_nextTime = DateTime.Now.AddMinutes(1);
}

// 是否有本节点
if (!_commands.Any(e => e.AppId == appId)) return null;

using var span = _tracer?.NewSpan(nameof(AcquireAppCommands), new { appId });

var cmds = AppCommand.AcquireCommands(appId, 100);
if (cmds.Count == 0) return null;

var rs = new List<CommandModel>();
foreach (var item in cmds)
{
if (item.Times > 10 || item.Expire.Year > 2000 && item.Expire < DateTime.Now)
item.Status = CommandStatus.取消;
else
{
if (item.Status == CommandStatus.处理中 && item.UpdateTime.AddMinutes(10) < DateTime.Now) continue;

item.Times++;
item.Status = CommandStatus.处理中;
rs.Add(item.ToModel());
}
item.UpdateTime = DateTime.Now;
}
cmds.Update(false);

return rs.ToArray();
}

/// <summary>向应用发送命令</summary>
/// <param name="app"></param>
/// <param name="model"></param>
Expand Down

0 comments on commit f256bd8

Please sign in to comment.