From 3d5f421553d86fea6778e36f8e281e4673cb43f3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=99=BA=E8=83=BD=E5=A4=A7=E7=9F=B3=E5=A4=B4?= Date: Fri, 28 Jun 2024 00:54:37 +0800 Subject: [PATCH] =?UTF-8?q?StarClient=E7=BB=A7=E6=89=BF=E8=87=AAClientBase?= =?UTF-8?q?=EF=BC=8C=E5=90=8E=E8=80=85=E7=9A=84=E5=A4=A7=E9=83=A8=E5=88=86?= =?UTF-8?q?=E5=8A=9F=E8=83=BD=E6=9D=A5=E8=87=AA=E5=89=8D=E8=80=85=E3=80=82?= =?UTF-8?q?=E6=9E=B6=E6=9E=84=E5=8D=87=E7=BA=A7=E5=90=8E=EF=BC=8CStarClien?= =?UTF-8?q?t=E4=B8=AD=E4=BB=85=E5=89=A9=E4=B8=8B=E4=B8=9A=E5=8A=A1?= =?UTF-8?q?=E9=80=BB=E8=BE=91=E4=BB=A3=E7=A0=81=EF=BC=8C=E4=B8=8D=E5=8C=85?= =?UTF-8?q?=E5=90=AB=E9=80=9A=E4=BF=A1=E5=92=8C=E5=9F=BA=E7=A1=80=E7=99=BB?= =?UTF-8?q?=E5=BD=95=E5=BF=83=E8=B7=B3=E7=AD=89=E6=8E=A5=E5=8F=A3=E4=BB=A3?= =?UTF-8?q?=E7=A0=81=E3=80=82?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- ClientTest/StarClientTests.cs | 114 ++--- StarAgent/Program.cs | 15 +- Stardust/LocalStarClient.cs | 7 +- Stardust/Managers/ServiceManager.cs | 2 +- Stardust/StarClient.cs | 679 +++------------------------- Stardust/Stardust.csproj | 8 +- 6 files changed, 138 insertions(+), 687 deletions(-) diff --git a/ClientTest/StarClientTests.cs b/ClientTest/StarClientTests.cs index 7263c188..79255ba6 100644 --- a/ClientTest/StarClientTests.cs +++ b/ClientTest/StarClientTests.cs @@ -3,77 +3,77 @@ using NewLife; using NewLife.Security; using Stardust; +using Stardust.Models; using Xunit; -namespace ClientTest +namespace ClientTest; + +public class StarClientTests { - public class StarClientTests - { - public String Server { get; set; } = "http://localhost:6600"; - //public StarClient Client { get; } + public String Server { get; set; } = "http://localhost:6600"; + //public StarClient Client { get; } - public StarClientTests() - { - //Client = new StarClient(Server); - //Client.Add("default", new Uri(Server)); - } + public StarClientTests() + { + //Client = new StarClient(Server); + //Client.Add("default", new Uri(Server)); + } - [Fact] - public void GetLoginInfoTest() + [Fact] + public void GetLoginInfoTest() + { + var client = new StarClient { - var client = new StarClient - { - Code = Rand.NextString(8), - Secret = Rand.NextString(16) - }; + Code = Rand.NextString(8), + Secret = Rand.NextString(16) + }; - var inf = client.GetLoginInfo(); - Assert.NotNull(inf); - Assert.NotNull(inf.Node); + var inf = client.BuildLoginRequest() as LoginInfo; + Assert.NotNull(inf); + Assert.NotNull(inf.Node); - Assert.Equal(client.Code, inf.Code); - Assert.Equal(client.Secret.MD5(), inf.Secret); + Assert.Equal(client.Code, inf.Code); + Assert.Equal(client.Secret.MD5(), inf.Secret); - var node = client.GetNodeInfo(); - var mi = MachineInfo.Current; - Assert.Equal(mi.UUID, node.UUID); - Assert.Equal(mi.Guid, node.MachineGuid); - } + var node = client.GetNodeInfo(); + var mi = MachineInfo.Current; + Assert.Equal(mi.UUID, node.UUID); + Assert.Equal(mi.Guid, node.MachineGuid); + } - [Theory(DisplayName = "登录测试")] - [InlineData("abcd", "1234")] - [InlineData(null, "1234")] - [InlineData("abcd", null)] - public async Task LoginTest(String code, String secret) + [Theory(DisplayName = "登录测试")] + [InlineData("abcd", "1234")] + [InlineData(null, "1234")] + [InlineData("abcd", null)] + public async Task LoginTest(String code, String secret) + { + var client = new StarClient(Server) { - var client = new StarClient(Server) - { - Code = code, - Secret = secret - }; + Code = code, + Secret = secret + }; - var rs = await client.Login(); - Assert.NotNull(rs); - Assert.NotNull(client.Info); - Assert.True(client.Logined); - } + var rs = await client.Login(); + Assert.NotNull(rs); + //Assert.NotNull(client.Info); + Assert.True(client.Logined); + } - [Fact] - public async Task LogoutTest() - { - var client = new StarClient(Server); + [Fact] + public async Task LogoutTest() + { + var client = new StarClient(Server); - await client.Login(); - await client.Logout("test"); - } + await client.Login(); + await client.Logout("test"); + } - [Fact] - public void GetHeartInfoTest() - { - var client = new StarClient(); - var inf = client.GetHeartInfo(); - Assert.NotNull(inf); - Assert.NotEmpty(inf.Macs); - } + [Fact] + public void GetHeartInfoTest() + { + var client = new StarClient(); + var inf = client.GetHeartInfo(); + Assert.NotNull(inf); + Assert.NotEmpty(inf.Macs); } } \ No newline at end of file diff --git a/StarAgent/Program.cs b/StarAgent/Program.cs index 7d64cc77..d95003a5 100644 --- a/StarAgent/Program.cs +++ b/StarAgent/Program.cs @@ -8,6 +8,7 @@ using NewLife.Reflection; using NewLife.Remoting; using NewLife.Remoting.Clients; +using NewLife.Remoting.Models; using NewLife.Serialization; using NewLife.Threading; using Stardust; @@ -15,9 +16,7 @@ using Stardust.Managers; using Stardust.Models; using Stardust.Plugins; -using Stardust.Services; using IHost = NewLife.Agent.IHost; -using Upgrade = Stardust.Web.Upgrade; namespace StarAgent; @@ -387,7 +386,7 @@ public void StartClient() // 登录后保存证书 client.OnLogined += (s, e) => { - var inf = client.Info; + var inf = e.Response; if (inf != null && !inf.Code.IsNullOrEmpty()) { set.Code = inf.Code; @@ -419,8 +418,8 @@ public void StartClient() _Manager.Attach(client); - // 使用跟踪 - client.UseTrace(); + //// 使用跟踪 + //client.UseTrace(); _Client = client; _container.AddSingleton(client); @@ -626,7 +625,7 @@ private async Task CheckUpgrade(Object data) ug.Trim("StarAgent"); // 检查更新 - var ur = await client.Upgrade(channel, _lastVersion); + var ur = await client.Upgrade(channel); if (ur != null && ur.Version != _lastVersion) { client.WriteInfoEvent("Upgrade", $"准备从[{_lastVersion}]更新到[{ur.Version}],开始下载 {ur.Source}"); @@ -650,11 +649,11 @@ private async Task CheckUpgrade(Object data) } else { - if (!ur.Preinstall.IsNullOrEmpty()) + if (ur is UpgradeInfo ur2 && !ur2.Preinstall.IsNullOrEmpty()) { client.WriteInfoEvent("Upgrade", "执行预安装脚本"); - ug.Run(ur.Preinstall); + ug.Run(ur2.Preinstall); } client.WriteInfoEvent("Upgrade", "解压完成,准备覆盖文件"); diff --git a/Stardust/LocalStarClient.cs b/Stardust/LocalStarClient.cs index 8dbf6aa9..d887739d 100644 --- a/Stardust/LocalStarClient.cs +++ b/Stardust/LocalStarClient.cs @@ -9,6 +9,7 @@ using NewLife.Log; using NewLife.Messaging; using NewLife.Remoting; +using NewLife.Remoting.Clients; using NewLife.Remoting.Models; using Stardust.Models; #if NET45_OR_GREATER || NETCOREAPP || NETSTANDARD @@ -240,7 +241,7 @@ public Boolean ProbeAndInstall(String? url = null, String? version = null, Strin WriteLog("目标:{0}", target); - var ug = new Stardust.Web.Upgrade + var ug = new Upgrade { SourceFile = Path.GetFileName(url).GetFullPath(), DestinationPath = target, @@ -253,6 +254,10 @@ public Boolean ProbeAndInstall(String? url = null, String? version = null, Strin var client = new HttpClient(); client.DownloadFileAsync(url, ug.SourceFile).Wait(); + //var file = ug.SourceFile; + //var tmp = Path.GetTempPath().CombinePath(Path.GetFileNameWithoutExtension(file)); + //file.AsFile().Extract(tmp, true); + //todo 把tmp赋值给ug.TempPath ug.Extract(); ug.Update(); diff --git a/Stardust/Managers/ServiceManager.cs b/Stardust/Managers/ServiceManager.cs index cbd2d572..2f777451 100644 --- a/Stardust/Managers/ServiceManager.cs +++ b/Stardust/Managers/ServiceManager.cs @@ -549,7 +549,7 @@ private async Task DoWork(Object state) // 应用服务的上报和拉取 DeployInfo[]? deploys = null; - if (_client != null && !_client.Token.IsNullOrEmpty()) + if (_client != null && _client.Logined) { // 上传失败不应该影响本地拉起服务 try diff --git a/Stardust/StarClient.cs b/Stardust/StarClient.cs index 6f8dc44b..bf7b0b78 100644 --- a/Stardust/StarClient.cs +++ b/Stardust/StarClient.cs @@ -1,6 +1,4 @@ -using System.Collections.Concurrent; -using System.Diagnostics; -using System.Diagnostics.CodeAnalysis; +using System.Diagnostics; using System.Net.NetworkInformation; using System.Reflection; using System.Runtime.InteropServices; @@ -10,69 +8,30 @@ using NewLife.Log; using NewLife.Reflection; using NewLife.Remoting; -using NewLife.Serialization; -using NewLife.Threading; using Stardust.Managers; using Stardust.Models; -using Stardust.Services; using NewLife.Data; using NewLife.Remoting.Models; using NewLife.Remoting.Clients; - - -#if NET45_OR_GREATER || NETCOREAPP || NETSTANDARD -using System.Net.WebSockets; -using WebSocket = System.Net.WebSockets.WebSocket; -#endif +using NewLife.Model; +using NewLife.Security; namespace Stardust; /// 星星客户端。每个设备节点有一个客户端连接服务端 -public class StarClient : ApiHttpClient, ICommandClient, IEventProvider +public class StarClient : ClientBase, ICommandClient, IEventProvider { #region 属性 - /// 证书 - public String? Code { get; set; } - - /// 密钥 - public String? Secret { get; set; } - /// 产品编码 public String? ProductCode { get; set; } - /// 是否已登录 - public Boolean Logined { get; set; } - - /// 登录完成后触发 - public event EventHandler? OnLogined; - /// 服务迁移 public event EventHandler? OnMigration; - /// 最后一次登录成功后的消息 - public LoginResponse? Info { get; private set; } - - /// 请求到服务端并返回的延迟时间。单位ms - public Int32 Delay { get; set; } - - ///// 本地应用服务管理 - //public ServiceManager Manager { get; set; } - /// 插件列表 - public String[] Plugins { get; set; } - - /// 最大失败数。超过该数时,新的数据将被抛弃,默认10 * 24 * 60 - public Int32 MaxFails { get; set; } = 10 * 24 * 60; - - private readonly ConcurrentDictionary _commands = new(StringComparer.OrdinalIgnoreCase); - /// 命令集合 - public IDictionary Commands => _commands; - - /// 收到命令时触发 - public event EventHandler? Received; + public String[]? Plugins { get; set; } private FrameworkManager _frameworkManager = new(); - private readonly ConcurrentQueue _fails = new(); private readonly ICache _cache = new MemoryCache(); #endregion @@ -80,125 +39,61 @@ public class StarClient : ApiHttpClient, ICommandClient, IEventProvider /// 实例化 public StarClient() { + Features = Features.Login | Features.Logout | Features.Ping | Features.Notify | Features.CommandReply | Features.PostEvent; + SetActions("Node/"); + Log = XTrace.Log; } /// 实例化 /// - public StarClient(String urls) : this() - { - if (!urls.IsNullOrEmpty()) - { - var ss = urls.Split(","); - for (var i = 0; i < ss.Length; i++) - { - Add("service" + (i + 1), new Uri(ss[i])); - } - } - } - - /// 销毁 - /// - protected override void Dispose(Boolean disposing) - { - StopTimer(); - - Logout(disposing ? "Dispose" : "GC").Wait(1_000); - - base.Dispose(disposing); - } + public StarClient(String urls) : this() => Server = urls; #endregion #region 方法 - /// 远程调用拦截,支持重新登录 - /// - /// - /// - /// - /// - /// 取消通知 - /// - [return: MaybeNull] - public override async Task InvokeAsync(HttpMethod method, String action, Object? args = null, Action? onRequest = null, CancellationToken cancellationToken = default) + /// 初始化 + protected override void OnInit() { - var needLogin = !Logined && !action.EqualIgnoreCase("Node/Login", "Node/Logout"); - if (needLogin) - { - await Login(); - } + var provider = ServiceProvider ??= ObjectContainer.Provider; - try + PasswordProvider = new SaltPasswordProvider { Algorithm = "md5", SaltTime = 60 }; + + // 找到容器,注册默认的模型实现,供后续InvokeAsync时自动创建正确的模型对象 + var container = ModelExtension.GetService(provider) ?? ObjectContainer.Current; + if (container != null) { - return await base.InvokeAsync(method, action, args, onRequest, cancellationToken); + container.AddTransient(); + container.AddTransient(); } - catch (Exception ex) - { - var ex2 = ex.GetTrue(); - if (Logined && ex2 is ApiException aex && (aex.Code == 401 || aex.Code == 403) && !action.EqualIgnoreCase("Node/Login", "Node/Logout")) - { - Log?.Debug("{0}", ex); - //XTrace.WriteException(ex); - WriteLog("重新登录!"); - await Login(); - - return await base.InvokeAsync(method, action, args, onRequest, cancellationToken); - } - throw; - } + base.OnInit(); } #endregion #region 登录 /// 登录 /// - public async Task Login() + public override async Task Login() { - XTrace.WriteLine("登录:{0}", Code); - - var info = GetLoginInfo(); + var rs = await base.Login(); - // 登录前清空令牌,避免服务端使用上一次信息 - Token = null; - Logined = false; - Info = null; - - var rs = Info = await LoginAsync(info); - if (rs != null && !rs.Code.IsNullOrEmpty()) - { - XTrace.WriteLine("下发证书:{0}/{1}", rs.Code, rs.Secret); - Code = rs.Code; - Secret = rs.Secret; - } - - // 登录后设置用于用户认证的token - Token = rs?.Token; - Logined = true; - - OnLogined?.Invoke(this, EventArgs.Empty); - - StartTimer(); - - _frameworkManager.Attach(this); + if (Logined) _frameworkManager.Attach(this); return rs; } - /// 获取登录信息 + /// 创建登录请求 /// - public LoginInfo GetLoginInfo() + public override ILoginRequest BuildLoginRequest() { - var di = GetNodeInfo(); - var ext = new LoginInfo + var request = base.BuildLoginRequest(); + if (request is LoginInfo info) { - Code = Code, - Secret = Secret.IsNullOrEmpty() ? null : Secret.MD5(), - ProductCode = ProductCode, - - Node = di, - }; + info.ProductCode = ProductCode; + info.Node = GetNodeInfo(); + } - return ext; + return request; } /// 获取设备信息 @@ -333,16 +228,17 @@ private static void FixOnLinux(NodeInfo di) { di.MaxOpenFiles = Execute("bash", "-c \"ulimit -n\"")?.Trim().ToInt() ?? 0; - var xrandr = Execute("xrandr", "-q"); - if (!xrandr.IsNullOrEmpty()) - { - var current = xrandr.Substring("current", ",").Trim(); - if (!current.IsNullOrEmpty()) - { - var ss = current.SplitAsInt("x"); - if (ss.Length >= 2) di.Resolution = $"{ss[0]}*{ss[1]}"; - } - } + // 很多Linux系统没有xrandr命令 + //var xrandr = Execute("xrandr", "-q"); + //if (!xrandr.IsNullOrEmpty()) + //{ + // var current = xrandr.Substring("current", ",").Trim(); + // if (!current.IsNullOrEmpty()) + // { + // var ss = current.SplitAsInt("x"); + // if (ss.Length >= 2) di.Resolution = $"{ss[0]}*{ss[1]}"; + // } + //} } /// 获取驱动器信息 @@ -391,47 +287,6 @@ public static IList GetDrives() return null; } } - - /// 注销 - /// - /// - public async Task Logout(String reason) - { - if (!Logined) return null; - - Logined = false; - XTrace.WriteLine("注销:{0} {1}", Code, reason); - - try - { - var rs = await LogoutAsync(reason); - - // 更新令牌 - Token = rs?.Token; - - StopTimer(); - - Logined = false; - - return rs; - } - catch (Exception ex) - { - Log?.Debug("{0}", ex); - //XTrace.WriteException(ex); - - return null; - } - } - - /// 登录 - /// 登录信息 - /// - private async Task LoginAsync(LoginInfo inf) => await PostAsync("Node/Login", inf); - - /// 注销 - /// - private async Task LogoutAsync(String reason) => await GetAsync("Node/Logout", new { reason }); #endregion #region 心跳 @@ -520,462 +375,48 @@ public PingInfo GetHeartInfo() /// 心跳 /// - public async Task Ping() + public override async Task Ping() { - try - { - var inf = GetHeartInfo(); - - // 如果网络不可用,直接保存到队列 - if (!NetworkInterface.GetIsNetworkAvailable()) - { - if (_fails.Count < MaxFails) _fails.Enqueue(inf); - return null; - } - - PingResponse? rs = null; - try - { - rs = await PingAsync(inf); - if (rs != null) - { - // 由服务器改变采样频率 - if (rs.Period > 0 && _timer != null) _timer.Period = rs.Period * 1000; - - var dt = rs.Time.ToDateTime(); - if (dt.Year > 2000) - { - // 计算延迟 - var ts = DateTime.UtcNow - dt; - var ms = (Int32)Math.Round(ts.TotalMilliseconds); - Delay = Delay > 0 ? (Delay + ms) / 2 : ms; - } - - // 时间偏移,用于修正本地时间 - dt = rs.ServerTime.ToDateTime(); - if (dt.Year > 2000) _span = dt.AddMilliseconds(Delay / 2) - DateTime.UtcNow; - - // 令牌 - if (!rs.Token.IsNullOrEmpty()) Token = rs.Token; - - // 推队列 - if (rs.Commands != null && rs.Commands.Length > 0) - { - foreach (var model in rs.Commands) - { - await ReceiveCommand(model); - } - } - - // 迁移到新服务器 - if (!rs.NewServer.IsNullOrEmpty()) - { - var arg = new MigrationEventArgs { NewServer = rs.NewServer + "" }; - - OnMigration?.Invoke(this, arg); - if (!arg.Cancel) - { - await Logout("切换新服务器"); - - // 清空原有链接,添加新链接 - Services.Clear(); - - var ss = rs.NewServer.Split(","); - for (var i = 0; i < ss.Length; i++) - { - Add("service" + (i + 1), new Uri(ss[i])); - } - - await Login(); - } - } - } - } - catch - { - if (_fails.Count < MaxFails) _fails.Enqueue(inf); - - throw; - } - - // 上报正常,处理历史,失败则丢弃 - while (_fails.TryDequeue(out var info)) - { - await PingAsync(info); - } - - return rs; - } - catch (Exception ex) - { - var ex2 = ex.GetTrue(); - if (ex2 is ApiException aex && (aex.Code == 401 || aex.Code == 403)) - { - XTrace.WriteLine("重新登录"); - return Login(); - } - - XTrace.WriteLine("心跳异常 {0}", ex.GetTrue().Message); - - throw; - } - } - - /// 心跳 - /// - /// - private async Task PingAsync(PingInfo inf) => await PostAsync("Node/Ping", inf); - - private TimeSpan _span; - /// 获取相对于服务器的当前时间,避免两端时间差 - /// - public DateTime GetNow() => DateTime.Now.Add(_span); - - private TraceService? _trace; - /// 使用追踪服务 - public void UseTrace() - { - _trace = new TraceService(); - _trace.Attach(this); - } - #endregion - - #region 上报 - private readonly ConcurrentQueue _events = new(); - private readonly ConcurrentQueue _failEvents = new(); - private TimerX? _eventTimer; - private String? _eventTraceId; - - /// 批量上报事件 - /// - /// - public async Task PostEvents(params EventModel[] events) => await PostAsync("Node/PostEvents", events); - - async Task DoPostEvent(Object state) - { - if (!NetworkInterface.GetIsNetworkAvailable()) return; - - DefaultSpan.Current = null; - var tid = _eventTraceId; - _eventTraceId = null; - - // 正常队列为空,异常队列有数据,给它一次机会 - if (_events.IsEmpty && !_failEvents.IsEmpty) - { - while (_failEvents.TryDequeue(out var ev)) - { - _events.Enqueue(ev); - } - } - - while (!_events.IsEmpty) + var rs = await base.Ping(); + if (rs != null) { - var max = 100; - var list = new List(); - while (_events.TryDequeue(out var model) && max-- > 0) list.Add(model); - - using var span = Tracer?.NewSpan("PostEvent", list.Count); - if (tid != null) span?.Detach(tid); - try - { - if (list.Count > 0) await PostEvents(list.ToArray()); - - // 成功后读取本地缓存 - while (_failEvents.TryDequeue(out var ev)) - { - _events.Enqueue(ev); - } - } - catch (Exception ex) + // 迁移到新服务器 + if (rs is PingResponse prs && !prs.NewServer.IsNullOrEmpty() && prs.NewServer != Server) { - span?.SetError(ex, null); + var arg = new MigrationEventArgs { NewServer = prs.NewServer }; - // 失败后进入本地缓存 - foreach (var item in list) + OnMigration?.Invoke(this, arg); + if (!arg.Cancel) { - _failEvents.Enqueue(item); - } - } - } - } - - /// 写事件 - /// - /// - /// - public virtual Boolean WriteEvent(String type, String name, String? remark) - { - // 记录追踪标识,上报的时候带上,尽可能让源头和下游串联起来 - _eventTraceId = DefaultSpan.Current?.ToString(); + await Logout("切换新服务器"); - var now = GetNow().ToUniversalTime(); - var ev = new EventModel { Time = now.ToLong(), Type = type, Name = name, Remark = remark }; - _events.Enqueue(ev); - - _eventTimer?.SetNext(1000); - - return true; - } - - /// 上报命令结果,如截屏、抓日志 - /// - /// - /// - private async Task ReportAsync(Int32 id, Byte[] data) => await PostAsync("Node/Report?Id=" + id, data); - - /// 上报服务调用结果 - /// - /// - public virtual async Task CommandReply(CommandReplyModel model) => await PostAsync("Node/CommandReply", model); - #endregion - - #region 长连接 - private TimerX? _timer; - private void StartTimer() - { - if (_timer == null) - { - lock (this) - { - _timer ??= new TimerX(DoPing, null, 1_000, 60_000, "Device") { Async = true }; - _eventTimer = new TimerX(DoPostEvent, null, 3_000, 60_000, "Device") { Async = true }; - } - } - } + // 清空原有链接,添加新链接 + Server = prs.NewServer; + Client = null; - private void StopTimer() - { - _timer.TryDispose(); - _timer = null; - -#if NET45_OR_GREATER || NETCOREAPP || NETSTANDARD - _source?.Cancel(); - try - { - if (_websocket != null && _websocket.State == WebSocketState.Open) - _websocket.CloseAsync(WebSocketCloseStatus.NormalClosure, "finish", default); - } - catch { } - - //_websocket.TryDispose(); - _websocket = null; -#endif - } - - private async Task DoPing(Object state) - { - DefaultSpan.Current = null; - using var span = Tracer?.NewSpan("NodePing"); - try - { - await Ping(); - - if (!NetworkInterface.GetIsNetworkAvailable()) return; - -#if NET45_OR_GREATER || NETCOREAPP || NETSTANDARD - var svc = _currentService; - if (svc == null) return; - - // 使用过滤器内部token,因为它有过期刷新机制 - var token = Token; - if (Filter is NewLife.Http.TokenHttpFilter thf) token = thf.Token?.AccessToken; - span?.AppendTag($"svc={svc.Address} Token=[{token?.Length}] websocket={_websocket?.State}"); - - if (token.IsNullOrEmpty()) return; - - // 定时ws心跳 - if (_websocket != null && _websocket.State == WebSocketState.Open) - { - try - { - // 在websocket链路上定时发送心跳,避免长连接被断开 - var str = "Ping"; - await _websocket.SendAsync(new ArraySegment(str.GetBytes()), WebSocketMessageType.Text, true, default); - } - catch (Exception ex) - { - span?.SetError(ex, null); - WriteLog("{0}", ex); + await Login(); } } - - if (_websocket == null || _websocket.State != WebSocketState.Open) - { - var url = svc.Address.ToString().Replace("http://", "ws://").Replace("https://", "wss://"); - var uri = new Uri(new Uri(url), "/node/notify"); - - using var span2 = Tracer?.NewSpan("WebSocketConnect", uri + ""); - - var client = new ClientWebSocket(); - client.Options.SetRequestHeader("Authorization", "Bearer " + token); - - span?.AppendTag($"WebSocket.Connect {uri}"); - await client.ConnectAsync(uri, default); - - _websocket = client; - - _source = new CancellationTokenSource(); - _ = Task.Run(() => DoPull(client, _source.Token)); - } -#endif - } - catch (Exception ex) - { - span?.SetError(ex, null); - Log?.Debug("{0}", ex); - } - } - -#if NET45_OR_GREATER || NETCOREAPP || NETSTANDARD - private WebSocket? _websocket; - private CancellationTokenSource? _source; - private async Task DoPull(WebSocket socket, CancellationToken cancellationToken) - { - DefaultSpan.Current = null; - try - { - var buf = new Byte[4 * 1024]; - while (!cancellationToken.IsCancellationRequested && socket.State == WebSocketState.Open) - { - var data = await socket.ReceiveAsync(new ArraySegment(buf), cancellationToken); - var txt = buf.ToStr(null, 0, data.Count); - if (txt.StartsWithIgnoreCase("Pong")) - { - } - else - { - var model = txt.ToJsonEntity(); - if (model != null) await ReceiveCommand(model); - } - } - } - catch (WebSocketException) { } - catch (Exception ex) - { - Log?.Debug("{0}", ex); - } - - using var span = Tracer?.NewSpan("NodePull", socket.State + ""); - - if (socket.State == WebSocketState.Open) - await socket.CloseAsync(WebSocketCloseStatus.NormalClosure, "finish", default); - } -#endif - - async Task ReceiveCommand(CommandModel model) - { - if (model == null) return; - - // 去重,避免命令被重复执行 - if (!_cache.Add($"nodecmd:{model.Id}", model, 3600)) return; - - // 埋点,建立调用链 - using var span = Tracer?.NewSpan("cmd:" + model.Command, model); - if (!model.TraceId.IsNullOrEmpty()) span?.Detach(model.TraceId); - try - { - //todo 有效期判断可能有隐患,现在只是假设服务器和客户端在同一个时区,如果不同,可能会出现问题 - var now = GetNow(); - XTrace.WriteLine("Got Command: {0}", model.ToJson()); - if (model.Expire.Year < 2000 || model.Expire > now) - { - // 延迟执行 - var ts = model.StartTime - now; - if (ts.TotalMilliseconds > 0) - { - TimerX.Delay(s => - { - _ = OnReceiveCommand(model); - }, (Int32)ts.TotalMilliseconds); - - var reply = new CommandReplyModel - { - Id = model.Id, - Status = CommandStatus.处理中, - Data = $"已安排计划执行 {model.StartTime.ToFullString()}" - }; - await CommandReply(reply); - } - else - await OnReceiveCommand(model); - } - else - { - var reply = new CommandReplyModel { Id = model.Id, Status = CommandStatus.取消 }; - await CommandReply(reply); - } - } - catch (Exception ex) - { - span?.SetError(ex, null); - } - } - - /// - /// 触发收到命令的动作 - /// - /// - protected virtual async Task OnReceiveCommand(CommandModel model) - { - var e = new CommandEventArgs { Model = model }; - Received?.Invoke(this, e); - - var rs = await this.ExecuteCommand(model); - e.Reply ??= rs; - - if (e.Reply != null && e.Reply.Id > 0) await CommandReply(e.Reply); - } - - /// 向命令引擎发送命令,触发指定已注册动作 - /// - /// - /// - public async Task SendCommand(String command, String argument) => await OnReceiveCommand(new CommandModel { Command = command, Argument = argument }); - #endregion - - #region 更新 - /// 获取更新信息 - /// - /// 最后一次升级的本地版本 - /// - public async Task Upgrade(String channel, String lastVersion) - { - XTrace.WriteLine("检查更新:{0}", channel); - - // 清理 - var ug = new Stardust.Web.Upgrade { Log = XTrace.Log }; - ug.DeleteBackup("."); - - var rs = await UpgradeAsync(channel, lastVersion); - if (rs != null) - { - XTrace.WriteLine("发现更新:{0}", rs.ToJson(true)); } return rs; } - - /// 更新 - /// - /// 最后一次升级的本地版本 - /// - public async Task UpgradeAsync(String channel, String lastVersion) => await GetAsync("Node/Upgrade", new { channel, lastVersion }); #endregion #region 部署 /// 获取分配到本节点的应用服务信息 /// - public async Task GetDeploy() => await GetAsync("Deploy/GetAll"); + public async Task GetDeploy() => await InvokeAsync("Deploy/GetAll"); /// 上传本节点的所有应用服务信息 /// /// - public async Task UploadDeploy(ServiceInfo[] services) => await PostAsync("Deploy/Upload", services); + public async Task UploadDeploy(ServiceInfo[] services) => await InvokeAsync("Deploy/Upload", services); /// 应用心跳。上报应用信息 /// /// - public async Task AppPing(AppInfo inf) => await PostAsync("Deploy/Ping", inf); + public async Task AppPing(AppInfo inf) => await InvokeAsync("Deploy/Ping", inf); #endregion #region 辅助 @@ -986,9 +427,9 @@ protected virtual async Task OnReceiveCommand(CommandModel model) /// public String BuildUrl(String url) { - if (!url.StartsWithIgnoreCase("http://", "https://")) + if (Client is ApiHttpClient client && !url.StartsWithIgnoreCase("http://", "https://")) { - var svr = Services.FirstOrDefault(e => e.Name == Source) ?? Services.FirstOrDefault(); + var svr = client.Services.FirstOrDefault(e => e.Name == client.Source) ?? client.Services.FirstOrDefault(); if (svr != null && svr.Address != null) url = new Uri(svr.Address, url) + ""; } diff --git a/Stardust/Stardust.csproj b/Stardust/Stardust.csproj index 9dfbf236..503da105 100644 --- a/Stardust/Stardust.csproj +++ b/Stardust/Stardust.csproj @@ -42,6 +42,12 @@ true + + + + + + @@ -106,7 +112,7 @@ - +