Skip to content

Commit

Permalink
0.9.86
Browse files Browse the repository at this point in the history
  • Loading branch information
halgari committed Oct 1, 2024
1 parent 0256fe2 commit f9f9280
Show file tree
Hide file tree
Showing 6 changed files with 167 additions and 56 deletions.
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
## Changelog

### 0.9.86 - 01/10/2024
* Swapped out `R3`'s behavior subject for a custom implementation that is lock-free
* Reworked how updates are propagated through the system. Thanks to the nature of TxIds we can detect gaps in the sequence of updates
and fill in missing results with `AsOf` queries. This allows us to remove the locks used during subscription and greatly reduce the
possibility of deadlocks.

### 0.9.84 - 20/09/2024
* Fixed a bug with Tuple3 values that had a reference in the first position.
* Added a user accessible remap function for values
Expand Down
4 changes: 1 addition & 3 deletions src/NexusMods.MnemonicDB.Abstractions/IDatomStore.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,7 @@
using System.Collections.Generic;
using System.Threading.Tasks;
using NexusMods.MnemonicDB.Abstractions.IndexSegments;
using NexusMods.MnemonicDB.Abstractions.Internals;
using NexusMods.MnemonicDB.Abstractions.TxFunctions;
using R3;

namespace NexusMods.MnemonicDB.Abstractions;

Expand All @@ -17,7 +15,7 @@ public interface IDatomStore : IDisposable
/// An observable of the transaction log, for getting the latest changes to the store. This observable
/// will always start with the most recent value, so there is no reason to use `StartWith` or `Replay` on it.
/// </summary>
public Observable<IDb> TxLog { get; }
public IObservable<IDb> TxLog { get; }

/// <summary>
/// Gets the latest transaction id found in the log.
Expand Down
66 changes: 23 additions & 43 deletions src/NexusMods.MnemonicDB/Connection.cs
Original file line number Diff line number Diff line change
@@ -1,20 +1,15 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Reactive.Subjects;
using System.Reactive.Linq;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Extensions.Logging;
using NexusMods.MnemonicDB.Abstractions;
using NexusMods.MnemonicDB.Abstractions.BuiltInEntities;
using NexusMods.MnemonicDB.Abstractions.ElementComparers;
using NexusMods.MnemonicDB.Abstractions.IndexSegments;
using NexusMods.MnemonicDB.Abstractions.Internals;
using NexusMods.MnemonicDB.Abstractions.Query;
using NexusMods.MnemonicDB.Abstractions.TxFunctions;
using NexusMods.MnemonicDB.Storage;
using R3;
using Observable = System.Reactive.Linq.Observable;
using ObservableExtensions = R3.ObservableExtensions;

namespace NexusMods.MnemonicDB;
Expand All @@ -27,7 +22,7 @@ public class Connection : IConnection
private readonly IDatomStore _store;
private readonly ILogger<Connection> _logger;

private R3.BehaviorSubject<IDb> _dbStream;
private DbStream _dbStream;
private IDisposable? _dbStreamDisposable;
private readonly IAnalyzer[] _analyzers;

Expand All @@ -41,45 +36,38 @@ public Connection(ILogger<Connection> logger, IDatomStore store, IServiceProvide
AttributeResolver = new AttributeResolver(provider, AttributeCache);
_logger = logger;
_store = store;
_dbStream = new R3.BehaviorSubject<IDb>(default!);
_dbStream = new DbStream();
_analyzers = analyzers.ToArray();
Bootstrap();
}

/// <summary>
/// Scrubs the transaction stream so that we only ever move forward and never repeat transactions
/// </summary>
private R3.Observable<Db> ProcessUpdate(R3.Observable<IDb> dbStream)
private IObservable<IDb> ProcessUpdates(IObservable<IDb> dbStream)
{
IDb? prev = null;

return R3.Observable.Create((Observer<Db> observer) =>
return dbStream.Select(idb =>
{
return dbStream.Subscribe(nextItem =>
{

if (prev != null && prev.BasisTxId >= nextItem.BasisTxId)
return;

var db = (Db)nextItem;
db.Connection = this;
var db = (Db)idb;
db.Connection = this;

foreach (var analyzer in _analyzers)
foreach (var analyzer in _analyzers)
{
try
{
try
{
var result = analyzer.Analyze(prev, nextItem);
db.AnalyzerData.Add(analyzer.GetType(), result);
}
catch (Exception ex)
{
_logger.LogError(ex, "Failed to analyze with {Analyzer}", analyzer.GetType().Name);
}
var result = analyzer.Analyze(prev, idb);
db.AnalyzerData.Add(analyzer.GetType(), result);
}

observer.OnNext((Db)nextItem);
prev = nextItem;
}, observer.OnCompleted);
catch (Exception ex)
{
_logger.LogError(ex, "Failed to analyze with {Analyzer}", analyzer.GetType().Name);
}
}
prev = idb;

return idb;
});
}

Expand All @@ -95,7 +83,7 @@ public IDb Db
// ReSharper disable once ConditionIsAlwaysTrueOrFalseAccordingToNullableAPIContract
if (val == null)
ThrowNullDb();
return val!.Value;
return _dbStream.Current;
}
}

Expand Down Expand Up @@ -134,15 +122,7 @@ public ITransaction BeginTransaction()
public IAnalyzer[] Analyzers => _analyzers;

/// <inheritdoc />
public IObservable<IDb> Revisions
{
get
{
if (_dbStream == default!)
ThrowNullDb();
return ObservableExtensions.AsSystemObservable(_dbStream!);
}
}
public IObservable<IDb> Revisions => _dbStream;

private void AddMissingAttributes()
{
Expand Down Expand Up @@ -204,7 +184,7 @@ private void Bootstrap()

AddMissingAttributes();

_dbStreamDisposable = ProcessUpdate(_store.TxLog)
_dbStreamDisposable = ProcessUpdates(_store.TxLog)
.Subscribe(itm => _dbStream.OnNext(itm));
}
catch (Exception ex)
Expand Down
130 changes: 130 additions & 0 deletions src/NexusMods.MnemonicDB/DbStream.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
using System;
using System.Collections.Immutable;
using System.Reactive;
using System.Threading;
using NexusMods.MnemonicDB.Abstractions;

namespace NexusMods.MnemonicDB;

/// <summary>
/// Represents an observable stream of database changes, a IObservable of IDb changes that never goes backwards or repeats transactions
/// </summary>
public class DbStream : IObservable<IDb>, IDisposable
{
private ImmutableHashSet<IObserver<IDb>> _observers = ImmutableHashSet<IObserver<IDb>>.Empty;
private IDb? _db;

/// <summary>
/// Returns the current database value
/// </summary>
public IDb Current => _db ?? throw new InvalidOperationException("No current transaction");

/// <summary>
/// Enqueues the next transaction in the stream
/// </summary>
public void OnNext(IDb db)
{
_db = db;
foreach (var observer in _observers)
observer.OnNext(db);
}

/// <summary>
/// Subscribes to the stream
/// </summary>
/// <param name="observer"></param>
/// <returns></returns>
public IDisposable Subscribe(IObserver<IDb> observer)
{
ImmutableHashSet<IObserver<IDb>> newObservers, oldObservers;
var forwardOnlyObserver = new ForwardOnlyObserver(observer);
do
{
oldObservers = _observers;
newObservers = oldObservers.Add(forwardOnlyObserver);
} while (!ReferenceEquals(Interlocked.CompareExchange(ref _observers, newObservers, oldObservers), oldObservers));

// Prime the observer with the current transaction
if (_db is not null)
forwardOnlyObserver.OnNext(_db);

return new Subscription(this, forwardOnlyObserver);
}

/// <summary>
/// Helper class to manage the subscription
/// </summary>
private class Subscription(DbStream stream, ForwardOnlyObserver observer) : IDisposable
{
public void Dispose()
{
ImmutableHashSet<IObserver<IDb>> newObservers, oldObservers;
do
{
oldObservers = stream._observers;
newObservers = oldObservers.Remove(observer);
} while (!ReferenceEquals(Interlocked.CompareExchange(ref stream._observers, newObservers, oldObservers), oldObservers));

observer.OnCompleted();
}
}

/// <summary>
/// An observer that ensures that we only ever move forward in the transaction stream
/// </summary>
private class ForwardOnlyObserver(IObserver<IDb> next) : IObserver<IDb>
{
private IDb? _prev;

public void OnCompleted()
{
next.OnCompleted();
}

public void OnError(Exception error)
{
next.OnError(error);
}

public void OnNext(IDb value)
{
// Common case, this is the next transaction
if (_prev is not null && _prev.BasisTxId.Value + 1 == value.BasisTxId.Value)
{
_prev = value;
next.OnNext(_prev);
return;
}

// First transaction
if (_prev is null)
{
_prev = value;
next.OnNext(value);
}

// Odd, but somehow we got a transaction behind the previous one
if (_prev.BasisTxId.Value >= value.BasisTxId.Value)
return;

// Otherwise we somehow missed a transaction, so we need to replay the transaction stream
for (var txId = _prev.BasisTxId.Value + 1; txId < value.BasisTxId.Value; txId++)
{
next.OnNext(value.Connection.AsOf(TxId.From(txId)));
}
_prev = value;
next.OnNext(value);
}
}


/// <summary>
/// Disposes of the stream
/// </summary>
public void Dispose()
{
foreach (var observer in _observers)
observer.OnCompleted();
}

}
1 change: 0 additions & 1 deletion src/NexusMods.MnemonicDB/NexusMods.MnemonicDB.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
<PackageReference Include="DynamicData" Version="8.4.1"/>
<PackageReference Include="Microsoft.Extensions.Logging.Abstractions" Version="8.0.1"/>
<PackageReference Include="Microsoft.Extensions.ObjectPool" Version="8.0.6" />
<PackageReference Include="R3" Version="1.2.8" />
<PackageReference Include="Reloaded.Memory" Version="9.4.1"/>
<PackageReference Include="RocksDB" Version="8.11.3.46984" />
<PackageReference Include="System.Reactive" Version="6.0.1" />
Expand Down
16 changes: 7 additions & 9 deletions src/NexusMods.MnemonicDB/Storage/DatomStore.cs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
using NexusMods.MnemonicDB.Abstractions.TxFunctions;
using NexusMods.MnemonicDB.Storage.Abstractions;
using NexusMods.MnemonicDB.Storage.DatomStorageStructures;
using R3;
using Reloaded.Memory.Extensions;

namespace NexusMods.MnemonicDB.Storage;
Expand All @@ -40,7 +39,7 @@ public class DatomStore : IDatomStore

private readonly BlockingCollection<PendingTransaction> _pendingTransactions;
private readonly IIndex _txLog;
private BehaviorSubject<IDb>? _updatesSubject;
private DbStream _dbStream;
private readonly IIndex _vaetCurrent;
private readonly IIndex _vaetHistory;
private readonly PooledMemoryBufferWriter _writer;
Expand Down Expand Up @@ -82,6 +81,7 @@ public class DatomStore : IDatomStore
public DatomStore(ILogger<DatomStore> logger, DatomStoreSettings settings, IStoreBackend backend)
{
_remapFunc = Remap;
_dbStream = new DbStream();
_attributeCache = backend.AttributeCache;
_pendingTransactions = new BlockingCollection<PendingTransaction>(new ConcurrentQueue<PendingTransaction>());

Expand Down Expand Up @@ -168,13 +168,11 @@ public DatomStore(ILogger<DatomStore> logger, DatomStoreSettings settings, IStor
}

/// <inheritdoc />
public Observable<IDb> TxLog
public IObservable<IDb> TxLog
{
get
{
if (_updatesSubject == null)
throw new InvalidOperationException("The store is not yet started");
return _updatesSubject;
return _dbStream;
}
}

Expand Down Expand Up @@ -206,7 +204,7 @@ public void Dispose()
_pendingTransactions.CompleteAdding();
_shutdownToken.Cancel();
_loggerThread?.Join();
_updatesSubject?.Dispose();
_dbStream.Dispose();
_writer.Dispose();
_retractWriter.Dispose();
}
Expand Down Expand Up @@ -268,7 +266,7 @@ private void ConsumeTransactions()
private void FinishTransaction(StoreResult result, PendingTransaction pendingTransaction)
{
_currentDb = ((Db)_currentDb!).WithNext(result, result.AssignedTxId);
_updatesSubject?.OnNext(_currentDb!);
_dbStream.OnNext(_currentDb);
pendingTransaction.Complete(result, _currentDb);
}

Expand Down Expand Up @@ -313,7 +311,7 @@ private void Bootstrap()
}

_currentDb = new Db(_currentSnapshot, _asOfTx, _attributeCache);
_updatesSubject = new BehaviorSubject<IDb>(_currentDb);
_dbStream.OnNext(_currentDb);
_loggerThread = new Thread(ConsumeTransactions)
{
IsBackground = true,
Expand Down

0 comments on commit f9f9280

Please sign in to comment.