Skip to content

Commit

Permalink
Tests pass again
Browse files Browse the repository at this point in the history
  • Loading branch information
halgari committed Jan 18, 2024
1 parent 3930d99 commit 8425515
Show file tree
Hide file tree
Showing 4 changed files with 88 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ public static IServiceCollection AddEntity<T>(this IServiceCollection collection
throw new ArgumentException($"Entity type {type.Name} does not have an EntityAttribute.");
}

EntityStructureRegistry.Register(IEntity.EntityIdAttribute);

foreach (var attributeDefinition in type.GetFields(BindingFlags.NonPublic | BindingFlags.Public | BindingFlags.Static))
{
if (attributeDefinition.GetValue(null) is not IAttribute attributeInstance)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ public static class EntityStructureRegistry
private static readonly ConcurrentDictionary<Type, EntityDefinition> _entityDefinitionsByType = new();
private static readonly ConcurrentDictionary<UInt128, EntityDefinition> _entityDefinitionsByUUID = new();


/// <summary>
/// Register an attribute in the global registry.
/// </summary>
Expand Down
86 changes: 82 additions & 4 deletions src/NexusMods.EventSourcing.RocksDB/RocksDBEventStore.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
using System.Buffers;
using System.Buffers.Binary;
using System.Collections.Generic;
using System.Linq;
using NexusMods.EventSourcing.Abstractions;
using NexusMods.EventSourcing.Abstractions.Serialization;
using NexusMods.EventSourcing.Serialization;
Expand All @@ -17,10 +18,10 @@ public sealed class RocksDBEventStore<TSerializer> : AEventStore
private readonly RocksDb _db;
private TransactionId _tx;
private readonly ColumnFamilyHandle _eventsColumn;
private readonly ColumnFamilyHandle _entityIndexColumn;
private readonly TSerializer _serializer;
private readonly SpanDeserializer<TSerializer> _deserializer;
private readonly ColumnFamilyHandle _snapshotColumn;
private readonly Dictionary<IIndexableAttribute,ColumnFamilyHandle> _indexColumns;

public RocksDBEventStore(TSerializer serializer, Settings settings, ISerializationRegistry serializationRegistry) : base(serializationRegistry)
{
Expand All @@ -32,8 +33,17 @@ public RocksDBEventStore(TSerializer serializer, Settings settings, ISerializati
options.SetCreateIfMissing();
_db = RocksDb.Open(options,
settings.StorageLocation.ToString(), new ColumnFamilies());

var indexableAttributes = EntityStructureRegistry.AllIndexableAttributes();

_indexColumns = new Dictionary<IIndexableAttribute, ColumnFamilyHandle>();
foreach (var attr in indexableAttributes.Distinct())
{
_indexColumns.Add(attr,
_db.CreateColumnFamily(new ColumnFamilyOptions(), "index_" + attr.IndexedAttributeId.ToString("X")));
}

_eventsColumn = _db.CreateColumnFamily(new ColumnFamilyOptions(), "events");
_entityIndexColumn = _db.CreateColumnFamily(new ColumnFamilyOptions(), "entityIndex");
_snapshotColumn = _db.CreateColumnFamily(new ColumnFamilyOptions(), "snapshots");
_tx = TransactionId.From(0);

Expand All @@ -42,13 +52,81 @@ public RocksDBEventStore(TSerializer serializer, Settings settings, ISerializati

public override TransactionId Add<T>(T eventEntity, (IIndexableAttribute, IAccumulator)[] indexed)
{
throw new NotImplementedException();
_tx = _tx.Next();

var eventSpan = _serializer.Serialize(eventEntity);

Span<byte> txIdSpan = stackalloc byte[8];
BinaryPrimitives.WriteUInt64BigEndian(txIdSpan, _tx.Value);

_db.Put(txIdSpan, eventSpan, _eventsColumn);

foreach (var (attr, accumulator) in indexed)
{
PutIndex(attr, accumulator, _tx);
}

return _tx;
}

private void PutIndex(IIndexableAttribute attr, IAccumulator accumulator, TransactionId txId)
{
var valueSize = attr.SpanSize();

Span<byte> keySpan = stackalloc byte[valueSize + 8];
attr.WriteTo(keySpan.SliceFast(0, valueSize), accumulator);
BinaryPrimitives.WriteUInt64BigEndian(keySpan.SliceFast(valueSize), txId.Value);

_db.Put(keySpan, ReadOnlySpan<byte>.Empty, _indexColumns[attr]);
}

public override void EventsForIndex<TIngester, TVal>(IIndexableAttribute<TVal> attr, TVal value, TIngester ingester, TransactionId fromTx,
TransactionId toTx)
{
throw new NotImplementedException();
var valueSize = attr.SpanSize();

Span<byte> startKey = stackalloc byte[valueSize + 8];
Span<byte> endKey = stackalloc byte[valueSize + 8];

attr.WriteTo(startKey.SliceFast(0, valueSize), value);
BinaryPrimitives.WriteUInt64BigEndian(startKey.SliceFast(valueSize), fromTx.Value);

attr.WriteTo(endKey.SliceFast(0, valueSize), value);
BinaryPrimitives.WriteUInt64BigEndian(endKey.SliceFast(valueSize), toTx.Value == TransactionId.Max ? toTx.Value : toTx.Value + 1);

var options = new ReadOptions();
unsafe
{
fixed (byte* startKeyPtr = startKey)
{
fixed (byte* endKeyPtr = endKey)
{
options.SetIterateUpperBound(endKeyPtr, (ulong)(valueSize + 8));
options.SetIterateLowerBound(startKeyPtr, (ulong)(valueSize + 8));

using var iterator = _db.NewIterator(_indexColumns[attr], options);

iterator.SeekToFirst();

while (iterator.Valid())
{

var keySpan = iterator.GetKeySpan();

var txSpan = keySpan.SliceFast(valueSize);
var txId = BinaryPrimitives.ReadUInt64BigEndian(txSpan);

var @event = _db.Get(txSpan, _deserializer, _eventsColumn);
if (!ingester.Ingest(TransactionId.From(txId), @event))
break;

iterator.Next();
}
}
}
}


}

public override TransactionId GetSnapshot(TransactionId asOf, EntityId entityId, out IAccumulator loadedDefinition,
Expand Down
3 changes: 3 additions & 0 deletions tests/NexusMods.EventSourcing.TestModel/InMemoryEventStore.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
using System.Buffers;
using System.Buffers.Binary;
using DynamicData;
using NexusMods.EventSourcing.Abstractions;
using NexusMods.EventSourcing.Abstractions.Serialization;
using NexusMods.Hashing.xxHash64;
Expand All @@ -18,6 +19,8 @@ public class InMemoryEventStore<TSerializer> : AEventStore
public InMemoryEventStore(TSerializer serializer, ISerializationRegistry serializationRegistry) : base(serializationRegistry)
{
_serializer = serializer;
// Make the first item 0 so we don't ever issue a TX Id of 0
_events.Add(Array.Empty<byte>());
}

public override TransactionId Add<T>(T entity, (IIndexableAttribute, IAccumulator)[] indexed)
Expand Down

0 comments on commit 8425515

Please sign in to comment.