Skip to content

Commit

Permalink
对“Redis多消费组可重复消费的队列”的消息消费确认提供了外部业务逻辑手动确认的途径AutoConfirmConsumption,优化了…
Browse files Browse the repository at this point in the history
…MultipleConsumer的示例
  • Loading branch information
e4ky committed Nov 9, 2023
1 parent f548d98 commit b4fba81
Show file tree
Hide file tree
Showing 3 changed files with 48 additions and 18 deletions.
52 changes: 41 additions & 11 deletions NewLife.Redis/Queues/MultipleConsumerGroupsQueue.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
using NewLife.Log;
using System.Text;
using System.Threading;
using NewLife.Log;

namespace NewLife.Caching.Queues;

Expand Down Expand Up @@ -48,6 +50,14 @@ public class MultipleConsumerGroupsQueue<T> : IDisposable
/// </summary>
public Boolean IgnoreErrMsg { set; get; } = true;

/// <summary>
/// 订阅消费消息后自动确认
/// </summary>
/// <remarks>
/// 如改为false请确保手动调用Acknowledge确认消费。
/// </remarks>
public Boolean AutoConfirmConsumption { set; get; } = true;

/// <summary>
/// 日志对像
/// </summary>
Expand Down Expand Up @@ -125,6 +135,7 @@ public void Connect(String connStr, String queueName)
/// <exception cref="NullReferenceException"></exception>
public void Connect(FullRedis redis, String queueName)
{
_Redis = redis;
if (_Redis != null)
{
_Queue = _Redis.GetStream<T>(queueName);
Expand Down Expand Up @@ -158,7 +169,10 @@ public void Subscribe(String subscribeAppName)
_Cts = new CancellationTokenSource();

if (_Redis == null || _Queue == null)
{
OnDisconnected("订阅时列队对像为Null。");
return;
}

//尝试创建消费组
try
Expand All @@ -182,6 +196,15 @@ public void Subscribe(String subscribeAppName)
Task.Run(() => getSubscribe(subscribeAppName), _Cts.Token);
}

/// <summary>
/// 确认消费消息
/// </summary>
/// <param name="msgIds">消息编号</param>
public void Acknowledge(params string[] msgIds)
{
_Queue?.Acknowledge(msgIds);
}

/// <summary>
/// 取消订阅
/// </summary>
Expand All @@ -193,6 +216,11 @@ public void Subscribe(String subscribeAppName)
/// <param name="subscribeAppName">订阅APP名称</param>
private async Task getSubscribe(String subscribeAppName)
{
if (_Redis == null)
{
OnDisconnected("Redis对像为Null。");
return;
}
if (_Queue == null)
{
_Cts.Cancel();
Expand All @@ -201,23 +229,23 @@ private async Task getSubscribe(String subscribeAppName)
}
while (!_Cts.IsCancellationRequested)
{
if (_Redis == null || _Queue == null)
OnDisconnected("获取订阅消息时列队对像为Null。");


var msg = await _Queue.TakeMessageAsync(10);
if (msg != null)
{
try
{
var data = msg.GetBody<T>();
_Queue.Acknowledge(msg.Id);
//通知订阅者
OnReceived(data);
OnReceived(msg.Id, data);
if (AutoConfirmConsumption)
{
_Queue.Acknowledge(msg.Id);
}
}
catch (Exception err)
{

if (XTrace.Debug) XTrace.WriteException(err);
//多消费组中,假如当前消息解析异常,原因大多是因为新增加消息格式等原因导致
//所以都可以正常忽略,如有特殊需要配置IgnoreErrMsg为false
Expand All @@ -231,7 +259,7 @@ private async Task getSubscribe(String subscribeAppName)
OnStopSubscribe(err.Message);
return;
}

}
}

Expand All @@ -253,8 +281,9 @@ public void Dispose()
/// <summary>
/// 通知订阅者接收到新消息
/// </summary>
/// <param name="msgId">消息ID</param>
/// <param name="data">命令</param>
public delegate void ReceivedHandler(T data);
public delegate void ReceivedHandler(string msgId, T data);

/// <summary>
/// 通知订阅者接收到新消息
Expand All @@ -264,8 +293,9 @@ public void Dispose()
/// <summary>
/// 通知订阅者接收到新消息
/// </summary>
/// <param name="data"></param>
protected void OnReceived(T data) => Received?.Invoke(data);
/// <param name="msgId">消息ID</param>
/// <param name="data">消息实体信息</param>
protected void OnReceived(string msgId, T data) => Received?.Invoke(msgId, data);

/// <summary>
/// 通知订阅者停止订阅
Expand Down
10 changes: 5 additions & 5 deletions QueueDemo/MultipleConsumer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ namespace QueueDemo;

class MultipleConsumer
{
public static void Start(FullRedis redis, String connStr)
public static void Start(FullRedis redis)
{
var topic = "MultipleConsumer";

Expand All @@ -17,10 +17,10 @@ public static void Start(FullRedis redis, String connStr)
//不同版本的redis错误消息关键词可能不一样,这里注意设置合适的关键词
ConsumeGroupExistErrMsgKeyWord = "exist"
};
mq.Connect(connStr, "BCGCommandQueue");
mq.Received += (data) =>
mq.Connect(redis, topic);
mq.Received += (msgId,data) =>
{
XTrace.WriteLine($"[Redis多消费组]收到列队消息:{data.Data.ToJson()}");
XTrace.WriteLine($"[Redis多消费组]收到列队消息,ID:{msgId},内容{data.Data.ToJson()}");
};
mq.StopSubscribe += (msg) =>
{
Expand All @@ -35,7 +35,7 @@ public static void Start(FullRedis redis, String connStr)
{
//一般不会进入这里。(可能这个事件还可以再优化一下)
XTrace.WriteLine($"因“{msg}”断开连接,进入重连模式。");
mq.Connect(connStr, "BCGCommandQueue");
mq.Connect(redis, topic);
};
mq.Subscribe(consumerName); //开始订阅消息

Expand Down
4 changes: 2 additions & 2 deletions QueueDemo/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -46,10 +46,10 @@ static void Main(String[] args)
Console.WriteLine("完整队列 RedisStream");
FullQueue.Start(redis);

//Redis多消费组可重复消费的队列
//Redis多消费组可重复消费的队列(完整队列的封装)
Console.WriteLine();
Console.WriteLine("多消费组 MultipleConsumerGroupsQueue");
MultipleConsumer.Start(redis, connStr);
MultipleConsumer.Start(redis);

Console.WriteLine("Finish!");
Console.ReadLine();
Expand Down

0 comments on commit b4fba81

Please sign in to comment.