Skip to content

Commit

Permalink
Finish renaming chunk->node
Browse files Browse the repository at this point in the history
  • Loading branch information
halgari committed Mar 7, 2024
1 parent b16aa81 commit bc1b4b5
Show file tree
Hide file tree
Showing 25 changed files with 113 additions and 118 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,13 @@
namespace NexusMods.EventSourcing.Storage.Benchmarks;

[MemoryDiagnoser]
public class AppendableChunkBenchmarks
public class AppendableNodeBenchmarks
{
private readonly IServiceProvider _services;
private readonly AttributeRegistry _registry;
private AppendableNode _node = null!;

public AppendableChunkBenchmarks()
public AppendableNodeBenchmarks()
{
var host = Host.CreateDefaultBuilder()
.ConfigureServices(s =>
Expand Down Expand Up @@ -73,7 +73,7 @@ public void IterationSetup()
public ulong EntityCount { get; set; }

[Benchmark]
public void SortChunk()
public void SortNode()
{
var comparator = new EATV(_registry);
_node.Sort(comparator);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ namespace NexusMods.EventSourcing.Storage.Benchmarks;

public class IndexBenchmarks : AStorageBenchmark
{
private List<AppendableNode> _chunks = null!;
private List<AppendableNode> _nodes = null!;
private IDatomComparator _sorter = null!;
private IDataNode _preBuilt = null!;
private Datom _midPoint;
Expand All @@ -28,17 +28,17 @@ public class IndexBenchmarks : AStorageBenchmark
[GlobalSetup]
public void GlobalSetup()
{
_chunks = new List<AppendableNode>();
_nodes = new List<AppendableNode>();

for (ulong chunk = 0; chunk < TxCount; chunk++)
for (ulong node = 0; node < TxCount; node++)
{
_chunks.Add(new AppendableNode());
_nodes.Add(new AppendableNode());
}

var emitters = new Action<EntityId, TxId, ulong>[]
{
(e, tx, v) => _registry.Append<TestAttributes.FileHash, ulong>(_chunks[(int)tx.Value], e, tx, DatomFlags.Added, v),
(e, tx, v) => _registry.Append<TestAttributes.FileName, string>(_chunks[(int)tx.Value], e, tx, DatomFlags.Added, "file " + v),
(e, tx, v) => _registry.Append<TestAttributes.FileHash, ulong>(_nodes[(int)tx.Value], e, tx, DatomFlags.Added, v),
(e, tx, v) => _registry.Append<TestAttributes.FileName, string>(_nodes[(int)tx.Value], e, tx, DatomFlags.Added, "file " + v),
};

for (ulong e = 0; e < Count; e++)
Expand All @@ -54,9 +54,9 @@ public void GlobalSetup()

_sorter = _registry.CreateComparator(SortOrder);

foreach (var chunk in _chunks)
foreach (var node in _nodes)
{
chunk.Sort(_sorter);
node.Sort(_sorter);
}

_preBuilt = IndexAll().Flush(NodeStore);
Expand All @@ -67,9 +67,9 @@ public void GlobalSetup()
public AppendableIndexNode IndexAll()
{
var index = new AppendableIndexNode(_sorter);
foreach (var chunk in _chunks)
foreach (var node in _nodes)
{
index = index.Ingest(chunk);
index = index.Ingest(node);
}

return index;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,22 +29,22 @@ public void GlobalSetup()

var emitters = new Action<AppendableNode, EntityId, TxId, ulong>[]
{
(chunk, e, tx, v) => _registry.Append<TestAttributes.FileHash, ulong>(chunk, e, tx, DatomFlags.Added, v),
(chunk, e, tx, v) => _registry.Append<TestAttributes.FileName, string>(chunk, e, tx, DatomFlags.Added, "file " + v),
(node, e, tx, v) => _registry.Append<TestAttributes.FileHash, ulong>(node, e, tx, DatomFlags.Added, v),
(node, e, tx, v) => _registry.Append<TestAttributes.FileName, string>(node, e, tx, DatomFlags.Added, "file " + v),
};

for (ulong tx = 0; tx < TxCount; tx++)
{
var chunk = new AppendableNode();
var node = new AppendableNode();
for (ulong e = 0; e < Count; e++)
{
for (var a = 0; a < 2; a++)
{
emitters[a](chunk, EntityId.From(e), TxId.From(tx), tx);
emitters[a](node, EntityId.From(e), TxId.From(tx), tx);
}
}
chunk.Sort(_sorter);
_index = _index.Ingest(chunk);
node.Sort(_sorter);
_index = _index.Ingest(node);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,12 @@ public class SortBenchmarks : AStorageBenchmark
[GlobalSetup]
public void GlobalSetup()
{
var chunk = new AppendableNode();
var node = new AppendableNode();

var emitters = new Action<EntityId, TxId, ulong>[]
{
(e, tx, v) => _registry.Append<TestAttributes.FileHash, ulong>(chunk, e, tx, DatomFlags.Added, v),
(e, tx, v) => _registry.Append<TestAttributes.FileName, string>(chunk, e, tx, DatomFlags.Added, "file " + v),
(e, tx, v) => _registry.Append<TestAttributes.FileHash, ulong>(node, e, tx, DatomFlags.Added, v),
(e, tx, v) => _registry.Append<TestAttributes.FileName, string>(node, e, tx, DatomFlags.Added, "file " + v),
};

for (ulong e = 0; e < Count; e++)
Expand All @@ -43,7 +43,7 @@ public void GlobalSetup()
}
}

_node = chunk;
_node = node;

}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ namespace NexusMods.EventSourcing.Storage.Algorithms;

public static class BinarySearch
{
public static int SeekEqualOrLess<TChunk, TComparator>(TChunk node, TComparator comparator, int start, int end, in Datom target)
where TChunk : IDataNode
public static int SeekEqualOrLess<TNode, TComparator>(TNode node, TComparator comparator, int start, int end, in Datom target)
where TNode : IDataNode
where TComparator : IDatomComparator
{
while (start < end)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@

namespace NexusMods.EventSourcing.Storage.Algorithms;

public static class ChunkReader
public static class NodeReader
{
public static IDataNode ReadDataChunk(ReadOnlyMemory<byte> data)
public static IDataNode ReadDataNode(ReadOnlyMemory<byte> data)
{
var reader = new BufferReader(data);
var header = reader.ReadFourCC();
Expand Down
16 changes: 8 additions & 8 deletions src/NexusMods.EventSourcing.Storage/Algorithms/SortedMerge.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ public static AppendableNode Merge<TNodeA, TNodeB, TComparator>(TNodeA a, TNodeB
where TNodeB : IDataNode
where TComparator : IDatomComparator
{
var newChunk = new AppendableNode();
var newNode = new AppendableNode();

int i = 0, j = 0;

Expand All @@ -23,37 +23,37 @@ public static AppendableNode Merge<TNodeA, TNodeB, TComparator>(TNodeA a, TNodeB
var cmp = comparator.Compare(aDatom, bDatom);
if (cmp < 0)
{
newChunk.Append(aDatom);
newNode.Append(aDatom);
i++;
}
else if (cmp > 0)
{
newChunk.Append(bDatom);
newNode.Append(bDatom);
j++;
}
else
{
newChunk.Append(aDatom);
newChunk.Append(bDatom);
newNode.Append(aDatom);
newNode.Append(bDatom);
i++;
j++;
}
}

while (i < a.Length)
{
newChunk.Append(a[i]);
newNode.Append(a[i]);
i++;
}


while (j < b.Length)
{
newChunk.Append(b[j]);
newNode.Append(b[j]);
j++;
}

return newChunk;
return newNode;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -78,10 +78,10 @@ public void Initialize(IEnumerable<ReadOnlyMemory<byte>> select)
}
}

public static AppendableBlobColumn UnpackFrom(IBlobColumn indexChunkValues)
public static AppendableBlobColumn UnpackFrom(IBlobColumn indexNodeValues)
{
var newColumn = new AppendableBlobColumn();
foreach (var value in indexChunkValues)
foreach (var value in indexNodeValues)
{
newColumn.Append(value.Span);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ public class UnsignedIntegerColumn<T> : IAppendableColumn<T>, IUnpackedColumn<T>
private uint _length;
private T[] _data;

public UnsignedIntegerColumn(uint initialLength = RawDataChunk.DefaultChunkSize)
public UnsignedIntegerColumn(uint initialLength = RawDataNode.DefaultNodeSize)
{
_length = 0;
_data = GC.AllocateUninitializedArray<T>((int)initialLength);
Expand Down
20 changes: 10 additions & 10 deletions src/NexusMods.EventSourcing.Storage/DatomStore.cs
Original file line number Diff line number Diff line change
Expand Up @@ -71,11 +71,11 @@ private async Task ConsumeTransactions()

_logger.LogDebug("Processing transaction with {DatomCount} datoms", pendingTransaction.Data.Length);
sw.Restart();
Log(pendingTransaction, out var chunk);
Log(pendingTransaction, out var node);

await UpdateInMemoryIndexes(chunk, pendingTransaction.AssignedTxId!.Value);
await UpdateInMemoryIndexes(node, pendingTransaction.AssignedTxId!.Value);

_updatesSubject.OnNext((pendingTransaction.AssignedTxId.Value, chunk));
_updatesSubject.OnNext((pendingTransaction.AssignedTxId.Value, node));
pendingTransaction.CompletionSource.SetResult(pendingTransaction.AssignedTxId.Value);
_logger.LogDebug("Transaction {TxId} processed in {Elapsed}ms, new in-memory size is {Count} datoms", pendingTransaction.AssignedTxId!.Value, sw.ElapsedMilliseconds, _indexes.InMemorySize);
}
Expand Down Expand Up @@ -383,9 +383,9 @@ private IEnumerable<Datom> ReverseLookupForIndex<TAttribute>(TxId txId, EntityId

private void Log(PendingTransaction pendingTransaction, out AppendableNode node)
{
var newChunk = new AppendableNode();
var newNode = new AppendableNode();
foreach (var datom in pendingTransaction.Data)
datom.Append(_registry, newChunk);
datom.Append(_registry, newNode);

var nextTxBlock = _nodeStore.GetNextTx();

Expand Down Expand Up @@ -419,13 +419,13 @@ EntityId MaybeRemap(EntityId id)
return id;
}

newChunk.SetTx(nextTx);
newChunk.RemapEntities(MaybeRemap, _registry);
newNode.SetTx(nextTx);
newNode.RemapEntities(MaybeRemap, _registry);

newChunk.Sort(_comparatorTxLog);
newNode.Sort(_comparatorTxLog);

node = newChunk;
var newTxBlock = _nodeStore.LogTx(newChunk.Pack());
node = newNode;
var newTxBlock = _nodeStore.LogTx(newNode.Pack());
Debug.Assert(newTxBlock.Value == nextTxBlock.Value, "newTxBlock == nextTxBlock");
pendingTransaction.AssignedTxId = nextTx;

Expand Down
2 changes: 1 addition & 1 deletion src/NexusMods.EventSourcing.Storage/FourCC.cs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ public static FourCC From(ReadOnlySpan<byte> s)
return result;
}

#region Chunk Types
#region Node Types

public static readonly FourCC PackedData = From("PDAT");
public static readonly FourCC PackedIndex = From("PIDX");
Expand Down
6 changes: 3 additions & 3 deletions src/NexusMods.EventSourcing.Storage/NodeStore.cs
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,8 @@ public IDataNode Flush(IDataNode node)
{
return node switch
{
PackedNode packedChunk => Flush(packedChunk),
PackedIndexNode packedIndexChunk => Flush(packedIndexChunk),
PackedNode packedNode => Flush(packedNode),
PackedIndexNode packedIndexNode => Flush(packedIndexNode),
_ => throw new NotImplementedException("Unknown node type. " + node.GetType().Name)
};
}
Expand All @@ -56,7 +56,7 @@ public IDataNode Flush(IIndexNode node)
{
return node switch
{
PackedIndexNode packedIndexChunk => Flush(packedIndexChunk),
PackedIndexNode packedIndexNode => Flush(packedIndexNode),
_ => throw new NotImplementedException("Unknown node type. " + node.GetType().Name)
};
}
Expand Down
16 changes: 8 additions & 8 deletions src/NexusMods.EventSourcing.Storage/Nodes/AppendableIndexNode.cs
Original file line number Diff line number Diff line change
Expand Up @@ -130,9 +130,9 @@ public override IDataNode Flush(INodeStore store)
for (var i = 0; i < _children.Count; i++)
{
var child = _children[i];
if (child is AppendableNode appendableChunk)
if (child is AppendableNode appendableNode)
{
var packedChild = appendableChunk.Pack();
var packedChild = appendableNode.Pack();
_children[i] = store.Flush(packedChild);
}
}
Expand Down Expand Up @@ -198,17 +198,17 @@ public AppendableIndexNode Ingest(IDataNode node)

var newChildren = new List<IDataNode>(_children.Count);

void MaybeSplit(AppendableNode chunk)
void MaybeSplit(AppendableNode node)
{
if (chunk.Length > Configuration.DataBlockSize * 2)
if (node.Length > Configuration.DataBlockSize * 2)
{
var (a, b) = chunk.Split();
var (a, b) = node.Split();
MaybeSplit(a);
MaybeSplit(b);
}
else
{
newChildren.Add(chunk);
newChildren.Add(node);
}

}
Expand Down Expand Up @@ -246,8 +246,8 @@ void MaybeSplit(AppendableNode chunk)

}

public AppendableNode Merge<TChunk, TEnumerable>(TChunk child, TEnumerable datoms)
where TChunk : IDataNode
public AppendableNode Merge<TNode, TEnumerable>(TNode child, TEnumerable datoms)
where TNode : IDataNode
where TEnumerable : IEnumerable<Datom>
{
return AppendableNode.Initialize(child.Merge(datoms, _comparator));
Expand Down
6 changes: 3 additions & 3 deletions src/NexusMods.EventSourcing.Storage/Nodes/AppendableNode.cs
Original file line number Diff line number Diff line change
Expand Up @@ -224,12 +224,12 @@ public override IEnumerator<Datom> GetEnumerator()
/// </summary>
public static AppendableNode Initialize(IEnumerable<Datom> datoms)
{
var chunk = new AppendableNode();
var node = new AppendableNode();
foreach (var datom in datoms)
{
chunk.Append(datom);
node.Append(datom);
}
return chunk;
return node;
}

public (AppendableNode A, AppendableNode B) Split()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,12 @@ namespace NexusMods.EventSourcing.Storage.Nodes;

public static class DataNodeExtensions
{
public static IEnumerable<Datom> Range<TChunkA>(this TChunkA chunk, int start, int end)
where TChunkA : IDataNode
public static IEnumerable<Datom> Range<TNodeA>(this TNodeA node, int start, int end)
where TNodeA : IDataNode
{
for (var i = start; i < end; i++)
{
yield return chunk[i];
yield return node[i];
}
}

Expand Down
Loading

0 comments on commit bc1b4b5

Please sign in to comment.