Skip to content

Commit

Permalink
Merge pull request #49 from RayTale/develop
Browse files Browse the repository at this point in the history
Add snapshot cache function
  • Loading branch information
u-less authored Jan 14, 2021
2 parents ae0ee07 + 5b9f914 commit beec340
Show file tree
Hide file tree
Showing 4 changed files with 160 additions and 37 deletions.
9 changes: 5 additions & 4 deletions Packages.props
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,9 @@
<PackageReference Update="Microsoft.Extensions.Logging" Version="5.0.0" />
<PackageReference Update="Microsoft.Extensions.Logging.Debug" Version="5.0.0" />
<PackageReference Update="Microsoft.Extensions.DependencyModel" Version="5.0.0" />
<PackageReference Update="Microsoft.Extensions.Caching.Abstractions" Version="5.0.0" />
<PackageReference Update="Microsoft.Extensions.Logging.Abstractions" Version="5.0.0" />
<PackageReference Update="Microsoft.Extensions.ObjectPool" Version="5.0.1" />
<PackageReference Update="Microsoft.Extensions.ObjectPool" Version="5.0.2" />
<PackageReference Update="Microsoft.Extensions.Options" Version="5.0.0" />
<PackageReference Update="Microsoft.Extensions.Hosting.Abstractions" Version="5.0.0" />
<PackageReference Update="BenchmarkDotNet" Version="0.12.1" />
Expand All @@ -26,7 +27,7 @@

<PackageReference Update="System.Buffers" Version="4.5.1" />
<PackageReference Update="System.Runtime.Loader" Version="4.3.0" />
<PackageReference Update="System.Text.Json" Version="5.0.0" />
<PackageReference Update="System.Text.Json" Version="5.0.1" />
<PackageReference Update="System.Threading.Tasks.Dataflow" Version="5.0.0" />
<PackageReference Update="Confluent.Kafka" Version="1.5.3" />
<PackageReference Update="RabbitMQ.Client" Version="6.2.1" />
Expand All @@ -36,7 +37,7 @@

<PackageReference Update="Lindhart.Analyser.MissingAwaitWarning" Version="2.0.0" PrivateAssets="All" />
<PackageReference Update="Microsoft.CodeAnalysis.CSharp" Version="3.8.0" PrivateAssets="All" />
<PackageReference Update="Microsoft.Data.Sqlite" Version="5.0.1" />
<PackageReference Update="Microsoft.Data.Sqlite" Version="5.0.2" />
<PackageReference Update="Nerdbank.GitVersioning" Version="3.3.37" PrivateAssets="All" />
<PackageReference Update="Microsoft.CodeQuality.Analyzers" Version="3.3.2">
<PrivateAssets>all</PrivateAssets>
Expand All @@ -58,7 +59,7 @@
<PrivateAssets>all</PrivateAssets>
<IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
</PackageReference>
<PackageReference Update="coverlet.collector" Version="1.3.0">
<PackageReference Update="coverlet.collector" Version="3.0.0">
<PrivateAssets>all</PrivateAssets>
<IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
</PackageReference>
Expand Down
166 changes: 136 additions & 30 deletions src/Vertex.Runtime/Actor/FlowActor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
using System.Reflection.Emit;
using System.Runtime.CompilerServices;
using System.Threading.Tasks;
using Microsoft.Extensions.Caching.Distributed;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
Expand All @@ -30,11 +31,12 @@ namespace Vertex.Runtime.Actor
{
public abstract class FlowActor<TPrimaryKey> : ActorBase<TPrimaryKey>, IFlowActor
{
private static readonly ConcurrentDictionary<Type, Func<object, IEvent, EventMeta, Task>> GrainHandlerDict = new ConcurrentDictionary<Type, Func<object, IEvent, EventMeta, Task>>();
private static readonly ConcurrentDictionary<Type, EventDiscardAttribute> EventDiscardAttributeDict = new ConcurrentDictionary<Type, EventDiscardAttribute>();
private static readonly ConcurrentDictionary<Type, StrictHandleAttribute> EventStrictAttributerAttributeDict = new ConcurrentDictionary<Type, StrictHandleAttribute>();
private static readonly ConcurrentDictionary<Type, Func<object, IEvent, EventMeta, Task>> GrainHandlerDict = new();
private static readonly ConcurrentDictionary<Type, EventDiscardAttribute> EventDiscardAttributeDict = new();
private static readonly ConcurrentDictionary<Type, StrictHandleAttribute> EventStrictAttributerAttributeDict = new();
private readonly Func<object, IEvent, EventMeta, Task> handlerInvokeFunc;
private readonly EventDiscardAttribute discardAttribute;
private string snapshotCacheKey;

public FlowActor()
{
Expand Down Expand Up @@ -287,14 +289,16 @@ static void GetInheritor(SwitchMethodEmit from, List<SwitchMethodEmit> list, Lis
}

#region property
protected FlowActorOptions VertexOptions { get; private set; }
protected FlowActorOptions FlowOptions { get; private set; }

public abstract IVertexActor Vertex { get; }

protected ILogger Logger { get; private set; }

protected ISerializer Serializer { get; private set; }

protected IDistributedCache SnapshotCache { get; private set; }

protected IEventTypeContainer EventTypeContainer { get; private set; }

/// <summary>
Expand All @@ -307,6 +311,11 @@ static void GetInheritor(SwitchMethodEmit from, List<SwitchMethodEmit> list, Lis
/// </summary>
protected long ActivateSnapshotVersion { get; private set; }

/// <summary>
/// The event version number of the snapshot cache
/// </summary>
protected long ActivateSnapshotCacheVersion { get; private set; }

public ISubSnapshotStorage<TPrimaryKey> SnapshotStorage { get; private set; }

/// <summary>
Expand All @@ -332,11 +341,17 @@ static void GetInheritor(SwitchMethodEmit from, List<SwitchMethodEmit> list, Lis
/// <returns></returns>
protected virtual async ValueTask DependencyInjection()
{
this.VertexOptions = this.ServiceProvider.GetService<IOptionsMonitor<FlowActorOptions>>().Get(this.ActorType.FullName);
this.FlowOptions = this.ServiceProvider.GetService<IOptionsMonitor<FlowActorOptions>>().Get(this.ActorType.FullName);
this.Serializer = this.ServiceProvider.GetService<ISerializer>();
this.EventTypeContainer = this.ServiceProvider.GetService<IEventTypeContainer>();
this.Logger = (ILogger)this.ServiceProvider.GetService(typeof(ILogger<>).MakeGenericType(this.ActorType));

if (this.FlowOptions.EnableSnapshotCache)
{
this.SnapshotCache = this.ServiceProvider.GetRequiredService<IDistributedCache>();
this.snapshotCacheKey = $"vertex_flowactor_snapshot_{this.ActorType.Name}_{this.ActorId}";
}

var snapshotStorageFactory = this.ServiceProvider.GetService<ISubSnapshotStorageFactory>();
this.SnapshotStorage = await snapshotStorageFactory.Create(this);
}
Expand All @@ -354,7 +369,7 @@ public override async Task OnActivateAsync()
try
{
await this.ReadSnapshotAsync();
if (this.Snapshot.Version != 0 || this.VertexOptions.InitType == FlowInitType.ZeroVersion)
if (this.Snapshot.Version != 0 || this.FlowOptions.InitType == FlowInitType.ZeroVersion)
{
await this.Recovery();
}
Expand All @@ -373,7 +388,7 @@ public override async Task OnActivateAsync()

public override async Task OnDeactivateAsync()
{
await this.SaveSnapshotAsync(true);
await this.SaveSnapshotAsync(isDeactivate: true);

if (this.Logger.IsEnabled(LogLevel.Trace))
{
Expand All @@ -385,13 +400,28 @@ protected virtual async Task ReadSnapshotAsync()
{
try
{
this.Snapshot = await this.SnapshotStorage.Get(this.ActorId);
if (this.Snapshot == null)
if (this.FlowOptions.EnableSnapshotCache)
{
var snapshotBytes = await this.SnapshotCache.GetAsync(this.snapshotCacheKey);
if (snapshotBytes != default)
{
this.Snapshot = this.Serializer.Deserialize<SubSnapshot<TPrimaryKey>>(snapshotBytes);
}
}

if (this.Snapshot == default)
{
this.Snapshot = await this.SnapshotStorage.Get(this.ActorId);
}

if (this.Snapshot == default)
{
await this.CreateSnapshot();
}

this.ActivateSnapshotVersion = this.Snapshot.Version;
this.ActivateSnapshotCacheVersion = this.Snapshot.Version;

if (this.Logger.IsEnabled(LogLevel.Trace))
{
this.Logger.LogTrace("ReadSnapshot completed: {0}->{1}", this.ActorType.FullName, this.Serializer.Serialize(this.Snapshot));
Expand Down Expand Up @@ -425,10 +455,10 @@ private async Task Recovery()
{
while (true)
{
var documentList = await this.Vertex.GetEventDocuments(this.Snapshot.Version + 1, this.Snapshot.Version + this.VertexOptions.EventPageSize);
var documentList = await this.Vertex.GetEventDocuments(this.Snapshot.Version + 1, this.Snapshot.Version + this.FlowOptions.EventPageSize);
var evtList = this.ConvertToEventUnitList(documentList);
await this.UnsafeTell(evtList);
if (documentList.Count < this.VertexOptions.EventPageSize)
if (documentList.Count < this.FlowOptions.EventPageSize)
{
break;
}
Expand Down Expand Up @@ -587,7 +617,7 @@ protected async ValueTask Tell(EventUnit<TPrimaryKey> eventUnit)
}
catch
{
await this.SaveSnapshotAsync(true);
await this.SaveSnapshotAsync(isError: true);
throw;
}
}
Expand Down Expand Up @@ -620,6 +650,7 @@ protected async Task ConcurrentTell(IEnumerable<EventUnit<TPrimaryKey>> inputs)
{
await Task.WhenAll(this.UnprocessedEventList.Select(@event => this.EventDelivered(@event).AsTask()));
this.Snapshot.UnsafeUpdateVersion(this.UnprocessedEventList.Last().Meta);

await this.SaveSnapshotAsync();

this.UnprocessedEventList.Clear();
Expand Down Expand Up @@ -671,44 +702,119 @@ protected virtual ValueTask OnEventDelivered(EventUnit<TPrimaryKey> eventUnit)
return new ValueTask(this.handlerInvokeFunc(this, eventUnit.Event, eventUnit.Meta));
}

/// <summary>
/// Custom save items
/// </summary>
/// <returns></returns>
[MethodImpl(MethodImplOptions.AggressiveInlining)]
protected virtual ValueTask OnSaveSnapshot() => ValueTask.CompletedTask;

[MethodImpl(MethodImplOptions.AggressiveInlining)]
protected virtual ValueTask OnSavedSnapshot() => ValueTask.CompletedTask;

protected virtual async ValueTask SaveSnapshotAsync(bool force = false)
protected virtual async ValueTask SaveSnapshotAsync(bool isDeactivate = false, bool isError = false)
{
if ((force && this.Snapshot.Version > this.ActivateSnapshotVersion) ||
(this.Snapshot.Version - this.ActivateSnapshotVersion >= this.VertexOptions.SnapshotVersionInterval))
if (isDeactivate)
{
try
if (this.FlowOptions.EnableSnapshotCache)
{
if (this.Snapshot.Version - this.ActivateSnapshotCacheVersion > this.FlowOptions.MinSnapshotCacheVersionInterval)
{
await this.OnSaveSnapshot(); // Custom save items
await this.SaveSnapshotToCacheAsync();
}
}
else if (this.Snapshot.Version - this.ActivateSnapshotVersion > this.FlowOptions.MinSnapshotVersionInterval)
{
await this.OnSaveSnapshot(); // Custom save items

if (this.ActivateSnapshotVersion == 0)
await this.SaveSnapshotToDbAsync();
}
}
else if (isError)
{
if (this.FlowOptions.EnableSnapshotCache)
{
if (this.Snapshot.Version > this.ActivateSnapshotCacheVersion)
{
await this.SnapshotStorage.Insert(this.Snapshot);
await this.OnSaveSnapshot(); // Custom save items
await this.SaveSnapshotToCacheAsync();
}
else
}
else if (this.Snapshot.Version > this.ActivateSnapshotVersion)
{
await this.OnSaveSnapshot(); // Custom save items
await this.SaveSnapshotToDbAsync();
}
}
else
{
if (this.FlowOptions.EnableSnapshotCache)
{
if (this.Snapshot.Version - this.ActivateSnapshotCacheVersion >= this.FlowOptions.SnapshotCacheVersionInterval)
{
await this.SnapshotStorage.Update(this.Snapshot);
await this.OnSaveSnapshot();
await this.SaveSnapshotToCacheAsync();

// Synchronize to DB
if (this.Snapshot.Version - this.ActivateSnapshotVersion >= this.FlowOptions.SnapshotVersionInterval)
{
await this.SaveSnapshotToDbAsync();
}
}
}
else if (this.Snapshot.Version - this.ActivateSnapshotVersion >= this.FlowOptions.SnapshotVersionInterval)
{
await this.OnSaveSnapshot();
await this.SaveSnapshotToDbAsync();
}
}
}

this.ActivateSnapshotVersion = this.Snapshot.Version;
await this.OnSavedSnapshot();
protected virtual async Task SaveSnapshotToDbAsync()
{
try
{
if (this.ActivateSnapshotVersion == 0)
{
await this.SnapshotStorage.Insert(this.Snapshot);
}
else
{
await this.SnapshotStorage.Update(this.Snapshot);
}

if (this.Logger.IsEnabled(LogLevel.Trace))
{
this.Logger.LogTrace("SaveSnapshot completed: {0}->{1}", this.ActorType.FullName, this.Serializer.Serialize(this.Snapshot));
}
this.ActivateSnapshotVersion = this.Snapshot.Version;

if (this.Logger.IsEnabled(LogLevel.Trace))
{
this.Logger.LogTrace("Save snapshot to db completed: {0}->{1}", this.ActorType.FullName, this.Serializer.Serialize(this.Snapshot));
}
catch (Exception ex)
}
catch (Exception ex)
{
this.Logger.LogCritical(ex, "Save snapshot to db failed: {0}->{1}", this.ActorType.FullName, this.ActorId.ToString());
throw;
}
}

protected virtual async Task SaveSnapshotToCacheAsync()
{
try
{
await this.SnapshotCache.SetAsync(this.snapshotCacheKey, this.Serializer.SerializeToUtf8Bytes(this.Snapshot));

this.ActivateSnapshotCacheVersion = this.Snapshot.Version;

if (this.Logger.IsEnabled(LogLevel.Trace))
{
this.Logger.LogCritical(ex, "SaveSnapshot failed: {0}->{1}", this.ActorType.FullName, this.ActorId.ToString());
throw;
this.Logger.LogTrace("Save snapshot to cache completed: {0}->{1}", this.ActorType.FullName, this.Serializer.Serialize(this.Snapshot));
}
}
catch (Exception ex)
{
this.Logger.LogCritical(ex, "Save snapshot to cache failed: {0}->{1}", this.ActorType.FullName, this.ActorId.ToString());
throw;
}
}
}
}
19 changes: 17 additions & 2 deletions src/Vertex.Runtime/Options/FlowActorOptions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,12 @@
public class FlowActorOptions
{
/// <summary>
/// Event Version interval of RayGrain saving snapshot
/// Event Version interval of FlowActor saving snapshot
/// </summary>
public int SnapshotVersionInterval { get; set; } = 500;

/// <summary>
/// The minimum event Version interval for saving snapshots when RayGrain is deactivated
/// The minimum event Version interval for saving snapshot when FlowActor is deactivated
/// </summary>
public int MinSnapshotVersionInterval { get; set; } = 1;

Expand All @@ -18,5 +18,20 @@ public class FlowActorOptions
public int EventPageSize { get; set; } = 1000;

public FlowInitType InitType { get; set; } = FlowInitType.FirstReceive;

/// <summary>
/// Enable snapshot caching
/// </summary>
public bool EnableSnapshotCache { get; set; }

/// <summary>
/// Event Version interval of FlowActor snapshot cache
/// </summary>
public int SnapshotCacheVersionInterval { get; set; }

/// <summary>
/// The minimum event Version interval for saving snapshot cache when FlowActor is deactivated
/// </summary>
public int MinSnapshotCacheVersionInterval { get; set; } = 1;
}
}
3 changes: 2 additions & 1 deletion src/Vertex.Runtime/Vertex.Runtime.csproj
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
<Project Sdk="Microsoft.NET.Sdk">
<Project Sdk="Microsoft.NET.Sdk">
<Sdk Name="Microsoft.Build.CentralPackageVersions" />
<ItemGroup>
<PackageReference Include="IdGen" />
<PackageReference Include="Microsoft.Extensions.Caching.Abstractions" />
<PackageReference Include="Microsoft.Extensions.DependencyInjection" />
<PackageReference Include="Microsoft.Orleans.CodeGenerator.MSBuild" />
<PackageReference Include="Microsoft.Orleans.Core" />
Expand Down

0 comments on commit beec340

Please sign in to comment.