Skip to content

Commit

Permalink
Merge pull request #88 from yimiaoxiehou/master
Browse files Browse the repository at this point in the history
[fix] 参照java client,修改获取 topic queue 的逻辑,当 topic 不存在时,返回 TBW102 的信息,以支持自动创建主题
  • Loading branch information
nnhy authored Dec 13, 2024
2 parents 93cbb18 + 8c5237e commit 8b30292
Show file tree
Hide file tree
Showing 3 changed files with 38 additions and 3 deletions.
11 changes: 10 additions & 1 deletion NewLife.RocketMQ/MqBase.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Diagnostics;
using System.Reflection;
using System.Security.Authentication;
Expand All @@ -24,9 +25,15 @@ public abstract class MqBase : DisposeBase
/// <remarks>阿里云目前需要在Group前面带上实例ID并用【%】连接,组成路由Group[用来路由到实例Group]</remarks>
public String Group { get; set; } = "DEFAULT_PRODUCER";

/// <summary>rocketmq 默认主题</summary>
public static String DefaultTopic { get; } = "TBW102";

/// <summary>主题</summary>
/// <remarks>阿里云目前需要在Topic前面带上实例ID并用【%】连接,组成路由Topic[用来路由到实例Topic]</remarks>
public String Topic { get; set; } = "TBW102";
public String Topic { get; set; } = DefaultTopic;

/// <summary>默认的主题队列数量</summary>
public Int32 DefaultTopicQueueNums { get; set; } = 4;

/// <summary>本地IP地址</summary>
public String ClientIP { get; set; } = NetHelper.MyIP() + "";
Expand Down Expand Up @@ -236,6 +243,8 @@ protected virtual void OnStart()

// 阻塞获取Broker地址,确保首次使用之前已经获取到Broker地址
var rs = client.GetRouteInfo(Topic);
DefaultTopicQueueNums = Math.Min(DefaultTopicQueueNums, rs.Where(e => e.Permission.HasFlag(Permissions.Write) && e.WriteQueueNums > 0).Select(e => e.WriteQueueNums).First());

foreach (var item in rs)
{
XTrace.WriteLine("发现Broker[{0}]: {1}, reads={2}, writes={3}", item.Name, item.Addresses.Join(), item.ReadQueueNums, item.WriteQueueNums);
Expand Down
27 changes: 26 additions & 1 deletion NewLife.RocketMQ/NameClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -143,9 +143,34 @@ public IList<BrokerInfo> GetRouteInfo(String topic)

// 有改变,重新平衡队列
OnBrokerChange?.Invoke(this, EventArgs.Empty);

return list.OrderBy(t => t.Name).ToList();
}
catch (ResponseException ex)
{
if (!topic.Equals(MqBase.DefaultTopic) && ResponseCode.TOPIC_NOT_EXIST.Equals(ex.Code))
{
WriteLog("未能找到主题[{0}],将读取默认主题[TBW102]的替代。", topic);
var rs = GetRouteInfo(MqBase.DefaultTopic);
if (rs != null && rs.Count() > 0)
{
if (rs[0].ReadQueueNums > Config.DefaultTopicQueueNums)
{
foreach (var item in rs)
{
item.WriteQueueNums = Config.DefaultTopicQueueNums;
item.ReadQueueNums = Config.DefaultTopicQueueNums;
}
}
}
return rs;
}
else
{
span?.SetError(ex, null);
throw;
}
}
catch (Exception ex)
{
span?.SetError(ex, null);
Expand Down
3 changes: 2 additions & 1 deletion NewLife.RocketMQ/Producer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -422,7 +422,8 @@ private SendMessageRequestHeader CreateHeader(Message message)
Properties = message.GetProperties(),
ReconsumeTimes = 0,
UnitMode = UnitMode,
DefaultTopic = "TBW102"
DefaultTopic = DefaultTopic,
DefaultTopicQueueNums = DefaultTopicQueueNums
};

return smrh;
Expand Down

0 comments on commit 8b30292

Please sign in to comment.