Skip to content

Commit

Permalink
Merge pull request #72 from Etherna/feature/MODM-167-atomic-upsert
Browse files Browse the repository at this point in the history
implement upsert addToSet repository feature
  • Loading branch information
tmm360 authored Mar 22, 2024
2 parents b471dc3 + 11b552a commit 24c0473
Show file tree
Hide file tree
Showing 3 changed files with 73 additions and 0 deletions.
1 change: 1 addition & 0 deletions MongODM.sln.DotSettings
Original file line number Diff line number Diff line change
Expand Up @@ -12,5 +12,6 @@ distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.</s:String>
<s:Boolean x:Key="/Default/Environment/SettingsMigration/IsMigratorApplied/=JetBrains_002EReSharper_002EFeature_002EServices_002ECodeCleanup_002EFileHeader_002EFileHeaderSettingsMigrate/@EntryIndexedValue">True</s:Boolean>
<s:Boolean x:Key="/Default/UserDictionary/Words/=Etherna/@EntryIndexedValue">True</s:Boolean>
<s:Boolean x:Key="/Default/UserDictionary/Words/=MongODM/@EntryIndexedValue">True</s:Boolean></wpf:ResourceDictionary>
24 changes: 24 additions & 0 deletions src/MongODM.Core/Repositories/IRepository.cs
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,12 @@ Task<IAsyncCursor<TProjection>> FindAsync<TProjection>(
FindOptions<TModel, TProjection>? options = null,
CancellationToken cancellationToken = default);

Task<TModel> FindOneAndUpdateAsync(
FilterDefinition<TModel> filter,
UpdateDefinition<TModel> update,
FindOneAndUpdateOptions<TModel> options,
CancellationToken cancellationToken = default);

Task<TModel> FindOneAsync(
TKey id,
CancellationToken cancellationToken = default);
Expand Down Expand Up @@ -155,5 +161,23 @@ Task ReplaceAsync(
Task<TModel?> TryFindOneAsync(
Expression<Func<TModel, bool>> predicate,
CancellationToken cancellationToken = default);

/// <summary>
/// Find one and modify atomically with an upsert "add to set" operation.
/// Create a new document if doesn't exists, add the element to the set if not present, or do nothing if element is already present
/// </summary>
/// <param name="filter">The document find filter</param>
/// <param name="setField">The set where add the item</param>
/// <param name="itemValue">The item to add</param>
/// <param name="onInsertModel">A new model, in case of insert</param>
/// <param name="cancellationToken">The cancellation token</param>
/// <typeparam name="TItem">Item type</typeparam>
/// <returns>The model as result from find before update</returns>
Task<TModel> UpsertAddToSetAsync<TItem>(
FilterDefinition<TModel> filter,
Expression<Func<TModel, IEnumerable<TItem>>> setField,
TItem itemValue,
TModel onInsertModel,
CancellationToken cancellationToken = default);
}
}
48 changes: 48 additions & 0 deletions src/MongODM.Core/Repositories/Repository.cs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
// limitations under the License.

using Etherna.MongoDB.Bson;
using Etherna.MongoDB.Bson.IO;
using Etherna.MongoDB.Bson.Serialization;
using Etherna.MongoDB.Driver;
using Etherna.MongoDB.Driver.Linq;
using Etherna.MongODM.Core.Domain.Models;
Expand Down Expand Up @@ -269,6 +271,14 @@ public Task<TModel> FindOneAsync(
CancellationToken cancellationToken = default) =>
FindOneOnDBAsync(predicate, cancellationToken);

public Task<TModel> FindOneAndUpdateAsync(
FilterDefinition<TModel> filter,
UpdateDefinition<TModel> update,
FindOneAndUpdateOptions<TModel> options,
CancellationToken cancellationToken = default) =>
AccessToCollectionAsync(collection =>
collection.FindOneAndUpdateAsync(filter, update, options, cancellationToken));

public string ModelIdToString(object model)
{
ArgumentNullException.ThrowIfNull(model, nameof(model));
Expand Down Expand Up @@ -387,6 +397,44 @@ public virtual Task ReplaceAsync(
}
}

public Task<TModel> UpsertAddToSetAsync<TItem>(
FilterDefinition<TModel> filter,
Expression<Func<TModel, IEnumerable<TItem>>> setField,
TItem itemValue,
TModel onInsertModel,
CancellationToken cancellationToken = default) =>
AccessToCollectionAsync(collection =>
{
var modelMap = DbContext.MapRegistry.GetModelMap(typeof(TModel));
var fieldDefinition = new ExpressionFieldDefinition<TModel>(setField);
var fieldRendered = fieldDefinition.Render((IBsonSerializer<TModel>)modelMap.ActiveSerializer, DbContext.SerializerRegistry);

// Serialize model.
var modelBsonDoc = new BsonDocument();
using (var bsonWriter = new BsonDocumentWriter(modelBsonDoc))
{
var context = BsonSerializationContext.CreateRoot(bsonWriter);
bsonWriter.WriteStartDocument();
bsonWriter.WriteName("model");
modelMap.ActiveSerializer.Serialize(context, onInsertModel);
bsonWriter.WriteEndDocument();
}

// Update "update" definition with OnInsert instructions.
var onInsertUpdate = modelBsonDoc[0].AsBsonDocument.Elements
.Where(element => element.Name != modelMap.ActiveSchema.IdMemberMap!.BsonMemberMap.ElementName &&
element.Name != fieldRendered.FieldName.Split('.').First())
.Select(element => Builders<TModel>.Update.SetOnInsert(element.Name, element.Value));
var upsertUpdate = Builders<TModel>.Update.Combine(onInsertUpdate.Append(
Builders<TModel>.Update.AddToSet(fieldDefinition, itemValue)));

// Exec on db.
return collection.FindOneAndUpdateAsync(filter, upsertUpdate, new FindOneAndUpdateOptions<TModel>()
{
IsUpsert = true,
}, cancellationToken);
});

// Protected virtual methods.
protected virtual Task CreateOnDBAsync(IEnumerable<TModel> models, CancellationToken cancellationToken) =>
AccessToCollectionAsync(collection => collection.InsertManyAsync(models, null, cancellationToken));
Expand Down

0 comments on commit 24c0473

Please sign in to comment.