Skip to content

Commit

Permalink
v2.6 使用v11核心库的IPacket
Browse files Browse the repository at this point in the history
  • Loading branch information
nnhy committed Sep 17, 2024
1 parent 394791f commit 0a17b2e
Show file tree
Hide file tree
Showing 10 changed files with 26 additions and 17 deletions.
6 changes: 3 additions & 3 deletions NewLife.RocketMQ/ClusterClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -307,12 +307,12 @@ private Command CreateCommand(RequestCode request, Object body, Object extFields
};

// 主体
if (body is Packet pk)
if (body is IPacket pk)
cmd.Payload = pk;
else if (body is Byte[] buf)
cmd.Payload = buf;
cmd.Payload = (ArrayPacket)buf;
else if (body != null)
cmd.Payload = Config.JsonHost.Write(body, false, false, false).GetBytes();
cmd.Payload = (ArrayPacket)Config.JsonHost.Write(body, false, false, false).GetBytes();

if (extFields != null)
{
Expand Down
4 changes: 2 additions & 2 deletions NewLife.RocketMQ/Consumer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -789,7 +789,7 @@ private async Task InitOffsetAsync(CancellationToken cancellationToken = default
/// </summary>
/// <param name="payload">负载数据</param>
/// <returns></returns>
private ConsumerStatesModel ConsumerStatesSpecialJsonHandler(Packet payload)
private ConsumerStatesModel ConsumerStatesSpecialJsonHandler(IPacket payload)
{
#region <cmd formate/>

Expand Down Expand Up @@ -960,7 +960,7 @@ private Command GetConsumerRunningInfo(Command cmd)

var rs = cmd.CreateReply() as Command;
rs.Header.Language = "DOTNET";
rs.Payload = sb.ToString().GetBytes();
rs.Payload = (ArrayPacket)sb.ToString().GetBytes();

return rs;
}
Expand Down
3 changes: 2 additions & 1 deletion NewLife.RocketMQ/NameClient.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using NewLife.Log;
using NewLife.Data;
using NewLife.Log;
using NewLife.Net;
using NewLife.RocketMQ.Client;
using NewLife.RocketMQ.Protocol;
Expand Down
4 changes: 2 additions & 2 deletions NewLife.RocketMQ/NewLife.RocketMQ.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<Description>纯托管轻量级RocketMQ客户端,支持发布消息、消费消息、负载均衡等核心功能!</Description>
<Company>新生命开发团队</Company>
<Copyright>©2002-2024 新生命开发团队</Copyright>
<VersionPrefix>2.5</VersionPrefix>
<VersionPrefix>2.6</VersionPrefix>
<VersionSuffix>$([System.DateTime]::Now.ToString(`yyyy.MMdd`))</VersionSuffix>
<Version>$(VersionPrefix).$(VersionSuffix)</Version>
<FileVersion>$(Version)</FileVersion>
Expand Down Expand Up @@ -59,7 +59,7 @@
</ItemGroup>

<ItemGroup>
<PackageReference Include="NewLife.Core" Version="10.10.2024.803" />
<PackageReference Include="NewLife.Core" Version="11.0.2024.917-beta0004" />
</ItemGroup>

</Project>
15 changes: 11 additions & 4 deletions NewLife.RocketMQ/Protocol/Command.cs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ public class Command : IAccessor, IMessage

/// <summary>主体</summary>
[XmlIgnore, IgnoreDataMember]
public Packet Payload { get; set; }
public IPacket Payload { get; set; }
#endregion

#region 扩展属性
Expand All @@ -31,6 +31,13 @@ public class Command : IAccessor, IMessage
Boolean IMessage.Error { get => throw new NotImplementedException(); set => throw new NotImplementedException(); }
#endregion

#region 构造

/// <summary>销毁。回收内存</summary>
public void Dispose() => Payload.TryDispose();

#endregion

#region 读写
/// <summary>从数据流中读取</summary>
/// <param name="stream"></param>
Expand Down Expand Up @@ -69,7 +76,7 @@ public Boolean Read(Stream stream, Object context = null)
// 读取主体
if (len > 4 + headerLen)
{
Payload = bn.ReadBytes(len - 4 - headerLen);
Payload = (ArrayPacket)bn.ReadBytes(len - 4 - headerLen);
}
}
else if (type == SerializeType.ROCKETMQ)
Expand Down Expand Up @@ -247,7 +254,7 @@ public Boolean Write(Stream stream, Object context = null)

/// <summary>命令转字节数组</summary>
/// <returns></returns>
public Packet ToPacket()
public IPacket ToPacket()
{
var ms = new MemoryStream();
Write(ms, null);
Expand Down Expand Up @@ -277,7 +284,7 @@ public IMessage CreateReply()
return cmd;
}

Boolean IMessage.Read(Packet pk) => Read(pk.GetStream());
Boolean IMessage.Read(IPacket pk) => Read(pk.GetStream());
#endregion

#region 辅助
Expand Down
4 changes: 2 additions & 2 deletions NewLife.RocketMQ/Protocol/Message.cs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ public class Message
public void SetBody(Object body)
{
_BodyString = null;
if (body is Packet pk)
if (body is IPacket pk)
Body = pk.ReadBytes();
else if (body is Byte[] buf)
Body = buf;
Expand Down Expand Up @@ -88,7 +88,7 @@ public IDictionary<String, String> ParseProperties(String properties)
{
if (properties.IsNullOrEmpty()) return null;

var dic = properties.SplitAsDictionaryT('\u0001', '\u0002');
var dic = properties.SplitAsDictionary("\u0001", "\u0002");

if (TryGetAndRemove(dic, nameof(Tags), out var str)) Tags = str;
if (TryGetAndRemove(dic, nameof(Keys), out str)) Keys = str;
Expand Down
2 changes: 1 addition & 1 deletion NewLife.RocketMQ/Protocol/MessageExt.cs
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ public Boolean Read(Stream stream, Object context = null)
/// <summary>读取所有消息</summary>
/// <param name="body"></param>
/// <returns></returns>
public static IList<MessageExt> ReadAll(Packet body)
public static IList<MessageExt> ReadAll(IPacket body)
{
//var ms = new MemoryStream(body);
var ms = body.GetStream();
Expand Down
2 changes: 1 addition & 1 deletion NewLife.RocketMQ/Protocol/MqCodec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ protected override void AddToQueue(IHandlerContext context, Command msg)
/// <param name="context"></param>
/// <param name="pk"></param>
/// <returns></returns>
protected override IList<Command> Decode(IHandlerContext context, Packet pk)
protected override IList<Command> Decode(IHandlerContext context, IPacket pk)
{
var ss = context.Owner as IExtend;
if (ss["Codec"] is not PacketCodec pc)
Expand Down
1 change: 1 addition & 0 deletions XUnitTestRocketMQ/CommandTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
using System.Text;
using System.Threading.Tasks;
using NewLife;
using NewLife.Data;
using NewLife.RocketMQ.Protocol;
using NewLife.Serialization;
using Xunit;
Expand Down
2 changes: 1 addition & 1 deletion XUnitTestRocketMQ/XUnitTestRocketMQ.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
</PropertyGroup>

<ItemGroup>
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="17.11.0" />
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="17.11.1" />
<PackageReference Include="xunit" Version="2.9.0" />
<PackageReference Include="xunit.runner.visualstudio" Version="2.8.2">
<PrivateAssets>all</PrivateAssets>
Expand Down

0 comments on commit 0a17b2e

Please sign in to comment.