Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ADO.NET Grain Directory #9263

Open
wants to merge 21 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions Orleans.sln
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,8 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "ChaoticCluster.ServiceDefau
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "TestSerializerExternalModels", "test\Misc\TestSerializerExternalModels\TestSerializerExternalModels.csproj", "{5D587DDE-036D-4694-A314-8DDF270AC031}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Orleans.GrainDirectory.AdoNet", "src\AdoNet\Orleans.GrainDirectory.AdoNet\Orleans.GrainDirectory.AdoNet.csproj", "{159A802E-2172-4BDE-8218-81C4CA224FAC}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
Expand Down Expand Up @@ -672,6 +674,10 @@ Global
{5D587DDE-036D-4694-A314-8DDF270AC031}.Debug|Any CPU.Build.0 = Debug|Any CPU
{5D587DDE-036D-4694-A314-8DDF270AC031}.Release|Any CPU.ActiveCfg = Release|Any CPU
{5D587DDE-036D-4694-A314-8DDF270AC031}.Release|Any CPU.Build.0 = Release|Any CPU
{159A802E-2172-4BDE-8218-81C4CA224FAC}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{159A802E-2172-4BDE-8218-81C4CA224FAC}.Debug|Any CPU.Build.0 = Debug|Any CPU
{159A802E-2172-4BDE-8218-81C4CA224FAC}.Release|Any CPU.ActiveCfg = Release|Any CPU
{159A802E-2172-4BDE-8218-81C4CA224FAC}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
Expand Down Expand Up @@ -796,6 +802,7 @@ Global
{76A549FA-69F1-4967-82B6-161A8B52C86B} = {2579A7F6-EBE8-485A-BB20-A5D19DB5612B}
{4004A79F-B6BB-4472-891B-AD1348AE3E93} = {2579A7F6-EBE8-485A-BB20-A5D19DB5612B}
{5D587DDE-036D-4694-A314-8DDF270AC031} = {70BCC54E-1618-4742-A079-07588065E361}
{159A802E-2172-4BDE-8218-81C4CA224FAC} = {EB2EDE59-5021-42EE-A97A-D59939B39C66}
EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution
SolutionGuid = {7BFB3429-B5BB-4DB1-95B4-67D77A864952}
Expand Down
199 changes: 199 additions & 0 deletions src/AdoNet/Orleans.GrainDirectory.AdoNet/AdoNetGrainDirectory.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,199 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using Orleans.Configuration;
using Orleans.GrainDirectory.AdoNet.Storage;
using Orleans.Runtime;

namespace Orleans.GrainDirectory.AdoNet;

internal sealed partial class AdoNetGrainDirectory(string name, AdoNetGrainDirectoryOptions options, ILogger<AdoNetGrainDirectory> logger, IOptions<ClusterOptions> clusterOptions, IHostApplicationLifetime lifetime) : IGrainDirectory
{
private readonly ILogger _logger = logger;
private readonly string _clusterId = clusterOptions.Value.ClusterId;
private RelationalOrleansQueries _queries;

/// <summary>
/// Looks up a grain activation.
/// </summary>
/// <param name="grainId">The grain identifier.</param>
/// <returns>The grain address if found or null if not found.</returns>
public async Task<GrainAddress> Lookup(GrainId grainId)
{
try
{
var queries = await GetQueriesAsync();

var entry = await queries
.LookupGrainActivationAsync(_clusterId, name, grainId.ToString())
.WaitAsync(lifetime.ApplicationStopping);

return entry?.ToGrainAddress();
}
catch (Exception ex)
{
LogFailedToLookup(ex, _clusterId, grainId);
throw;
}
}

/// <summary>
/// Registers a new grain activation.
/// </summary>
/// <param name="address">The grain address.</param>
/// <returns>The new or current grain address.</returns>
public async Task<GrainAddress> Register(GrainAddress address)
{
ArgumentNullException.ThrowIfNull(address);

try
{
var queries = await GetQueriesAsync();

// this call is expected to register a new entry or return the existing one if found in a thread safe manner
var entry = await queries
.RegisterGrainActivationAsync(_clusterId, name, address.GrainId.ToString(), address.SiloAddress.ToParsableString(), address.ActivationId.ToParsableString())
.WaitAsync(lifetime.ApplicationStopping);

LogRegistered(_clusterId, address.GrainId, address.SiloAddress, address.ActivationId);

return entry.ToGrainAddress();
}
catch (Exception ex)
{
LogFailedToRegister(ex, _clusterId, address.GrainId, address.SiloAddress, address.ActivationId);
throw;
}
}

/// <summary>
/// Unregister an existing grain activation.
/// </summary>
/// <param name="address">The grain address.</param>
public async Task Unregister(GrainAddress address)
{
ArgumentNullException.ThrowIfNull(address);

try
{
var queries = await GetQueriesAsync();

var count = await queries
.UnregisterGrainActivationAsync(_clusterId, name, address.GrainId.ToString(), address.ActivationId.ToParsableString())
.WaitAsync(lifetime.ApplicationStopping);

if (count > 0)
{
LogUnregistered(_clusterId, address.GrainId, address.SiloAddress, address.ActivationId);
}
}
catch (Exception ex)
{
LogFailedToUnregister(ex, _clusterId, address.GrainId, address.SiloAddress, address.ActivationId);
throw;
}
}

/// <summary>
/// Unregisters all grain activations in the specified set of silos.
/// </summary>
/// <param name="siloAddresses">The set of silos.</param>
public async Task UnregisterSilos(List<SiloAddress> siloAddresses)
{
ArgumentNullException.ThrowIfNull(siloAddresses);

if (siloAddresses.Count == 0)
{
return;
}

try
{
var queries = await GetQueriesAsync();

var count = await queries
.UnregisterGrainActivationsAsync(_clusterId, name, GetSilosAddressesAsString(siloAddresses))
.WaitAsync(lifetime.ApplicationStopping);

if (count > 0)
{
LogUnregisteredSilos(count, _clusterId, siloAddresses);
}
}
catch (Exception ex)
{
LogFailedToUnregisterSilos(ex, _clusterId, siloAddresses);
throw;
}

static string GetSilosAddressesAsString(IEnumerable<SiloAddress> siloAddresses) => string.Join('|', siloAddresses.Select(x => x.ToParsableString()));
}

/// <summary>
/// Unfortunate implementation detail to account for lack of async lifetime.
/// Ideally this concern will be moved upstream so this won't be needed.
/// </summary>
private readonly SemaphoreSlim _semaphore = new(1);

/// <summary>
/// Ensures queries are loaded only once while allowing for recovery if the load fails.
/// </summary>
private ValueTask<RelationalOrleansQueries> GetQueriesAsync()
{
// attempt fast path
return _queries is not null ? new(_queries) : new(CoreAsync());

// slow path
async Task<RelationalOrleansQueries> CoreAsync()
{
await _semaphore.WaitAsync(lifetime.ApplicationStopping);
try
{
// attempt fast path again
if (_queries is not null)
{
return _queries;
}

// slow path - the member variable will only be set if the call succeeds
return _queries = await RelationalOrleansQueries
.CreateInstance(options.Invariant, options.ConnectionString)
.WaitAsync(lifetime.ApplicationStopping);
}
finally
{
_semaphore.Release();
}
}
}

#region Logging

[LoggerMessage(1, LogLevel.Error, "Failed to lookup({ClusterId}, {GrainId})")]
private partial void LogFailedToLookup(Exception ex, string clusterId, GrainId grainId);

[LoggerMessage(2, LogLevel.Debug, "Registered ({ClusterId}, {GrainId}, {SiloAddress}, {ActivationId})")]
private partial void LogRegistered(string clusterId, GrainId grainId, SiloAddress siloAddress, ActivationId activationId);

[LoggerMessage(3, LogLevel.Error, "Failed to register ({ClusterId}, {GrainId}, {SiloAddress}, {ActivationId})")]
private partial void LogFailedToRegister(Exception ex, string clusterId, GrainId grainId, SiloAddress siloAddress, ActivationId activationId);

[LoggerMessage(4, LogLevel.Debug, "Unregistered ({ClusterId}, {GrainId}, {SiloAddress}, {ActivationId})")]
private partial void LogUnregistered(string clusterId, GrainId grainId, SiloAddress siloAddress, ActivationId activationId);

[LoggerMessage(5, LogLevel.Error, "Failed to unregister ({ClusterId}, {GrainId}, {SiloAddress}, {ActivationId})")]
private partial void LogFailedToUnregister(Exception ex, string clusterId, GrainId grainId, SiloAddress siloAddress, ActivationId activationId);

[LoggerMessage(6, LogLevel.Debug, "Unregistered {Count} activations from silos {SiloAddresses} in cluster {ClusterId}")]
private partial void LogUnregisteredSilos(int count, string clusterId, IEnumerable<SiloAddress> siloAddresses);

[LoggerMessage(7, LogLevel.Error, "Failed to unregister silos {SiloAddresses} in cluster {ClusterId}")]
private partial void LogFailedToUnregisterSilos(Exception ex, string clusterId, IEnumerable<SiloAddress> siloAddresses);

#endregion Logging
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
using System;
using Orleans.Runtime;

namespace Orleans.GrainDirectory.AdoNet;

/// <summary>
/// The model that represents a grain activation in an ADONET grain directory.
/// </summary>
internal sealed record AdoNetGrainDirectoryEntry(
string ClusterId,
string ProviderId,
string GrainId,
string SiloAddress,
string ActivationId)
{
public AdoNetGrainDirectoryEntry() : this("", "", "", "", "")
{
}

public GrainAddress ToGrainAddress() => new()
{
GrainId = Runtime.GrainId.Parse(GrainId),
SiloAddress = Runtime.SiloAddress.FromParsableString(SiloAddress),
ActivationId = Runtime.ActivationId.FromParsableString(ActivationId)
};

public static AdoNetGrainDirectoryEntry FromGrainAddress(string clusterId, string providerId, GrainAddress address)
{
ArgumentNullException.ThrowIfNull(clusterId);
ArgumentNullException.ThrowIfNull(address);

return new AdoNetGrainDirectoryEntry(
clusterId,
providerId,
address.GrainId.ToString(),
address.SiloAddress.ToParsableString(),
address.ActivationId.ToParsableString());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
namespace Orleans.GrainDirectory.AdoNet;

/// <summary>
/// Options for the ADO.NET Grain Directory.
/// </summary>
public class AdoNetGrainDirectoryOptions
{
/// <summary>
/// Gets or sets the ADO.NET invariant.
/// </summary>
public string Invariant { get; set; }

/// <summary>
/// Gets or sets the connection string.
/// </summary>
[Redact]
public string ConnectionString { get; set; }
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
// This file is used by Code Analysis to maintain SuppressMessage
// attributes that are applied to this project.
// Project-level suppressions either have no target or are given
// a specific target and scoped to a namespace, type, member, etc.

using System.Diagnostics.CodeAnalysis;

[assembly: SuppressMessage("Style", "IDE0130:Namespace does not match folder structure", Justification = "N/A", Scope = "namespace", Target = "~N:Orleans.Configuration")]
[assembly: SuppressMessage("Style", "IDE0130:Namespace does not match folder structure", Justification = "N/A", Scope = "namespace", Target = "~N:Orleans.Hosting")]
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
using System;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Options;
using Orleans.GrainDirectory;
using Orleans.GrainDirectory.AdoNet;

namespace Orleans.Hosting;

public static class AdoNetGrainDirectorySiloBuilderExtensions
{
public static ISiloBuilder UseAdoNetGrainDirectoryAsDefault(
this ISiloBuilder builder,
Action<AdoNetGrainDirectoryOptions> configureOptions)
{
return builder.UseAdoNetGrainDirectoryAsDefault(ob => ob.Configure(configureOptions));
}

public static ISiloBuilder UseAdoNetGrainDirectoryAsDefault(
this ISiloBuilder builder,
Action<OptionsBuilder<AdoNetGrainDirectoryOptions>> configureOptions)
{
return builder.ConfigureServices(services => services.AddAdoNetGrainDirectory(GrainDirectoryAttribute.DEFAULT_GRAIN_DIRECTORY, configureOptions));
}

public static ISiloBuilder AddAdoNetGrainDirectory(
this ISiloBuilder builder,
string name,
Action<AdoNetGrainDirectoryOptions> configureOptions)
{
return builder.AddAdoNetGrainDirectory(name, ob => ob.Configure(configureOptions));
}

public static ISiloBuilder AddAdoNetGrainDirectory(
this ISiloBuilder builder,
string name,
Action<OptionsBuilder<AdoNetGrainDirectoryOptions>> configureOptions)
{
return builder.ConfigureServices(services => services.AddAdoNetGrainDirectory(name, configureOptions));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
using System;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Options;
using Orleans;
using Orleans.GrainDirectory.AdoNet;
using Orleans.Hosting;
using Orleans.Providers;
using static System.String;

[assembly: RegisterProvider("AdoNet", "GrainDirectory", "Silo", typeof(AdoNetGrainDirectoryProviderBuilder))]

namespace Orleans.Hosting;

internal sealed class AdoNetGrainDirectoryProviderBuilder : IProviderBuilder<ISiloBuilder>
{
public void Configure(ISiloBuilder builder, string name, IConfigurationSection configurationSection)
{
builder.AddAdoNetGrainDirectory(name, (OptionsBuilder<AdoNetGrainDirectoryOptions> optionsBuilder) =>
optionsBuilder.Configure<IServiceProvider>((options, services) =>
{
var invariant = configurationSection["Invariant"];
if (!IsNullOrEmpty(invariant))
{
options.Invariant = invariant;
}

var connectionString = configurationSection["ConnectionString"];
var connectionName = configurationSection["ConnectionName"];
if (!IsNullOrEmpty(connectionString))
{
options.ConnectionString = connectionString;
}
else if (!IsNullOrEmpty(connectionName))
{
options.ConnectionString = services.GetRequiredService<IConfiguration>().GetConnectionString(connectionName);
}
}));
}
}
Loading
Loading