Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: The local message table supports creation through the Id generator #610

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,13 @@ public class IntegrationEventLog : IHasConcurrencyStamp

public string EventTypeName { get; private set; } = null!;

[NotMapped]
public string EventTypeShortName => EventTypeName.Split('.').Last();
[NotMapped] public string EventTypeShortName => EventTypeName.Split('.').Last();

private object? _event;

[NotMapped] public object Event => _event ??= JsonSerializer.Deserialize<object>(Content)!;

[NotMapped]
public string Topic { get; private set; } = null!;
[NotMapped] public string Topic { get; private set; } = null!;

public IntegrationEventStates State { get; set; } = IntegrationEventStates.NotPublished;

Expand All @@ -37,12 +35,16 @@ public class IntegrationEventLog : IHasConcurrencyStamp

private IntegrationEventLog()
{
Id = Guid.NewGuid();
Initialize();
}

public IntegrationEventLog(IIntegrationEvent @event, Guid transactionId) : this()
public IntegrationEventLog(IIntegrationEvent @event, Guid transactionId) : this(Guid.NewGuid(), @event, transactionId)
{
}

public IntegrationEventLog(Guid id, IIntegrationEvent @event, Guid transactionId) : this()
{
Id = id;
EventId = @event.GetEventId();
CreationTime = @event.GetCreationTime();
ModificationTime = @event.GetCreationTime();
Expand All @@ -64,8 +66,9 @@ public IntegrationEventLog DeserializeJsonContent()
Topic = json!.Topic;
if (Topic.IsNullOrWhiteSpace())
{
Topic = EventTypeShortName;//Used to handle when the Topic is not persisted, it is consistent with the class name by default
Topic = EventTypeShortName; //Used to handle when the Topic is not persisted, it is consistent with the class name by default
}

return this;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,10 @@ public string SectionName
{
if (DbContextType == null)
_sectionName = string.Empty;

return _sectionName ??= ConnectionStringNameAttribute.GetConnStringName(DbContextType!);
}
}

public IIdGenerator<Guid>? IdGenerator { get; set; }
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,14 @@ public static IIntegrationEventOptions UseEventLog<TDbContext>(
option.DbContextType = typeof(TDbContext);
});

options.Services.TryAddScoped<IIntegrationEventLogService>(serviceProvider => new IntegrationEventLogService(
serviceProvider.GetRequiredService<IntegrationEventLogContext>(),
serviceProvider.GetService<ILogger<IntegrationEventLogService>>()));
options.Services.TryAddScoped<IIntegrationEventLogService>(serviceProvider =>
{
var idGenerator = serviceProvider.GetRequiredService<IOptions<LocalMessageTableOptions>>().Value.IdGenerator?? serviceProvider.GetService<IIdGenerator<Guid>>();
return new IntegrationEventLogService(
serviceProvider.GetRequiredService<IntegrationEventLogContext>(),
idGenerator,
serviceProvider.GetService<ILogger<IntegrationEventLogService>>());
});

//Add local message table model mapping
if (!disableEntityTypeConfiguration)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,15 @@ public class IntegrationEventLogService : IIntegrationEventLogService
{
private readonly IntegrationEventLogContext _eventLogContext;
private readonly ILogger<IntegrationEventLogService>? _logger;
private readonly IIdGenerator<Guid>? _idGenerator;

public IntegrationEventLogService(
IntegrationEventLogContext eventLogContext,
IIdGenerator<Guid>? idGenerator,
ILogger<IntegrationEventLogService>? logger = null)
{
_eventLogContext = eventLogContext;
_idGenerator = idGenerator;
_logger = logger;
}

Expand All @@ -33,8 +36,8 @@ public async Task<IEnumerable<IntegrationEventLog>> RetrieveEventLogsFailedToPub
var time = DateTime.UtcNow.AddSeconds(-minimumRetryInterval);
var result = await _eventLogContext.EventLogs
.Where(e => (e.State == IntegrationEventStates.PublishedFailed || e.State == IntegrationEventStates.InProgress) &&
e.TimesSent <= maxRetryTimes &&
e.ModificationTime < time)
e.TimesSent <= maxRetryTimes &&
e.ModificationTime < time)
.OrderBy(e => e.CreationTime)
.Take(retryBatchSize)
.ToListAsync(cancellationToken);
Expand Down Expand Up @@ -80,7 +83,10 @@ public async Task SaveEventAsync(IIntegrationEvent @event, DbTransaction transac
await _eventLogContext.DbContext.Database.UseTransactionAsync(transaction, Guid.NewGuid(),
cancellationToken: cancellationToken);

var eventLogEntry = new IntegrationEventLog(@event, _eventLogContext.DbContext.Database.CurrentTransaction!.TransactionId);
var eventLogEntry = new IntegrationEventLog(
_idGenerator?.NewId() ?? Guid.NewGuid(),
@event,
_eventLogContext.DbContext.Database.CurrentTransaction!.TransactionId);
await _eventLogContext.EventLogs.AddAsync(eventLogEntry, cancellationToken);
await _eventLogContext.DbContext.SaveChangesAsync(cancellationToken);

Expand Down Expand Up @@ -115,6 +121,7 @@ public Task MarkEventAsInProgressAsync(Guid eventId, int minimumRetryInterval, C
throw new UserFriendlyException(
$"Failed to modify the state of the local message table to {IntegrationEventStates.InProgress}, the current State is {eventLog.State}, Id: {eventLog.Id}, Multitasking execution error, waiting for the next retry");
}

if (eventLog.State != IntegrationEventStates.NotPublished &&
eventLog.State != IntegrationEventStates.InProgress &&
eventLog.State != IntegrationEventStates.PublishedFailed)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
global using Microsoft.Extensions.DependencyInjection;
global using Microsoft.Extensions.DependencyInjection.Extensions;
global using Microsoft.Extensions.Logging;
global using Microsoft.Extensions.Options;
global using System;
global using System.Collections.Generic;
global using System.Data.Common;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,14 +93,77 @@ await CreateIntegrationEventLogServiceAsync(_ => Task.CompletedTask,
Assert.AreEqual(1, count);
}

[TestMethod]
public async Task TestSaveEventAsyncByAssignIdGenerator()
{
var guid = Guid.NewGuid();
Mock<IIdGenerator<Guid>> idGenerator = new();
idGenerator.Setup(generator => generator.NewId()).Returns(guid);
var integrationEventLogService =
await CreateIntegrationEventLogServiceAsync(_ => Task.CompletedTask,
false,
idGenerator.Object
);
var serviceProvider = _services.BuildServiceProvider();
var integrationEventLogContext = serviceProvider.GetRequiredService<IntegrationEventLogContext>();

var count = integrationEventLogContext.DbContext.Set<IntegrationEventLog>().Count();
Assert.AreEqual(0, count);

var orderPaymentSucceededIntegrationEvent = new OrderPaymentSucceededIntegrationEvent
{
OrderId = "1234567890123",
PaymentTime = (long)(DateTime.UtcNow - new DateTime(1970, 1, 1, 0, 0, 0, DateTimeKind.Utc)).TotalSeconds
};
await using var transaction = await integrationEventLogContext.DbContext.Database.BeginTransactionAsync();
await integrationEventLogService.SaveEventAsync(orderPaymentSucceededIntegrationEvent, transaction.GetDbTransaction());
await integrationEventLogContext.DbContext.SaveChangesAsync();
await transaction.CommitAsync();
count = integrationEventLogContext.DbContext.Set<IntegrationEventLog>().Count();
Assert.AreEqual(1, count);
Assert.IsTrue(integrationEventLogContext.DbContext.Set<IntegrationEventLog>().Any(log => log.Id == guid));
}

[TestMethod]
public async Task TestSaveEventAsyncByUseGlobalIdGenerator()
{
Mock<IIdGenerator<Guid>> idGenerator = new();
var guid = Guid.NewGuid();
idGenerator.Setup(generator => generator.NewId()).Returns(guid);
_services.AddIdGenerator(generator => generator.Services.AddSingleton(_ => idGenerator.Object));
var integrationEventLogService =
await CreateIntegrationEventLogServiceAsync(_ => Task.CompletedTask,
false
);
var serviceProvider = _services.BuildServiceProvider();
var integrationEventLogContext = serviceProvider.GetRequiredService<IntegrationEventLogContext>();

var count = integrationEventLogContext.DbContext.Set<IntegrationEventLog>().Count();
Assert.AreEqual(0, count);

var orderPaymentSucceededIntegrationEvent = new OrderPaymentSucceededIntegrationEvent
{
OrderId = "1234567890123",
PaymentTime = (long)(DateTime.UtcNow - new DateTime(1970, 1, 1, 0, 0, 0, DateTimeKind.Utc)).TotalSeconds
};
await using var transaction = await integrationEventLogContext.DbContext.Database.BeginTransactionAsync();
await integrationEventLogService.SaveEventAsync(orderPaymentSucceededIntegrationEvent, transaction.GetDbTransaction());
await integrationEventLogContext.DbContext.SaveChangesAsync();
await transaction.CommitAsync();
count = integrationEventLogContext.DbContext.Set<IntegrationEventLog>().Count();
Assert.AreEqual(1, count);
Assert.IsTrue(integrationEventLogContext.DbContext.Set<IntegrationEventLog>().Any(log => log.Id == guid));
}

[DataRow(true)]
[DataRow(false)]
[DataTestMethod]
public async Task TestMarkEventAsPublishedAsync(bool isUseLogger)
{
Guid eventId = default!;
var integrationEventLogService =
await CreateIntegrationEventLogServiceAsync(async eventLogContext => eventId = await InsertDataAsync(eventLogContext, IntegrationEventStates.InProgress),
await CreateIntegrationEventLogServiceAsync(
async eventLogContext => eventId = await InsertDataAsync(eventLogContext, IntegrationEventStates.InProgress),
isUseLogger
);
var serviceProvider = _services.BuildServiceProvider();
Expand All @@ -118,7 +181,8 @@ public async Task TestMarkEventAsPublished2Async(bool isUseLogger)
{
Guid eventId = default!;
var integrationEventLogService =
await CreateIntegrationEventLogServiceAsync(async eventLogContext => eventId = await InsertDataAsync(eventLogContext, IntegrationEventStates.NotPublished),
await CreateIntegrationEventLogServiceAsync(
async eventLogContext => eventId = await InsertDataAsync(eventLogContext, IntegrationEventStates.NotPublished),
isUseLogger
);
await Assert.ThrowsExceptionAsync<UserFriendlyException>(async ()
Expand Down Expand Up @@ -150,7 +214,8 @@ public async Task TestMarkEventAsInProgress2Async(bool isUseLogger)
{
Guid eventId = default!;
var integrationEventLogService =
await CreateIntegrationEventLogServiceAsync(async eventLogContext => eventId = await InsertDataAsync(eventLogContext, IntegrationEventStates.Published),
await CreateIntegrationEventLogServiceAsync(
async eventLogContext => eventId = await InsertDataAsync(eventLogContext, IntegrationEventStates.Published),
isUseLogger
);
await Assert.ThrowsExceptionAsync<UserFriendlyException>(async ()
Expand Down Expand Up @@ -179,7 +244,8 @@ public async Task TestMarkEventAsFailedAsync(bool isUseLogger)
{
Guid eventId = default!;
var integrationEventLogService =
await CreateIntegrationEventLogServiceAsync(async eventLogContext => eventId = await InsertDataAsync(eventLogContext, IntegrationEventStates.InProgress),
await CreateIntegrationEventLogServiceAsync(
async eventLogContext => eventId = await InsertDataAsync(eventLogContext, IntegrationEventStates.InProgress),
isUseLogger
);
var serviceProvider = _services.BuildServiceProvider();
Expand Down Expand Up @@ -223,7 +289,8 @@ await Assert.ThrowsExceptionAsync<ArgumentException>(async ()
public async Task TestDeleteExpiresAsync(bool isUseLogger)
{
var integrationEventLogService =
await CreateIntegrationEventLogServiceAsync(async eventLogContext => await InsertDataAsync(eventLogContext, IntegrationEventStates.Published),
await CreateIntegrationEventLogServiceAsync(
async eventLogContext => await InsertDataAsync(eventLogContext, IntegrationEventStates.Published),
isUseLogger
);
var serviceProvider = _services.BuildServiceProvider();
Expand All @@ -240,7 +307,8 @@ await CreateIntegrationEventLogServiceAsync(async eventLogContext => await Inser
public async Task TestDeleteExpires2Async(bool isUseLogger)
{
var integrationEventLogService =
await CreateIntegrationEventLogServiceAsync(async eventLogContext => await InsertDataAsync(eventLogContext, IntegrationEventStates.NotPublished),
await CreateIntegrationEventLogServiceAsync(
async eventLogContext => await InsertDataAsync(eventLogContext, IntegrationEventStates.NotPublished),
isUseLogger
);
var serviceProvider = _services.BuildServiceProvider();
Expand All @@ -253,15 +321,16 @@ await CreateIntegrationEventLogServiceAsync(async eventLogContext => await Inser

private async Task<IntegrationEventLogService> CreateIntegrationEventLogServiceAsync(
Func<IntegrationEventLogContext, Task> func,
bool isUseLogger)
bool isUseLogger,
IIdGenerator<Guid>? idGenerator = null)
{
if (isUseLogger) _services.AddLogging();
var serviceProvider = _services.BuildServiceProvider();
var integrationEventLogContext = serviceProvider.GetRequiredService<IntegrationEventLogContext>();
await integrationEventLogContext.DbContext.Database.EnsureCreatedAsync();
await func.Invoke(integrationEventLogContext);
var logger = isUseLogger ? serviceProvider.GetRequiredService<ILogger<IntegrationEventLogService>>() : null;
return new IntegrationEventLogService(integrationEventLogContext, logger);
return new IntegrationEventLogService(integrationEventLogContext, idGenerator ?? serviceProvider.GetService<IIdGenerator<Guid>>(), logger);
}

private static async Task<Guid> InsertDataAsync(
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
// Copyright (c) MASA Stack All rights reserved.
// Licensed under the MIT License. See LICENSE.txt in the project root for license information.

global using Masa.BuildingBlocks.Data;
global using Masa.BuildingBlocks.Data.UoW;
global using Masa.BuildingBlocks.Dispatcher.IntegrationEvents;
global using Masa.BuildingBlocks.Dispatcher.IntegrationEvents.Logs;
Expand Down