Skip to content

Commit

Permalink
Merge pull request #24 from RayTale/develop
Browse files Browse the repository at this point in the history
Improve storage scalability
  • Loading branch information
u-less authored Nov 18, 2020
2 parents 4af43d7 + b612755 commit 37b89fa
Show file tree
Hide file tree
Showing 12 changed files with 138 additions and 64 deletions.
23 changes: 16 additions & 7 deletions src/Storage/Vertex.Storage.Linq2db/Core/EventStorageAttribute.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,33 +5,42 @@
namespace Vertex.Storage.Linq2db.Core
{
[AttributeUsage(AttributeTargets.Class, AllowMultiple = false, Inherited = false)]
public class EventStorageAttribute : Attribute
public class EventStorageAttribute : EventStorageBaseAttribute
{
private readonly Func<string, string> shardingFunc;
private readonly string optionName;

public EventStorageAttribute(string optionName, string name, int sharding = 0)
{
this.Name = name;
this.OptionName = optionName;
this.optionName = optionName;
if (sharding < 0)
{
throw new ArgumentOutOfRangeException("sharding must be greater than 0");
}

if (sharding == 0)
{
this.ShardingFunc = actorId => $"Vertex_Event_{name}".ToLower();
this.shardingFunc = actorId => $"Vertex_Event_{name}".ToLower();
}
else
{
var tableNames = Enumerable.Range(0, sharding).Select(index => $"Vertex_Event_{name}_{index}".ToLower()).ToList();
var hash = new ConsistentHash(tableNames, tableNames.Count * 10);
this.ShardingFunc = actorId => hash.GetNode(actorId);
this.shardingFunc = actorId => hash.GetNode(actorId);
}
}

public string OptionName { get; init; }

public string Name { get; init; }

public Func<string, string> ShardingFunc { get; init; }
public override string GetTableName(string actorId)
{
return this.shardingFunc(actorId);
}

public override string GetOptionName(string actorId)
{
return this.optionName;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
using System;

namespace Vertex.Storage.Linq2db.Core
{
[AttributeUsage(AttributeTargets.Class, AllowMultiple = false, Inherited = false)]
public abstract class EventStorageBaseAttribute : Attribute
{
public abstract string GetTableName(string actorId);

public abstract string GetOptionName(string actorId);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,33 +5,42 @@
namespace Vertex.Storage.Linq2db.Core
{
[AttributeUsage(AttributeTargets.Class, AllowMultiple = false, Inherited = false)]
public class SnapshotStorageAttribute : Attribute
public class SnapshotStorageAttribute : SnapshotStorageBaseAttribute
{
private readonly Func<string, string> shardingFunc;
private readonly string optionName;

public SnapshotStorageAttribute(string optionName, string name, int sharding = 0)
{
this.Name = name;
this.OptionName = optionName;
this.optionName = optionName;
if (sharding < 0)
{
throw new ArgumentOutOfRangeException("sharding must be greater than 0");
}

if (sharding == 0)
{
this.ShardingFunc = actorId => $"Vertex_Snapshot_{name}".ToLower();
this.shardingFunc = actorId => $"Vertex_Snapshot_{name}".ToLower();
}
else
{
var tableNames = Enumerable.Range(0, sharding).Select(index => $"Vertex_Snapshot_{name}_{index}".ToLower()).ToList();
var hash = new ConsistentHash(tableNames, tableNames.Count * 10);
this.ShardingFunc = actorId => hash.GetNode(actorId);
this.shardingFunc = actorId => hash.GetNode(actorId);
}
}

public string OptionName { get; init; }

public string Name { get; init; }

public Func<string, string> ShardingFunc { get; init; }
public override string GetOptionName(string actorId)
{
return this.optionName;
}

public override string GetTableName(string actorId)
{
return this.shardingFunc(actorId);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
using System;

namespace Vertex.Storage.Linq2db.Core
{
[AttributeUsage(AttributeTargets.Class, AllowMultiple = false, Inherited = false)]
public abstract class SnapshotStorageBaseAttribute : Attribute
{
public abstract string GetTableName(string actorId);

public abstract string GetOptionName(string actorId);
}
}
23 changes: 16 additions & 7 deletions src/Storage/Vertex.Storage.Linq2db/Core/TxEventStorageAttribute.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,33 +5,42 @@
namespace Vertex.Storage.Linq2db.Core
{
[AttributeUsage(AttributeTargets.Class, AllowMultiple = false, Inherited = false)]
public class TxEventStorageAttribute : Attribute
public class TxEventStorageAttribute : TxEventStorageBaseAttribute
{
private readonly Func<string, string> shardingFunc;
private readonly string optionName;

public TxEventStorageAttribute(string optionName, string name, int sharding = 0)
{
this.Name = name;
this.OptionName = optionName;
this.optionName = optionName;
if (sharding < 0)
{
throw new ArgumentOutOfRangeException("sharding must be greater than 0");
}

if (sharding == 0)
{
this.ShardingFunc = actorId => $"Vertex_TxEvent_{name}".ToLower();
this.shardingFunc = actorId => $"Vertex_TxEvent_{name}".ToLower();
}
else
{
var tableNames = Enumerable.Range(0, sharding).Select(index => $"Vertex_TxEvent_{name}_{index}".ToLower()).ToList();
var hash = new ConsistentHash(tableNames, tableNames.Count * 10);
this.ShardingFunc = actorId => hash.GetNode(actorId);
this.shardingFunc = actorId => hash.GetNode(actorId);
}
}

public string OptionName { get; init; }

public string Name { get; init; }

public Func<string, string> ShardingFunc { get; init; }
public override string GetOptionName(string actorId)
{
return this.optionName;
}

public override string GetTableName(string actorId)
{
return this.shardingFunc(actorId);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
using System;

namespace Vertex.Storage.Linq2db.Core
{
[AttributeUsage(AttributeTargets.Class, AllowMultiple = false, Inherited = false)]
public abstract class TxEventStorageBaseAttribute : Attribute
{
public abstract string GetTableName(string actorId);

public abstract string GetOptionName(string actorId);
}
}
1 change: 0 additions & 1 deletion src/Storage/Vertex.Storage.Linq2db/Storage/EventStorage.cs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Runtime.CompilerServices;
using System.Threading.Tasks;
using LinqToDB;
using LinqToDB.Data;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@ namespace Vertex.Storage.Linq2db.Storage
{
public class EventStorageFactory : IEventStorageFactory
{
private readonly ConcurrentDictionary<Type, EventStorageAttribute> typeAttributes = new ConcurrentDictionary<Type, EventStorageAttribute>();
private readonly ConcurrentDictionary<string, Lazy<Task<object>>> eventStorageDict = new ConcurrentDictionary<string, Lazy<Task<object>>>();
private readonly ConcurrentDictionary<Type, EventStorageBaseAttribute> typeAttributes = new();
private readonly ConcurrentDictionary<string, Lazy<Task<object>>> eventStorageDict = new();
private readonly DbFactory dbFactory;
private readonly IGrainFactory grainFactory;
private readonly IServiceProvider serviceProvider;
Expand All @@ -33,28 +33,31 @@ public async ValueTask<IEventStorage<TPrimaryKey>> Create<TPrimaryKey>(IActor<TP
{
var attribute = this.typeAttributes.GetOrAdd(actor.GetType(), key =>
{
var attributes = key.GetCustomAttributes(typeof(EventStorageAttribute), false);
if (attributes.Length > 0)
var attributes = key.GetCustomAttributes(false);
var attribute = attributes.SingleOrDefault(att => typeof(EventStorageBaseAttribute).IsAssignableFrom(att.GetType()));
if (attribute != default)
{
return attributes.First() as EventStorageAttribute;
return attribute as EventStorageBaseAttribute;
}
else
{
throw new MissingAttributeException($"{nameof(EventStorageAttribute)}=>{key.Name}");
throw new MissingAttributeException($"{nameof(EventStorageBaseAttribute)}=>{key.Name}");
}
});
var tableName = attribute.ShardingFunc(actor.ActorId.ToString());
var storage = await this.eventStorageDict.GetOrAdd($"{attribute.OptionName}_{tableName}", key =>
var actorId = actor.ActorId.ToString();
var tableName = attribute.GetTableName(actorId);
var optionName = attribute.GetOptionName(actorId);
var storage = await this.eventStorageDict.GetOrAdd($"{optionName}_{tableName}", key =>
new Lazy<Task<object>>(async () =>
{
using var db = this.dbFactory.GetEventDb(attribute.OptionName);
using var db = this.dbFactory.GetEventDb(optionName);
await db.CreateTableIfNotExists<EventEntity<TPrimaryKey>>(this.grainFactory, key, tableName, async () =>
{
var indexGenerator = db.GetGenerator();
await indexGenerator.CreateUniqueIndexIfNotExists(db, tableName, $"{tableName}_event_unique", nameof(EventEntity<TPrimaryKey>.ActorId).ToLower(), nameof(EventEntity<TPrimaryKey>.Version).ToLower());
await indexGenerator.CreateUniqueIndexIfNotExists(db, tableName, $"{tableName}_event_flow_unique", nameof(EventEntity<TPrimaryKey>.ActorId).ToLower(), nameof(EventEntity<TPrimaryKey>.Name).ToLower(), nameof(EventEntity<TPrimaryKey>.FlowId).ToLower());
});
return new EventStorage<TPrimaryKey>(this.serviceProvider, this.dbFactory, attribute.OptionName, tableName);
return new EventStorage<TPrimaryKey>(this.serviceProvider, this.dbFactory, optionName, tableName);
})).Value;
return storage as IEventStorage<TPrimaryKey>;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@ namespace Vertex.Storage.Linq2db.Storage
{
public class SnapshotStorageFactory : ISnapshotStorageFactory
{
private readonly ConcurrentDictionary<Type, SnapshotStorageAttribute> typeAttributes = new ConcurrentDictionary<Type, SnapshotStorageAttribute>();
private readonly ConcurrentDictionary<string, Lazy<Task<object>>> eventStorageDict = new ConcurrentDictionary<string, Lazy<Task<object>>>();
private readonly ConcurrentDictionary<Type, SnapshotStorageBaseAttribute> typeAttributes = new();
private readonly ConcurrentDictionary<string, Lazy<Task<object>>> eventStorageDict = new();
private readonly DbFactory dbFactory;
private readonly IGrainFactory grainFactory;
private readonly IServiceProvider serviceProvider;
Expand All @@ -31,23 +31,26 @@ public async ValueTask<ISnapshotStorage<TPrimaryKey>> Create<TPrimaryKey>(IActor
{
var attribute = this.typeAttributes.GetOrAdd(actor.GetType(), key =>
{
var attributes = key.GetCustomAttributes(typeof(SnapshotStorageAttribute), false);
if (attributes.Length > 0)
var attributes = key.GetCustomAttributes(false);
var attribute = attributes.SingleOrDefault(att => typeof(SnapshotStorageBaseAttribute).IsAssignableFrom(att.GetType()));
if (attribute != default)
{
return attributes.First() as SnapshotStorageAttribute;
return attribute as SnapshotStorageBaseAttribute;
}
else
{
throw new MissingAttributeException($"{nameof(SnapshotStorageAttribute)}=>{key.Name}");
throw new MissingAttributeException($"{nameof(SnapshotStorageBaseAttribute)}=>{key.Name}");
}
});
var tableName = attribute.ShardingFunc(actor.ActorId.ToString());
var storage = await this.eventStorageDict.GetOrAdd($"{attribute.OptionName}_{tableName}", key =>
var actorId = actor.ActorId.ToString();
var tableName = attribute.GetTableName(actorId);
var optionName = attribute.GetOptionName(actorId);
var storage = await this.eventStorageDict.GetOrAdd($"{optionName}_{tableName}", key =>
new Lazy<Task<object>>(async () =>
{
using var db = this.dbFactory.GetEventDb(attribute.OptionName);
using var db = this.dbFactory.GetEventDb(optionName);
await db.CreateTableIfNotExists<SnapshotEntity<TPrimaryKey>>(this.grainFactory, key, tableName);
return new SnapshotStorage<TPrimaryKey>(this.serviceProvider, this.dbFactory, attribute.OptionName, tableName);
return new SnapshotStorage<TPrimaryKey>(this.serviceProvider, this.dbFactory, optionName, tableName);
})).Value;

return storage as ISnapshotStorage<TPrimaryKey>;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@ namespace Vertex.Storage.Linq2db.Storage
{
public class SubSnapshotStorageFactory : ISubSnapshotStorageFactory
{
private readonly ConcurrentDictionary<Type, SnapshotStorageAttribute> typeAttributes = new ConcurrentDictionary<Type, SnapshotStorageAttribute>();
private readonly ConcurrentDictionary<string, Lazy<Task<object>>> eventStorageDict = new ConcurrentDictionary<string, Lazy<Task<object>>>();
private readonly ConcurrentDictionary<Type, SnapshotStorageBaseAttribute> typeAttributes = new();
private readonly ConcurrentDictionary<string, Lazy<Task<object>>> eventStorageDict = new();
private readonly DbFactory dbFactory;
private readonly IGrainFactory grainFactory;
private readonly IServiceProvider serviceProvider;
Expand All @@ -31,23 +31,26 @@ public async ValueTask<ISubSnapshotStorage<TPrimaryKey>> Create<TPrimaryKey>(IAc
{
var attribute = this.typeAttributes.GetOrAdd(actor.GetType(), key =>
{
var attributes = key.GetCustomAttributes(typeof(SnapshotStorageAttribute), false);
if (attributes.Length > 0)
var attributes = key.GetCustomAttributes(false);
var attribute = attributes.SingleOrDefault(att => typeof(SnapshotStorageBaseAttribute).IsAssignableFrom(att.GetType()));
if (attribute != default)
{
return attributes.First() as SnapshotStorageAttribute;
return attribute as SnapshotStorageBaseAttribute;
}
else
{
throw new MissingAttributeException($"{nameof(SnapshotStorageAttribute)}=>{key.Name}");
throw new MissingAttributeException($"{nameof(SnapshotStorageBaseAttribute)}=>{key.Name}");
}
});
var tableName = attribute.ShardingFunc(actor.ActorId.ToString());
var storage = await this.eventStorageDict.GetOrAdd($"{attribute.OptionName}_{tableName}", key =>
var actorId = actor.ActorId.ToString();
var tableName = attribute.GetTableName(actorId);
var optionName = attribute.GetOptionName(actorId);
var storage = await this.eventStorageDict.GetOrAdd($"{optionName}_{tableName}", key =>
new Lazy<Task<object>>(async () =>
{
using var db = this.dbFactory.GetEventDb(attribute.OptionName);
using var db = this.dbFactory.GetEventDb(optionName);
await db.CreateTableIfNotExists<SubSnapshotEntity<TPrimaryKey>>(this.grainFactory, key, tableName);
return new SubSnapshotStorage<TPrimaryKey>(this.serviceProvider, this.dbFactory, attribute.OptionName, tableName);
return new SubSnapshotStorage<TPrimaryKey>(this.serviceProvider, this.dbFactory, optionName, tableName);
})).Value;

return storage as ISubSnapshotStorage<TPrimaryKey>;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@ namespace Vertex.Storage.Linq2db.Storage
{
public class TxEventStorageFactory : ITxEventStorageFactory
{
private readonly ConcurrentDictionary<Type, TxEventStorageAttribute> typeAttributes = new ConcurrentDictionary<Type, TxEventStorageAttribute>();
private readonly ConcurrentDictionary<string, Lazy<Task<object>>> eventStorageDict = new ConcurrentDictionary<string, Lazy<Task<object>>>();
private readonly ConcurrentDictionary<Type, TxEventStorageBaseAttribute> typeAttributes = new();
private readonly ConcurrentDictionary<string, Lazy<Task<object>>> eventStorageDict = new();
private readonly DbFactory dbFactory;
private readonly IGrainFactory grainFactory;
private readonly IServiceProvider serviceProvider;
Expand All @@ -32,28 +32,31 @@ public async ValueTask<ITxEventStorage<TPrimaryKey>> Create<TPrimaryKey>(IActor<
{
var attribute = this.typeAttributes.GetOrAdd(actor.GetType(), key =>
{
var attributes = key.GetCustomAttributes(typeof(TxEventStorageAttribute), false);
if (attributes.Length > 0)
var attributes = key.GetCustomAttributes(false);
var attribute = attributes.SingleOrDefault(att => typeof(TxEventStorageBaseAttribute).IsAssignableFrom(att.GetType()));
if (attribute != default)
{
return attributes.First() as TxEventStorageAttribute;
return attribute as TxEventStorageBaseAttribute;
}
else
{
throw new MissingAttributeException($"{nameof(TxEventStorageAttribute)}=>{key.Name}");
throw new MissingAttributeException($"{nameof(TxEventStorageBaseAttribute)}=>{key.Name}");
}
});
var tableName = attribute.ShardingFunc(actor.ActorId.ToString());
var storage = await this.eventStorageDict.GetOrAdd($"{attribute.OptionName}_{tableName}", key =>
var actorId = actor.ActorId.ToString();
var tableName = attribute.GetTableName(actorId);
var optionName = attribute.GetOptionName(actorId);
var storage = await this.eventStorageDict.GetOrAdd($"{optionName}_{tableName}", key =>
new Lazy<Task<object>>(async () =>
{
using var db = this.dbFactory.GetEventDb(attribute.OptionName);
using var db = this.dbFactory.GetEventDb(optionName);
await db.CreateTableIfNotExists<EventEntity<TPrimaryKey>>(this.grainFactory, key, tableName, async () =>
{
var indexGenerator = db.GetGenerator();
await indexGenerator.CreateUniqueIndexIfNotExists(db, tableName, $"{tableName}_event_unique", nameof(EventEntity<TPrimaryKey>.ActorId).ToLower(), nameof(EventEntity<TPrimaryKey>.Version).ToLower());
await indexGenerator.CreateUniqueIndexIfNotExists(db, tableName, $"{tableName}_event_flow_unique", nameof(EventEntity<TPrimaryKey>.ActorId).ToLower(), nameof(EventEntity<TPrimaryKey>.Name).ToLower(), nameof(EventEntity<TPrimaryKey>.FlowId).ToLower());
});
return new TxEventStorage<TPrimaryKey>(this.serviceProvider, this.dbFactory, attribute.OptionName, tableName);
return new TxEventStorage<TPrimaryKey>(this.serviceProvider, this.dbFactory, optionName, tableName);
})).Value;

return storage as ITxEventStorage<TPrimaryKey>;
Expand Down
Loading

0 comments on commit 37b89fa

Please sign in to comment.