Skip to content

Commit

Permalink
Can join into child entities and link entities via reverse lookups
Browse files Browse the repository at this point in the history
  • Loading branch information
halgari committed Feb 20, 2024
1 parent e00df59 commit 257dc3d
Show file tree
Hide file tree
Showing 21 changed files with 503 additions and 33 deletions.
16 changes: 10 additions & 6 deletions src/NexusMods.EventSourcing.Abstractions/Datom.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,11 @@ public interface IDatom
void Emit<TSink>(ref TSink sink) where TSink : IDatomSink;

/// <summary>
/// Duplicates the datom with a new entity id
/// The datom should call the remap function on each entity id it contains
/// to remap the entity ids to actual ids
/// </summary>
/// <param name="newId"></param>
/// <returns></returns>
IDatom RemapEntityId(ulong newId);
/// <param name="remapFn"></param>
void Remap(Func<EntityId, EntityId> remapFn);
}

public interface IDatomWithTx : IDatom
Expand All @@ -36,9 +36,13 @@ public void Emit<TSink>(ref TSink sink) where TSink : IDatomSink
}

/// <inheritdoc />
public IDatom RemapEntityId(ulong newId)
public void Remap(Func<EntityId, EntityId> remapFn)
{
return new AssertDatom<TAttr, TVal>(newId, v);
e = remapFn(EntityId.From(e)).Value;
if (v is EntityId entityId)
{
v = (TVal) (object) EntityId.From(remapFn(entityId).Value);
}
}
}

Expand Down
5 changes: 5 additions & 0 deletions src/NexusMods.EventSourcing.Abstractions/IDatomStore.cs
Original file line number Diff line number Diff line change
Expand Up @@ -42,4 +42,9 @@ public interface IDatomStore : IDisposable
/// <typeparam name="TAttribute"></typeparam>
/// <returns></returns>
Expression GetValueReadExpression(Type attribute, Expression valueSpan, out ulong attributeId);

/// <summary>
/// Gets all the entities that reference the given entity id with the given attribute.
/// </summary>
IEnumerable<EntityId> ReverseLookup<TAttribute>(TxId txId) where TAttribute : IAttribute<EntityId>;
}
19 changes: 17 additions & 2 deletions src/NexusMods.EventSourcing.Abstractions/IDb.cs
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,24 @@ public IIterator Where<TAttr>()
/// <summary>
/// Returns a read model for each of the given entity ids.
/// </summary>
/// <param name="ids"></param>
public IEnumerable<TModel> Get<TModel>(IEnumerable<EntityId> ids)
where TModel : IReadModel;


/// <summary>
/// Gets a read model for the given entity id.
/// </summary>
/// <param name="id"></param>
/// <typeparam name="TModel"></typeparam>
/// <returns></returns>
public IEnumerable<TModel> Get<TModel>(IEnumerable<EntityId> ids)
public TModel Get<TModel>(EntityId id)
where TModel : IReadModel;

/// <summary>
/// Gets a read model for every enitity that references the given entity id
/// with the given attribute.
/// </summary>
public IEnumerable<TModel> GetReverse<TAttribute, TModel>(EntityId id)
where TModel : IReadModel
where TAttribute : IAttribute<EntityId>;
}
5 changes: 5 additions & 0 deletions src/NexusMods.EventSourcing.Abstractions/ITransaction.cs
Original file line number Diff line number Diff line change
Expand Up @@ -40,4 +40,9 @@ void Add<TAttribute, TVal>(EntityId entityId, TVal val, bool isAssert = true)
/// Commits the transaction
/// </summary>
ICommitResult Commit();

/// <summary>
/// Gets the temporary id for the transaction
/// </summary>
public TxId ThisTxId { get; }
}
37 changes: 36 additions & 1 deletion src/NexusMods.EventSourcing.Abstractions/Models/AReadModel.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
namespace NexusMods.EventSourcing.Abstractions.Models;
using System;
using System.Collections.Generic;

namespace NexusMods.EventSourcing.Abstractions.Models;

/// <summary>
/// Base class for all read models.
Expand All @@ -23,8 +26,40 @@ internal AReadModel(EntityId id)
Id = id;
}

/// <summary>
/// Retrieves the read model from the database
/// </summary>
/// <param name="tx"></param>
/// <typeparam name="TReadModel"></typeparam>
/// <returns></returns>
/// <exception cref="NotImplementedException"></exception>
protected TReadModel Get<TReadModel>(EntityId entityId)
where TReadModel : AReadModel<TReadModel>, IReadModel
{
return Db.Get<TReadModel>(entityId);
}

/// <summary>
/// Retrieves the matching read models from the database via the specified reverse lookup attribute
/// </summary>
/// <typeparam name="TAttribute"></typeparam>
/// <typeparam name="TReadModel"></typeparam>
/// <returns></returns>
/// <exception cref="NotImplementedException"></exception>
protected IEnumerable<TReadModel> GetReverse<TAttribute, TReadModel>()
where TReadModel : AReadModel<TReadModel>, IReadModel
where TAttribute : ScalarAttribute<TAttribute, EntityId>
{
return Db.GetReverse<TAttribute, TReadModel>(Id);
}

/// <summary>
/// The base identifier for the entity.
/// </summary>
public EntityId Id { get; internal set; }

/// <summary>
/// The database this read model is associated with.
/// </summary>
public IDb Db { get; internal set; } = null!;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
using System;

namespace NexusMods.EventSourcing.Abstractions.Models;

/// <summary>
/// Defines a backwards lookup attribute
/// </summary>
public class ReverseLookupAttribute<TAttribute> : Attribute
where TAttribute : ScalarAttribute<TAttribute, EntityId>
{

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
using System;
using System.Buffers;
using System.Buffers.Binary;
using NexusMods.EventSourcing.Abstractions;

namespace NexusMods.EventSourcing.DatomStore.BuiltInSerializers;

public class EntityIdSerialzer : IValueSerializer<EntityId>
{
public Type NativeType => typeof(EntityId);

public static readonly UInt128 Id = "E2C3185E-C082-4641-B25E-7CEC803A2F48".ToUInt128Guid();
public UInt128 UniqueId => Id;
public int Compare(ReadOnlySpan<byte> a, ReadOnlySpan<byte> b)
{
return BinaryPrimitives.ReadUInt64LittleEndian(a).CompareTo(BinaryPrimitives.ReadUInt64LittleEndian(b));
}

public void Write<TWriter>(EntityId value, TWriter buffer) where TWriter : IBufferWriter<byte>
{
var span = buffer.GetSpan(8);
BinaryPrimitives.WriteUInt64LittleEndian(span, value.Value);
buffer.Advance(8);
}

public int Read(ReadOnlySpan<byte> buffer, out EntityId val)
{
val = EntityId.From(BinaryPrimitives.ReadUInt64LittleEndian(buffer));
return 8;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
using System;
using System.Buffers;
using System.Buffers.Binary;
using NexusMods.EventSourcing.Abstractions;
namespace NexusMods.EventSourcing.DatomStore.BuiltInSerializers;

public class TxIdSerializer : IValueSerializer<TxId>
{
public Type NativeType => typeof(TxId);

public static readonly UInt128 Id = "BB2B2BAF-9AA8-4DB0-8BFC-A0A853ED9BA0".ToUInt128Guid();
public UInt128 UniqueId => Id;
public int Compare(ReadOnlySpan<byte> a, ReadOnlySpan<byte> b)
{
return BinaryPrimitives.ReadUInt64LittleEndian(a).CompareTo(BinaryPrimitives.ReadUInt64LittleEndian(b));
}

public void Write<TWriter>(TxId value, TWriter buffer) where TWriter : IBufferWriter<byte>
{
var span = buffer.GetSpan(8);
BinaryPrimitives.WriteUInt64LittleEndian(span, value.Value);
buffer.Advance(8);
}

public int Read(ReadOnlySpan<byte> buffer, out TxId val)
{
val = TxId.From(BinaryPrimitives.ReadUInt64LittleEndian(buffer));
return 8;
}
}
135 changes: 135 additions & 0 deletions src/NexusMods.EventSourcing.DatomStore/Indexes/AVTEIndex.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Runtime.InteropServices;
using NexusMods.EventSourcing.Abstractions;
using Reloaded.Memory.Extensions;
using RocksDbSharp;

namespace NexusMods.EventSourcing.DatomStore.Indexes;

public class AVTEIndex(AttributeRegistry registry) :
AIndexDefinition<AVTEIndex>(registry, "avte"), IComparatorIndex<AVTEIndex>
{
public static unsafe int Compare(AIndexDefinition<AVTEIndex> idx, KeyHeader* a, uint aLength, KeyHeader* b, uint bLength)
{
// Attribute, Value, TX, Entity
var cmp = KeyHeader.CompareAttribute(a, b);
if (cmp != 0) return cmp;
cmp = KeyHeader.CompareValues(idx.Registry, a, aLength, b, bLength);
if (cmp != 0) return cmp;
cmp = KeyHeader.CompareTx(a, b);
if (cmp != 0) return cmp;
cmp = KeyHeader.CompareEntity(a, b);
if (cmp != 0) return cmp;
return KeyHeader.CompareIsAssert(a, b);
}

public unsafe struct AVTEIterator : IDisposable
{
private readonly KeyHeader* _key;
private KeyHeader* _current;
private UIntPtr _currentLength;
private readonly Iterator _iterator;
private readonly AttributeRegistry _registry;
private bool _needsSeek;

public AVTEIterator(ulong txId, AttributeRegistry registry, AVTEIndex idx)
{
_registry = registry;
_iterator = idx.Db.NewIterator(idx.ColumnFamilyHandle);
_key = (KeyHeader*)Marshal.AllocHGlobal(KeyHeader.Size);
_key->Entity = ulong.MaxValue;
_key->AttributeId = ulong.MaxValue;
_key->Tx = txId;
_key->IsAssert = true;
_needsSeek = true;
}


public void Set<TAttribute>() where TAttribute : IAttribute
{
_key->Entity = ulong.MaxValue;
_key->AttributeId = _registry.GetAttributeId<TAttribute>();
_needsSeek = true;
}

public IDatom Current
{
get
{
Debug.Assert(!_needsSeek, "Must call Next() before accessing Current");
var currentValue = new ReadOnlySpan<byte>((byte*)_current + KeyHeader.Size, (int)_currentLength - KeyHeader.Size);
return _registry.ReadDatom(ref *_current, currentValue);
}
}

public EntityId EntityId => EntityId.From(_current->Entity);

public TValue GetValue<TAttribute, TValue>()
where TAttribute : IAttribute<TValue>
{
Debug.Assert(!_needsSeek, "Must call Next() before accessing GetValue");
var currentValue = new ReadOnlySpan<byte>((byte*)_current + KeyHeader.Size, (int)_currentLength - KeyHeader.Size);
return _registry.ReadValue<TAttribute, TValue>(ref *_current, currentValue);

}

public ulong AttributeId
{
get
{
Debug.Assert(!_needsSeek, "Must call Next() before accessing AttributeId");
return _current->AttributeId;
}
}

public ReadOnlySpan<byte> ValueSpan => _iterator.GetKeySpan().SliceFast(KeyHeader.Size);

public bool Next()
{
if (_needsSeek)
{
_iterator.SeekForPrev((byte*)_key, KeyHeader.Size);
_needsSeek = false;
}
else
{
_key->Entity = _current->Entity - 1;
_iterator.Prev();
}

while (true)
{

if (!_iterator.Valid()) return false;

_current = (KeyHeader*)Native.Instance.rocksdb_iter_key(_iterator.Handle, out _currentLength);

Debug.Assert(_currentLength < KeyHeader.Size, "Key length is less than KeyHeader.Size");

if (_current->AttributeId != _key->AttributeId)
return false;

if (_current->Tx > _key->Tx)
{
_iterator.Prev();
continue;
}

if (_current->Entity > _key->Entity)
{
_iterator.Prev();
continue;
}

return true;
}
}
public void Dispose()
{
_iterator.Dispose();
Marshal.FreeHGlobal((IntPtr)_key);
}
}
}
14 changes: 14 additions & 0 deletions src/NexusMods.EventSourcing.DatomStore/RocksDBDatomStore.cs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ public class RocksDBDatomStore : IDatomStore
private ulong _tx;
private readonly AETVIndex _avetIndex;
private readonly EATVIndex _eatvIndex;
private readonly AVTEIndex _avteIndex;

public RocksDBDatomStore(ILogger<RocksDBDatomStore> logger, AttributeRegistry registry, DatomStoreSettings settings)
{
Expand All @@ -42,6 +43,8 @@ public RocksDBDatomStore(ILogger<RocksDBDatomStore> logger, AttributeRegistry re
_eatvIndex.Init(_db);
_avetIndex = new AETVIndex(_registry);
_avetIndex.Init(_db);
_avteIndex = new AVTEIndex(_registry);
_avteIndex.Init(_db);

_pooledWriter = new PooledMemoryBufferWriter(128);

Expand All @@ -68,6 +71,7 @@ private void Serialize<TAttr, TVal>(WriteBatch batch, ulong e, TVal val, ulong t
var span = _pooledWriter.GetWrittenSpan();
_eatvIndex.Put(batch, span);
_avetIndex.Put(batch, span);
_avteIndex.Put(batch, span);
}

private struct TransactSink(RocksDBDatomStore store, WriteBatch batch, ulong tx) : IDatomSink
Expand Down Expand Up @@ -130,6 +134,16 @@ public Expression GetValueReadExpression(Type attribute, Expression valueSpan, o
return _registry.GetReadExpression(attribute, valueSpan, out attributeId);
}

public IEnumerable<EntityId> ReverseLookup<TAttribute>(TxId txId) where TAttribute : IAttribute<EntityId>
{
using var iterator = new AVTEIndex.AVTEIterator(txId.Value, _registry, _avteIndex);
iterator.Set<TAttribute>();
while (iterator.Next())
{
yield return iterator.EntityId;
}
}

public void Dispose()
{
_db.Dispose();
Expand Down
4 changes: 3 additions & 1 deletion src/NexusMods.EventSourcing.DatomStore/Services.cs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,9 @@ public static IServiceCollection AddDatomStore(this IServiceCollection services)
.AddValueSerializer<BoolSerializer>()
.AddValueSerializer<SymbolSerializer>()
.AddValueSerializer<StringSerializer>()
.AddValueSerializer<UInt64Serializer>();
.AddValueSerializer<UInt64Serializer>()
.AddValueSerializer<EntityIdSerialzer>()
.AddValueSerializer<TxIdSerializer>();
return services;
}

Expand Down
Loading

0 comments on commit 257dc3d

Please sign in to comment.