diff --git a/docs/QueryDesign.md b/docs/QueryDesign.md new file mode 100644 index 00000000..aa0a9786 --- /dev/null +++ b/docs/QueryDesign.md @@ -0,0 +1,51 @@ +--- +hide: + - toc +--- + +## Query Design + +The architecture of MnemonicDB is fairly simple: tuples are stored in indexes sorted in various ways, the logging function +(in `IDatomStore`) publishes a list of new databases and from those the updates in each transaction can be determined. This +simple format allows for a wide variety of queries to be performed on the data and optimally a lot of performance can be gained +in many parts of query. + +### Goals +A few goals of what is desired in the query design: + +* **Performance**: The queries should not require O(n*m) operations as much as possible, while parts may be implemented +simply and have higher complexity, options should be left in the design for optimization. +* **Larger than memory**: The query results are expected to fit in memory, but the source datasets may not. This means that +as much as possible only the minimal working set should be loaded into memory. +* **System Sympathy**: The queries should be designed to work well the rest of the database, indexes store data pre-sorted, +queries should be designed to take advantage of this. +* **Live Queries**: C#'s DynamicData is a fantastic library for UI programming and is close (but not quite) to something +that can be used by MnemonicDB. The queries should be designed to work well with this library, but also provide some sort +of delta-update systems such as `IObservable>` or similar. This allows for small transactions to not require +the entire query to be re-run. A delta update system fits very well with MnemonicDB's transactional publish queue, as each +transaction can result in a delta of datoms added (or removed) from the database. + + +### Concepts + +* **IConnection**: The connection is the primary interface for talking to the database, it can be "dereferenced" by calling +`conn.Db` to get a immutable database. This interfaces also provides an `IObservable` for subscribing to updates to the +database. It also provides a `IObservable` for subscribing to updates along with the portion of the `TxLog` +added in each transaction. +* **IDb**: The database is a readonly view of all the data in the database as of a specific point in time. +* **SliceDescriptor**: A description of a slice of an index in the database. It defines how a database value should be +interpreted and which index to use. This can be thought of as a "view" on the database, consisting of a tuple of `[index, from, to]`. +SliceDescriptors are immutable, and are compared by value. This means that they can be used as keys in dictionaries and sets, and makes +them useful for caching. +* **ObservableSlice**: A slice that can be subscribed to for updates. This is a `IObservable>` and is constructed +by combining a `SliceDescriptor` and a `IConnection`. +* **IndexSlice**: A loaded chunk from the database, made by combining a `SliceDescriptor` and a `IDb`. + +### Query Design + +Based on these simple primitives, a wide variety of queries can be constructed, two IndexSlices can be joined to filter each other, +the datoms in the index can be grouped, sorted, and filtered in various ways. The `ObservableSlice` can be used to create live queries +of the database. + +#### Example Queries + diff --git a/src/NexusMods.MnemonicDB.Abstractions/Attribute.cs b/src/NexusMods.MnemonicDB.Abstractions/Attribute.cs index d14dacbf..3a4c22fc 100644 --- a/src/NexusMods.MnemonicDB.Abstractions/Attribute.cs +++ b/src/NexusMods.MnemonicDB.Abstractions/Attribute.cs @@ -476,6 +476,24 @@ public override string ToString() { return $"({(IsRetract ? "-" : "+")}, {E.Value:x}, {A.Id.Name}, {V}, {T.Value:x})"; } + + /// + public bool EqualsByValue(IReadDatom other) + { + if (other is not ReadDatom o) + return false; + return A == o.A && E == o.E && V!.Equals(o.V); + } + + /// + public int HashCodeByValue() + { + return HashCode.Combine(A, E, V); + } + + } + + } diff --git a/src/NexusMods.MnemonicDB.Abstractions/AttributeId.cs b/src/NexusMods.MnemonicDB.Abstractions/AttributeId.cs index 9dcae6d4..248e4eee 100644 --- a/src/NexusMods.MnemonicDB.Abstractions/AttributeId.cs +++ b/src/NexusMods.MnemonicDB.Abstractions/AttributeId.cs @@ -32,4 +32,12 @@ public override string ToString() { return "AId:" + Value.ToString("X"); } + + /// + /// Returns the next AttributeId after this one. + /// + public AttributeId Next() + { + return new((ushort)(Value + 1)); + } } diff --git a/src/NexusMods.MnemonicDB.Abstractions/Datom.cs b/src/NexusMods.MnemonicDB.Abstractions/Datom.cs index a60ec76d..c45140f3 100644 --- a/src/NexusMods.MnemonicDB.Abstractions/Datom.cs +++ b/src/NexusMods.MnemonicDB.Abstractions/Datom.cs @@ -57,6 +57,11 @@ public TValue Resolve(Attribute attribute) /// public TxId T => Prefix.T; + /// + /// True if the datom is a retract + /// + public bool IsRetract => Prefix.IsRetract; + /// /// Copies the data of this datom onto the heap so it's detached from the current iteration. /// @@ -77,4 +82,31 @@ public override string ToString() { return Resolved.ToString()!; } + + /// + /// Returns -1 if this datom is less than the other, 0 if they are equal, and 1 if this datom is greater than the other. + /// in relation to the given index type. + /// + public int Compare(Datom other, IndexType indexType) + { + switch (indexType) + { + case IndexType.TxLog: + return DatomComparators.TxLogComparator.Compare(RawSpan, other.RawSpan); + case IndexType.EAVTCurrent: + case IndexType.EAVTHistory: + return DatomComparators.EAVTComparator.Compare(RawSpan, other.RawSpan); + case IndexType.AEVTCurrent: + case IndexType.AEVTHistory: + return DatomComparators.AEVTComparator.Compare(RawSpan, other.RawSpan); + case IndexType.AVETCurrent: + case IndexType.AVETHistory: + return DatomComparators.AVETComparator.Compare(RawSpan, other.RawSpan); + case IndexType.VAETCurrent: + case IndexType.VAETHistory: + return DatomComparators.VAETComparator.Compare(RawSpan, other.RawSpan); + default: + throw new ArgumentOutOfRangeException(nameof(indexType), indexType, "Unknown index type"); + } + } } diff --git a/src/NexusMods.MnemonicDB.Abstractions/DatomComparators/ADatomComparator.cs b/src/NexusMods.MnemonicDB.Abstractions/DatomComparators/ADatomComparator.cs index 1e3723bb..bf6a307a 100644 --- a/src/NexusMods.MnemonicDB.Abstractions/DatomComparators/ADatomComparator.cs +++ b/src/NexusMods.MnemonicDB.Abstractions/DatomComparators/ADatomComparator.cs @@ -1,4 +1,5 @@ using System; +using NexusMods.MnemonicDB.Abstractions.DatomIterators; using NexusMods.MnemonicDB.Abstractions.ElementComparers; using NexusMods.MnemonicDB.Abstractions.Internals; @@ -31,6 +32,24 @@ public static int Compare(byte* aPtr, int aLen, byte* bPtr, int bLen) return TE.Compare(aPtr, aLen, bPtr, bLen); } + /// + /// Compare two datom spans + /// + public static int Compare(ReadOnlySpan a, ReadOnlySpan b) + { + fixed(byte* aPtr = a) + fixed(byte* bPtr = b) + return Compare(aPtr, a.Length, bPtr, b.Length); + } + + /// + /// Compare two datoms + /// + public static int Compare(in Datom a, in Datom b) + { + return Compare(a.RawSpan, b.RawSpan); + } + /// public int CompareInstance(byte* aPtr, int aLen, byte* bPtr, int bLen) { diff --git a/src/NexusMods.MnemonicDB.Abstractions/DatomComparators/AEVComparator.cs b/src/NexusMods.MnemonicDB.Abstractions/DatomComparators/AEVComparator.cs new file mode 100644 index 00000000..bae0bea8 --- /dev/null +++ b/src/NexusMods.MnemonicDB.Abstractions/DatomComparators/AEVComparator.cs @@ -0,0 +1,8 @@ +using NexusMods.MnemonicDB.Abstractions.ElementComparers; + +namespace NexusMods.MnemonicDB.Abstractions.DatomComparators; + +/// +/// AEV Comparator. +/// +public class AEVComparator : APartialDatomComparator; diff --git a/src/NexusMods.MnemonicDB.Abstractions/DatomComparators/APartialDatomComparator.cs b/src/NexusMods.MnemonicDB.Abstractions/DatomComparators/APartialDatomComparator.cs new file mode 100644 index 00000000..3fd5d68c --- /dev/null +++ b/src/NexusMods.MnemonicDB.Abstractions/DatomComparators/APartialDatomComparator.cs @@ -0,0 +1,50 @@ +using System; +using NexusMods.MnemonicDB.Abstractions.DatomIterators; +using NexusMods.MnemonicDB.Abstractions.ElementComparers; + +namespace NexusMods.MnemonicDB.Abstractions.DatomComparators; + +/// +/// A comparator that only considers the EAV portion of the datom, useful for in-memory sets that +/// are not concerned with time, and only contain asserts +/// +public abstract unsafe class APartialDatomComparator : IDatomComparator + where TA : IElementComparer + where TB : IElementComparer + where TC : IElementComparer +{ + public static int Compare(byte* aPtr, int aLen, byte* bPtr, int bLen) + { + var cmp = TA.Compare(aPtr, aLen, bPtr, bLen); + if (cmp != 0) return cmp; + + cmp = TB.Compare(aPtr, aLen, bPtr, bLen); + if (cmp != 0) return cmp; + + return TC.Compare(aPtr, aLen, bPtr, bLen); + } + + /// + /// Compare two datom spans + /// + public static int Compare(ReadOnlySpan a, ReadOnlySpan b) + { + fixed(byte* aPtr = a) + fixed(byte* bPtr = b) + return Compare(aPtr, a.Length, bPtr, b.Length); + } + + /// + /// Compare two datoms + /// + public static int Compare(in Datom a, in Datom b) + { + return Compare(a.RawSpan, b.RawSpan); + } + + /// + public int CompareInstance(byte* aPtr, int aLen, byte* bPtr, int bLen) + { + return Compare(aPtr, aLen, bPtr, bLen); + } +} diff --git a/src/NexusMods.MnemonicDB.Abstractions/DatomComparators/AVEComparator.cs b/src/NexusMods.MnemonicDB.Abstractions/DatomComparators/AVEComparator.cs new file mode 100644 index 00000000..e5b3dd3a --- /dev/null +++ b/src/NexusMods.MnemonicDB.Abstractions/DatomComparators/AVEComparator.cs @@ -0,0 +1,8 @@ +using NexusMods.MnemonicDB.Abstractions.ElementComparers; + +namespace NexusMods.MnemonicDB.Abstractions.DatomComparators; + +/// +/// AVE Comparator. +/// +public class AVEComparator : APartialDatomComparator; diff --git a/src/NexusMods.MnemonicDB.Abstractions/DatomComparators/EAVComparator.cs b/src/NexusMods.MnemonicDB.Abstractions/DatomComparators/EAVComparator.cs new file mode 100644 index 00000000..43d0963c --- /dev/null +++ b/src/NexusMods.MnemonicDB.Abstractions/DatomComparators/EAVComparator.cs @@ -0,0 +1,8 @@ +using NexusMods.MnemonicDB.Abstractions.ElementComparers; + +namespace NexusMods.MnemonicDB.Abstractions.DatomComparators; + +/// +/// EAV Comparator. +/// +public class EAVComparator : APartialDatomComparator; diff --git a/src/NexusMods.MnemonicDB.Abstractions/DatomComparators/IDatomComparator.cs b/src/NexusMods.MnemonicDB.Abstractions/DatomComparators/IDatomComparator.cs index ac4df257..1560d20c 100644 --- a/src/NexusMods.MnemonicDB.Abstractions/DatomComparators/IDatomComparator.cs +++ b/src/NexusMods.MnemonicDB.Abstractions/DatomComparators/IDatomComparator.cs @@ -23,7 +23,7 @@ public unsafe interface IDatomComparator public int CompareInstance(ReadOnlySpan a, ReadOnlySpan b) { fixed(byte* aPtr = a) - fixed(byte* bPtr = b) - return CompareInstance(aPtr, a.Length, bPtr, b.Length); + fixed(byte* bPtr = b) + return CompareInstance(aPtr, a.Length, bPtr, b.Length); } } diff --git a/src/NexusMods.MnemonicDB.Abstractions/DatomComparators/VAEComparator.cs b/src/NexusMods.MnemonicDB.Abstractions/DatomComparators/VAEComparator.cs new file mode 100644 index 00000000..c7a102ad --- /dev/null +++ b/src/NexusMods.MnemonicDB.Abstractions/DatomComparators/VAEComparator.cs @@ -0,0 +1,8 @@ +using NexusMods.MnemonicDB.Abstractions.ElementComparers; + +namespace NexusMods.MnemonicDB.Abstractions.DatomComparators; + +/// +/// VAE Comparator. +/// +public class VAEComparator : APartialDatomComparator; diff --git a/src/NexusMods.MnemonicDB.Abstractions/DynamicCache.cs b/src/NexusMods.MnemonicDB.Abstractions/DynamicCache.cs new file mode 100644 index 00000000..9d96af5d --- /dev/null +++ b/src/NexusMods.MnemonicDB.Abstractions/DynamicCache.cs @@ -0,0 +1,17 @@ +using System; +using System.Collections.Concurrent; +using System.Linq; +using System.Reactive.Linq; +using System.Reactive.Subjects; +using DynamicData; +using NexusMods.MnemonicDB.Abstractions.Models; + +namespace NexusMods.MnemonicDB.Abstractions; + +public class DynamicCache : IDynamicCache +{ + public IObservable.ReadDatom, EntityId>> Query(Attribute attr, THighLevel value) + { + throw new NotImplementedException(); + } +} diff --git a/src/NexusMods.MnemonicDB.Abstractions/IAnalytics.cs b/src/NexusMods.MnemonicDB.Abstractions/IAnalytics.cs new file mode 100644 index 00000000..e7740515 --- /dev/null +++ b/src/NexusMods.MnemonicDB.Abstractions/IAnalytics.cs @@ -0,0 +1,15 @@ +using System.Collections.Frozen; + +namespace NexusMods.MnemonicDB.Abstractions; + +/// +/// Database analytics, attached to each IDb instance but often calculated on-the fly +/// and cached. +/// +public interface IAnalytics +{ + /// + /// All the entities referenced in the most recent transaction of the database. + /// + public FrozenSet LatestTxIds { get; } +} diff --git a/src/NexusMods.MnemonicDB.Abstractions/IConnection.cs b/src/NexusMods.MnemonicDB.Abstractions/IConnection.cs index 9e3e404f..12abef7c 100644 --- a/src/NexusMods.MnemonicDB.Abstractions/IConnection.cs +++ b/src/NexusMods.MnemonicDB.Abstractions/IConnection.cs @@ -1,7 +1,25 @@ using System; +using NexusMods.MnemonicDB.Abstractions.IndexSegments; +using NexusMods.MnemonicDB.Abstractions.Internals; namespace NexusMods.MnemonicDB.Abstractions; +/// +/// A database revision, which includes a datom and the datoms added to it. +/// +public struct Revision +{ + /// + /// The database for the most recent transaction + /// + public IDb Database; + + /// + /// The datoms that were added in the most recent transaction + /// + public IndexSegment AddedDatoms; +} + /// /// Represents a connection to a database. /// @@ -12,6 +30,11 @@ public interface IConnection /// public IDb Db { get; } + /// + /// The attribute registry for this connection + /// + public IAttributeRegistry Registry { get; } + /// /// Gets the most recent transaction id. /// @@ -20,7 +43,7 @@ public interface IConnection /// /// A sequential stream of database revisions. /// - public IObservable Revisions { get; } + public IObservable Revisions { get; } /// /// A service provider that entities can use to resolve their values diff --git a/src/NexusMods.MnemonicDB.Abstractions/IDatomStore.cs b/src/NexusMods.MnemonicDB.Abstractions/IDatomStore.cs index 968d90f3..945e66bb 100644 --- a/src/NexusMods.MnemonicDB.Abstractions/IDatomStore.cs +++ b/src/NexusMods.MnemonicDB.Abstractions/IDatomStore.cs @@ -14,7 +14,8 @@ namespace NexusMods.MnemonicDB.Abstractions; public interface IDatomStore : IDisposable { /// - /// An observable of the transaction log, for getting the latest changes to the store. + /// An observable of the transaction log, for getting the latest changes to the store. This observable + /// will always start with the most recent value, so there is no reason to use `StartWith` or `Replay` on it. /// public IObservable<(TxId TxId, ISnapshot Snapshot)> TxLog { get; } diff --git a/src/NexusMods.MnemonicDB.Abstractions/IDb.cs b/src/NexusMods.MnemonicDB.Abstractions/IDb.cs index cfca13a6..d789fb8d 100644 --- a/src/NexusMods.MnemonicDB.Abstractions/IDb.cs +++ b/src/NexusMods.MnemonicDB.Abstractions/IDb.cs @@ -5,6 +5,7 @@ using NexusMods.MnemonicDB.Abstractions.IndexSegments; using NexusMods.MnemonicDB.Abstractions.Internals; using NexusMods.MnemonicDB.Abstractions.Models; +using NexusMods.MnemonicDB.Abstractions.Query; namespace NexusMods.MnemonicDB.Abstractions; @@ -33,6 +34,11 @@ public interface IDb : IEquatable /// IAttributeRegistry Registry { get; } + /// + /// Analytics for the database. + /// + IAnalytics Analytics { get; } + /// /// Gets a read model for the given entity id. /// @@ -55,8 +61,16 @@ public TModel Get(EntityId id) public Entities GetReverse(EntityId id, Attribute attribute) where TModel : IHasEntityIdAndDb; + /// + /// Get all the datoms for the given entity id. + /// public IEnumerable Datoms(EntityId id); + /// + /// Get all the datoms for the given slice descriptor. + /// + public IndexSegment Datoms(SliceDescriptor sliceDescriptor); + /// /// Gets the datoms for the given transaction id. /// diff --git a/src/NexusMods.MnemonicDB.Abstractions/IDynamicCache.cs b/src/NexusMods.MnemonicDB.Abstractions/IDynamicCache.cs new file mode 100644 index 00000000..ddd0bcbd --- /dev/null +++ b/src/NexusMods.MnemonicDB.Abstractions/IDynamicCache.cs @@ -0,0 +1,18 @@ +using System; +using DynamicData; +using NexusMods.MnemonicDB.Abstractions.Models; + +namespace NexusMods.MnemonicDB.Abstractions; + +/// +/// A dynamic cache of entities in the connection filtered by various criteria. +/// +public interface IDynamicCache +{ + /// + /// Get all the entities of a given type that are in the cache. When an entity becomes + /// invalid it will be removed from the cache. + /// + public IObservable.ReadDatom, EntityId>> + Query(Attribute attr, THighLevel value); +} diff --git a/src/NexusMods.MnemonicDB.Abstractions/IReadDatom.cs b/src/NexusMods.MnemonicDB.Abstractions/IReadDatom.cs index 49c70756..60eb1493 100644 --- a/src/NexusMods.MnemonicDB.Abstractions/IReadDatom.cs +++ b/src/NexusMods.MnemonicDB.Abstractions/IReadDatom.cs @@ -41,4 +41,53 @@ public interface IReadDatom /// Adds the datom to the transaction as a retraction. /// void Retract(ITransaction tx); + + /// + /// Returns true if the datom is equal when comparing the entity, attribute and value. + /// + bool EqualsByValue(IReadDatom other); + + + /// + /// Hashcode of just the entity, attribute and value. + /// + public int HashCodeByValue(); +} + +/// +/// A wrapper around a datom that compares only on the EAV values +/// +public readonly struct ReadDatomKey : IEquatable +{ + private readonly IReadDatom _datom; + + /// + /// Initializes a new instance of the struct. + /// + public ReadDatomKey(IReadDatom datom) + { + _datom = datom; + } + + /// + public override bool Equals(object? obj) + { + if (obj is not ReadDatomKey key) + return false; + + return _datom.EqualsByValue(key._datom); + } + + /// + public override int GetHashCode() + { + return _datom.HashCodeByValue(); + } + + /// + public bool Equals(ReadDatomKey other) + { + return _datom.EqualsByValue(other._datom); + } + } diff --git a/src/NexusMods.MnemonicDB.Abstractions/ISnapshot.cs b/src/NexusMods.MnemonicDB.Abstractions/ISnapshot.cs index c23bd155..45ad584e 100644 --- a/src/NexusMods.MnemonicDB.Abstractions/ISnapshot.cs +++ b/src/NexusMods.MnemonicDB.Abstractions/ISnapshot.cs @@ -2,7 +2,9 @@ using System.Collections.Generic; using System.Runtime.InteropServices; using NexusMods.MnemonicDB.Abstractions.DatomIterators; +using NexusMods.MnemonicDB.Abstractions.IndexSegments; using NexusMods.MnemonicDB.Abstractions.Internals; +using NexusMods.MnemonicDB.Abstractions.Query; using Reloaded.Memory.Extensions; namespace NexusMods.MnemonicDB.Abstractions; @@ -16,65 +18,13 @@ namespace NexusMods.MnemonicDB.Abstractions; public interface ISnapshot { /// - /// Get an enumerable of all the datoms between the given keys, if the keys are in reverse order, - /// the datoms will be returned in reverse order. + /// Get the data specified by the given descriptor as a single segment. /// - IEnumerable Datoms(IndexType type, ReadOnlySpan a, ReadOnlySpan b); + IndexSegment Datoms(SliceDescriptor descriptor); /// - /// Get an enumerable of all the datoms between the given key and the end of the index - /// + /// Get the data specified by the given descriptor chunked into segments of datoms of the given size. /// - IEnumerable Datoms(IndexType type, ReadOnlySpan a) - { - Span b = stackalloc byte[KeyPrefix.Size + sizeof(ulong)]; - b.Fill(0xFF); - return Datoms(type, a, b); - } + IEnumerable DatomsChunked(SliceDescriptor descriptor, int chunkSize); - /// - /// Get an enumerable of all the datoms between the given key and the end of the index - /// - /// - IEnumerable Datoms(IndexType type, KeyPrefix a) - { - Span b = stackalloc byte[KeyPrefix.Size + sizeof(ulong)]; - b.Fill(0xFF); - return Datoms(type, MemoryMarshal.CreateSpan(ref a, 1).CastFast(), b); - } - - /// - /// Get an enumerable of all the datoms between the given keys. - /// - IEnumerable Datoms(IndexType type, KeyPrefix a, KeyPrefix b) - { - return Datoms(type, MemoryMarshal.CreateSpan(ref a, 1).CastFast(), - MemoryMarshal.CreateSpan(ref b, 1).CastFast()); - } - - /// - /// Get an enumerable of all the datoms in the given index. - /// - IEnumerable Datoms(IndexType type) - { - if (type == IndexType.VAETCurrent || type == IndexType.VAETHistory) - { - unsafe - { - // We need to pad the key in case this is used in a VAET index that sorts by value first, - // which would always be a EntityId (ulong) - Span a = stackalloc byte[KeyPrefix.Size + sizeof(ulong)]; - a.Clear(); - MemoryMarshal.Write(a, KeyPrefix.Min); - - Span b = stackalloc byte[KeyPrefix.Size + sizeof(ulong)]; - b.Fill(0xFF); - MemoryMarshal.Write(b, KeyPrefix.Max); - - return Datoms(type, a, b); - } - } - - return Datoms(type, KeyPrefix.Min, KeyPrefix.Max); - } } diff --git a/src/NexusMods.MnemonicDB.Abstractions/IndexSegments/IndexSegment.cs b/src/NexusMods.MnemonicDB.Abstractions/IndexSegments/IndexSegment.cs index 522c84a9..e4064b98 100644 --- a/src/NexusMods.MnemonicDB.Abstractions/IndexSegments/IndexSegment.cs +++ b/src/NexusMods.MnemonicDB.Abstractions/IndexSegments/IndexSegment.cs @@ -1,6 +1,7 @@ using System; using System.Collections; using System.Collections.Generic; +using DynamicData; using NexusMods.MnemonicDB.Abstractions.DatomIterators; using NexusMods.MnemonicDB.Abstractions.Internals; using NexusMods.Paths; @@ -87,4 +88,14 @@ IEnumerator IEnumerable.GetEnumerator() { return GetEnumerator(); } + + /// + /// Create a new index segment from the given datoms + /// + public static IndexSegment From(IAttributeRegistry registry, IReadOnlyCollection datoms) + { + using var builder = new IndexSegmentBuilder(registry, datoms.Count); + builder.Add(datoms); + return builder.Build(); + } } diff --git a/src/NexusMods.MnemonicDB.Abstractions/IndexSegments/IndexSegmentBuilder.cs b/src/NexusMods.MnemonicDB.Abstractions/IndexSegments/IndexSegmentBuilder.cs index dfae8916..9164a073 100644 --- a/src/NexusMods.MnemonicDB.Abstractions/IndexSegments/IndexSegmentBuilder.cs +++ b/src/NexusMods.MnemonicDB.Abstractions/IndexSegments/IndexSegmentBuilder.cs @@ -26,6 +26,20 @@ public IndexSegmentBuilder(IAttributeRegistry registry, int capacity = 1024) _data = new PooledMemoryBufferWriter(capacity); } + /// + /// The number of datoms in the segment + /// + public int Count => _offsets.Count; + + /// + /// Resets the builder so it can be reused + /// + public void Reset() + { + _offsets.Clear(); + _data.Reset(); + } + /// /// Add a datom to the segment /// @@ -65,6 +79,15 @@ public readonly void Add(EntityId entityId, Attribute + /// Append + /// + public readonly void Add(ReadOnlySpan rawData) + { + _offsets.Add(_data.Length); + _data.Write(rawData); + } + /// /// Construct the index segment /// diff --git a/src/NexusMods.MnemonicDB.Abstractions/KeyBuilder.cs b/src/NexusMods.MnemonicDB.Abstractions/KeyBuilder.cs new file mode 100644 index 00000000..40688e27 --- /dev/null +++ b/src/NexusMods.MnemonicDB.Abstractions/KeyBuilder.cs @@ -0,0 +1,54 @@ +using System; +using NexusMods.MnemonicDB.Abstractions.ElementComparers; +using NexusMods.MnemonicDB.Abstractions.Internals; + +namespace NexusMods.MnemonicDB.Abstractions; + +/// +/// Assists in building keys for the datastore, packing multiple values into a single memory buffer. +/// +public class KeyBuilder +{ + private readonly RegistryId _registryId; + + /// + /// Primary constructor, requires a registry id for resolving attribute ids. + /// + public KeyBuilder(RegistryId registryId) + { + _registryId = registryId; + } + + /// + /// Write a lower bound key for the given entity id, with other values set to their minimum. + /// + /// + /// + public Memory From(EntityId e) + { + var writer = new PooledMemoryBufferWriter(32); + var prefix = new KeyPrefix().Set(e, AttributeId.Min, TxId.MinValue, false); + writer.WriteMarshal(prefix); + writer.WriteMarshal((byte)ValueTags.Null); + var output = GC.AllocateUninitializedArray(writer.Length); + writer.WrittenMemory.Span.CopyTo(output); + return output; + } + + /// + /// Write an upper bound key for the given entity id, with other values set to their maximum. + /// + /// + /// + public Memory To(EntityId e) + { + var writer = new PooledMemoryBufferWriter(32); + var prefix = new KeyPrefix().Set(e, AttributeId.Max, TxId.MaxValue, false); + writer.WriteMarshal(prefix); + writer.WriteMarshal((byte)ValueTags.Null); + var output = GC.AllocateUninitializedArray(writer.Length); + writer.WrittenMemory.Span.CopyTo(output); + return output; + } + +} diff --git a/src/NexusMods.MnemonicDB.Abstractions/Models/IReadOnlyModel.cs b/src/NexusMods.MnemonicDB.Abstractions/Models/IReadOnlyModel.cs index 9aa20b9f..ea7720d6 100644 --- a/src/NexusMods.MnemonicDB.Abstractions/Models/IReadOnlyModel.cs +++ b/src/NexusMods.MnemonicDB.Abstractions/Models/IReadOnlyModel.cs @@ -1,4 +1,6 @@ -using System.Collections.Generic; +using System; +using System.Collections.Generic; +using System.Reactive.Linq; namespace NexusMods.MnemonicDB.Abstractions.Models; @@ -7,6 +9,7 @@ namespace NexusMods.MnemonicDB.Abstractions.Models; /// public interface IReadOnlyModel : IHasEntityIdAndDb, IReadOnlyCollection { + /// /// Returns true if all the attributes in the model exist in the attached /// database diff --git a/src/NexusMods.MnemonicDB.Abstractions/Models/ModelDefinition.cs b/src/NexusMods.MnemonicDB.Abstractions/Models/ModelDefinition.cs deleted file mode 100644 index 0a4b6687..00000000 --- a/src/NexusMods.MnemonicDB.Abstractions/Models/ModelDefinition.cs +++ /dev/null @@ -1,69 +0,0 @@ -namespace NexusMods.MnemonicDB.Abstractions.Models; - -public class ModelDefinition -{ - /// - /// Creates a new ModelDefinition, with the given name. - /// - public static ModelDefinition New(string name) - { - return new ModelDefinition(); - } - - /// - /// Specify that this model includes another model. - /// - public ModelDefinition FooFar() - { - return this; - } - - /// - /// Defines a new attribute on the model of the given attribute type with the given parameters - /// - public ModelDefinition Attribute(string name, bool isIndexed = false, bool noHistory = false) - where TType : IAttribute - { - return this; - } - - /// - /// Define a reference to another model, via an attribute of the given name. - /// - public ModelDefinition Reference(string name) - { - return this; - } - - /// - /// Define a multi-cardinality reference to another model, via an attribute of the given name. - /// - public ModelDefinition References(string name) - { - return this; - } - - /// - /// Define an attribute that is a marker; it doesn't have a value, its existance determines if the value - /// is true or false. - /// - public ModelDefinition MarkedBy(string name) - { - return this; - } - - /// - /// Defines a reference in another model that points to this class. These references will be exposed - /// in the `name` property of this model. - /// - public ModelDefinition BackRef(string otherAttribute, string thisAttribute) - { - return this; - } - - public ModelDefinition Build() - { - return this; - } - -} diff --git a/src/NexusMods.MnemonicDB.Abstractions/PooledMemoryBufferWriter.cs b/src/NexusMods.MnemonicDB.Abstractions/PooledMemoryBufferWriter.cs index 4acc2b74..163aec99 100644 --- a/src/NexusMods.MnemonicDB.Abstractions/PooledMemoryBufferWriter.cs +++ b/src/NexusMods.MnemonicDB.Abstractions/PooledMemoryBufferWriter.cs @@ -91,7 +91,7 @@ public void Reset() } /// - /// Resets the buffer writer, allowing it to be reused. + /// Writes the given span to the buffer, and advances the length. /// public void Write(ReadOnlySpan span) { diff --git a/src/NexusMods.MnemonicDB.Abstractions/Query/ObservableDatoms.cs b/src/NexusMods.MnemonicDB.Abstractions/Query/ObservableDatoms.cs new file mode 100644 index 00000000..732160dc --- /dev/null +++ b/src/NexusMods.MnemonicDB.Abstractions/Query/ObservableDatoms.cs @@ -0,0 +1,117 @@ +using System; +using System.Collections.Generic; +using System.Reactive.Linq; +using DynamicData; +using NexusMods.MnemonicDB.Abstractions.DatomComparators; +using NexusMods.MnemonicDB.Abstractions.DatomIterators; +using NexusMods.MnemonicDB.Abstractions.IndexSegments; + +namespace NexusMods.MnemonicDB.Abstractions.Query; + +public static class ObservableDatoms +{ + + /// + /// Observe a slice of the database, as datoms are added or removed from the database, the observer will be updated + /// with the changeset of datoms that have been added or removed. + /// + public static IObservable> ObserveDatoms(this IConnection conn, SliceDescriptor descriptor) + { + var comparator = PartialComparator(descriptor.Index); + var equality = (IEqualityComparer)comparator; + var set = new SortedSet(comparator); + + return conn.Revisions.Select((rev, idx) => + { + if (idx == 0) + return Setup(set, rev.Database, descriptor); + return Diff(set, rev.AddedDatoms, descriptor, equality); + }); + } + + /// + /// Observe all datoms for a given entity id + /// + public static IObservable> ObserveDatoms(this IConnection conn, EntityId id) + { + return conn.ObserveDatoms(SliceDescriptor.Create(id, conn.Registry)); + } + + private static IChangeSet Diff(SortedSet set, IndexSegment updates, SliceDescriptor descriptor, IEqualityComparer comparer) + { + List>? changes = null; + + foreach (var datom in updates) + { + if (!descriptor.Includes(datom)) + continue; + if (datom.IsRetract) + { + var idx = set.IndexOf(datom, comparer); + if (idx >= 0) + { + set.Remove(datom); + changes ??= []; + changes.Add(new Change(ListChangeReason.Remove, datom, idx)); + } + else + { + throw new InvalidOperationException("Retract without assert in set"); + } + } + else + { + set.Add(datom); + var idx = set.IndexOf(datom); + changes ??= []; + changes.Add(new Change(ListChangeReason.Add, datom, idx)); + } + } + if (changes == null) + return ChangeSet.Empty; + return new ChangeSet(changes); + } + + private static ChangeSet Setup(SortedSet set, IDb db, SliceDescriptor descriptor) + { + var datoms = db.Datoms(descriptor); + set.UnionWith(datoms); + return new ChangeSet([new Change(ListChangeReason.AddRange, datoms)]); + } + + private struct Comparer : IComparer, IEqualityComparer + where TInner : IDatomComparator + { + public unsafe int Compare(Datom a, Datom b) + { + var aSpan = a.RawSpan; + var bSpan = b.RawSpan; + fixed(byte* aPtr = aSpan) + fixed(byte* bPtr = bSpan) + return TInner.Compare(aPtr, aSpan.Length, bPtr, bSpan.Length); + } + + public bool Equals(Datom x, Datom y) + { + return Compare(x, y) == 0; + } + + public int GetHashCode(Datom obj) + { + throw new NotSupportedException(); + } + } + + private static IComparer PartialComparator(IndexType type) => type switch + { + IndexType.EAVTCurrent => new Comparer(), + IndexType.EAVTHistory => new Comparer(), + IndexType.AVETCurrent => new Comparer(), + IndexType.AVETHistory => new Comparer(), + IndexType.AEVTCurrent => new Comparer(), + IndexType.AEVTHistory => new Comparer(), + IndexType.VAETCurrent => new Comparer(), + IndexType.VAETHistory => new Comparer(), + _ => throw new ArgumentOutOfRangeException() + }; +} diff --git a/src/NexusMods.MnemonicDB.Abstractions/Query/SliceDescriptor.cs b/src/NexusMods.MnemonicDB.Abstractions/Query/SliceDescriptor.cs new file mode 100644 index 00000000..23d15c89 --- /dev/null +++ b/src/NexusMods.MnemonicDB.Abstractions/Query/SliceDescriptor.cs @@ -0,0 +1,293 @@ +using System; +using System.Runtime.InteropServices; +using NexusMods.MnemonicDB.Abstractions.Attributes; +using NexusMods.MnemonicDB.Abstractions.DatomIterators; +using NexusMods.MnemonicDB.Abstractions.ElementComparers; +using NexusMods.MnemonicDB.Abstractions.Internals; +using Reloaded.Memory.Extensions; + +namespace NexusMods.MnemonicDB.Abstractions.Query; + +/// +/// A slice descriptor for querying datoms, it doesn't contain any data, but can be combined +/// with other objects like databases or indexes to query for datoms. +/// +public readonly struct SliceDescriptor +{ + /// + /// The index to query, the `From` and `To` should be within the same index. + /// + public required IndexType Index { get; init; } + + /// + /// The lower bound of the slice, inclusive. + /// + public required Datom From { get; init; } + + /// + /// The upper bound of the slice, exclusive. + /// + public required Datom To { get; init; } + + /// + /// True if the slice is in reverse order, false otherwise. Reverse order means that a DB query + /// with this slice will sort the results in descending order. + /// + public bool IsReverse => From.Compare(To, Index) > 0; + + /// + /// Returns true if the datom is within the slice, false otherwise. + /// + public bool Includes(in Datom datom) + { + return Index switch + { + IndexType.TxLog => DatomComparators.TxLogComparator.Compare(From, datom) <= 0 && + DatomComparators.TxLogComparator.Compare(datom, To) < 0, + IndexType.EAVTCurrent or IndexType.EAVTHistory => + DatomComparators.EAVTComparator.Compare(From, datom) <= 0 && + DatomComparators.EAVTComparator.Compare(datom, To) < 0, + IndexType.AEVTCurrent or IndexType.AEVTHistory => + DatomComparators.AEVTComparator.Compare(From, datom) <= 0 && + DatomComparators.AEVTComparator.Compare(datom, To) < 0, + IndexType.VAETCurrent or IndexType.VAETHistory => + DatomComparators.VAETComparator.Compare(From, datom) <= 0 && + DatomComparators.VAETComparator.Compare(datom, To) < 0, + _ => throw new ArgumentOutOfRangeException(nameof(Index), Index, "Unknown index type") + }; + } + + /// + /// Creates a slice descriptor from the to and from datoms + /// + public static SliceDescriptor Create(IndexType index, Datom from, Datom to) => new() { Index = index, From = from, To = to }; + + /// + /// Creates a slice descriptor for the given entity in the current EAVT index + /// + public static SliceDescriptor Create(EntityId e, IAttributeRegistry registry) + { + return new SliceDescriptor + { + Index = IndexType.EAVTCurrent, + From = Datom(e, AttributeId.Min, TxId.MinValue, false, registry), + To = Datom(e, AttributeId.Max, TxId.MaxValue, false, registry) + }; + } + + /// + /// Creates a slice descriptor for the given transaction in the TxLog index + /// + public static SliceDescriptor Create(TxId tx, IAttributeRegistry registry) + { + return new SliceDescriptor + { + Index = IndexType.TxLog, + From = Datom(EntityId.MinValueNoPartition, AttributeId.Min, tx, false, registry), + To = Datom(EntityId.MaxValueNoPartition, AttributeId.Max, tx, false, registry) + }; + } + + /// + /// Creates a slice descriptor for the given attribute in the current AVET index + /// + public static SliceDescriptor Create(Attribute attr, THighLevel value, IAttributeRegistry registry) + { + return new SliceDescriptor + { + Index = IndexType.AVETCurrent, + From = Datom(EntityId.MinValueNoPartition, attr, value, TxId.MinValue, false, registry), + To = Datom(EntityId.MaxValueNoPartition, attr, value, TxId.MaxValue, false, registry) + }; + } + + /// + /// Creates a slice descriptor for the given reference attribute and entity that is being pointed to, this is a + /// reverse lookup. + /// + public static SliceDescriptor Create(AttributeId referenceAttribute, EntityId pointingTo, IAttributeRegistry dbRegistry) + { + return new SliceDescriptor + { + Index = IndexType.VAETCurrent, + From = Datom(EntityId.MinValueNoPartition, referenceAttribute, pointingTo, TxId.MinValue, false, dbRegistry), + To = Datom(EntityId.MaxValueNoPartition, referenceAttribute, pointingTo, TxId.MaxValue, false, dbRegistry) + }; + } + + /// + /// Creates a slice descriptor for the given attribute from the current AEVT index + /// reverse lookup. + /// + public static SliceDescriptor Create(AttributeId referenceAttribute, IAttributeRegistry dbRegistry) + { + return new SliceDescriptor + { + Index = IndexType.AEVTCurrent, + From = Datom(EntityId.MinValueNoPartition, referenceAttribute, TxId.MinValue, false, dbRegistry), + To = Datom(EntityId.MaxValueNoPartition, referenceAttribute, TxId.MaxValue, false, dbRegistry) + }; + } + + + /// + /// Creates a slice descriptor for the given attribute and entity from the EAVT index + /// + public static SliceDescriptor Create(EntityId e, AttributeId a, IAttributeRegistry dbRegistry) + { + return new SliceDescriptor + { + Index = IndexType.EAVTCurrent, + From = Datom(e, a, TxId.MinValue, false, dbRegistry), + To = Datom(e, AttributeId.From((ushort)(a.Value + 1)), TxId.MaxValue, false, dbRegistry) + }; + } + + /// + /// Creates a slice descriptor that points only to the specific attribute + /// + public static SliceDescriptor Create(IndexType index, ReadOnlySpan span, IAttributeRegistry registry) + { + var array = span.ToArray(); + return new SliceDescriptor + { + Index = index, + From = new Datom(array, registry), + To = new Datom(array, registry) + }; + } + + /// + /// Creates a slice descriptor for the given exactly from the given index + /// + public static SliceDescriptor Exact(IndexType index, ReadOnlySpan span, IAttributeRegistry registry) + { + var from = span.ToArray(); + var to = span.ToArray(); + var prefix = MemoryMarshal.Read(to.AsSpan()); + prefix.Set(prefix.E, prefix.A, TxId.From(prefix.T.Value + 1), prefix.IsRetract); + return new SliceDescriptor + { + Index = index, + From = new Datom(from, registry), + To = new Datom(to, registry) + }; + } + + + /// + /// Creates a slice descriptor for the given attribute in the current AEVT index + /// + public static SliceDescriptor Create(IAttribute attr, IAttributeRegistry registry) + { + var attrId = attr.GetDbId(registry.Id); + return new SliceDescriptor + { + Index = IndexType.AEVTCurrent, + From = Datom(EntityId.MinValueNoPartition, attrId, TxId.MinValue, false, registry), + To = Datom(EntityId.MaxValueNoPartition, attrId, TxId.MaxValue, false, registry) + }; + } + + + /// + /// Creates a slice descriptor for datoms that reference the given entity via the VAET index + /// + public static SliceDescriptor CreateReferenceTo(EntityId pointingTo, IAttributeRegistry dbRegistry) + { + return new SliceDescriptor + { + Index = IndexType.VAETCurrent, + From = Datom(EntityId.MinValueNoPartition, AttributeId.Min, pointingTo, TxId.MinValue, false, dbRegistry), + To = Datom(EntityId.MaxValueNoPartition, AttributeId.Max, pointingTo, TxId.MaxValue, false, dbRegistry) + }; + } + + + /// + /// Creates a slice descriptor for the entire index + /// + public static SliceDescriptor Create(IndexType index, IAttributeRegistry registry) + { + if (index is IndexType.VAETCurrent or IndexType.VAETHistory) + { + // VAET has a special case where we need to include the reference type and an actual reference + // in the slice + var from = GC.AllocateUninitializedArray(KeyPrefix.Size + 9); + from.AsSpan().Clear(); + from[KeyPrefix.Size + 1] = (byte)ValueTags.Reference; + var to = GC.AllocateUninitializedArray(KeyPrefix.Size + 9); + to.AsSpan().Fill(byte.MaxValue); + from[KeyPrefix.Size + 1] = (byte)ValueTags.Reference; + return new SliceDescriptor + { + Index = index, + From = new Datom(from, registry), + To = new Datom(to, registry) + }; + } + else + { + var from = GC.AllocateUninitializedArray(KeyPrefix.Size); + from.AsSpan().Clear(); + var to = GC.AllocateUninitializedArray(KeyPrefix.Size); + to.AsSpan().Fill(byte.MaxValue); + return new SliceDescriptor + { + Index = index, + From = new Datom(from, registry), + To = new Datom(to, registry) + }; + } + + } + + /// + /// Creates a datom with no value from the given parts + /// + public static Datom Datom(EntityId e, AttributeId a, TxId id, bool isRetract, IAttributeRegistry registry) + { + var data = GC.AllocateUninitializedArray(KeyPrefix.Size); + var prefix = new KeyPrefix().Set(e, a, id, isRetract); + MemoryMarshal.Write(data, prefix); + return new Datom(data, registry); + } + + /// + /// Creates a with a value from the given attribute and value + /// + public static Datom Datom(EntityId e, Attribute a, THighLevel value, TxId tx, bool isRetract, IAttributeRegistry registry) + { + using var pooled = new PooledMemoryBufferWriter(); + a.Write(e, registry.Id, value, tx, isRetract, pooled); + return new Datom(pooled.WrittenMemory.ToArray(), registry); + } + + /// + /// Creates a slice descriptor for the given entity range, for the current EAVT index + /// + public static SliceDescriptor Create(EntityId from, EntityId to, IAttributeRegistry dbRegistry) + { + return new SliceDescriptor + { + Index = IndexType.EAVTCurrent, + From = Datom(from, AttributeId.Min, TxId.MinValue, false, dbRegistry), + To = Datom(to, AttributeId.Max, TxId.MaxValue, false, dbRegistry) + }; + } + + /// + /// Creates a datom with no value from the given parts + /// + public static Datom Datom(EntityId e, AttributeId a, EntityId value, TxId id, bool isRetract, IAttributeRegistry registry) + { + var data = new Memory(GC.AllocateUninitializedArray(KeyPrefix.Size + 1 + sizeof(ulong))); + var span = data.Span; + var prefix = new KeyPrefix().Set(e, a, id, isRetract); + MemoryMarshal.Write(span, prefix); + span[KeyPrefix.Size] = (byte)ValueTags.Reference; + MemoryMarshal.Write(span.SliceFast(KeyPrefix.Size + 1), value); + return new Datom(data, registry); + } + +} diff --git a/src/NexusMods.MnemonicDB.SourceGenerator/Template.weave b/src/NexusMods.MnemonicDB.SourceGenerator/Template.weave index 95ebbebb..b2de915e 100644 --- a/src/NexusMods.MnemonicDB.SourceGenerator/Template.weave +++ b/src/NexusMods.MnemonicDB.SourceGenerator/Template.weave @@ -132,10 +132,14 @@ public partial class {{= model.Name}} { tx.Add(Id, {{= attr.FieldName}}, new __COMPARERS__.Null(), false); } {{else}} - {{if attr.IsOptional}} + {{if attr.IsOptional && !attr.IsReference}} if ({{= attr.ContextualName}} is not null) { tx.Add(Id, {{= attr.FieldName}}, {{= attr.ContextualName}}!, false); } + {{elif attr.IsOptional && attr.IsReference}} + if ({{= attr.ContextualName}}.Value != 0) { + tx.Add(Id, {{= attr.FieldName}}, {{= attr.ContextualName}}.Value, false); + } {{else}} tx.Add(Id, {{= attr.FieldName}}, {{= attr.ContextualName}}, false); @@ -274,6 +278,9 @@ public partial class {{= model.Name}} { public bool IsValid() { + // This is true when the struct is a default value. + if (Db == null) return false; + {{each attr in model.Attributes}} {{if !attr.IsMarker && !attr.IsOptional}} if (!Contains({{= attr.FieldName}})) return false; @@ -354,14 +361,12 @@ public readonly partial struct {{= model.Name}}Id : IEquatable<{{= model.Name}}I public bool Equals({{= model.Name}}Id other) { - if (other == null) return false; return Value.Value == other.Value.Value; } public bool Equals(__ABSTRACTIONS__.EntityId other) { - if (other == null) return false; return Value.Value == other.Value; } @@ -373,6 +378,16 @@ public readonly partial struct {{= model.Name}}Id : IEquatable<{{= model.Name}}I public static bool operator ==({{= model.Name}}Id left, {{= model.Name}}Id right) => left.Equals(right); public static bool operator !=({{= model.Name}}Id left, {{= model.Name}}Id right) => !left.Equals(right); + + public override bool Equals(object? obj) + { + return obj is {{= model.Name}}Id id && Equals(id); + } + + public override int GetHashCode() + { + return Value.GetHashCode(); + } } @@ -385,7 +400,7 @@ public static class {{= model.Name}}Extensions { } {{each include in model.Includes}} - public static bool TryGetAs{{= model.Name}}(this {{= include.ToDisplayString()}}.ReadOnly model, [NotNullWhen(true)] out {{= model.Name}}.ReadOnly? result) { + public static bool TryGetAs{{= model.Name}}(this {{= include.ToDisplayString()}}.ReadOnly model, [NotNullWhen(true)] out {{= model.Name}}.ReadOnly result) { var casted = new {{= model.Name}}.ReadOnly(model.Db, model.Id); if (casted.IsValid()) { result = casted; diff --git a/src/NexusMods.MnemonicDB.Storage/DatomStore.cs b/src/NexusMods.MnemonicDB.Storage/DatomStore.cs index cc5948b7..56369d66 100644 --- a/src/NexusMods.MnemonicDB.Storage/DatomStore.cs +++ b/src/NexusMods.MnemonicDB.Storage/DatomStore.cs @@ -10,9 +10,11 @@ using Microsoft.Extensions.Hosting; using Microsoft.Extensions.Logging; using NexusMods.MnemonicDB.Abstractions; +using NexusMods.MnemonicDB.Abstractions.DatomIterators; using NexusMods.MnemonicDB.Abstractions.ElementComparers; using NexusMods.MnemonicDB.Abstractions.IndexSegments; using NexusMods.MnemonicDB.Abstractions.Internals; +using NexusMods.MnemonicDB.Abstractions.Query; using NexusMods.MnemonicDB.Abstractions.TxFunctions; using NexusMods.MnemonicDB.Storage.Abstractions; using NexusMods.MnemonicDB.Storage.DatomStorageStructures; @@ -36,7 +38,7 @@ public class DatomStore : IDatomStore, IHostedService private readonly DatomStoreSettings _settings; private readonly Channel _txChannel; private readonly IIndex _txLog; - private readonly Subject<(TxId TxId, ISnapshot snapshot)> _updatesSubject; + private BehaviorSubject<(TxId TxId, ISnapshot snapshot)>? _updatesSubject; private readonly IIndex _vaetCurrent; private readonly IIndex _vaetHistory; private readonly PooledMemoryBufferWriter _writer; @@ -101,9 +103,6 @@ public DatomStore(ILogger logger, AttributeRegistry registry, DatomS _avetCurrent = _backend.GetIndex(IndexType.AVETCurrent); _avetHistory = _backend.GetIndex(IndexType.AVETHistory); - - _updatesSubject = new Subject<(TxId TxId, ISnapshot Snapshot)>(); - registry.Populate(BuiltInAttributes.Initial); _txChannel = Channel.CreateUnbounded(); @@ -145,7 +144,16 @@ public async Task Sync() return await Transact(new IndexSegment()); } - public IObservable<(TxId TxId, ISnapshot Snapshot)> TxLog => _updatesSubject; + /// + public IObservable<(TxId TxId, ISnapshot Snapshot)> TxLog + { + get + { + if (_updatesSubject == null) + throw new InvalidOperationException("The store is not yet started"); + return _updatesSubject; + } + } /// public async Task RegisterAttributes(IEnumerable newAttrs) @@ -174,7 +182,7 @@ public ISnapshot GetSnapshot() /// public void Dispose() { - _updatesSubject.Dispose(); + _updatesSubject?.Dispose(); _writer.Dispose(); _retractWriter.Dispose(); } @@ -203,7 +211,7 @@ private async Task ConsumeTransactions() Log(pendingTransaction, out var result); - _updatesSubject.OnNext((result.AssignedTxId, result.Snapshot)); + _updatesSubject?.OnNext((result.AssignedTxId, result.Snapshot)); pendingTransaction.CompletionSource.TrySetResult(result); } catch (Exception ex) @@ -227,7 +235,7 @@ private async Task Bootstrap() try { var snapshot = _backend.GetSnapshot(); - var lastTx = TxId.From(_nextIdCache.LastEntityInPartition(snapshot, PartitionId.Transactions).Value); + var lastTx = TxId.From(_nextIdCache.LastEntityInPartition(snapshot, PartitionId.Transactions, _registry).Value); if (lastTx.Value == TxId.MinValue) { @@ -255,6 +263,8 @@ private async Task Bootstrap() _logger.LogError(ex, "Failed to bootstrap the datom store"); throw; } + + _updatesSubject = new BehaviorSubject<(TxId TxId, ISnapshot snapshot)>((_asOfTx, _currentSnapshot)); _txTask = Task.Run(ConsumeTransactions); } @@ -275,7 +285,7 @@ private EntityId MaybeRemap(ISnapshot snapshot, EntityId id, Dictionary> 40 & 0xFF)); - var assignedId = _nextIdCache.NextId(snapshot, partitionId); + var assignedId = _nextIdCache.NextId(snapshot, partitionId, _registry); remaps.Add(id, assignedId); return assignedId; } @@ -291,7 +301,7 @@ private EntityId MaybeRemap(ISnapshot snapshot, EntityId id, Dictionary(); var remapFn = (Func)(id => MaybeRemap(currentSnapshot, id, remaps, thisTx)); @@ -423,7 +433,18 @@ private void ProcessRetract(IWriteBatch batch, IAttribute attribute, ReadOnlySpa prevKey.Set(e, a, TxId.MinValue, false); MemoryMarshal.Write(_prevWriter.GetWrittenSpanWritable(), prevKey); - var prevDatom = iterator.Datoms(IndexType.EAVTCurrent, _prevWriter.GetWrittenSpan()) + var low = new Datom(_prevWriter.GetWrittenSpan().ToArray(), Registry); + prevKey.Set(e, a, TxId.MaxValue, false); + MemoryMarshal.Write(_prevWriter.GetWrittenSpanWritable(), prevKey); + var high = new Datom(_prevWriter.GetWrittenSpan().ToArray(), Registry); + var sliceDescriptor = new SliceDescriptor + { + Index = IndexType.EAVTCurrent, + From = low, + To = high + }; + + var prevDatom = iterator.Datoms(sliceDescriptor) .Select(d => d.Clone()) .FirstOrDefault(); @@ -501,8 +522,8 @@ private unsafe PrevState GetPreviousState(bool isRemapped, IAttribute attribute, if (attribute.Cardinalty == Cardinality.Many) { - var found = snapshot.Datoms(IndexType.EAVTCurrent, span) - .Select(d => d.Clone()) + var sliceDescriptor = SliceDescriptor.Exact(IndexType.EAVTCurrent, span, _registry); + var found = snapshot.Datoms(sliceDescriptor) .FirstOrDefault(); if (!found.Valid) return PrevState.NotExists; if (found.E != keyPrefix.E || found.A != keyPrefix.A) @@ -519,11 +540,10 @@ private unsafe PrevState GetPreviousState(bool isRemapped, IAttribute attribute, } else { - KeyPrefix start = default; - start.Set(keyPrefix.E, keyPrefix.A, TxId.MinValue, false); - var datom = snapshot.Datoms(IndexType.EAVTCurrent, start) - .Select(d => d.Clone()) + var descriptor = SliceDescriptor.Create(keyPrefix.E, keyPrefix.A, _registry); + + var datom = snapshot.Datoms(descriptor) .FirstOrDefault(); if (!datom.Valid) return PrevState.NotExists; diff --git a/src/NexusMods.MnemonicDB.Storage/InMemoryBackend/Snapshot.cs b/src/NexusMods.MnemonicDB.Storage/InMemoryBackend/Snapshot.cs index 83737621..65ff8fd2 100644 --- a/src/NexusMods.MnemonicDB.Storage/InMemoryBackend/Snapshot.cs +++ b/src/NexusMods.MnemonicDB.Storage/InMemoryBackend/Snapshot.cs @@ -1,10 +1,10 @@ -using System; -using System.Collections.Generic; +using System.Collections.Generic; using System.Collections.Immutable; using System.Linq; +using Microsoft.Win32; using NexusMods.MnemonicDB.Abstractions; -using NexusMods.MnemonicDB.Abstractions.DatomComparators; -using NexusMods.MnemonicDB.Abstractions.DatomIterators; +using NexusMods.MnemonicDB.Abstractions.IndexSegments; +using NexusMods.MnemonicDB.Abstractions.Query; namespace NexusMods.MnemonicDB.Storage.InMemoryBackend; @@ -21,10 +21,15 @@ public Snapshot(ImmutableSortedSet[] indexes, AttributeRegistry registry public void Dispose() { } - public IEnumerable Datoms(IndexType type, ReadOnlySpan a, ReadOnlySpan b) + /// + public IndexSegment Datoms(SliceDescriptor descriptor) { - var idxLower = _indexes[(int)type].IndexOf(a.ToArray()); - var idxUpper = _indexes[(int)type].IndexOf(b.ToArray()); + var thisIndex = _indexes[(int)descriptor.Index]; + if (thisIndex.Count == 0) + return new IndexSegment(); + + var idxLower = thisIndex.IndexOf(descriptor.From.RawSpan.ToArray()); + var idxUpper = thisIndex.IndexOf(descriptor.To.RawSpan.ToArray()); if (idxLower < 0) idxLower = ~idxLower; @@ -43,25 +48,77 @@ public IEnumerable Datoms(IndexType type, ReadOnlySpan a, ReadOnlyS reverse = true; } - return DatomsInner(type, reverse, lower, upper); + using var segmentBuilder = new IndexSegmentBuilder(_registry); + if (!reverse) + { + for (var i = lower; i <= upper; i++) + { + if (i >= thisIndex.Count) + break; + segmentBuilder.Add(thisIndex.ElementAt(i)); + } + } + else + { + for (var i = upper; i >= lower; i--) + { + segmentBuilder.Add(thisIndex.ElementAt(i)); + } + } + return segmentBuilder.Build(); } - private IEnumerable DatomsInner(IndexType type, bool reverse, int lower, int upper) + /// + public IEnumerable DatomsChunked(SliceDescriptor descriptor, int chunkSize) { + var idxLower = _indexes[(int)descriptor.Index].IndexOf(descriptor.From.RawSpan.ToArray()); + var idxUpper = _indexes[(int)descriptor.Index].IndexOf(descriptor.To.RawSpan.ToArray()); + + if (idxLower < 0) + idxLower = ~idxLower; + + if (idxUpper < 0) + idxUpper = ~idxUpper; + + var lower = idxLower; + var upper = idxUpper; + var reverse = false; + + if (idxLower > idxUpper) + { + lower = idxUpper; + upper = idxLower; + reverse = true; + } + + using var segmentBuilder = new IndexSegmentBuilder(_registry); + var index = _indexes[(int)descriptor.Index]; + if (!reverse) { for (var i = lower; i < upper; i++) { - yield return new Datom(_indexes[(int)type].ElementAt(i), _registry); + segmentBuilder.Add(index.ElementAt(i)); + if (segmentBuilder.Count == chunkSize) + { + yield return segmentBuilder.Build(); + segmentBuilder.Reset(); + } } } else { for (var i = upper; i > lower; i--) { - yield return new Datom(_indexes[(int)type].ElementAt(i), _registry); + segmentBuilder.Add(index.ElementAt(i)); + if (segmentBuilder.Count == chunkSize) + { + yield return segmentBuilder.Build(); + segmentBuilder.Reset(); + } } } + yield return segmentBuilder.Build(); } } diff --git a/src/NexusMods.MnemonicDB.Storage/NextIdCache.cs b/src/NexusMods.MnemonicDB.Storage/NextIdCache.cs index 6450b976..0dd42ca1 100644 --- a/src/NexusMods.MnemonicDB.Storage/NextIdCache.cs +++ b/src/NexusMods.MnemonicDB.Storage/NextIdCache.cs @@ -2,6 +2,7 @@ using System.Runtime.CompilerServices; using NexusMods.MnemonicDB.Abstractions; using NexusMods.MnemonicDB.Abstractions.Internals; +using NexusMods.MnemonicDB.Abstractions.Query; namespace NexusMods.MnemonicDB.Storage; @@ -21,12 +22,12 @@ public struct NextIdCache /// /// Gets the next id for the given partition /// - public EntityId NextId(ISnapshot snapshot, PartitionId partitionId) + public EntityId NextId(ISnapshot snapshot, PartitionId partitionId, IAttributeRegistry registry) { var partition = partitionId.Value; if (this[partition] == 0) { - var lastEnt = LastEntityInPartition(snapshot, partitionId); + var lastEnt = LastEntityInPartition(snapshot, partitionId, registry); this[partition] = lastEnt.Value; } @@ -37,7 +38,7 @@ public EntityId NextId(ISnapshot snapshot, PartitionId partitionId) /// /// Gets the last recorded entity in the partition in the snapshot /// - public EntityId LastEntityInPartition(ISnapshot snapshot, PartitionId partitionId) + public EntityId LastEntityInPartition(ISnapshot snapshot, PartitionId partitionId, IAttributeRegistry registry) { var partition = partitionId.Value; if (this[partition] != 0) @@ -45,10 +46,10 @@ public EntityId LastEntityInPartition(ISnapshot snapshot, PartitionId partitionI return partitionId.MakeEntityId(this[partition]); } - var startPrefix = new KeyPrefix().Set(partitionId.MakeEntityId(ulong.MaxValue), AttributeId.Min, TxId.MinValue, false); - var endPrefix = new KeyPrefix().Set(partitionId.MakeEntityId(0), AttributeId.Max, TxId.MaxValue, false); + var descriptor = SliceDescriptor.Create(partitionId.MakeEntityId(ulong.MaxValue), partitionId.MakeEntityId(0), registry); - var lastEnt = snapshot.Datoms(IndexType.EAVTCurrent, startPrefix, endPrefix) + var lastEnt = snapshot.DatomsChunked(descriptor, 1) + .SelectMany(c => c) .Select(d => d.E) .FirstOrDefault(partitionId.MakeEntityId(0)); diff --git a/src/NexusMods.MnemonicDB.Storage/RocksDbBackend/Backend.cs b/src/NexusMods.MnemonicDB.Storage/RocksDbBackend/Backend.cs index c19c39a1..5f91a6d9 100644 --- a/src/NexusMods.MnemonicDB.Storage/RocksDbBackend/Backend.cs +++ b/src/NexusMods.MnemonicDB.Storage/RocksDbBackend/Backend.cs @@ -1,10 +1,6 @@ -using System; -using System.Collections.Generic; +using System.Collections.Generic; using NexusMods.MnemonicDB.Abstractions; using NexusMods.MnemonicDB.Abstractions.DatomComparators; -using NexusMods.MnemonicDB.Abstractions.DatomIterators; -using NexusMods.MnemonicDB.Abstractions.ElementComparers; -using NexusMods.MnemonicDB.Abstractions.Internals; using NexusMods.MnemonicDB.Storage.Abstractions; using NexusMods.Paths; using RocksDbSharp; @@ -16,19 +12,19 @@ public class Backend(AttributeRegistry registry) : IStoreBackend { private readonly ColumnFamilies _columnFamilies = new(); private readonly Dictionary _indexes = new(); - private readonly Dictionary _stores = new(); - private RocksDb? _db = null!; + internal readonly Dictionary Stores = new(); + internal RocksDb? Db = null!; public IWriteBatch CreateBatch() { - return new Batch(_db!); + return new Batch(Db!); } public void DeclareIndex(IndexType name) where TComparator : IDatomComparator { var indexStore = new IndexStore(name.ToString(), name); - _stores.Add(name, indexStore); + Stores.Add(name, indexStore); var index = new Index(indexStore); _indexes.Add(name, index); @@ -51,79 +47,19 @@ public void Init(AbsolutePath location) .SetCreateMissingColumnFamilies() .SetCompression(Compression.Lz4); - foreach (var (name, store) in _stores) + foreach (var (name, store) in Stores) { var index = _indexes[name]; store.SetupColumnFamily((IIndex)index, _columnFamilies); } - _db = RocksDb.Open(options, location.ToString(), _columnFamilies); + Db = RocksDb.Open(options, location.ToString(), _columnFamilies); - foreach (var (name, store) in _stores) store.PostOpenSetup(_db); + foreach (var (name, store) in Stores) store.PostOpenSetup(Db); } public void Dispose() { - _db?.Dispose(); - } - - private class Snapshot(Backend backend, AttributeRegistry registry) : ISnapshot - { - private readonly RocksDbSharp.Snapshot _snapshot = backend._db!.CreateSnapshot(); - - public IEnumerable Datoms(IndexType type, ReadOnlySpan a, ReadOnlySpan b) - { - var comparator = type.GetComparator(); - var reverse = false; - - var lower = a; - var upper = b; - if (comparator.CompareInstance(a, b) > 0) - { - reverse = true; - lower = b; - upper = a; - } - - var options = new ReadOptions() - .SetSnapshot(_snapshot) - .SetIterateLowerBound(lower.ToArray()) - .SetIterateUpperBound(upper.ToArray()); - - return DatomsInner(type, options, reverse); - } - - private IEnumerable DatomsInner(IndexType type, ReadOptions options, bool reverse) - { - using var iterator = backend._db!.NewIterator(backend._stores[type].Handle, options); - if (reverse) - iterator.SeekToLast(); - else - iterator.SeekToFirst(); - - using var writer = new PooledMemoryBufferWriter(128); - - while (iterator.Valid()) - { - writer.Reset(); - writer.Write(iterator.GetKeySpan()); - - if (writer.Length >= KeyPrefix.Size + 1) - { - var tag = (ValueTags)writer.GetWrittenSpan()[KeyPrefix.Size]; - if (tag == ValueTags.HashedBlob) - { - writer.Write(iterator.GetValueSpan()); - } - } - - yield return new Datom(writer.WrittenMemory, registry); - - if (reverse) - iterator.Prev(); - else - iterator.Next(); - } - } + Db?.Dispose(); } } diff --git a/src/NexusMods.MnemonicDB.Storage/RocksDbBackend/Snapshot.cs b/src/NexusMods.MnemonicDB.Storage/RocksDbBackend/Snapshot.cs new file mode 100644 index 00000000..fc74e8ae --- /dev/null +++ b/src/NexusMods.MnemonicDB.Storage/RocksDbBackend/Snapshot.cs @@ -0,0 +1,110 @@ +using System.Collections.Generic; +using NexusMods.MnemonicDB.Abstractions; +using NexusMods.MnemonicDB.Abstractions.ElementComparers; +using NexusMods.MnemonicDB.Abstractions.IndexSegments; +using NexusMods.MnemonicDB.Abstractions.Internals; +using NexusMods.MnemonicDB.Abstractions.Query; +using RocksDbSharp; + +namespace NexusMods.MnemonicDB.Storage.RocksDbBackend; + +internal class Snapshot(Backend backend, AttributeRegistry registry) : ISnapshot +{ + private readonly RocksDbSharp.Snapshot _snapshot = backend.Db!.CreateSnapshot(); + + public IndexSegment Datoms(SliceDescriptor descriptor) + { + var reverse = descriptor.IsReverse; + var from = reverse ? descriptor.To : descriptor.From; + var to = reverse ? descriptor.From : descriptor.To; + + var options = new ReadOptions() + .SetSnapshot(_snapshot) + .SetIterateLowerBound(from.RawSpan.ToArray()) + .SetIterateUpperBound(to.RawSpan.ToArray()); + + using var builder = new IndexSegmentBuilder(registry); + + using var iterator = backend.Db!.NewIterator(backend.Stores[descriptor.Index].Handle, options); + if (reverse) + iterator.SeekToLast(); + else + iterator.SeekToFirst(); + + using var writer = new PooledMemoryBufferWriter(128); + + while (iterator.Valid()) + { + writer.Reset(); + writer.Write(iterator.GetKeySpan()); + + if (writer.Length >= KeyPrefix.Size + 1) + { + var tag = (ValueTags)writer.GetWrittenSpan()[KeyPrefix.Size]; + if (tag == ValueTags.HashedBlob) + { + writer.Write(iterator.GetValueSpan()); + } + } + + builder.Add(writer.WrittenMemory.Span); + + if (reverse) + iterator.Prev(); + else + iterator.Next(); + } + return builder.Build(); + } + + public IEnumerable DatomsChunked(SliceDescriptor descriptor, int chunkSize) + { + var reverse = descriptor.IsReverse; + var from = reverse ? descriptor.To : descriptor.From; + var to = reverse ? descriptor.From : descriptor.To; + + var options = new ReadOptions() + .SetSnapshot(_snapshot) + .SetIterateLowerBound(from.RawSpan.ToArray()) + .SetIterateUpperBound(to.RawSpan.ToArray()); + + using var builder = new IndexSegmentBuilder(registry); + + using var iterator = backend.Db!.NewIterator(backend.Stores[descriptor.Index].Handle, options); + if (reverse) + iterator.SeekToLast(); + else + iterator.SeekToFirst(); + + using var writer = new PooledMemoryBufferWriter(128); + + while (iterator.Valid()) + { + writer.Reset(); + writer.Write(iterator.GetKeySpan()); + + if (writer.Length >= KeyPrefix.Size + 1) + { + var tag = (ValueTags)writer.GetWrittenSpan()[KeyPrefix.Size]; + if (tag == ValueTags.HashedBlob) + { + writer.Write(iterator.GetValueSpan()); + } + } + + builder.Add(writer.WrittenMemory.Span); + + if (builder.Count == chunkSize) + { + yield return builder.Build(); + builder.Reset(); + } + + if (reverse) + iterator.Prev(); + else + iterator.Next(); + } + yield return builder.Build(); + } +} diff --git a/src/NexusMods.MnemonicDB/Analytics.cs b/src/NexusMods.MnemonicDB/Analytics.cs new file mode 100644 index 00000000..5f0984ac --- /dev/null +++ b/src/NexusMods.MnemonicDB/Analytics.cs @@ -0,0 +1,43 @@ +using System; +using System.Collections.Frozen; +using System.Collections.Generic; +using NexusMods.MnemonicDB.Abstractions; +using NexusMods.MnemonicDB.Abstractions.Attributes; + +namespace NexusMods.MnemonicDB; + +internal class Analytics : IAnalytics +{ + private readonly IDb _db; + private readonly Lazy> _latestTxIds; + + internal Analytics(IDb db) + { + _db = db; + _latestTxIds = new Lazy>(CalculateLatestTxIds); + } + + private FrozenSet CalculateLatestTxIds() + { + var tx = _db.BasisTxId; + var latestDatoms = _db.Datoms(tx); + + var ids = new HashSet(); + foreach (var datom in latestDatoms) + { + ids.Add(datom.E); + if (datom is ReferenceAttribute.ReadDatom referenceDatom) + { + ids.Add(referenceDatom.V); + } + else if (datom is ReferencesAttribute.ReadDatom referencesDatom) + { + ids.Add(referencesDatom.V); + } + } + return ids.ToFrozenSet(); + } + + + public FrozenSet LatestTxIds => _latestTxIds.Value; +} diff --git a/src/NexusMods.MnemonicDB/AsOfSnapshot.cs b/src/NexusMods.MnemonicDB/AsOfSnapshot.cs index ebdf505b..187bc6c1 100644 --- a/src/NexusMods.MnemonicDB/AsOfSnapshot.cs +++ b/src/NexusMods.MnemonicDB/AsOfSnapshot.cs @@ -4,7 +4,9 @@ using System.Runtime.InteropServices; using NexusMods.MnemonicDB.Abstractions; using NexusMods.MnemonicDB.Abstractions.DatomIterators; +using NexusMods.MnemonicDB.Abstractions.IndexSegments; using NexusMods.MnemonicDB.Abstractions.Internals; +using NexusMods.MnemonicDB.Abstractions.Query; using NexusMods.MnemonicDB.Storage; using Reloaded.Memory.Extensions; @@ -17,20 +19,56 @@ namespace NexusMods.MnemonicDB; /// internal class AsOfSnapshot(ISnapshot inner, TxId asOfTxId, AttributeRegistry registry) : ISnapshot { - /// - public IEnumerable Datoms(IndexType type, ReadOnlySpan a, ReadOnlySpan b) + public IndexSegment Datoms(SliceDescriptor descriptor) { - var current = inner.Datoms(type.CurrentVariant(), a, b); - var history = inner.Datoms(type.HistoryVariant(), a, b); - var comparatorFn = type.GetComparator(); + // TODO: stop using IEnumerable and use IndexSegment directly + var current = inner.Datoms(descriptor with {Index = descriptor.Index.CurrentVariant()}); + var history = inner.Datoms(descriptor with {Index = descriptor.Index.HistoryVariant()}); + var comparatorFn = descriptor.Index.GetComparator(); + + using var builder = new IndexSegmentBuilder(registry); + var merged = current.Merge(history, (dCurrent, dHistory) => comparatorFn.CompareInstance(dCurrent.RawSpan, dHistory.RawSpan)); var filtered = merged.Where(d => d.T <= asOfTxId); var withoutRetracts = ApplyRetracts(filtered); - return withoutRetracts; + + foreach (var datom in withoutRetracts) + { + builder.Add(datom.RawSpan); + } + + return builder.Build(); } + public IEnumerable DatomsChunked(SliceDescriptor descriptor, int chunkSize) + { + // TODO: stop using IEnumerable and use IndexSegment directly + var current = inner.DatomsChunked(descriptor with {Index = descriptor.Index.CurrentVariant()}, chunkSize).SelectMany(c => c); + var history = inner.DatomsChunked(descriptor with {Index = descriptor.Index.HistoryVariant()}, chunkSize).SelectMany(c => c); + var comparatorFn = descriptor.Index.GetComparator(); + + using var builder = new IndexSegmentBuilder(registry); + + var merged = current.Merge(history, + (dCurrent, dHistory) => comparatorFn.CompareInstance(dCurrent.RawSpan, dHistory.RawSpan)); + var filtered = merged.Where(d => d.T <= asOfTxId); + + var withoutRetracts = ApplyRetracts(filtered); + + foreach (var datom in withoutRetracts) + { + builder.Add(datom.RawSpan); + if (builder.Count % chunkSize == 0) + { + yield return builder.Build(); + builder.Reset(); + } + } + + yield return builder.Build(); + } /// diff --git a/src/NexusMods.MnemonicDB/Connection.cs b/src/NexusMods.MnemonicDB/Connection.cs index d8d6b2ef..cde61349 100644 --- a/src/NexusMods.MnemonicDB/Connection.cs +++ b/src/NexusMods.MnemonicDB/Connection.cs @@ -2,6 +2,7 @@ using System.Collections.Generic; using System.Linq; using System.Reactive.Linq; +using System.Reactive.Subjects; using System.Threading; using System.Threading.Tasks; using Microsoft.Extensions.Hosting; @@ -10,6 +11,7 @@ using NexusMods.MnemonicDB.Abstractions.ElementComparers; using NexusMods.MnemonicDB.Abstractions.IndexSegments; using NexusMods.MnemonicDB.Abstractions.Internals; +using NexusMods.MnemonicDB.Abstractions.Query; using NexusMods.MnemonicDB.Abstractions.TxFunctions; using NexusMods.MnemonicDB.Storage; @@ -21,11 +23,13 @@ namespace NexusMods.MnemonicDB; public class Connection : IConnection, IHostedService { private readonly IDatomStore _store; - private IDb? _db; private readonly IEnumerable _declaredAttributes; private readonly ILogger _logger; private Task? _bootstrapTask; + private BehaviorSubject _dbStream; + private IDisposable? _dbStreamDisposable; + /// /// Main connection class, co-ordinates writes and immutable reads /// @@ -35,7 +39,7 @@ public Connection(ILogger logger, IDatomStore store, IServiceProvide _logger = logger; _declaredAttributes = declaredAttributes; _store = store; - + _dbStream = new BehaviorSubject(default!); } /// @@ -46,12 +50,17 @@ public IDb Db { get { - if (_db == null) + var val = _dbStream; + // ReSharper disable once ConditionIsAlwaysTrueOrFalseAccordingToNullableAPIContract + if (val == null) ThrowNullDb(); - return _db!; + return val!.Value.Database; } } + /// + public IAttributeRegistry Registry => _store.Registry; + private static void ThrowNullDb() { throw new InvalidOperationException("Connection not started, did you forget to start the hosted service?"); @@ -75,9 +84,15 @@ public ITransaction BeginTransaction() } /// - public IObservable Revisions => _store.TxLog - .Select(log => new Db(log.Snapshot, this, log.TxId, (AttributeRegistry)_store.Registry)); - + public IObservable Revisions + { + get + { + if (_dbStream == default!) + ThrowNullDb(); + return _dbStream!; + } + } private async Task AddMissingAttributes(IEnumerable declaredAttributes) { @@ -112,8 +127,10 @@ private async Task AddMissingAttributes(IEnumerable dec private IEnumerable ExistingAttributes() { var snapshot = _store.GetSnapshot(); - var start = BuiltInAttributes.UniqueIdEntityId; - var attrIds = snapshot.Datoms(IndexType.AEVTCurrent, start, AttributeId.From((ushort)(start.Value + 1))) + var sliceDescriptor = + SliceDescriptor.Create(BuiltInAttributes.UniqueId, _store.Registry); + + var attrIds = snapshot.Datoms(sliceDescriptor) .Select(d => d.E); foreach (var attrId in attrIds) @@ -121,10 +138,8 @@ private IEnumerable ExistingAttributes() var serializerId = ValueTags.Null; var uniqueId = Symbol.Unknown; - var from = new KeyPrefix().Set(attrId, AttributeId.Min, TxId.MinValue, false); - var to = new KeyPrefix().Set(attrId, AttributeId.Max, TxId.MaxValue, false); - - foreach (var rawDatom in snapshot.Datoms(IndexType.EAVTCurrent, from, to)) + var entityDescriptor = SliceDescriptor.Create(EntityId.From(attrId.Value), _store.Registry); + foreach (var rawDatom in snapshot.Datoms(entityDescriptor)) { var datom = rawDatom.Resolved; @@ -169,11 +184,19 @@ private async Task Bootstrap() try { var storeResult = await AddMissingAttributes(_declaredAttributes); - _db = new Db(storeResult.Snapshot, this, storeResult.AssignedTxId, (AttributeRegistry)_store.Registry); - _store.TxLog.Subscribe(log => - { - _db = new Db(log.Snapshot, this, log.TxId, (AttributeRegistry)_store.Registry); - }); + + _dbStreamDisposable = _store.TxLog + .Select(log => + { + var db = new Db(log.Snapshot, this, log.TxId, (AttributeRegistry)_store.Registry); + var addedItems = db.Datoms(SliceDescriptor.Create(db.BasisTxId, _store.Registry)); + return new Revision + { + Database = db, + AddedDatoms = addedItems + }; + }) + .Subscribe(_dbStream); } catch (Exception ex) { @@ -184,7 +207,7 @@ private async Task Bootstrap() /// public Task StopAsync(CancellationToken cancellationToken) { - // Nothing to do + _dbStreamDisposable?.Dispose(); return Task.CompletedTask; } } diff --git a/src/NexusMods.MnemonicDB/Db.cs b/src/NexusMods.MnemonicDB/Db.cs index c989e50e..fefc1fb0 100644 --- a/src/NexusMods.MnemonicDB/Db.cs +++ b/src/NexusMods.MnemonicDB/Db.cs @@ -3,6 +3,7 @@ using System.Diagnostics; using System.Linq; using System.Runtime.InteropServices; +using Microsoft.Extensions.DependencyInjection; using NexusMods.MnemonicDB.Abstractions; using NexusMods.MnemonicDB.Abstractions.Attributes; using NexusMods.MnemonicDB.Abstractions.DatomIterators; @@ -10,6 +11,7 @@ using NexusMods.MnemonicDB.Abstractions.IndexSegments; using NexusMods.MnemonicDB.Abstractions.Internals; using NexusMods.MnemonicDB.Abstractions.Models; +using NexusMods.MnemonicDB.Abstractions.Query; using NexusMods.MnemonicDB.Storage; using Reloaded.Memory.Extensions; @@ -24,6 +26,7 @@ internal class Db : IDb private readonly IndexSegmentCache<(EntityId, AttributeId)> _reverseCache; private readonly IndexSegmentCache _referencesCache; private readonly RegistryId _registryId; + private readonly Lazy _analytics; public ISnapshot Snapshot { get; } public IAttributeRegistry Registry => _registry; @@ -37,36 +40,25 @@ public Db(ISnapshot snapshot, Connection connection, TxId txId, AttributeRegistr _entityCache = new IndexSegmentCache(EntityDatoms, registry); _reverseCache = new IndexSegmentCache<(EntityId, AttributeId)>(ReverseDatoms, registry); _referencesCache = new IndexSegmentCache(ReferenceDatoms, registry); + _analytics = new Lazy(() => new Analytics(this)); Snapshot = snapshot; BasisTxId = txId; } private static IEnumerable EntityDatoms(IDb db, EntityId id) { - return db.Snapshot.Datoms(IndexType.EAVTCurrent, id, EntityId.From(id.Value + 1)); + return db.Snapshot.Datoms(SliceDescriptor.Create(id, db.Registry)); } private static IEnumerable ReverseDatoms(IDb db, (EntityId, AttributeId) key) { - var (id, attrId) = key; - - Span startKey = stackalloc byte[KeyPrefix.Size + sizeof(ulong) + 1]; - Span endKey = stackalloc byte[KeyPrefix.Size + sizeof(ulong) + 1]; - MemoryMarshal.Write(startKey, new KeyPrefix().Set(EntityId.MinValueNoPartition, attrId, TxId.MinValue, false)); - MemoryMarshal.Write(endKey, new KeyPrefix().Set(EntityId.MaxValueNoPartition, attrId, TxId.MaxValue, false)); - - startKey[KeyPrefix.Size] = (byte)ValueTags.Reference; - endKey[KeyPrefix.Size] = (byte)ValueTags.Reference; - - MemoryMarshal.Write(startKey.SliceFast(KeyPrefix.Size + 1), id); - MemoryMarshal.Write(endKey.SliceFast(KeyPrefix.Size + 1), id.Value); - - - return db.Snapshot.Datoms(IndexType.VAETCurrent, startKey, endKey); + return db.Snapshot.Datoms(SliceDescriptor.Create(key.Item2, key.Item1, db.Registry)); } public TxId BasisTxId { get; } + public IAnalytics Analytics => _analytics.Value; + public IConnection Connection => _connection; public IEnumerable Get(IEnumerable ids) @@ -89,10 +81,8 @@ public IndexSegment Get(EntityId entityId) public IEnumerable Find(IAttribute attribute) { var attrId = attribute.GetDbId(_registry.Id); - var a = new KeyPrefix().Set(EntityId.MinValueNoPartition, attrId, TxId.MinValue, false); - var b = new KeyPrefix().Set(EntityId.MaxValueNoPartition, attrId, TxId.MaxValue, false); return Snapshot - .Datoms(IndexType.AEVTCurrent, a, b) + .Datoms(SliceDescriptor.Create(attrId, _registry)) .Select(d => d.E); } @@ -104,17 +94,7 @@ public EntityIds GetBackRefs(ReferenceAttribute attribute, EntityId id) private static IEnumerable ReferenceDatoms(IDb db, EntityId eid) { - Span startKey = stackalloc byte[KeyPrefix.Size + sizeof(ulong) + 1]; - Span endKey = stackalloc byte[KeyPrefix.Size + sizeof(ulong) + 1]; - MemoryMarshal.Write(startKey, new KeyPrefix().Set(EntityId.MinValueNoPartition, AttributeId.Min, TxId.MinValue, false)); - MemoryMarshal.Write(endKey, new KeyPrefix().Set(EntityId.MaxValueNoPartition, AttributeId.Max, TxId.MaxValue, false)); - - startKey[KeyPrefix.Size] = (byte)ValueTags.Reference; - endKey[KeyPrefix.Size] = (byte)ValueTags.Reference; - - MemoryMarshal.Write(startKey.SliceFast(KeyPrefix.Size + 1), eid); - MemoryMarshal.Write(endKey.SliceFast(KeyPrefix.Size + 1), eid); - return db.Snapshot.Datoms(IndexType.VAETCurrent, startKey, endKey); + return db.Snapshot.Datoms(SliceDescriptor.CreateReferenceTo(eid, db.Registry)); } public IndexSegment ReferencesTo(EntityId id) @@ -150,14 +130,8 @@ public IEnumerable FindIndexedDatoms(Attribute(EntityId id) @@ -180,9 +154,14 @@ public IEnumerable Datoms(EntityId entityId) .Select(d => d.Resolved); } + public IndexSegment Datoms(SliceDescriptor sliceDescriptor) + { + return Snapshot.Datoms(sliceDescriptor); + } + public IEnumerable Datoms(TxId txId) { - return Snapshot.Datoms(IndexType.TxLog, txId, TxId.From(txId.Value + 1)) + return Snapshot.Datoms(SliceDescriptor.Create(txId, _registry)) .Select(d => d.Resolved); } diff --git a/tests/NexusMods.MnemonicDB.Storage.Tests/ABackendTest.cs b/tests/NexusMods.MnemonicDB.Storage.Tests/ABackendTest.cs index c2253360..d7ccb7f3 100644 --- a/tests/NexusMods.MnemonicDB.Storage.Tests/ABackendTest.cs +++ b/tests/NexusMods.MnemonicDB.Storage.Tests/ABackendTest.cs @@ -5,6 +5,7 @@ using NexusMods.MnemonicDB.Abstractions.DatomComparators; using NexusMods.MnemonicDB.Abstractions.DatomIterators; using NexusMods.MnemonicDB.Abstractions.IndexSegments; +using NexusMods.MnemonicDB.Abstractions.Query; using NexusMods.MnemonicDB.Storage.Tests.TestAttributes; using NexusMods.MnemonicDB.TestModel; using NexusMods.Paths; @@ -33,7 +34,7 @@ public async Task InsertedDatomsShowUpInTheIndex(IndexType type) { var tx = await GenerateData(); var datoms = tx.Snapshot - .Datoms(type) + .Datoms(SliceDescriptor.Create(type, Registry)) .Select(d => d.Resolved) .ToArray(); @@ -98,7 +99,7 @@ public async Task CanStoreDataInBlobs(IndexType type) } var datoms = DatomStore.GetSnapshot() - .Datoms(type) + .Datoms(SliceDescriptor.Create(type, Registry)) .Select(d => d.Resolved) .ToArray(); @@ -120,8 +121,8 @@ await Verify(datoms.ToTable(Registry)) public async Task HistoricalQueriesReturnAllDataSorted(IndexType type) { var tx = await GenerateData(); - var current = tx.Snapshot.Datoms(type.CurrentVariant()); - var history = tx.Snapshot.Datoms(type.HistoryVariant()); + var current = tx.Snapshot.Datoms(SliceDescriptor.Create(type.CurrentVariant(), Registry)); + var history = tx.Snapshot.Datoms(SliceDescriptor.Create(type.HistoryVariant(), Registry)); var comparer = type.GetComparator(); var merged = current .Merge(history, CompareDatoms(comparer)) @@ -244,7 +245,7 @@ public async Task RetractedValuesAreSupported(IndexType type) var datoms = tx2.Snapshot - .Datoms(type) + .Datoms(SliceDescriptor.Create(type, Registry)) .Select(d => d.Resolved) .ToArray(); await Verify(datoms.ToTable(Registry)) diff --git a/tests/NexusMods.MnemonicDB.TestModel/Mod.cs b/tests/NexusMods.MnemonicDB.TestModel/Mod.cs index ccf892ea..41db6523 100644 --- a/tests/NexusMods.MnemonicDB.TestModel/Mod.cs +++ b/tests/NexusMods.MnemonicDB.TestModel/Mod.cs @@ -13,4 +13,5 @@ public partial class Mod : IModelDefinition public static readonly BackReferenceAttribute Files = new(File.Mod); public static readonly MarkerAttribute Marked = new(Namespace, nameof(Marked)) { IsIndexed = true }; public static readonly StringAttribute Description = new(Namespace, nameof(Description)) { IsOptional = true }; + public static readonly ReferenceAttribute LoadAfter = new(Namespace, nameof(LoadAfter)) { IsOptional = true }; } diff --git a/tests/NexusMods.MnemonicDB.Tests/ComplexModelTests.cs b/tests/NexusMods.MnemonicDB.Tests/ComplexModelTests.cs index ffa87924..c9bacbb8 100644 --- a/tests/NexusMods.MnemonicDB.Tests/ComplexModelTests.cs +++ b/tests/NexusMods.MnemonicDB.Tests/ComplexModelTests.cs @@ -190,9 +190,9 @@ public async Task CanRestartStorage(int modCount, int filesPerMod, int extraFile totalSize += mod.Files.Sum(f => f.Size); if (mod.Id == firstMod.Id) - mod.Files.Count().Should().Be(filesPerMod + extraFiles, "first mod should have the extra files"); + mod.Files.Count.Should().Be(filesPerMod + extraFiles, "first mod should have the extra files"); else - mod.Files.Count().Should().Be(filesPerMod, "every mod should have the same amount of files"); + mod.Files.Count.Should().Be(filesPerMod, "every mod should have the same amount of files"); } using var tx2 = Connection.BeginTransaction(); diff --git a/tests/NexusMods.MnemonicDB.Tests/DbTests.CanObserveIndexChanges.verified.txt b/tests/NexusMods.MnemonicDB.Tests/DbTests.CanObserveIndexChanges.verified.txt new file mode 100644 index 00000000..9bc140b7 --- /dev/null +++ b/tests/NexusMods.MnemonicDB.Tests/DbTests.CanObserveIndexChanges.verified.txt @@ -0,0 +1,23 @@ +[ + [ + Mod1 - Updated, + Mod2 - Updated, + Mod3 - Updated + ], + [ + Test Mod 1, + Mod2 - Updated, + Mod3 - Updated + ], + [ + Test Mod 1, + Mod2 - Updated, + Mod3 - Updated, + Test Mod 2 + ], + [ + Mod2 - Updated, + Mod3 - Updated, + Test Mod 2 + ] +] \ No newline at end of file diff --git a/tests/NexusMods.MnemonicDB.Tests/DbTests.cs b/tests/NexusMods.MnemonicDB.Tests/DbTests.cs index 687bdc72..812670de 100644 --- a/tests/NexusMods.MnemonicDB.Tests/DbTests.cs +++ b/tests/NexusMods.MnemonicDB.Tests/DbTests.cs @@ -1,6 +1,9 @@ -using NexusMods.MnemonicDB.Abstractions; +using System.Reactive.Linq; +using DynamicData; +using NexusMods.MnemonicDB.Abstractions; using NexusMods.Hashing.xxHash64; using NexusMods.MnemonicDB.Abstractions.Models; +using NexusMods.MnemonicDB.Abstractions.Query; using NexusMods.MnemonicDB.Abstractions.TxFunctions; using NexusMods.MnemonicDB.TestModel; using NexusMods.Paths; @@ -171,369 +174,447 @@ public async Task ReadModelsCanHaveExtraAttributes() readModel.TryGetAsArchiveFile(out var castedDown).Should().BeTrue(); (castedDown is ArchiveFile.ReadOnly).Should().BeTrue(); + + var badCast = new File.ReadOnly(result.Db, EntityId.From(1)); + badCast.IsValid().Should().BeFalse("bad cast should not validate"); + badCast.TryGetAsArchiveFile(out var archiveFileBad).Should().BeFalse("bad cast should not be able to cast down"); + archiveFileBad.IsValid().Should().BeFalse("bad cast should not validate as archive file"); + castedDown.Should().BeEquivalentTo(archiveReadModel, "casted down model should be the same as the original model"); } - [Fact] - public async Task CanGetCommitUpdates() - { - List updates = new(); + [Fact] + public async Task CanGetCommitUpdates() + { + List updates = new(); - var tx = Connection.BeginTransaction(); - var file = new File.New(tx) - { - Path = "C:\\test.txt", - Hash = Hash.From(0xDEADBEEF), - Size = Size.From(1), - ModId = EntityId.From(1) - }; - var result = await tx.Commit(); + var tx = Connection.BeginTransaction(); + var file = new File.New(tx) + { + Path = "C:\\test.txt", + Hash = Hash.From(0xDEADBEEF), + Size = Size.From(1), + ModId = EntityId.From(1) + }; + var result = await tx.Commit(); - var realId = result[file.Id]; + var realId = result[file.Id]; - Connection.Revisions.Subscribe(update => - { - var datoms = update.Datoms(update.BasisTxId).ToArray(); - // Only Txes we care about - if (datoms.Any(d => d.E == realId)) - updates.Add(datoms); - }); + Connection.Revisions.Subscribe(update => + { + // Only Txes we care about + if (update.AddedDatoms.Any(d => d.E == realId)) + updates.Add(update.AddedDatoms.Select(d => d.Resolved).ToArray()); + }); - for (var idx = 0; idx < 4; idx++) - { - tx = Connection.BeginTransaction(); - tx.Add(realId, File.Hash, Hash.From(0xDEADBEEF + (ulong)idx + 0xEE)); - result = await tx.Commit(); + for (var idx = 0; idx < 4; idx++) + { + tx = Connection.BeginTransaction(); + tx.Add(realId, File.Hash, Hash.From(0xDEADBEEF + (ulong)idx + 0xEE)); + result = await tx.Commit(); - await Task.Delay(100); + await Task.Delay(100); - updates.Should().HaveCount(idx + 1); - var updateDatom = updates[idx]; + // +2 because we always get one update for the initial state and one for the new state + updates.Should().HaveCount(idx + 2); + var updateDatom = updates[idx + 1]; - await VerifyTable(updateDatom) - .UseTextForParameters("update_datom_" + idx); - } + await VerifyTable(updateDatom) + .UseTextForParameters("update_datom_" + idx); } + } + + [Fact] + public async Task CanGetChildEntities() + { + var tx = Connection.BeginTransaction(); - [Fact] - public async Task CanGetChildEntities() + var loadout = new Loadout.New(tx) { - var tx = Connection.BeginTransaction(); + Name = "Test Loadout" + }; - var loadout = new Loadout.New(tx) - { - Name = "Test Loadout" - }; + _ = new Mod.New(tx) + { + Name = "Test Mod 1", + Source = new Uri("http://mod1.com"), + LoadoutId = loadout + }; - _ = new Mod.New(tx) - { - Name = "Test Mod 1", - Source = new Uri("http://mod1.com"), - LoadoutId = loadout - }; + _ = new Mod.New(tx) + { + Name = "Test Mod 2", + Source = new Uri("http://mod2.com"), + LoadoutId = loadout + }; - _ = new Mod.New(tx) - { - Name = "Test Mod 2", - Source = new Uri("http://mod2.com"), - LoadoutId = loadout - }; + var result = await tx.Commit(); - var result = await tx.Commit(); + var newDb = Connection.Db; - var newDb = Connection.Db; + var loadoutWritten = loadout.Remap(result); - var loadoutWritten = loadout.Remap(result); + loadoutWritten.Mods.Count.Should().Be(2); + loadoutWritten.Mods.Select(m => m.Name).Should().BeEquivalentTo(["Test Mod 1", "Test Mod 2"]); - loadoutWritten.Mods.Count.Should().Be(2); - loadoutWritten.Mods.Select(m => m.Name).Should().BeEquivalentTo(["Test Mod 1", "Test Mod 2"]); + var firstMod = loadoutWritten.Mods.First(); + firstMod.Loadout.Id.InPartition(PartitionId.Entity).Should().BeTrue("LoadoutId should in the entity partition"); + firstMod.LoadoutId.Should().BeEquivalentTo(loadoutWritten.LoadoutId); + firstMod.Db.Should().Be(newDb); + loadout.Name.Should().Be("Test Loadout"); + firstMod.Loadout.Name.Should().Be("Test Loadout"); + } - var firstMod = loadoutWritten.Mods.First(); - firstMod.Loadout.Id.InPartition(PartitionId.Entity).Should().BeTrue("LoadoutId should in the entity partition"); - firstMod.LoadoutId.Should().BeEquivalentTo(loadoutWritten.LoadoutId); - firstMod.Db.Should().Be(newDb); - loadout.Name.Should().Be("Test Loadout"); - firstMod.Loadout.Name.Should().Be("Test Loadout"); - } + [Fact] + public async Task CanFindEntitiesByAttribute() + { + await InsertExampleData(); - [Fact] - public async Task CanFindEntitiesByAttribute() - { - await InsertExampleData(); + var db = Connection.Db; - var db = Connection.Db; + var ids = from mod in Mod.All(db) + from modOther in Mod.FindByName(db, mod.Name) + select (mod.Name, mod.Id.ToString(), modOther.Name); - var ids = from mod in Mod.All(db) - from modOther in Mod.FindByName(db, mod.Name) - select (mod.Name, mod.Id.ToString(), modOther.Name); + await Verify(ids); + } - await Verify(ids); - } + [Fact] + public async Task CanGetDatomsFromEntity() + { + var loadout = await InsertExampleData(); + var mod = loadout.Mods.First(); - [Fact] - public async Task CanGetDatomsFromEntity() - { - var loadout = await InsertExampleData(); - var mod = loadout.Mods.First(); + mod.Contains(Mod.Name).Should().BeTrue(); + mod.Contains(Mod.Source).Should().BeTrue(); + mod.Contains(Loadout.Name).Should().BeFalse(); - mod.Contains(Mod.Name).Should().BeTrue(); - mod.Contains(Mod.Source).Should().BeTrue(); - mod.Contains(Loadout.Name).Should().BeFalse(); + mod.ToString().Should().Be("Mod"); - mod.ToString().Should().Be("Mod"); + await VerifyTable(mod); + } - await VerifyTable(mod); - } + [Fact] + public async Task CanPutEntitiesInDifferentPartitions() + { - [Fact] - public async Task CanPutEntitiesInDifferentPartitions() + using var tx = Connection.BeginTransaction(); + var file1 = new File.New(tx) { + Path = "C:\\test1.txt", + Hash = Hash.From(0xDEADBEEF), + Size = Size.From(1), + ModId = EntityId.From(1) + }; - using var tx = Connection.BeginTransaction(); - var file1 = new File.New(tx) - { - Path = "C:\\test1.txt", - Hash = Hash.From(0xDEADBEEF), - Size = Size.From(1), - ModId = EntityId.From(1) - }; - - var file2 = new File.New(tx, PartitionId.From(10)) - { - Path = "C:\\test2.txt", - Hash = Hash.From(0xDEADBEEF), - Size = Size.From(1), - ModId = EntityId.From(1) - }; + var file2 = new File.New(tx, PartitionId.From(10)) + { + Path = "C:\\test2.txt", + Hash = Hash.From(0xDEADBEEF), + Size = Size.From(1), + ModId = EntityId.From(1) + }; - var file3 = new File.New(tx, PartitionId.From(200)) - { - Path = "C:\\test3.txt", - Hash = Hash.From(0xDEADBEEF), - Size = Size.From(1), - ModId = EntityId.From(1) - }; + var file3 = new File.New(tx, PartitionId.From(200)) + { + Path = "C:\\test3.txt", + Hash = Hash.From(0xDEADBEEF), + Size = Size.From(1), + ModId = EntityId.From(1) + }; - // TempIds store the desired partition in the third highest byte - (file1.Id.Value >> 40 & 0xFF).Should().Be(PartitionId.Entity.Value); - (file2.Id.Value >> 40 & 0xFF).Should().Be(10); - (file3.Id.Value >> 40 & 0xFF).Should().Be(200); + // TempIds store the desired partition in the third highest byte + (file1.Id.Value >> 40 & 0xFF).Should().Be(PartitionId.Entity.Value); + (file2.Id.Value >> 40 & 0xFF).Should().Be(10); + (file3.Id.Value >> 40 & 0xFF).Should().Be(200); - var result = await tx.Commit(); - var file1RO = file1.Remap(result); - var file2RO = file2.Remap(result); - var file3RO = file3.Remap(result); + var result = await tx.Commit(); + var file1RO = file1.Remap(result); + var file2RO = file2.Remap(result); + var file3RO = file3.Remap(result); - var allDatoms = file1RO.Concat(file2RO).Concat(file3RO); + var allDatoms = file1RO.Concat(file2RO).Concat(file3RO); - await VerifyTable(allDatoms); - } + await VerifyTable(allDatoms); + } - [Fact] - public async Task CanLoadEntitiesWithoutSubclass() - { - var loadout = await InsertExampleData(); + [Fact] + public async Task CanLoadEntitiesWithoutSubclass() + { + var loadout = await InsertExampleData(); - var entityLoadout = new ReadOnlyModel(Connection.Db, loadout.Id); + var entityLoadout = new ReadOnlyModel(Connection.Db, loadout.Id); - entityLoadout - .Should().BeEquivalentTo(loadout); - } + entityLoadout + .Should().BeEquivalentTo(loadout); + } - [Fact] - public async Task CanCreateTempEntities() + [Fact] + public async Task CanCreateTempEntities() + { + var loadoutOther = new TempEntity { - var loadoutOther = new TempEntity - { - { Loadout.Name, "Loadout Other" } - }; + { Loadout.Name, "Loadout Other" } + }; - var loadout = new TempEntity - { - { Loadout.Name, "Test Loadout" }, - { Mod.LoadoutId, loadoutOther}, - }; + var loadout = new TempEntity + { + { Loadout.Name, "Test Loadout" }, + { Mod.LoadoutId, loadoutOther}, + }; - using var tx = Connection.BeginTransaction(); - loadout.AddTo(tx); - var result = await tx.Commit(); + using var tx = Connection.BeginTransaction(); + loadout.AddTo(tx); + var result = await tx.Commit(); - var loaded = Loadout.Load(result.Db, result[loadout.Id!.Value]); - loaded.Name.Should().Be("Test Loadout"); + var loaded = Loadout.Load(result.Db, result[loadout.Id!.Value]); + loaded.Name.Should().Be("Test Loadout"); - loadout.GetFirst(Loadout.Name).Should().Be("Test Loadout"); + loadout.GetFirst(Loadout.Name).Should().Be("Test Loadout"); - Mod.LoadoutId.Get(loaded).Should().Be(result[loadoutOther.Id!.Value], "Sub entity should be added to the transaction"); - } + Mod.LoadoutId.Get(loaded).Should().Be(result[loadoutOther.Id!.Value], "Sub entity should be added to the transaction"); + } - [Fact] - public async Task CanWorkWithMarkerAttributes() + [Fact] + public async Task CanWorkWithMarkerAttributes() + { + var mod = new TempEntity { - var mod = new TempEntity - { - { Mod.Name, "Test Mod" }, - Mod.Marked, - }; + { Mod.Name, "Test Mod" }, + Mod.Marked, + }; - using var tx = Connection.BeginTransaction(); - mod.AddTo(tx); - var result = await tx.Commit(); + using var tx = Connection.BeginTransaction(); + mod.AddTo(tx); + var result = await tx.Commit(); - var reloaded = Mod.Load(result.Db, result[mod.Id!.Value]); - reloaded.IsMarked.Should().BeTrue(); + var reloaded = Mod.Load(result.Db, result[mod.Id!.Value]); + reloaded.IsMarked.Should().BeTrue(); - } + } - [Fact] - public async Task CanExecuteTxFunctions() + [Fact] + public async Task CanExecuteTxFunctions() + { + EntityId id; + // Create a loadout with inital state + using var tx = Connection.BeginTransaction(); + var loadout = new Loadout.New(tx) { - EntityId id; - // Create a loadout with inital state - using var tx = Connection.BeginTransaction(); - var loadout = new Loadout.New(tx) - { - Name = "Test Loadout: 1" - }; - var result = await tx.Commit(); - id = result[loadout.Id]; + Name = "Test Loadout: 1" + }; + var result = await tx.Commit(); + id = result[loadout.Id]; - // Update it 1000 times in "parallel". The actual function is executed serially, but we queue up the updates - // in parallel. If this was executed in parallel, we'd see a result other than 1001 at the end due to race conditions - List tasks = []; + // Update it 1000 times in "parallel". The actual function is executed serially, but we queue up the updates + // in parallel. If this was executed in parallel, we'd see a result other than 1001 at the end due to race conditions + List tasks = []; + { + for (var i = 0; i < 1000; i++) { - for (var i = 0; i < 1000; i++) + tasks.Add(Task.Run(async () => { - tasks.Add(Task.Run(async () => - { - using var txInner = Connection.BeginTransaction(); - // Send the function for the update, not update itself - txInner.Add(id, 1, AddToName); - await txInner.Commit(); - })); - } + using var txInner = Connection.BeginTransaction(); + // Send the function for the update, not update itself + txInner.Add(id, 1, AddToName); + await txInner.Commit(); + })); } + } - await Task.WhenAll(tasks); + await Task.WhenAll(tasks); - var db = Connection.Db; - var loadoutRO = Loadout.Load(db, id); - loadoutRO.Name.Should().Be("Test Loadout: 1001"); + var db = Connection.Db; + var loadoutRO = Loadout.Load(db, id); + loadoutRO.Name.Should().Be("Test Loadout: 1001"); - return; + return; - // Actual work is done here, we load the entity and update it this is executed serially - // by the transaction executor - void AddToName(ITransaction tx, IDb db, EntityId eid, int amount) - { - var loadout = Loadout.Load(db, eid); - var oldAmount = int.Parse(loadout.Name.Split(":")[1].Trim()); - tx.Add(loadout.Id, Loadout.Name, $"Test Loadout: {(oldAmount + amount)}"); - } + // Actual work is done here, we load the entity and update it this is executed serially + // by the transaction executor + void AddToName(ITransaction tx, IDb db, EntityId eid, int amount) + { + var loadout = Loadout.Load(db, eid); + var oldAmount = int.Parse(loadout.Name.Split(":")[1].Trim()); + tx.Add(loadout.Id, Loadout.Name, $"Test Loadout: {(oldAmount + amount)}"); } + } - [Fact] - public async Task NonRecursiveDeleteDeletesOnlyOneEntity() - { - var loadout = await InsertExampleData(); - var firstDb = Connection.Db; + [Fact] + public async Task NonRecursiveDeleteDeletesOnlyOneEntity() + { + var loadout = await InsertExampleData(); + var firstDb = Connection.Db; - var firstMod = loadout.Mods.First(); - var firstFiles = firstMod.Files.ToArray(); + var firstMod = loadout.Mods.First(); + var firstFiles = firstMod.Files.ToArray(); - loadout.Mods.Count.Should().Be(3); + loadout.Mods.Count.Should().Be(3); - using var tx = Connection.BeginTransaction(); - tx.Delete(firstMod.Id, false); - var result = await tx.Commit(); + using var tx = Connection.BeginTransaction(); + tx.Delete(firstMod.Id, false); + var result = await tx.Commit(); - loadout = loadout.Rebase(result.Db); + loadout = loadout.Rebase(result.Db); - loadout.Mods.Count.Should().Be(2); + loadout.Mods.Count.Should().Be(2); - var modRefreshed = Mod.Load(result.Db, firstMod.ModId); - modRefreshed.IsValid().Should().BeFalse("Mod should be deleted"); + var modRefreshed = Mod.Load(result.Db, firstMod.ModId); + modRefreshed.IsValid().Should().BeFalse("Mod should be deleted"); - Mod.TryGet(result.Db, firstMod.ModId, out _).Should().BeFalse("Mod should be deleted"); - Mod.TryGet(firstDb, firstMod.ModId, out _).Should().BeTrue("The history of the mod still exists"); + Mod.TryGet(result.Db, firstMod.ModId, out _).Should().BeFalse("Mod should be deleted"); + Mod.TryGet(firstDb, firstMod.ModId, out _).Should().BeTrue("The history of the mod still exists"); - foreach (var file in firstFiles) - { - var reloaded = File.Load(result.Db, result[file.Id]); - reloaded.IsValid().Should().BeTrue("File should still exist, the delete wasn't recursive"); - } + foreach (var file in firstFiles) + { + var reloaded = File.Load(result.Db, result[file.Id]); + reloaded.IsValid().Should().BeTrue("File should still exist, the delete wasn't recursive"); } + } - [Fact] - public async Task RecursiveDeleteDeletesModsAsWellButNotCollections() - { - var loadout = await InsertExampleData(); - var firstDb = Connection.Db; - var firstMod = loadout.Mods.First(); + [Fact] + public async Task RecursiveDeleteDeletesModsAsWellButNotCollections() + { + var loadout = await InsertExampleData(); + var firstDb = Connection.Db; + var firstMod = loadout.Mods.First(); - using var extraTx = Connection.BeginTransaction(); - var collection = new Collection.New(extraTx) - { - Name = "Test Collection", - ModIds = [firstMod], - LoadoutId = loadout - }; - var result = await extraTx.Commit(); + using var extraTx = Connection.BeginTransaction(); + var collection = new Collection.New(extraTx) + { + Name = "Test Collection", + ModIds = [firstMod], + LoadoutId = loadout + }; + var result = await extraTx.Commit(); - loadout = loadout.Rebase(result.Db); + loadout = loadout.Rebase(result.Db); - var firstFiles = firstMod.Files.ToArray(); + var firstFiles = firstMod.Files.ToArray(); - loadout.Mods.Count.Should().Be(3); - loadout.Collections.Count.Should().Be(1); + loadout.Mods.Count.Should().Be(3); + loadout.Collections.Count.Should().Be(1); - using var tx = Connection.BeginTransaction(); - tx.Delete(firstMod.Id, true); - result = await tx.Commit(); + using var tx = Connection.BeginTransaction(); + tx.Delete(firstMod.Id, true); + result = await tx.Commit(); - loadout = loadout.Rebase(result.Db); + loadout = loadout.Rebase(result.Db); - loadout.Mods.Count.Should().Be(2); - loadout.Collections.Count.Should().Be(1); + loadout.Mods.Count.Should().Be(2); + loadout.Collections.Count.Should().Be(1); - var modRefreshed = Mod.Load(result.Db, firstMod.ModId); - modRefreshed.IsValid().Should().BeFalse("Mod should be deleted"); + var modRefreshed = Mod.Load(result.Db, firstMod.ModId); + modRefreshed.IsValid().Should().BeFalse("Mod should be deleted"); - Mod.TryGet(result.Db, firstMod.ModId, out _).Should().BeFalse("Mod should be deleted"); - Mod.TryGet(firstDb, firstMod.ModId, out _).Should().BeTrue("The history of the mod still exists"); + Mod.TryGet(result.Db, firstMod.ModId, out _).Should().BeFalse("Mod should be deleted"); + Mod.TryGet(firstDb, firstMod.ModId, out _).Should().BeTrue("The history of the mod still exists"); - foreach (var file in firstFiles) - { - var reloaded = File.Load(result.Db, result[file.Id]); - reloaded.IsValid().Should().BeFalse("File should be deleted, the delete was recursive"); - } + foreach (var file in firstFiles) + { + var reloaded = File.Load(result.Db, result[file.Id]); + reloaded.IsValid().Should().BeFalse("File should be deleted, the delete was recursive"); } + } + + [Fact] + public async Task CanReadAndWriteOptionalAttributes() + { + var loadout = await InsertExampleData(); - [Fact] - public async Task CanReadAndWriteOptionalAttributes() + var firstMod = loadout.Mods.First(); + + firstMod.Contains(Mod.Description).Should().BeFalse(); + + + using var tx = Connection.BeginTransaction(); + var mod = new Mod.New(tx) { - var loadout = await InsertExampleData(); + LoadoutId = loadout, + Name = "Test Mod", + Source = new Uri("http://test.com"), + Description = "Test Description" + }; + var result = await tx.Commit(); - var firstMod = loadout.Mods.First(); + var remapped = mod.Remap(result); + remapped.Description.Should().Be("Test Description"); + } - firstMod.Contains(Mod.Description).Should().BeFalse(); + [Fact] + public async Task CanGetModelRevisions() + { + var loadout = await InsertExampleData(); + var loadoutNames = new List(); - using var tx = Connection.BeginTransaction(); - var mod = new Mod.New(tx) - { - LoadoutId = loadout, - Name = "Test Mod", - Source = new Uri("http://test.com"), - Description = "Test Description" - }; - var result = await tx.Commit(); - var remapped = mod.Remap(result); - remapped.Description.Should().Be("Test Description"); - } + // TODO: re-enable this once we decide on how to handle revisions + /* + using var subscription = loadout.Revisions() + .Select(l => l.Name) + .Finally(() => loadoutNames.Add("DONE")) + .Subscribe(l => loadoutNames.Add(l)); + + + loadoutNames.Count.Should().Be(1, "Only the current revision should be loaded"); + + using var tx1 = Connection.BeginTransaction(); + tx1.Add(loadout.Id, Loadout.Name, "Update 1"); + var result = await tx1.Commit(); + + using var tx2 = Connection.BeginTransaction(); + tx2.Add(loadout.Id, Loadout.Name, "Update 2"); + var result2 = await tx2.Commit(); + + using var tx3 = Connection.BeginTransaction(); + tx3.Delete(loadout.Id, true); + var result3 = await tx3.Commit(); + + loadoutNames.Count.Should().Be(4, "All revisions should be loaded"); + + loadoutNames.Should().BeEquivalentTo(["Test Loadout", "Update 1", "Update 2", "DONE"]); + */ + + + } + + [Fact] + public async Task CanObserveIndexChanges() + { + var loadout = await InsertExampleData(); + + List changes = new(); + + // Define the slice to observe + var slice = SliceDescriptor.Create(Mod.Name, Connection.Db.Registry); + // Setup the subscription + using var _ = ObservableDatoms.ObserveDatoms(Connection, slice) + // Snapshot the values each time + .QueryWhenChanged(datoms => datoms.Select(d => d.Resolved.ObjectValue.ToString()!).ToArray()) + // Add the changes to the list + .Subscribe(x => changes.Add(x)); + + // Rename a mod + using var tx = Connection.BeginTransaction(); + tx.Add(loadout.Mods.First().Id, Mod.Name, "Test Mod 1"); + await tx.Commit(); + + // Add a new mod + using var tx2 = Connection.BeginTransaction(); + tx2.Add(tx2.TempId(), Mod.Name, "Test Mod 2"); + await tx2.Commit(); + + // Delete the first mod + using var tx3 = Connection.BeginTransaction(); + tx3.Retract(loadout.Mods.First().Id, Mod.Name, "Test Mod 1"); + await tx3.Commit(); + + await Verify(changes); + } }