Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow injecting ICustomStorageInterface implementation from outside Grain #8807

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
using System;
using Orleans;
using Orleans.Runtime;

namespace OrleansEventSourcing.CustomStorage;

/// <summary>
/// Exception thrown whenever a grain call is attempted with a bad / missing custom storage provider configuration settings for that grain.
/// </summary>
[GenerateSerializer, Serializable]
public sealed class BadCustomStorageProviderConfigException : OrleansException
{
public BadCustomStorageProviderConfigException()
{
}

public BadCustomStorageProviderConfigException(string msg)
: base(msg)
{
}

public BadCustomStorageProviderConfigException(string msg, Exception exc)
: base(msg, exc)
{
}
}
46 changes: 46 additions & 0 deletions src/Orleans.EventSourcing/CustomStorage/CustomStorageHelpers.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
using System;
using System.Diagnostics.CodeAnalysis;
using Microsoft.Extensions.DependencyInjection;
using Orleans.EventSourcing.CustomStorage;
using Orleans.Runtime;

namespace OrleansEventSourcing.CustomStorage;

public static class CustomStorageHelpers
{
public static ICustomStorageInterface<TState, TDelta> GetCustomStorage<TState, TDelta>(object hostGrain, GrainId grainId, IServiceProvider services)
where TState : class, new()
where TDelta : class
{
ArgumentNullException.ThrowIfNull(hostGrain);

if (hostGrain is ICustomStorageInterface<TState, TDelta> hostGrainCustomStorage)
{
return hostGrainCustomStorage;
}

var grainType = hostGrain.GetType();
var attrs = grainType.GetCustomAttributes(typeof(CustomStorageProviderAttribute), true);
var attr = attrs.Length > 0 ? (CustomStorageProviderAttribute)attrs[0] : null;
var storageFactory = attr != null
? services.GetKeyedService<ICustomStorageFactory>(attr.ProviderName)
: services.GetService<ICustomStorageFactory>();

if (storageFactory == null)
{
ThrowMissingProviderException(grainType, attr?.ProviderName);
}

return storageFactory.CreateCustomStorage<TState, TDelta>(grainId);
}

[DoesNotReturn]
private static void ThrowMissingProviderException(Type grainType, string name)
{
var grainTypeName = grainType.FullName;
var errMsg = string.IsNullOrEmpty(name)
? $"No default custom storage provider found loading grain type {grainTypeName} and grain does not implement ICustomStorageInterface."
: $"No custom storage provider named \"{name}\" found loading grain type {grainTypeName} and grain does not implement ICustomStorageInterface.";
throw new BadCustomStorageProviderConfigException(errMsg);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
using System;
using Orleans.Providers;

namespace OrleansEventSourcing.CustomStorage;

[AttributeUsage(AttributeTargets.Class)]
public class CustomStorageProviderAttribute : Attribute
{
public string ProviderName { get; set; } = ProviderConstants.DEFAULT_STORAGE_PROVIDER_NAME;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
using Orleans.EventSourcing.CustomStorage;
using Orleans.Runtime;

namespace OrleansEventSourcing.CustomStorage;

public interface ICustomStorageFactory
{
public ICustomStorageInterface<TState, TDelta> CreateCustomStorage<TState, TDelta>(GrainId grainId);
}
15 changes: 13 additions & 2 deletions src/Orleans.EventSourcing/CustomStorage/ICustomStorageInterface.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using System.Collections.Generic;
using System;
using System.Collections.Generic;
using System.Threading.Tasks;

namespace Orleans.EventSourcing.CustomStorage
Expand All @@ -18,11 +19,21 @@ public interface ICustomStorageInterface<TState, TDelta>
Task<KeyValuePair<int,TState>> ReadStateFromStorage();

/// <summary>
/// Applies the given array of deltas to storage, and returns true, if the version in storage matches the expected version.
/// Applies the given array of deltas to storage, and returns true, if the version in storage matches the expected version.
/// Otherwise, does nothing and returns false. If successful, the version of storage must be increased by the number of deltas.
/// </summary>
/// <returns>true if the deltas were applied, false otherwise</returns>
Task<bool> ApplyUpdatesToStorage(IReadOnlyList<TDelta> updates, int expectedversion);

/// <summary>
/// Attempt to retrieve a segment of the log, possibly from storage. Throws <see cref="NotSupportedException"/> if
/// the log cannot be read, which depends on the providers used and how they are configured.
/// </summary>
/// <param name="fromVersion">the start position </param>
/// <param name="toVersion">the end position</param>
/// <returns></returns>
Task<IReadOnlyList<TDelta>> RetrieveLogSegment(int fromVersion, int toVersion) =>
throw new NotSupportedException();
}

}
16 changes: 10 additions & 6 deletions src/Orleans.EventSourcing/CustomStorage/LogConsistencyProvider.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,20 +3,22 @@
using System;
using Microsoft.Extensions.Options;
using Microsoft.Extensions.DependencyInjection;
using OrleansEventSourcing.CustomStorage;

namespace Orleans.EventSourcing.CustomStorage
{
/// <summary>
/// A log-consistency provider that relies on grain-specific custom code for
/// A log-consistency provider that relies on grain-specific custom code for
/// reading states from storage, and appending deltas to storage.
/// Grains that wish to use this provider must implement the <see cref="ICustomStorageInterface{TState, TDelta}"/>
/// interface, to define how state is read and how deltas are written.
/// If the provider attribute "PrimaryCluster" is supplied in the provider configuration, then only the specified cluster
/// accesses storage, and other clusters may not issue updates.
/// accesses storage, and other clusters may not issue updates.
/// </summary>
public class LogConsistencyProvider : ILogViewAdaptorFactory
{
private readonly CustomStorageLogConsistencyOptions options;
private readonly IServiceProvider serviceProvider;

/// <summary>
/// Specifies a clusterid of the primary cluster from which to access storage exclusively, null if
Expand All @@ -26,18 +28,20 @@ public class LogConsistencyProvider : ILogViewAdaptorFactory

/// <inheritdoc/>
public bool UsesStorageProvider => false;
public LogConsistencyProvider(CustomStorageLogConsistencyOptions options)

public LogConsistencyProvider(CustomStorageLogConsistencyOptions options, IServiceProvider serviceProvider)
{
this.options = options;
this.serviceProvider = serviceProvider;
}

/// <inheritdoc/>
public ILogViewAdaptor<TView, TEntry> MakeLogViewAdaptor<TView, TEntry>(ILogViewAdaptorHost<TView, TEntry> hostgrain, TView initialstate, string graintypename, IGrainStorage grainStorage, ILogConsistencyProtocolServices services)
where TView : class, new()
where TEntry : class
{
return new CustomStorageAdaptor<TView, TEntry>(hostgrain, initialstate, services, PrimaryCluster);
var customStorage = CustomStorageHelpers.GetCustomStorage<TView, TEntry>(hostgrain, services.GrainId, serviceProvider);
return new CustomStorageAdaptor<TView, TEntry>(hostgrain, initialstate, services, PrimaryCluster, customStorage);
}
}

Expand All @@ -46,7 +50,7 @@ public static class LogConsistencyProviderFactory
public static ILogViewAdaptorFactory Create(IServiceProvider services, string name)
{
var optionsMonitor = services.GetRequiredService<IOptionsMonitor<CustomStorageLogConsistencyOptions>>();
return ActivatorUtilities.CreateInstance<LogConsistencyProvider>(services, optionsMonitor.Get(name));
return ActivatorUtilities.CreateInstance<LogConsistencyProvider>(services, optionsMonitor.Get(name), services);
}
}
}
27 changes: 16 additions & 11 deletions src/Orleans.EventSourcing/CustomStorage/LogViewAdaptor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,12 @@
using System.Linq;
using System.Threading.Tasks;
using Microsoft.Extensions.Logging;
using Orleans.Storage;
using Orleans.EventSourcing.Common;

namespace Orleans.EventSourcing.CustomStorage
{
/// <summary>
/// A log consistency adaptor that uses the user-provided storage interface <see cref="ICustomStorageInterface{T,E}"/>.
/// A log consistency adaptor that uses the user-provided storage interface <see cref="ICustomStorageInterface{T,E}"/>.
/// This interface must be implemented by any grain that uses this log view adaptor.
/// </summary>
/// <typeparam name="TLogView">log view type</typeparam>
Expand All @@ -23,15 +22,15 @@ internal class CustomStorageAdaptor<TLogView, TLogEntry> : PrimaryBasedLogViewAd
/// Initialize a new instance of CustomStorageAdaptor class
/// </summary>
public CustomStorageAdaptor(ILogViewAdaptorHost<TLogView, TLogEntry> host, TLogView initialState,
ILogConsistencyProtocolServices services, string primaryCluster)
ILogConsistencyProtocolServices services, string primaryCluster, ICustomStorageInterface<TLogView, TLogEntry> customStorage)
: base(host, initialState, services)
{
if (!(host is ICustomStorageInterface<TLogView, TLogEntry>))
throw new BadProviderConfigException("Must implement ICustomStorageInterface<TLogView,TLogEntry> for CustomStorageLogView provider");
this.customStorage = customStorage;
this.primaryCluster = primaryCluster;
}

private readonly string primaryCluster;
private readonly ICustomStorageInterface<TLogView, TLogEntry> customStorage;

private TLogView cached;
private int version;
Expand Down Expand Up @@ -114,7 +113,7 @@ protected override async Task ReadAsync()
try
{
// read from storage
var result = await ((ICustomStorageInterface<TLogView, TLogEntry>)Host).ReadStateFromStorage();
var result = await customStorage.ReadStateFromStorage();
version = result.Key;
cached = result.Value;

Expand Down Expand Up @@ -152,7 +151,7 @@ protected override async Task<int> WriteAsync()

try
{
writesuccessful = await ((ICustomStorageInterface<TLogView,TLogEntry>) Host).ApplyUpdatesToStorage(updates, version);
writesuccessful = await customStorage.ApplyUpdatesToStorage(updates, version);

LastPrimaryIssue.Resolve(Host, Services);
}
Expand Down Expand Up @@ -197,7 +196,7 @@ protected override async Task<int> WriteAsync()

try
{
var result = await ((ICustomStorageInterface<TLogView, TLogEntry>)Host).ReadStateFromStorage();
var result = await customStorage.ReadStateFromStorage();
version = result.Key;
cached = result.Value;

Expand Down Expand Up @@ -225,6 +224,12 @@ protected override async Task<int> WriteAsync()
return writesuccessful ? updates.Count : 0;
}

/// <inheritdoc/>
public override Task<IReadOnlyList<TLogEntry>> RetrieveLogSegment(int fromVersion, int toVersion)
{
return customStorage.RetrieveLogSegment(fromVersion, toVersion);
}

/// <summary>
/// Describes a connection issue that occurred when updating the primary storage.
/// </summary>
Expand Down Expand Up @@ -279,7 +284,7 @@ public override string ToString()
return string.Format("v{0} ({1} updates)", Version, Updates.Count);
}
}

private readonly SortedList<long, UpdateNotificationMessage> notifications = new SortedList<long,UpdateNotificationMessage>();

/// <inheritdoc/>
Expand Down Expand Up @@ -309,7 +314,7 @@ protected override void ProcessNotifications()
var updatenotification = notifications.ElementAt(0).Value;
notifications.RemoveAt(0);

// Apply all operations in pending
// Apply all operations in pending
foreach (var u in updatenotification.Updates)
try
{
Expand All @@ -328,7 +333,7 @@ protected override void ProcessNotifications()
Services.Log(LogLevel.Trace, "unprocessed notifications in queue: {0}", notifications.Count);

base.ProcessNotifications();

}

[Conditional("DEBUG")]
Expand Down
75 changes: 74 additions & 1 deletion test/Grains/TestGrains/LogTestGrainVariations.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
using Orleans.EventSourcing.CustomStorage;
using Orleans.Providers;
using Orleans.Runtime;
using Orleans.Serialization;
using OrleansEventSourcing.CustomStorage;
using UnitTests.GrainInterfaces;

namespace TestGrains
Expand Down Expand Up @@ -48,7 +51,7 @@ private ILogTestGrain GetStorageGrain()
}
return storagegrain;
}


public Task<bool> ApplyUpdatesToStorage(IReadOnlyList<object> updates, int expectedversion)
{
Expand Down Expand Up @@ -120,5 +123,75 @@ public Task<KeyValuePair<int, MyGrainState>> ReadStateFromStorage()
}
}

// use the explicitly specified "CustomStorage" log-consistency provider with a separate ICustomStorageInterface implementation
[LogConsistencyProvider(ProviderName = "CustomStorage")]
public class LogTestGrainSeparateCustomStorage : LogTestGrain
{
public class SeparateCustomStorageFactory : ICustomStorageFactory
{
private readonly DeepCopier deepCopier;

public SeparateCustomStorageFactory(DeepCopier deepCopier)
{
this.deepCopier = deepCopier;
}

public ICustomStorageInterface<TState, TDelta> CreateCustomStorage<TState, TDelta>(GrainId grainId)
{
return new SeparateCustomStorage<TState, TDelta>(deepCopier);
}
}

public class SeparateCustomStorage<TState, TDelta> : ICustomStorageInterface<TState, TDelta>
{
private readonly DeepCopier copier;

// we use fake in-memory state as the storage
private TState state;
private int version;

public SeparateCustomStorage(DeepCopier copier)
{
this.copier = copier;
}

public Task<KeyValuePair<int, TState>> ReadStateFromStorage()
{
if (state == null)
{
state = Activator.CreateInstance<TState>();
version = 0;
}
return Task.FromResult(new KeyValuePair<int, TState>(version, this.copier.Copy(state)));
}

public Task<bool> ApplyUpdatesToStorage(IReadOnlyList<TDelta> updates, int expectedversion)
{
if (state == null)
{
state = Activator.CreateInstance<TState>();
version = 0;
}

if (expectedversion != version)
return Task.FromResult(false);

foreach (var u in updates)
{
this.TransitionState(state, u);
version++;
}

return Task.FromResult(true);
}

protected virtual void TransitionState(TState state, object @event)
{
dynamic s = state;
dynamic e = @event;
s.Apply(e);
}
}
}

}
Loading