diff --git a/NewLife.Core/Data/IPacket.cs b/NewLife.Core/Data/IPacket.cs index 11464e030..8d47fba4a 100644 --- a/NewLife.Core/Data/IPacket.cs +++ b/NewLife.Core/Data/IPacket.cs @@ -54,8 +54,8 @@ public interface IPacket /// 拥有管理权的数据包。使用完以后需要释放 public interface IOwnerPacket : IPacket, IDisposable { - /// 是否拥有管理权 - Boolean HasOwner { get; set; } + ///// 是否拥有管理权 + //Boolean HasOwner { get; set; } } /// 内存包辅助类 @@ -80,7 +80,7 @@ public static IPacket Append(this IPacket pk, IPacket next) /// public static IPacket Append(this IPacket pk, Byte[] next) => Append(pk, new ArrayPacket(next)); - /// 转字符串并释放 + /// 转字符串 /// /// /// @@ -95,7 +95,7 @@ public static String ToStr(this IPacket pk, Encoding? encoding = null, Int32 off if (span.Length > count) span = span[..count]; var rs = span.ToStr(encoding); - pk.TryDispose(); + //pk.TryDispose(); return rs; } @@ -107,7 +107,7 @@ public static String ToStr(this IPacket pk, Encoding? encoding = null, Int32 off if (span.Length > count) span = span[..count]; sb.Append(span.ToStr(encoding)); - p.TryDispose(); + //p.TryDispose(); count -= span.Length; } @@ -297,6 +297,8 @@ public static Boolean TryGetArray(this IPacket pk, out ArraySegment segmen { if (pk.Next == null) { + if (pk is OwnerPacket op && op.TryGetArray(out segment)) return true; + if (pk is ArrayPacket ap) { segment = new ArraySegment(ap.Buffer, ap.Offset, ap.Length); @@ -316,12 +318,16 @@ public static Boolean TryGetArray(this IPacket pk, out ArraySegment segmen /// /// 使用时务必明确所有权归属,用完后及时释放。 /// -public struct OwnerPacket : IDisposable, IPacket, IOwnerPacket +public class OwnerPacket : MemoryManager, IPacket, IOwnerPacket { #region 属性 - private IMemoryOwner _owner; - /// 内存所有者 - public IMemoryOwner Owner => _owner; + private Byte[] _buffer; + /// 缓冲区 + public Byte[] Buffer => _buffer; + + private readonly Int32 _offset; + /// 数据偏移 + public Int32 Offset => _offset; private readonly Int32 _length; /// 数据长度 @@ -330,10 +336,7 @@ public struct OwnerPacket : IDisposable, IPacket, IOwnerPacket /// 获取/设置 指定位置的字节 /// /// - public Byte this[Int32 index] { get => _owner.Memory.Span[index]; set => _owner.Memory.Span[index] = value; } - - /// 是否拥有管理权 - public Boolean HasOwner { get; set; } + public Byte this[Int32 index] { get => _buffer[_offset + index]; set => _buffer[_offset + index] = value; } /// 下一个链式包 public IPacket? Next { get; set; } @@ -342,6 +345,7 @@ public struct OwnerPacket : IDisposable, IPacket, IOwnerPacket public Int32 Total => Length + (Next?.Total ?? 0); #endregion + #region 构造 /// 实例化指定长度的内存包,从共享内存池中借出 /// 长度 /// @@ -350,46 +354,50 @@ public OwnerPacket(Int32 length) if (length < 0) throw new ArgumentOutOfRangeException(nameof(length), "Length must be non-negative and less than or equal to the memory owner's length."); - _owner = MemoryPool.Shared.Rent(length); + _buffer = ArrayPool.Shared.Rent(length); + _offset = 0; _length = length; - HasOwner = true; } /// 实例化内存包,指定内存所有者和长度 - /// 内存所有者 + /// 缓冲区 + /// /// 长度 /// - public OwnerPacket(IMemoryOwner memoryOwner, Int32 length) + private OwnerPacket(Byte[] buffer, Int32 offset, Int32 length) { - if (length < 0 || length > memoryOwner.Memory.Length) + if (offset < 0 || length < 0 || offset + length > buffer.Length) throw new ArgumentOutOfRangeException(nameof(length), "Length must be non-negative and less than or equal to the memory owner's length."); - _owner = memoryOwner; + _buffer = buffer; + _offset = offset; _length = length; } - /// 释放 - public void Dispose() + /// 销毁释放 + /// + protected override void Dispose(Boolean disposing) { - //if (!HasOwner) throw new InvalidOperationException("Has not owner."); - - var owner = _owner; - if (HasOwner && owner != null) + var buffer = _buffer; + if (buffer != null) { // 释放内存所有者以后,直接置空,避免重复使用 - _owner = null!; - owner.Dispose(); - HasOwner = false; + _buffer = null!; + + ArrayPool.Shared.Return(buffer); } + + Next.TryDispose(); } + #endregion /// 获取分片包。在管理权生命周期内短暂使用 /// - public Span GetSpan() => _owner.Memory.Span[.._length]; + public override Span GetSpan() => new(_buffer, _offset, _length); /// 获取内存包。在管理权生命周期内短暂使用 /// - public Memory GetMemory() => _owner.Memory[.._length]; + public Memory GetMemory() => new(_buffer, _offset, _length); /// 切片得到新数据包,同时转移内存管理权,当前数据包应尽快停止使用 /// @@ -398,29 +406,47 @@ public void Dispose() /// /// 偏移 /// 个数。默认-1表示到末尾 - public IPacket Slice(Int32 offset, Int32 count) + IPacket IPacket.Slice(Int32 offset, Int32 count) => Slice(offset, count); + + /// 切片得到新数据包,同时转移内存管理权,当前数据包应尽快停止使用 + /// + /// 可能是引用同一块内存,也可能是新的内存。 + /// 可能就是当前数据包,也可能引用相同的所有者或数组。 + /// + /// 偏移 + /// 个数。默认-1表示到末尾 + public OwnerPacket Slice(Int32 offset, Int32 count) { var remain = _length - offset; if (count < 0 || count > remain) count = remain; - if (offset == 0 && count == _length) return this; - if (offset == 0) - { - var pk = new OwnerPacket(_owner, count) { HasOwner = HasOwner }; - HasOwner = false; - return pk; - } + return new OwnerPacket(_buffer, _offset + offset, count); + } - // 当前数据包可能会释放,必须拷贝数据 - //return new MemoryPacket(_owner.Memory.Slice(offset, count), count); - var rs = new ArrayPacket(count); - GetSpan().CopyTo(rs.GetSpan()); - return rs; + /// 尝试获取数据段 + /// + /// + protected override Boolean TryGetArray(out ArraySegment segment) + { + segment = new ArraySegment(_buffer, _offset, _length); + return true; } + /// 钉住内存 + /// + /// + /// + public override MemoryHandle Pin(Int32 elementIndex = 0) => throw new NotSupportedException(); + + /// 取消钉内存 + /// + public override void Unpin() => throw new NotImplementedException(); + + #region 重载运算符 /// 已重载 /// - public override String ToString() => $"[{_owner.Memory.Length}](0, {_length})" + (Next == null ? "" : $"<{Total}>"); + public override String ToString() => $"[{_buffer.Length}]({_offset}, {_length})" + (Next == null ? "" : $"<{Total}>"); + #endregion } /// 内存包 @@ -511,11 +537,6 @@ public struct ArrayPacket : IDisposable, IPacket, IOwnerPacket /// 数据长度 public Int32 Length => _length; - ///// 获取/设置 指定位置的字节 - ///// - ///// - //public Byte this[Int32 index] { get => _buffer[_offset + index]; set => _buffer[_offset + index] = value; } - /// 是否拥有管理权。Dispose时,若有管理权则还给池里 public Boolean HasOwner { get; set; } @@ -660,6 +681,8 @@ public void Dispose() Pool.Shared.Return(buf); HasOwner = false; } + + Next.TryDispose(); } #endregion diff --git a/NewLife.Core/Http/HttpBase.cs b/NewLife.Core/Http/HttpBase.cs index fcc06046f..7ed70c01d 100644 --- a/NewLife.Core/Http/HttpBase.cs +++ b/NewLife.Core/Http/HttpBase.cs @@ -1,4 +1,5 @@ -using System.Text; +using System.Buffers; +using System.Text; using NewLife.Buffers; using NewLife.Collections; using NewLife.Data; @@ -133,23 +134,16 @@ public virtual IOwnerPacket Build() var header = BuildHeader(len); // 从内存池申请缓冲区,Slice后管理权转移,外部使用完以后释放 - using var pk = new ArrayPacket(Encoding.UTF8.GetByteCount(header) + len); + //using var pk = new ArrayPacket(Encoding.UTF8.GetByteCount(header) + len); + len += Encoding.UTF8.GetByteCount(header); + var pk = new OwnerPacket(len); var writer = new SpanWriter(pk.GetSpan()); - //BuildHeader(writer, len); writer.Write(header, -1); if (body != null) writer.Write(body.GetSpan()); return pk.Slice(0, writer.Position); - - //var header = BuildHeader(len); - //var rs = new Packet(header.GetBytes()) - //{ - // Next = data - //}; - - //return rs; } /// 创建头部 diff --git a/NewLife.Core/Http/TinyHttpClient.cs b/NewLife.Core/Http/TinyHttpClient.cs index b0c0236be..b6794dddd 100644 --- a/NewLife.Core/Http/TinyHttpClient.cs +++ b/NewLife.Core/Http/TinyHttpClient.cs @@ -135,7 +135,7 @@ protected virtual async Task SendDataAsync(Uri? uri, IPacket? requ if (request != null) await request.CopyToAsync(ns).ConfigureAwait(false); // 接收 - using var pk = new ArrayPacket(BufferSize); + var pk = new OwnerPacket(BufferSize); using var source = new CancellationTokenSource(Timeout); #if NETCOREAPP || NETSTANDARD2_1 @@ -162,7 +162,7 @@ protected virtual async Task SendDataAsync(Uri? uri, IPacket? requ while (retry-- > 0) { // 发出请求 - using var rs2 = await SendDataAsync(uri, req).ConfigureAwait(false); + var rs2 = await SendDataAsync(uri, req).ConfigureAwait(false); if (rs2 == null || rs2.Length == 0) return null; // 解析响应 @@ -204,7 +204,7 @@ protected virtual async Task SendDataAsync(Uri? uri, IPacket? requ var last = rs; while (total < res.ContentLength) { - //todo 这里的IPacket.Append可能有问题,因为last本质上是结构体 + // 这里的IPacket.Append可能有问题,因为last不能是结构体 var pk = await SendDataAsync(null, null).ConfigureAwait(false); last.Append(pk); @@ -343,7 +343,7 @@ private IPacket ParseChunk(IPacket rs, out Int32 offset, out Int32 octets) RequestUri = new Uri(url), }; - var rs = (await SendAsync(request).ConfigureAwait(false)); + using var rs = (await SendAsync(request).ConfigureAwait(false)); return rs?.Body?.ToStr(); } #endregion @@ -360,7 +360,7 @@ private IPacket ParseChunk(IPacket rs, out Int32 offset, out Int32 octets) var baseAddress = BaseAddress ?? throw new ArgumentNullException(nameof(BaseAddress)); var request = BuildRequest(baseAddress, method, action, args); - var rs = await SendAsync(request); + using var rs = await SendAsync(request); if (rs == null || rs.Body == null || rs.Body.Length == 0) return default; diff --git a/NewLife.Core/Http/WebSocketMessage.cs b/NewLife.Core/Http/WebSocketMessage.cs index 784376a1f..3476d669c 100644 --- a/NewLife.Core/Http/WebSocketMessage.cs +++ b/NewLife.Core/Http/WebSocketMessage.cs @@ -140,7 +140,7 @@ public virtual IOwnerPacket ToPacket() if (!StatusDescription.IsNullOrEmpty()) len += Encoding.UTF8.GetByteCount(StatusDescription); } - var rs = new ArrayPacket(1 + 1 + 8 + 4 + len); + var rs = new OwnerPacket(1 + 1 + 8 + 4 + len); var writer = new SpanWriter(rs.GetSpan()) { IsLittleEndian = false @@ -212,7 +212,7 @@ public virtual IOwnerPacket ToPacket() if (!StatusDescription.IsNullOrEmpty()) writer.Write(StatusDescription, -1); } - return (rs.Slice(0, writer.Position) as IOwnerPacket)!; + return rs.Slice(0, writer.Position); } #endregion } \ No newline at end of file diff --git a/NewLife.Core/Net/Handlers/LengthFieldCodec.cs b/NewLife.Core/Net/Handlers/LengthFieldCodec.cs index 4b30d04d1..cb889791a 100644 --- a/NewLife.Core/Net/Handlers/LengthFieldCodec.cs +++ b/NewLife.Core/Net/Handlers/LengthFieldCodec.cs @@ -39,7 +39,7 @@ protected override Object Encode(IHandlerContext context, IPacket msg) } else { - msg = new ArrayPacket(len) { Next = msg }; + msg = new OwnerPacket(len) { Next = msg }; } var writer = new SpanWriter(msg.GetSpan()) { IsLittleEndian = Size > 0 }; diff --git a/NewLife.Core/Net/Handlers/MessageCodec.cs b/NewLife.Core/Net/Handlers/MessageCodec.cs index fb6762a41..709abb968 100644 --- a/NewLife.Core/Net/Handlers/MessageCodec.cs +++ b/NewLife.Core/Net/Handlers/MessageCodec.cs @@ -47,11 +47,15 @@ public class MessageCodec : Handler /// public override Object? Write(IHandlerContext context, Object message) { + // 谁申请,谁归还 + IOwnerPacket? owner = null; if (message is T msg) { var rs = Encode(context, msg); if (rs == null) return null; + message = rs; + owner = rs as IOwnerPacket; // 加入队列,忽略请求消息 if (message is IMessage msg2) @@ -62,7 +66,15 @@ public class MessageCodec : Handler AddToQueue(context, msg); } - return base.Write(context, message); + try + { + return base.Write(context, message); + } + finally + { + // 下游可能忘了释放内存,这里兜底释放 + owner?.Dispose(); + } } /// 编码消息,一般是编码为Packet后传给下一个处理器 diff --git a/NewLife.Core/Net/Handlers/WebSocketCodec.cs b/NewLife.Core/Net/Handlers/WebSocketCodec.cs index 2b2d9c4a7..70c13b6d5 100644 --- a/NewLife.Core/Net/Handlers/WebSocketCodec.cs +++ b/NewLife.Core/Net/Handlers/WebSocketCodec.cs @@ -56,15 +56,7 @@ public override Boolean Close(IHandlerContext context, String reason) } } - try - { - return base.Read(context, message); - } - finally - { - // 下游可能忘了释放内存,这里兜底释放 - message.TryDispose(); - } + return base.Read(context, message); } /// 发送消息时,写入数据 @@ -76,8 +68,10 @@ public override Boolean Close(IHandlerContext context, String reason) if (UserPacket && message is IPacket pk) message = new WebSocketMessage { Type = WebSocketMessageType.Binary, Payload = pk }; + // 谁申请,谁归还 + IOwnerPacket? owner = null; if (message is WebSocketMessage msg) - message = msg.ToPacket(); + message = owner = msg.ToPacket(); try { @@ -86,7 +80,7 @@ public override Boolean Close(IHandlerContext context, String reason) finally { // 下游可能忘了释放内存,这里兜底释放 - message.TryDispose(); + owner?.Dispose(); } } } diff --git a/NewLife.Core/Net/SessionBase.cs b/NewLife.Core/Net/SessionBase.cs index f5a3ad14d..51ff44316 100644 --- a/NewLife.Core/Net/SessionBase.cs +++ b/NewLife.Core/Net/SessionBase.cs @@ -271,7 +271,7 @@ public Int32 Send(IPacket data) using var span = Tracer?.NewSpan($"net:{Name}:Receive", BufferSize + ""); try { - using var pk = new ArrayPacket(BufferSize); + var pk = new OwnerPacket(BufferSize); var size = Client.Receive(pk.Buffer, SocketFlags.None); if (span != null) span.Value = size; @@ -295,7 +295,7 @@ public Int32 Send(IPacket data) using var span = Tracer?.NewSpan($"net:{Name}:ReceiveAsync", BufferSize + ""); try { - using var pk = new ArrayPacket(BufferSize); + var pk = new OwnerPacket(BufferSize); #if NETFRAMEWORK || NETSTANDARD2_0 var ar = Client.BeginReceive(pk.Buffer, 0, pk.Length, SocketFlags.None, null, Client); var size = ar.IsCompleted ? diff --git a/NewLife.Core/Net/TcpSession.cs b/NewLife.Core/Net/TcpSession.cs index 53e229185..94c2e6bb7 100644 --- a/NewLife.Core/Net/TcpSession.cs +++ b/NewLife.Core/Net/TcpSession.cs @@ -387,7 +387,7 @@ protected override Int32 OnSend(IPacket pk) using var span = Tracer?.NewSpan($"net:{Name}:ReceiveAsync", BufferSize + ""); try { - using var pk = new ArrayPacket(BufferSize); + var pk = new OwnerPacket(BufferSize); var size = await ss.ReadAsync(pk.Buffer, 0, pk.Length, cancellationToken); if (span != null) span.Value = size; diff --git a/NewLife.Core/Net/UdpSession.cs b/NewLife.Core/Net/UdpSession.cs index fb78ae137..36aacee34 100644 --- a/NewLife.Core/Net/UdpSession.cs +++ b/NewLife.Core/Net/UdpSession.cs @@ -256,7 +256,7 @@ public IOwnerPacket Receive() try { var ep = Remote.EndPoint as EndPoint; - using var pk = new ArrayPacket(Server.BufferSize); + var pk = new OwnerPacket(Server.BufferSize); var size = Server.Client.ReceiveFrom(pk.Buffer, ref ep); if (span != null) span.Value = size; @@ -280,7 +280,7 @@ public IOwnerPacket Receive() try { var ep = Remote.EndPoint as EndPoint; - using var pk = new ArrayPacket(Server.BufferSize); + var pk = new OwnerPacket(Server.BufferSize); var socket = Server.Client; #if NETFRAMEWORK || NETSTANDARD2_0 var ar = socket.BeginReceiveFrom(pk.Buffer, 0, pk.Length, SocketFlags.None, ref ep, null, socket); diff --git a/Test/Program.cs b/Test/Program.cs index 8119cf857..4f6e1cba0 100644 --- a/Test/Program.cs +++ b/Test/Program.cs @@ -13,6 +13,7 @@ using System.Threading.Tasks; using NewLife; using NewLife.Caching; +using NewLife.Collections; using NewLife.Common; using NewLife.Data; using NewLife.Http; @@ -76,7 +77,7 @@ private static async Task Main(String[] args) try { #endif - Test4(); + Test1(); #if !DEBUG } catch (Exception ex) @@ -98,12 +99,32 @@ private static async Task Main(String[] args) static StarClient _client; private static void Test1() { - var client = new WebClientX { Log = XTrace.Log }; - client.AuthKey = "NewLife"; - //var rs = client.DownloadLink("http://sh03.newlifex.com,http://x.newlifex.com", "ip.gz", "tt/"); - //var rs = client.DownloadLink("http://sh03.newlifex.com,http://x.newlifex.com", "leaf", "tt/"); - var rs = client.DownloadLink("http://sh03.newlifex.com,https://x.newlifex.com/dotNet/8.0.7", "dotnet-runtime-8.0.7-linux-x64", "tt/"); - XTrace.WriteLine(rs); + var pool = Pool.Shared; + var buf = pool.Rent(1000); + + buf[3] = 0x35; + Thread.Sleep(1000); + + pool.Return(buf); + pool.Return(buf); + + buf = pool.Rent(1000); + XTrace.WriteLine(buf.ToHex("-", 0, 8)); + + var buf2 = pool.Rent(800); + XTrace.WriteLine(buf2.ToHex("-", 0, 8)); + + buf[4] = 0x37; + buf2[4] = 0x39; + XTrace.WriteLine(buf.ToHex("-", 0, 8)); + XTrace.WriteLine(buf2.ToHex("-", 0, 8)); + + //var client = new WebClientX { Log = XTrace.Log }; + //client.AuthKey = "NewLife"; + ////var rs = client.DownloadLink("http://sh03.newlifex.com,http://x.newlifex.com", "ip.gz", "tt/"); + ////var rs = client.DownloadLink("http://sh03.newlifex.com,http://x.newlifex.com", "leaf", "tt/"); + //var rs = client.DownloadLink("http://sh03.newlifex.com,https://x.newlifex.com/dotNet/8.0.7", "dotnet-runtime-8.0.7-linux-x64", "tt/"); + //XTrace.WriteLine(rs); } private static void Test2()