diff --git a/src/BuildingBlocks/Dispatcher/Masa.BuildingBlocks.Dispatcher.IntegrationEvents/Logs/IntegrationEventLog.cs b/src/BuildingBlocks/Dispatcher/Masa.BuildingBlocks.Dispatcher.IntegrationEvents/Logs/IntegrationEventLog.cs index 5e291c979..1f480811b 100644 --- a/src/BuildingBlocks/Dispatcher/Masa.BuildingBlocks.Dispatcher.IntegrationEvents/Logs/IntegrationEventLog.cs +++ b/src/BuildingBlocks/Dispatcher/Masa.BuildingBlocks.Dispatcher.IntegrationEvents/Logs/IntegrationEventLog.cs @@ -11,15 +11,13 @@ public class IntegrationEventLog : IHasConcurrencyStamp public string EventTypeName { get; private set; } = null!; - [NotMapped] - public string EventTypeShortName => EventTypeName.Split('.').Last(); + [NotMapped] public string EventTypeShortName => EventTypeName.Split('.').Last(); private object? _event; [NotMapped] public object Event => _event ??= JsonSerializer.Deserialize(Content)!; - [NotMapped] - public string Topic { get; private set; } = null!; + [NotMapped] public string Topic { get; private set; } = null!; public IntegrationEventStates State { get; set; } = IntegrationEventStates.NotPublished; @@ -37,12 +35,16 @@ public class IntegrationEventLog : IHasConcurrencyStamp private IntegrationEventLog() { - Id = Guid.NewGuid(); Initialize(); } - public IntegrationEventLog(IIntegrationEvent @event, Guid transactionId) : this() + public IntegrationEventLog(IIntegrationEvent @event, Guid transactionId) : this(Guid.NewGuid(), @event, transactionId) { + } + + public IntegrationEventLog(Guid id, IIntegrationEvent @event, Guid transactionId) : this() + { + Id = id; EventId = @event.GetEventId(); CreationTime = @event.GetCreationTime(); ModificationTime = @event.GetCreationTime(); @@ -64,8 +66,9 @@ public IntegrationEventLog DeserializeJsonContent() Topic = json!.Topic; if (Topic.IsNullOrWhiteSpace()) { - Topic = EventTypeShortName;//Used to handle when the Topic is not persisted, it is consistent with the class name by default + Topic = EventTypeShortName; //Used to handle when the Topic is not persisted, it is consistent with the class name by default } + return this; } diff --git a/src/BuildingBlocks/Dispatcher/Masa.BuildingBlocks.Dispatcher.IntegrationEvents/Options/LocalMessageTableOptions.cs b/src/BuildingBlocks/Dispatcher/Masa.BuildingBlocks.Dispatcher.IntegrationEvents/Options/LocalMessageTableOptions.cs index f84959d06..3c16e7b16 100644 --- a/src/BuildingBlocks/Dispatcher/Masa.BuildingBlocks.Dispatcher.IntegrationEvents/Options/LocalMessageTableOptions.cs +++ b/src/BuildingBlocks/Dispatcher/Masa.BuildingBlocks.Dispatcher.IntegrationEvents/Options/LocalMessageTableOptions.cs @@ -17,8 +17,10 @@ public string SectionName { if (DbContextType == null) _sectionName = string.Empty; - + return _sectionName ??= ConnectionStringNameAttribute.GetConnStringName(DbContextType!); } } + + public IIdGenerator? IdGenerator { get; set; } } diff --git a/src/Contrib/Dispatcher/IntegrationEvents/Masa.Contrib.Dispatcher.IntegrationEvents.EventLogs.EFCore/IntegrationEventOptionsExtensions.cs b/src/Contrib/Dispatcher/IntegrationEvents/Masa.Contrib.Dispatcher.IntegrationEvents.EventLogs.EFCore/Extensions/IntegrationEventOptionsExtensions.cs similarity index 81% rename from src/Contrib/Dispatcher/IntegrationEvents/Masa.Contrib.Dispatcher.IntegrationEvents.EventLogs.EFCore/IntegrationEventOptionsExtensions.cs rename to src/Contrib/Dispatcher/IntegrationEvents/Masa.Contrib.Dispatcher.IntegrationEvents.EventLogs.EFCore/Extensions/IntegrationEventOptionsExtensions.cs index 5f1ff638c..5e5840cc7 100644 --- a/src/Contrib/Dispatcher/IntegrationEvents/Masa.Contrib.Dispatcher.IntegrationEvents.EventLogs.EFCore/IntegrationEventOptionsExtensions.cs +++ b/src/Contrib/Dispatcher/IntegrationEvents/Masa.Contrib.Dispatcher.IntegrationEvents.EventLogs.EFCore/Extensions/IntegrationEventOptionsExtensions.cs @@ -30,9 +30,14 @@ public static IIntegrationEventOptions UseEventLog( option.DbContextType = typeof(TDbContext); }); - options.Services.TryAddScoped(serviceProvider => new IntegrationEventLogService( - serviceProvider.GetRequiredService(), - serviceProvider.GetService>())); + options.Services.TryAddScoped(serviceProvider => + { + var idGenerator = serviceProvider.GetRequiredService>().Value.IdGenerator?? serviceProvider.GetService>(); + return new IntegrationEventLogService( + serviceProvider.GetRequiredService(), + idGenerator, + serviceProvider.GetService>()); + }); //Add local message table model mapping if (!disableEntityTypeConfiguration) diff --git a/src/Contrib/Dispatcher/IntegrationEvents/Masa.Contrib.Dispatcher.IntegrationEvents.EventLogs.EFCore/IntegrationEventLogService.cs b/src/Contrib/Dispatcher/IntegrationEvents/Masa.Contrib.Dispatcher.IntegrationEvents.EventLogs.EFCore/IntegrationEventLogService.cs index 335031a89..71f77d8ef 100644 --- a/src/Contrib/Dispatcher/IntegrationEvents/Masa.Contrib.Dispatcher.IntegrationEvents.EventLogs.EFCore/IntegrationEventLogService.cs +++ b/src/Contrib/Dispatcher/IntegrationEvents/Masa.Contrib.Dispatcher.IntegrationEvents.EventLogs.EFCore/IntegrationEventLogService.cs @@ -7,12 +7,15 @@ public class IntegrationEventLogService : IIntegrationEventLogService { private readonly IntegrationEventLogContext _eventLogContext; private readonly ILogger? _logger; + private readonly IIdGenerator? _idGenerator; public IntegrationEventLogService( IntegrationEventLogContext eventLogContext, + IIdGenerator? idGenerator, ILogger? logger = null) { _eventLogContext = eventLogContext; + _idGenerator = idGenerator; _logger = logger; } @@ -33,8 +36,8 @@ public async Task> RetrieveEventLogsFailedToPub var time = DateTime.UtcNow.AddSeconds(-minimumRetryInterval); var result = await _eventLogContext.EventLogs .Where(e => (e.State == IntegrationEventStates.PublishedFailed || e.State == IntegrationEventStates.InProgress) && - e.TimesSent <= maxRetryTimes && - e.ModificationTime < time) + e.TimesSent <= maxRetryTimes && + e.ModificationTime < time) .OrderBy(e => e.CreationTime) .Take(retryBatchSize) .ToListAsync(cancellationToken); @@ -80,7 +83,10 @@ public async Task SaveEventAsync(IIntegrationEvent @event, DbTransaction transac await _eventLogContext.DbContext.Database.UseTransactionAsync(transaction, Guid.NewGuid(), cancellationToken: cancellationToken); - var eventLogEntry = new IntegrationEventLog(@event, _eventLogContext.DbContext.Database.CurrentTransaction!.TransactionId); + var eventLogEntry = new IntegrationEventLog( + _idGenerator?.NewId() ?? Guid.NewGuid(), + @event, + _eventLogContext.DbContext.Database.CurrentTransaction!.TransactionId); await _eventLogContext.EventLogs.AddAsync(eventLogEntry, cancellationToken); await _eventLogContext.DbContext.SaveChangesAsync(cancellationToken); @@ -115,6 +121,7 @@ public Task MarkEventAsInProgressAsync(Guid eventId, int minimumRetryInterval, C throw new UserFriendlyException( $"Failed to modify the state of the local message table to {IntegrationEventStates.InProgress}, the current State is {eventLog.State}, Id: {eventLog.Id}, Multitasking execution error, waiting for the next retry"); } + if (eventLog.State != IntegrationEventStates.NotPublished && eventLog.State != IntegrationEventStates.InProgress && eventLog.State != IntegrationEventStates.PublishedFailed) diff --git a/src/Contrib/Dispatcher/IntegrationEvents/Masa.Contrib.Dispatcher.IntegrationEvents.EventLogs.EFCore/_Imports.cs b/src/Contrib/Dispatcher/IntegrationEvents/Masa.Contrib.Dispatcher.IntegrationEvents.EventLogs.EFCore/_Imports.cs index 3094bb7eb..d2524f143 100644 --- a/src/Contrib/Dispatcher/IntegrationEvents/Masa.Contrib.Dispatcher.IntegrationEvents.EventLogs.EFCore/_Imports.cs +++ b/src/Contrib/Dispatcher/IntegrationEvents/Masa.Contrib.Dispatcher.IntegrationEvents.EventLogs.EFCore/_Imports.cs @@ -10,6 +10,7 @@ global using Microsoft.Extensions.DependencyInjection; global using Microsoft.Extensions.DependencyInjection.Extensions; global using Microsoft.Extensions.Logging; +global using Microsoft.Extensions.Options; global using System; global using System.Collections.Generic; global using System.Data.Common; diff --git a/src/Contrib/Dispatcher/IntegrationEvents/Tests/Masa.Contrib.Dispatcher.IntegrationEvents.EventLogs.EFCore.Tests/IntegrationEventLogServiceTest.cs b/src/Contrib/Dispatcher/IntegrationEvents/Tests/Masa.Contrib.Dispatcher.IntegrationEvents.EventLogs.EFCore.Tests/IntegrationEventLogServiceTest.cs index 9a579936c..e2ed399cc 100644 --- a/src/Contrib/Dispatcher/IntegrationEvents/Tests/Masa.Contrib.Dispatcher.IntegrationEvents.EventLogs.EFCore.Tests/IntegrationEventLogServiceTest.cs +++ b/src/Contrib/Dispatcher/IntegrationEvents/Tests/Masa.Contrib.Dispatcher.IntegrationEvents.EventLogs.EFCore.Tests/IntegrationEventLogServiceTest.cs @@ -93,6 +93,68 @@ await CreateIntegrationEventLogServiceAsync(_ => Task.CompletedTask, Assert.AreEqual(1, count); } + [TestMethod] + public async Task TestSaveEventAsyncByAssignIdGenerator() + { + var guid = Guid.NewGuid(); + Mock> idGenerator = new(); + idGenerator.Setup(generator => generator.NewId()).Returns(guid); + var integrationEventLogService = + await CreateIntegrationEventLogServiceAsync(_ => Task.CompletedTask, + false, + idGenerator.Object + ); + var serviceProvider = _services.BuildServiceProvider(); + var integrationEventLogContext = serviceProvider.GetRequiredService(); + + var count = integrationEventLogContext.DbContext.Set().Count(); + Assert.AreEqual(0, count); + + var orderPaymentSucceededIntegrationEvent = new OrderPaymentSucceededIntegrationEvent + { + OrderId = "1234567890123", + PaymentTime = (long)(DateTime.UtcNow - new DateTime(1970, 1, 1, 0, 0, 0, DateTimeKind.Utc)).TotalSeconds + }; + await using var transaction = await integrationEventLogContext.DbContext.Database.BeginTransactionAsync(); + await integrationEventLogService.SaveEventAsync(orderPaymentSucceededIntegrationEvent, transaction.GetDbTransaction()); + await integrationEventLogContext.DbContext.SaveChangesAsync(); + await transaction.CommitAsync(); + count = integrationEventLogContext.DbContext.Set().Count(); + Assert.AreEqual(1, count); + Assert.IsTrue(integrationEventLogContext.DbContext.Set().Any(log => log.Id == guid)); + } + + [TestMethod] + public async Task TestSaveEventAsyncByUseGlobalIdGenerator() + { + Mock> idGenerator = new(); + var guid = Guid.NewGuid(); + idGenerator.Setup(generator => generator.NewId()).Returns(guid); + _services.AddIdGenerator(generator => generator.Services.AddSingleton(_ => idGenerator.Object)); + var integrationEventLogService = + await CreateIntegrationEventLogServiceAsync(_ => Task.CompletedTask, + false + ); + var serviceProvider = _services.BuildServiceProvider(); + var integrationEventLogContext = serviceProvider.GetRequiredService(); + + var count = integrationEventLogContext.DbContext.Set().Count(); + Assert.AreEqual(0, count); + + var orderPaymentSucceededIntegrationEvent = new OrderPaymentSucceededIntegrationEvent + { + OrderId = "1234567890123", + PaymentTime = (long)(DateTime.UtcNow - new DateTime(1970, 1, 1, 0, 0, 0, DateTimeKind.Utc)).TotalSeconds + }; + await using var transaction = await integrationEventLogContext.DbContext.Database.BeginTransactionAsync(); + await integrationEventLogService.SaveEventAsync(orderPaymentSucceededIntegrationEvent, transaction.GetDbTransaction()); + await integrationEventLogContext.DbContext.SaveChangesAsync(); + await transaction.CommitAsync(); + count = integrationEventLogContext.DbContext.Set().Count(); + Assert.AreEqual(1, count); + Assert.IsTrue(integrationEventLogContext.DbContext.Set().Any(log => log.Id == guid)); + } + [DataRow(true)] [DataRow(false)] [DataTestMethod] @@ -100,7 +162,8 @@ public async Task TestMarkEventAsPublishedAsync(bool isUseLogger) { Guid eventId = default!; var integrationEventLogService = - await CreateIntegrationEventLogServiceAsync(async eventLogContext => eventId = await InsertDataAsync(eventLogContext, IntegrationEventStates.InProgress), + await CreateIntegrationEventLogServiceAsync( + async eventLogContext => eventId = await InsertDataAsync(eventLogContext, IntegrationEventStates.InProgress), isUseLogger ); var serviceProvider = _services.BuildServiceProvider(); @@ -118,7 +181,8 @@ public async Task TestMarkEventAsPublished2Async(bool isUseLogger) { Guid eventId = default!; var integrationEventLogService = - await CreateIntegrationEventLogServiceAsync(async eventLogContext => eventId = await InsertDataAsync(eventLogContext, IntegrationEventStates.NotPublished), + await CreateIntegrationEventLogServiceAsync( + async eventLogContext => eventId = await InsertDataAsync(eventLogContext, IntegrationEventStates.NotPublished), isUseLogger ); await Assert.ThrowsExceptionAsync(async () @@ -150,7 +214,8 @@ public async Task TestMarkEventAsInProgress2Async(bool isUseLogger) { Guid eventId = default!; var integrationEventLogService = - await CreateIntegrationEventLogServiceAsync(async eventLogContext => eventId = await InsertDataAsync(eventLogContext, IntegrationEventStates.Published), + await CreateIntegrationEventLogServiceAsync( + async eventLogContext => eventId = await InsertDataAsync(eventLogContext, IntegrationEventStates.Published), isUseLogger ); await Assert.ThrowsExceptionAsync(async () @@ -179,7 +244,8 @@ public async Task TestMarkEventAsFailedAsync(bool isUseLogger) { Guid eventId = default!; var integrationEventLogService = - await CreateIntegrationEventLogServiceAsync(async eventLogContext => eventId = await InsertDataAsync(eventLogContext, IntegrationEventStates.InProgress), + await CreateIntegrationEventLogServiceAsync( + async eventLogContext => eventId = await InsertDataAsync(eventLogContext, IntegrationEventStates.InProgress), isUseLogger ); var serviceProvider = _services.BuildServiceProvider(); @@ -223,7 +289,8 @@ await Assert.ThrowsExceptionAsync(async () public async Task TestDeleteExpiresAsync(bool isUseLogger) { var integrationEventLogService = - await CreateIntegrationEventLogServiceAsync(async eventLogContext => await InsertDataAsync(eventLogContext, IntegrationEventStates.Published), + await CreateIntegrationEventLogServiceAsync( + async eventLogContext => await InsertDataAsync(eventLogContext, IntegrationEventStates.Published), isUseLogger ); var serviceProvider = _services.BuildServiceProvider(); @@ -240,7 +307,8 @@ await CreateIntegrationEventLogServiceAsync(async eventLogContext => await Inser public async Task TestDeleteExpires2Async(bool isUseLogger) { var integrationEventLogService = - await CreateIntegrationEventLogServiceAsync(async eventLogContext => await InsertDataAsync(eventLogContext, IntegrationEventStates.NotPublished), + await CreateIntegrationEventLogServiceAsync( + async eventLogContext => await InsertDataAsync(eventLogContext, IntegrationEventStates.NotPublished), isUseLogger ); var serviceProvider = _services.BuildServiceProvider(); @@ -253,7 +321,8 @@ await CreateIntegrationEventLogServiceAsync(async eventLogContext => await Inser private async Task CreateIntegrationEventLogServiceAsync( Func func, - bool isUseLogger) + bool isUseLogger, + IIdGenerator? idGenerator = null) { if (isUseLogger) _services.AddLogging(); var serviceProvider = _services.BuildServiceProvider(); @@ -261,7 +330,7 @@ private async Task CreateIntegrationEventLogServiceA await integrationEventLogContext.DbContext.Database.EnsureCreatedAsync(); await func.Invoke(integrationEventLogContext); var logger = isUseLogger ? serviceProvider.GetRequiredService>() : null; - return new IntegrationEventLogService(integrationEventLogContext, logger); + return new IntegrationEventLogService(integrationEventLogContext, idGenerator ?? serviceProvider.GetService>(), logger); } private static async Task InsertDataAsync( diff --git a/src/Contrib/Dispatcher/IntegrationEvents/Tests/Masa.Contrib.Dispatcher.IntegrationEvents.EventLogs.EFCore.Tests/_Imports.cs b/src/Contrib/Dispatcher/IntegrationEvents/Tests/Masa.Contrib.Dispatcher.IntegrationEvents.EventLogs.EFCore.Tests/_Imports.cs index c37193402..e2a2cda5c 100644 --- a/src/Contrib/Dispatcher/IntegrationEvents/Tests/Masa.Contrib.Dispatcher.IntegrationEvents.EventLogs.EFCore.Tests/_Imports.cs +++ b/src/Contrib/Dispatcher/IntegrationEvents/Tests/Masa.Contrib.Dispatcher.IntegrationEvents.EventLogs.EFCore.Tests/_Imports.cs @@ -1,6 +1,7 @@ // Copyright (c) MASA Stack All rights reserved. // Licensed under the MIT License. See LICENSE.txt in the project root for license information. +global using Masa.BuildingBlocks.Data; global using Masa.BuildingBlocks.Data.UoW; global using Masa.BuildingBlocks.Dispatcher.IntegrationEvents; global using Masa.BuildingBlocks.Dispatcher.IntegrationEvents.Logs;