Skip to content

Commit

Permalink
Support customization of both document id and partition key for Cosmo…
Browse files Browse the repository at this point in the history
…s DB grain persistence
  • Loading branch information
ReubenBond committed Nov 1, 2023
1 parent b4ef03e commit 774631d
Show file tree
Hide file tree
Showing 6 changed files with 94 additions and 82 deletions.
32 changes: 10 additions & 22 deletions src/Azure/Orleans.Persistence.Cosmos/CosmosGrainStorage.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,23 +3,21 @@
using System.Diagnostics;
using Microsoft.Extensions.DependencyInjection;
using Orleans.Storage;
using static Orleans.Persistence.Cosmos.CosmosIdSanitizer;

namespace Orleans.Persistence.Cosmos;

internal class CosmosGrainStorage : IGrainStorage, ILifecycleParticipant<ISiloLifecycle>
{
private const string ANY_ETAG = "*";
private const string KEY_STRING_SEPARATOR = "__";
private const string GRAINTYPE_PARTITION_KEY_PATH = "/GrainType";
private readonly ILogger _logger;
private readonly CosmosGrainStorageOptions _options;
private readonly string _name;
private readonly IServiceProvider _serviceProvider;
private readonly string _serviceId;
private string _partitionKeyPath;
private readonly IPartitionKeyProvider _partitionKeyProvider;
private readonly ICosmosOperationExecutor _executor;
private readonly IDocumentIdProvider _documentIdProvider;
private CosmosClient _client = default!;
private Container _container = default!;

Expand All @@ -29,23 +27,21 @@ public CosmosGrainStorage(
ILoggerFactory loggerFactory,
IServiceProvider serviceProvider,
IOptions<ClusterOptions> clusterOptions,
IPartitionKeyProvider partitionKeyProvider
)
IDocumentIdProvider documentIdProvider)
{
_logger = loggerFactory.CreateLogger<CosmosGrainStorage>();
_options = options;
_name = name;
_serviceProvider = serviceProvider;
_serviceId = clusterOptions.Value.ServiceId;
_partitionKeyProvider = partitionKeyProvider;
_executor = options.OperationExecutor;
_partitionKeyPath = _options.PartitionKeyPath;
_documentIdProvider = documentIdProvider;
}

public async Task ReadStateAsync<T>(string grainType, GrainId grainId, IGrainState<T> grainState)
{
var id = GetKeyString(grainId);
var partitionKey = await BuildPartitionKey(grainType, grainId);
var (id, partitionKey) = await _documentIdProvider.GetDocumentId(grainType, grainId);

if (_logger.IsEnabled(LogLevel.Trace))
{
Expand Down Expand Up @@ -105,9 +101,7 @@ public async Task ReadStateAsync<T>(string grainType, GrainId grainId, IGrainSta

public async Task WriteStateAsync<T>(string grainType, GrainId grainId, IGrainState<T> grainState)
{
var id = GetKeyString(grainId);

var partitionKey = await BuildPartitionKey(grainType, grainId);
var (id, partitionKey) = await _documentIdProvider.GetDocumentId(grainType, grainId);

if (_logger.IsEnabled(LogLevel.Trace))
{
Expand Down Expand Up @@ -188,8 +182,7 @@ public async Task WriteStateAsync<T>(string grainType, GrainId grainId, IGrainSt

public async Task ClearStateAsync<T>(string grainType, GrainId grainId, IGrainState<T> grainState)
{
var id = GetKeyString(grainId);
var partitionKey = await BuildPartitionKey(grainType, grainId);
var (id, partitionKey) = await _documentIdProvider.GetDocumentId(grainType, grainId);
if (_logger.IsEnabled(LogLevel.Trace))
{
_logger.LogTrace(
Expand Down Expand Up @@ -262,11 +255,6 @@ public void Participate(ISiloLifecycle lifecycle)
lifecycle.Subscribe(OptionFormattingUtilities.Name<CosmosGrainStorage>(_name), _options.InitStage, Init);
}

private string GetKeyString(GrainId grainId) => $"{Sanitize(_serviceId)}{KEY_STRING_SEPARATOR}{Sanitize(grainId.Type.ToString()!)}{SeparatorChar}{Sanitize(grainId.Key.ToString()!)}";

private ValueTask<string> BuildPartitionKey(string grainType, GrainId grainId) =>
_partitionKeyProvider.GetPartitionKey(grainType, grainId);

private async Task Init(CancellationToken ct)
{
var stopWatch = Stopwatch.StartNew();
Expand Down Expand Up @@ -368,7 +356,7 @@ private async Task TryCreateResources()
var container = containerResponse.Resource;
_partitionKeyPath = container.PartitionKeyPath;
if (_partitionKeyPath == GRAINTYPE_PARTITION_KEY_PATH &&
_partitionKeyProvider is not DefaultPartitionKeyProvider)
_documentIdProvider is not DefaultDocumentIdProvider)
throw new OrleansConfigurationException("Custom partition key provider is not compatible with partition key path set to /GrainType");
}

Expand Down Expand Up @@ -404,8 +392,8 @@ public static class CosmosStorageFactory
public static IGrainStorage Create(IServiceProvider services, string name)
{
var optionsMonitor = services.GetRequiredService<IOptionsMonitor<CosmosGrainStorageOptions>>();
var partitionKeyProvider = services.GetServiceByName<IPartitionKeyProvider>(name)
?? services.GetRequiredService<IPartitionKeyProvider>();
var documentIdProvider = services.GetServiceByName<IDocumentIdProvider>(name)
?? services.GetRequiredService<IDocumentIdProvider>();
var loggerFactory = services.GetRequiredService<ILoggerFactory>();
var clusterOptions = services.GetRequiredService<IOptions<ClusterOptions>>();
return new CosmosGrainStorage(
Expand All @@ -414,6 +402,6 @@ public static IGrainStorage Create(IServiceProvider services, string name)
loggerFactory,
services,
clusterOptions,
partitionKeyProvider);
documentIdProvider);
}
}
29 changes: 29 additions & 0 deletions src/Azure/Orleans.Persistence.Cosmos/DefaultDocumentIdProvider.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
using static Orleans.Persistence.Cosmos.CosmosIdSanitizer;

namespace Orleans.Persistence.Cosmos;

/// <summary>
/// The default implementation of <see cref="IDocumentIdProvider"/>.
/// </summary>
public sealed class DefaultDocumentIdProvider : IDocumentIdProvider
{
private const string KEY_STRING_SEPARATOR = "__";
private readonly ClusterOptions _options;

/// <summary>
/// Initializes a new instance of the <see cref="DefaultDocumentIdProvider"/> class.
/// </summary>
/// <param name="options">The cluster options.</param>
public DefaultDocumentIdProvider(IOptions<ClusterOptions> options)
{
_options = options.Value;
}

/// <inheritdoc/>
public ValueTask<(string DocumentId, string PartitionKey)> GetDocumentId(string stateName, GrainId grainId)
{
var documentId = $"{Sanitize(_options.ServiceId)}{KEY_STRING_SEPARATOR}{Sanitize(grainId.Type.ToString()!)}{SeparatorChar}{Sanitize(grainId.Key.ToString()!)}";
var partitionKey = Sanitize(stateName);
return new((documentId, partitionKey));
}
}
76 changes: 38 additions & 38 deletions src/Azure/Orleans.Persistence.Cosmos/HostingExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,65 +12,65 @@ namespace Orleans.Hosting;
public static class HostingExtensions
{
/// <summary>
/// Configure silo to use Azure Cosmos DB storage as the default grain storage using a custom Partition Key Provider.
/// Configure silo to use Azure Cosmos DB storage as the default grain storage using a custom document id provider.
/// </summary>
/// <typeparam name="TPartitionKeyProvider">The custom partition key provider type.</typeparam>
/// <typeparam name="TDocumentIdProvider">The document id provider.</typeparam>
/// <param name="builder">The silo builder.</param>
/// <param name="configureOptions">The delegate used to configure the provider.</param>
public static ISiloBuilder AddCosmosGrainStorageAsDefault<TPartitionKeyProvider>(
public static ISiloBuilder AddCosmosGrainStorageAsDefault<TDocumentIdProvider>(
this ISiloBuilder builder,
Action<CosmosGrainStorageOptions> configureOptions) where TPartitionKeyProvider : class, IPartitionKeyProvider
Action<CosmosGrainStorageOptions> configureOptions) where TDocumentIdProvider : class, IDocumentIdProvider
{
return builder.AddCosmosGrainStorage<TPartitionKeyProvider>(ProviderConstants.DEFAULT_STORAGE_PROVIDER_NAME, configureOptions);
return builder.AddCosmosGrainStorage<TDocumentIdProvider>(ProviderConstants.DEFAULT_STORAGE_PROVIDER_NAME, configureOptions);
}

/// <summary>
/// Configure silo to use Azure Cosmos DB storage for grain storage using a custom Partition Key Provider.
/// Configure silo to use Azure Cosmos DB storage for grain storage using a custom document id provider.
/// </summary>
/// <typeparam name="TPartitionKeyProvider">The custom partition key provider type.</typeparam>
/// <typeparam name="TDocumentIdProvider">The document id provider.</typeparam>
/// <param name="builder">The silo builder.</param>
/// <param name="name">The storage provider name.</param>
/// <param name="configureOptions">The delegate used to configure the provider.</param>
public static ISiloBuilder AddCosmosGrainStorage<TPartitionKeyProvider>(
public static ISiloBuilder AddCosmosGrainStorage<TDocumentIdProvider>(
this ISiloBuilder builder,
string name,
Action<CosmosGrainStorageOptions> configureOptions) where TPartitionKeyProvider : class, IPartitionKeyProvider
Action<CosmosGrainStorageOptions> configureOptions) where TDocumentIdProvider : class, IDocumentIdProvider
{
builder.Services.AddSingletonNamedService<IPartitionKeyProvider, TPartitionKeyProvider>(name);
builder.Services.AddSingletonNamedService<IDocumentIdProvider, TDocumentIdProvider>(name);
builder.Services.AddCosmosGrainStorage(name, configureOptions);
return builder;
}

/// <summary>
/// Configure silo to use Azure Cosmos DB storage as the default grain storage using a custom Partition Key Provider.
/// Configure silo to use Azure Cosmos DB storage as the default grain storage using a custom document id provider.
/// </summary>
/// <param name="builder">The silo builder.</param>
/// <param name="configureOptions">The delegate used to configure the provider.</param>
/// <param name="customPartitionKeyProviderType">The custom partition key provider type.</param>
/// <param name="customDocumentIdProviderType">The document id provider.</param>
public static ISiloBuilder AddCosmosGrainStorageAsDefault(
this ISiloBuilder builder,
Action<CosmosGrainStorageOptions> configureOptions,
Type customPartitionKeyProviderType)
Type customDocumentIdProviderType)
{
return builder.AddCosmosGrainStorage(ProviderConstants.DEFAULT_STORAGE_PROVIDER_NAME, configureOptions, customPartitionKeyProviderType);
return builder.AddCosmosGrainStorage(ProviderConstants.DEFAULT_STORAGE_PROVIDER_NAME, configureOptions, customDocumentIdProviderType);
}

/// <summary>
/// Configure silo to use Azure Cosmos DB storage for grain storage using a custom Partition Key Provider.
/// Configure silo to use Azure Cosmos DB storage for grain storage using a custom document id provider.
/// </summary>
/// <param name="builder">The silo builder.</param>
/// <param name="name">The storage provider name.</param>
/// <param name="configureOptions">The delegate used to configure the provider.</param>
/// <param name="customPartitionKeyProviderType">The custom partition key provider type.</param>
/// <param name="customDocumentIdProviderType">The document id provider.</param>
public static ISiloBuilder AddCosmosGrainStorage(
this ISiloBuilder builder,
string name,
Action<CosmosGrainStorageOptions> configureOptions,
Type customPartitionKeyProviderType)
Type customDocumentIdProviderType)
{
if (customPartitionKeyProviderType != null)
if (customDocumentIdProviderType != null)
{
builder.Services.TryAddSingleton(typeof(IPartitionKeyProvider), customPartitionKeyProviderType);
builder.Services.TryAddSingleton(typeof(IDocumentIdProvider), customDocumentIdProviderType);
}

builder.Services.AddCosmosGrainStorage(name, configureOptions);
Expand Down Expand Up @@ -105,64 +105,64 @@ public static ISiloBuilder AddCosmosGrainStorage(
}

/// <summary>
/// Configure silo to use Azure Cosmos DB storage as the default grain storage using a custom Partition Key Provider.
/// Configure silo to use Azure Cosmos DB storage as the default grain storage using a custom document id provider.
/// </summary>
/// <typeparam name="TPartitionKeyProvider">The custom partition key provider type.</typeparam>
/// <typeparam name="TDocumentIdProvider">The document id provider.</typeparam>
/// <param name="builder">The silo builder.</param>
/// <param name="configureOptions">The delegate used to configure the provider.</param>
public static ISiloBuilder AddCosmosGrainStorageAsDefault<TPartitionKeyProvider>(
public static ISiloBuilder AddCosmosGrainStorageAsDefault<TDocumentIdProvider>(
this ISiloBuilder builder,
Action<OptionsBuilder<CosmosGrainStorageOptions>>? configureOptions = null) where TPartitionKeyProvider : class, IPartitionKeyProvider
Action<OptionsBuilder<CosmosGrainStorageOptions>>? configureOptions = null) where TDocumentIdProvider : class, IDocumentIdProvider
{
return builder.AddCosmosGrainStorage<TPartitionKeyProvider>(ProviderConstants.DEFAULT_STORAGE_PROVIDER_NAME, configureOptions);
return builder.AddCosmosGrainStorage<TDocumentIdProvider>(ProviderConstants.DEFAULT_STORAGE_PROVIDER_NAME, configureOptions);
}

/// <summary>
/// Configure silo to use Azure Cosmos DB storage for grain storage using a custom Partition Key Provider.
/// Configure silo to use Azure Cosmos DB storage for grain storage using a custom document id provider.
/// </summary>
/// <typeparam name="TPartitionKeyProvider">The custom partition key provider type.</typeparam>
/// <typeparam name="TDocumentIdProvider">The document id provider.</typeparam>
/// <param name="builder">The silo builder.</param>
/// <param name="name">The storage provider name.</param>
/// <param name="configureOptions">The delegate used to configure the provider.</param>
public static ISiloBuilder AddCosmosGrainStorage<TPartitionKeyProvider>(
public static ISiloBuilder AddCosmosGrainStorage<TDocumentIdProvider>(
this ISiloBuilder builder,
string name,
Action<OptionsBuilder<CosmosGrainStorageOptions>>? configureOptions = null) where TPartitionKeyProvider : class, IPartitionKeyProvider
Action<OptionsBuilder<CosmosGrainStorageOptions>>? configureOptions = null) where TDocumentIdProvider : class, IDocumentIdProvider
{
builder.Services.AddSingletonNamedService<IPartitionKeyProvider, TPartitionKeyProvider>(name);
builder.Services.AddSingletonNamedService<IDocumentIdProvider, TDocumentIdProvider>(name);
builder.Services.AddCosmosGrainStorage(name, configureOptions);
return builder;
}

/// <summary>
/// Configure silo to use Azure Cosmos DB storage as the default grain storage using a custom Partition Key Provider.
/// Configure silo to use Azure Cosmos DB storage as the default grain storage using a custom document id provider.
/// </summary>
/// <param name="builder">The silo builder.</param>
/// <param name="customPartitionKeyProviderType">The custom partition key provider type.</param>
/// <param name="customDocumentIdProviderType">The document id provider.</param>
/// <param name="configureOptions">The delegate used to configure the provider.</param>
public static ISiloBuilder AddCosmosGrainStorageAsDefault(
this ISiloBuilder builder,
Type customPartitionKeyProviderType,
Type customDocumentIdProviderType,
Action<OptionsBuilder<CosmosGrainStorageOptions>>? configureOptions = null)
{
return builder.AddCosmosGrainStorage(ProviderConstants.DEFAULT_STORAGE_PROVIDER_NAME, customPartitionKeyProviderType, configureOptions);
return builder.AddCosmosGrainStorage(ProviderConstants.DEFAULT_STORAGE_PROVIDER_NAME, customDocumentIdProviderType, configureOptions);
}

/// <summary>
/// Configure silo to use Azure Cosmos DB storage for grain storage using a custom Partition Key Provider.
/// Configure silo to use Azure Cosmos DB storage for grain storage using a custom document id provider.
/// </summary>
/// <param name="builder">The silo builder.</param>
/// <param name="name">The storage provider name.</param>
/// <param name="configureOptions">The delegate used to configure the provider.</param>
public static ISiloBuilder AddCosmosGrainStorage(
this ISiloBuilder builder,
string name,
Type customPartitionKeyProviderType,
Type customDocumentIdProviderType,
Action<OptionsBuilder<CosmosGrainStorageOptions>>? configureOptions = null)
{
if (customPartitionKeyProviderType != null)
if (customDocumentIdProviderType != null)
{
builder.Services.AddSingletonNamedService<IPartitionKeyProvider>(name, customPartitionKeyProviderType);
builder.Services.AddSingletonNamedService<IDocumentIdProvider>(name, customDocumentIdProviderType);
}

builder.Services.AddCosmosGrainStorage(name, configureOptions);
Expand Down Expand Up @@ -252,7 +252,7 @@ public static IServiceCollection AddCosmosGrainStorage(
name));
services.ConfigureNamedOptionForLogging<CosmosGrainStorageOptions>(name);
services.TryAddSingleton(sp => sp.GetServiceByName<IGrainStorage>(ProviderConstants.DEFAULT_STORAGE_PROVIDER_NAME));
services.TryAddSingleton<IPartitionKeyProvider, DefaultPartitionKeyProvider>();
services.TryAddSingleton<IDocumentIdProvider, DefaultDocumentIdProvider>();
return services.AddSingletonNamedService(name, CosmosStorageFactory.Create)
.AddSingletonNamedService(name, (s, n) => (ILifecycleParticipant<ISiloLifecycle>)s.GetRequiredServiceByName<IGrainStorage>(n));
}
Expand Down
15 changes: 15 additions & 0 deletions src/Azure/Orleans.Persistence.Cosmos/IDocumentIdProvider.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
namespace Orleans.Persistence.Cosmos;

/// <summary>
/// Gets document and partition identifiers for grain state documents.
/// </summary>
public interface IDocumentIdProvider
{
/// <summary>
/// Gets the document identifier for the specified grain.
/// </summary>
/// <param name="stateName">The grain state name.</param>
/// <param name="grainId">The grain identifier.</param>
/// <returns>The document id and partition key.</returns>
ValueTask<(string DocumentId, string PartitionKey)> GetDocumentId(string stateName, GrainId grainId);
}
20 changes: 0 additions & 20 deletions src/Azure/Orleans.Persistence.Cosmos/IPartitionKeyProvider.cs

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -46,10 +46,10 @@ private async Task<CosmosGrainStorage> InitializeStorage()

options.ConfigureTestDefaults();

var pkProvider = new DefaultPartitionKeyProvider();
var clusterOptions = Options.Create(new ClusterOptions { ClusterId = _clusterId, ServiceId = _serviceId });
var idProvider = new DefaultDocumentIdProvider(clusterOptions);

var store = ActivatorUtilities.CreateInstance<CosmosGrainStorage>(providerRuntime.ServiceProvider, options, clusterOptions, "TestStorage", pkProvider);
var store = ActivatorUtilities.CreateInstance<CosmosGrainStorage>(providerRuntime.ServiceProvider, options, clusterOptions, "TestStorage", idProvider);
var lifecycle = ActivatorUtilities.CreateInstance<SiloLifecycleSubject>(providerRuntime.ServiceProvider);
store.Participate(lifecycle);
await lifecycle.OnStart();
Expand Down

0 comments on commit 774631d

Please sign in to comment.