Skip to content

Commit

Permalink
Rework schema updating and inserting
Browse files Browse the repository at this point in the history
  • Loading branch information
halgari committed Oct 14, 2024
1 parent cda5d38 commit 7ae9a5a
Show file tree
Hide file tree
Showing 7 changed files with 275 additions and 43 deletions.
38 changes: 37 additions & 1 deletion src/NexusMods.MnemonicDB.Abstractions/AttributeCache.cs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ public sealed class AttributeCache
private BitArray _isReference;
private BitArray _isIndexed;
private Symbol[] _symbols;
private ValueTag[] _valueTags;
private BitArray _isNoHistory;

public AttributeCache()
Expand All @@ -29,12 +30,14 @@ public AttributeCache()
_isIndexed = new BitArray(maxId);
_isNoHistory = new BitArray(maxId);
_symbols = new Symbol[maxId];
_valueTags = new ValueTag[maxId];

foreach (var kv in AttributeDefinition.HardcodedIds)
{
_attributeIdsBySymbol[kv.Key.Id] = AttributeId.From(kv.Value);
_isIndexed[kv.Value] = kv.Key.IsIndexed;
_symbols[kv.Value] = kv.Key.Id;
_valueTags[kv.Value] = kv.Key.LowLevelType;
}

}
Expand Down Expand Up @@ -104,7 +107,16 @@ public void Reset(IDb db)
newIsCardinalityMany[(int)id] = AttributeDefinition.Cardinality.ReadValue(datom.ValueSpan, datom.Prefix.ValueTag, null!) == Cardinality.Many;
}
_isCardinalityMany = newIsCardinalityMany;


var valueTags = db.Datoms(AttributeDefinition.ValueType);
var newValueTags = new ValueTag[maxIndex];
foreach (var datom in valueTags)
{
var id = datom.E.Value;
var type = AttributeDefinition.ValueType.ReadValue(datom.ValueSpan, datom.Prefix.ValueTag, null!);
newValueTags[id] = type;
}
_valueTags = newValueTags;
}

/// <summary>
Expand Down Expand Up @@ -155,4 +167,28 @@ public Symbol GetSymbol(AttributeId id)
{
return _symbols[id.Value];
}

/// <summary>
/// Returns true if the attribute is defined in the database.
/// </summary>
public bool Has(Symbol attribute)
{
return _attributeIdsBySymbol.ContainsKey(attribute);
}

/// <summary>
/// Try to get the AttributeId for the given attribute name
/// </summary>
public bool TryGetAttributeId(Symbol attribute, out AttributeId id)
{
return _attributeIdsBySymbol.TryGetValue(attribute, out id);
}

/// <summary>
/// Return the value tag type for the given attribute id
/// </summary>
public ValueTag GetValueTag(AttributeId aid)
{
return _valueTags[aid.Value];
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -162,11 +162,11 @@ public static SliceDescriptor Create(AttributeId referenceAttribute, EntityId po
/// Creates a slice descriptor for the given attribute from the current AEVT index
/// reverse lookup.
/// </summary>
public static SliceDescriptor Create(AttributeId referenceAttribute)
public static SliceDescriptor Create(AttributeId referenceAttribute, IndexType indexType = IndexType.AEVTCurrent)
{
return new SliceDescriptor
{
Index = IndexType.AEVTCurrent,
Index = indexType,
From = Datom(EntityId.MinValueNoPartition, referenceAttribute, TxId.MinValue, false),
To = Datom(EntityId.MaxValueNoPartition, referenceAttribute, TxId.MaxValue, false)
};
Expand Down
20 changes: 20 additions & 0 deletions src/NexusMods.MnemonicDB.Abstractions/Serializer.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
using System;
using System.Buffers;
using System.Collections.Generic;
using System.IO.Hashing;
using System.Runtime.CompilerServices;
using System.Runtime.InteropServices;
Expand Down Expand Up @@ -396,5 +397,24 @@ public static void Remap(this ValueTag tag, Span<byte> span, Func<EntityId, Enti
}
#endregion

#region ValueConversion

public static void ConvertValue<TWriter>(this ValueTag srcTag, ReadOnlySpan<byte> srcSpan, ValueTag destTag, TWriter destWriter)
where TWriter : IBufferWriter<byte>
{

switch (srcTag, destTag)
{
case (ValueTag.UInt8, ValueTag.UInt16):
WriteUnmanaged((ushort)MemoryMarshal.Read<byte>(srcSpan), destWriter);
break;

default:
throw new NotSupportedException("Conversion not supported from " + srcTag + " to " + destTag);
}
}

#endregion

}

43 changes: 4 additions & 39 deletions src/NexusMods.MnemonicDB/Connection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -144,43 +144,7 @@ public async Task<ICommitResult> Excise(EntityId[] entityIds)

/// <inheritdoc />
public IObservable<IDb> Revisions => _dbStream;

private void AddMissingAttributes()
{
var declaredAttributes = AttributeResolver.DefinedAttributes;
var existing = AttributeCache.AllAttributeIds.ToHashSet();

if (existing.Count == 0)
throw new AggregateException(
"No attributes found in the database, something went wrong, as it should have been bootstrapped by now");

var missing = declaredAttributes.Where(a => !existing.Contains(a.Id)).ToArray();
if (missing.Length == 0)
{
// No changes to make to the schema, we can return early
return;
}

var attrId = existing.Select(sym => AttributeCache.GetAttributeId(sym)).Max().Value;
using var builder = new IndexSegmentBuilder(AttributeCache);
foreach (var attr in missing.OrderBy(e => e.Id.Id))
{
var id = EntityId.From(++attrId);
builder.Add(id, AttributeDefinition.UniqueId, attr.Id);
builder.Add(id, AttributeDefinition.ValueType, attr.LowLevelType);
if (attr.IsIndexed)
builder.Add(id, AttributeDefinition.Indexed, Null.Instance);
builder.Add(id, AttributeDefinition.Cardinality, attr.Cardinalty);
if (attr.NoHistory)
builder.Add(id, AttributeDefinition.NoHistory, Null.Instance);
if (attr.DeclaredOptional)
builder.Add(id, AttributeDefinition.Optional, Null.Instance);
}

var (_, db) = _store.Transact(new IndexSegmentTransaction(builder.Build()));
AttributeCache.Reset(db);
}


internal async Task<ICommitResult> Transact(IInternalTxFunction fn)
{
StoreResult newTx;
Expand All @@ -203,8 +167,9 @@ private void Bootstrap()
};
AttributeCache.Reset(initialDb);

AddMissingAttributes();

var declaredAttributes = AttributeResolver.DefinedAttributes.OrderBy(a => a.Id.Id).ToArray();
_store.Transact(new SchemaMigration(declaredAttributes));

_dbStreamDisposable = ProcessUpdates(_store.TxLog)
.Subscribe(itm => _dbStream.OnNext(itm));
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
using NexusMods.MnemonicDB.Abstractions;
using NexusMods.MnemonicDB.Abstractions.ElementComparers;
using NexusMods.MnemonicDB.Abstractions.IndexSegments;
using NexusMods.MnemonicDB.Abstractions.Query;
using NexusMods.MnemonicDB.Storage;
using NexusMods.MnemonicDB.Storage.Abstractions;

namespace NexusMods.MnemonicDB.InternalTxFunctions.Migrations;

internal static class AddIndex
{

}
198 changes: 198 additions & 0 deletions src/NexusMods.MnemonicDB/InternalTxFunctions/SchemaMigration.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,198 @@
using System.Threading;
using Microsoft.Extensions.Logging;
using NexusMods.MnemonicDB.Abstractions;
using NexusMods.MnemonicDB.Abstractions.BuiltInEntities;
using NexusMods.MnemonicDB.Abstractions.DatomIterators;
using NexusMods.MnemonicDB.Abstractions.ElementComparers;
using NexusMods.MnemonicDB.Abstractions.IndexSegments;
using NexusMods.MnemonicDB.Abstractions.Query;
using NexusMods.MnemonicDB.Storage;
using NexusMods.MnemonicDB.Storage.Abstractions;

namespace NexusMods.MnemonicDB.InternalTxFunctions;

internal class SchemaMigration : AInternalFn
{
private readonly IAttribute[] _declaredAttributes;
private ulong _tempId = PartitionId.Temp.MakeEntityId(1).Value;

public SchemaMigration(IAttribute[] attributes)
{
_declaredAttributes = attributes;
}


/// <inhertdoc />
public EntityId TempId(PartitionId entityPartition)
{
var tempId = Interlocked.Increment(ref _tempId);
// Add the partition to the id
var actualId = ((ulong)entityPartition << 40) | tempId;
return EntityId.From(actualId);
}

public override void Execute(DatomStore store)
{
var batch = store.Backend.CreateBatch();
var cache = store.AttributeCache;
using var builder = new IndexSegmentBuilder(cache);
var madeChanges = false;
foreach (var attribute in _declaredAttributes)
{
if (!cache.TryGetAttributeId(attribute.Id, out var aid))
{
madeChanges = true;
AddAttribute(attribute, builder);
continue;
}

if (cache.IsIndexed(aid) != attribute.IsIndexed)
{
if (attribute.IsIndexed)
AddIndex(store, aid, batch);
else
RemoveIndex(store, aid, batch);
madeChanges = true;
}

if (cache.GetValueTag(aid) != attribute.LowLevelType)
{
store.Logger.LogInformation("Converting values for attribute {0} from {1} to {2}", attribute.Id, cache.GetValueTag(aid), attribute.ValueType);
ConvertValuesTo(store, attribute.IsIndexed, aid, batch, attribute.LowLevelType);
madeChanges = true;
}
}

if (!madeChanges)
return;

var built = builder.Build();
store.LogDatoms(built);
store.AttributeCache.Reset(new Db(store.CurrentSnapshot, store.AsOfTxId, store.AttributeCache));
}


private void AddAttribute(IAttribute definition, in IndexSegmentBuilder builder)
{
var id = TempId(PartitionId.Attribute);
builder.Add(id, AttributeDefinition.UniqueId, definition.Id);
builder.Add(id, AttributeDefinition.ValueType, definition.LowLevelType);
builder.Add(id, AttributeDefinition.Cardinality, definition.Cardinalty);

if (definition.IsIndexed)
builder.Add(id, AttributeDefinition.Indexed, Null.Instance);

if (definition.DeclaredOptional)
builder.Add(id, AttributeDefinition.Optional, Null.Instance);

if (definition.NoHistory)
builder.Add(id, AttributeDefinition.NoHistory, Null.Instance);
}

/// <summary>
/// Remove add indexed datoms for a specific attribute
/// </summary>
internal static void AddIndex(DatomStore store, AttributeId id, IWriteBatch batch)
{
foreach (var datom in store.CurrentSnapshot.Datoms(SliceDescriptor.Create(id, IndexType.AEVTCurrent)))
{
store.AVETCurrent.Put(batch, datom);
}

foreach (var datom in store.CurrentSnapshot.Datoms(SliceDescriptor.Create(id, IndexType.AVETCurrent)))
{
store.AVETHistory.Put(batch, datom);
}
}

/// <summary>
/// Remove the indexed datoms for a specific attribute
/// </summary>
internal static void RemoveIndex(DatomStore store, AttributeId id, IWriteBatch batch)
{
foreach (var datom in store.CurrentSnapshot.Datoms(SliceDescriptor.Create(id, IndexType.AEVTCurrent)))
{
store.AVETCurrent.Delete(batch, datom);
}

foreach (var datom in store.CurrentSnapshot.Datoms(SliceDescriptor.Create(id, IndexType.AVETCurrent)))
{
store.AVETHistory.Delete(batch, datom);
}
}

internal static void ConvertValuesTo(DatomStore store, bool isIndexed, AttributeId id, IWriteBatch batch, ValueTag newTagType)
{
using var writer = new PooledMemoryBufferWriter();
foreach (var datom in store.CurrentSnapshot.Datoms(SliceDescriptor.Create(id, IndexType.AEVTCurrent)))
{
store.EAVTCurrent.Delete(batch, datom);
store.AEVTCurrent.Delete(batch, datom);
store.TxLogIndex.Delete(batch, datom);

var currentTag = datom.Prefix.ValueTag;

// if it's a reference, delete it from the backref index
if (currentTag == ValueTag.Reference)
store.VAETCurrent.Delete(batch, datom);

// Delete it from the Value index if it's not a reference
if (isIndexed && currentTag != ValueTag.Reference)
store.AVETCurrent.Delete(batch, datom);

// Convert the value to the new type
var newDatom = ConvertValue(datom, writer, newTagType);

// Put the converted datom back into the indexes
store.EAVTCurrent.Put(batch, newDatom);
store.AEVTCurrent.Put(batch, newDatom);
store.TxLogIndex.Put(batch, newDatom);

if (newTagType == ValueTag.Reference)
store.VAETCurrent.Put(batch, newDatom);

if (isIndexed && newTagType != ValueTag.Reference)
store.AVETCurrent.Put(batch, newDatom);
}

foreach (var datom in store.CurrentSnapshot.Datoms(SliceDescriptor.Create(id, IndexType.AEVTHistory)))
{
store.EAVTHistory.Delete(batch, datom);
store.AEVTHistory.Delete(batch, datom);
store.TxLogIndex.Delete(batch, datom);

var currentTag = datom.Prefix.ValueTag;

// if it's a reference, delete it from the backref index
if (currentTag == ValueTag.Reference)
store.VAETHistory.Delete(batch, datom);

// Delete it from the Value index if it's not a reference
if (isIndexed && currentTag != ValueTag.Reference)
store.VAETHistory.Delete(batch, datom);

// Convert the value to the new type
var newDatom = ConvertValue(datom, writer, newTagType);

// Put the converted datom back into the indexes
store.EAVTHistory.Put(batch, newDatom);
store.AEVTHistory.Put(batch, newDatom);
store.TxLogIndex.Put(batch, newDatom);

if (newTagType == ValueTag.Reference)
store.VAETHistory.Put(batch, newDatom);

if (isIndexed && newTagType != ValueTag.Reference)
store.AVETHistory.Put(batch, newDatom);
}
}

private static Datom ConvertValue(Datom datom, PooledMemoryBufferWriter writer, ValueTag newTagType)
{
writer.Reset();
datom.Prefix.ValueTag.ConvertValue(datom.ValueSpan, newTagType, writer);
var prefix = datom.Prefix with { ValueTag = newTagType };
var newDatom = new Datom(prefix, writer.WrittenMemory);
return newDatom;
}
}
2 changes: 1 addition & 1 deletion src/NexusMods.MnemonicDB/Storage/DatomStore.cs
Original file line number Diff line number Diff line change
Expand Up @@ -396,7 +396,7 @@ private void LogTx(IWriteBatch batch)
/// <summary>
/// Log a single datom, this is the inner loop of the transaction processing
/// </summary>
private void LogDatom(in Datom datom, IWriteBatch batch)
internal void LogDatom(in Datom datom, IWriteBatch batch)
{
_writer.Reset();

Expand Down

0 comments on commit 7ae9a5a

Please sign in to comment.