Skip to content

Commit

Permalink
Update to v1.0.16. Fix disposal of migration sessions. (#530)
Browse files Browse the repository at this point in the history
Problem
When using KEYS option for slot migration, the migration session removal isn't disposing the session object. This leads to an out of memory exception when migrating a large number of keys.

Fix

Set slots correctly for the migrate operation. This helps to ensure the slots are correctly reset on removal of migration session.
Also, removal of multiple outstanding session objects when node is removed for the FORGET command, will dispose each of those session objects.
  • Loading branch information
yrajas authored Jul 29, 2024
1 parent dfc8d6d commit a1e6e77
Show file tree
Hide file tree
Showing 6 changed files with 65 additions and 20 deletions.
2 changes: 1 addition & 1 deletion .azure/pipelines/azure-pipelines-external-release.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
# 1) update the name: string below (line 6) -- this is the version for the nuget package (e.g. 1.0.0)
# 2) update \libs\host\GarnetServer.cs readonly string version (~line 45) -- NOTE - these two values need to be the same
######################################
name: 1.0.15
name: 1.0.16
trigger:
branches:
include:
Expand Down
40 changes: 37 additions & 3 deletions libs/cluster/Server/Migration/MigrateSessionTaskStore.cs
Original file line number Diff line number Diff line change
Expand Up @@ -129,23 +129,57 @@ public bool TryAddMigrateSession(
return success;
}

public bool TryRemove(MigrateSession mSession)
{
try
{
_lock.WriteLock();
if (_disposed) return false;

foreach (var slot in mSession.GetSlots)
{
Debug.Assert(sessions[slot] == mSession, "MigrateSession not found in slot");
sessions[slot] = null;
}

mSession.Dispose();
return true;
}
catch (Exception ex)
{
logger?.LogError(ex, "Error at TryRemove");
return false;
}
finally
{
_lock.WriteUnlock();
}
}

public bool TryRemove(string targetNodeId)
{
try
{
_lock.WriteLock();
if (_disposed) return false;
MigrateSession mSession = null;
HashSet<MigrateSession> mSessions = null;
for (var i = 0; i < sessions.Length; i++)
{
var s = sessions[i];
if (s != null && s.GetTargetNodeId.Equals(targetNodeId, StringComparison.Ordinal))
{
sessions[i] = null;
mSession = s;
mSessions ??= [];
_ = mSessions.Add(s);
}
}
mSession?.Dispose();

if (mSessions != null)
{
foreach (var session in mSessions)
session.Dispose();
}

return true;
}
catch (Exception ex)
Expand Down
2 changes: 1 addition & 1 deletion libs/cluster/Server/Migration/MigrationManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ public bool TryAddMigrationTask(
/// <param name="mSession"></param>
/// <returns></returns>
public bool TryRemoveMigrationTask(MigrateSession mSession)
=> migrationTaskStore.TryRemove(mSession.GetTargetNodeId);
=> migrationTaskStore.TryRemove(mSession);

/// <summary>
/// Remove migration task associated with provided target nodeId
Expand Down
2 changes: 2 additions & 0 deletions libs/cluster/Session/MigrateCommand.cs
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,8 @@ private bool TryMIGRATE(int count, byte* ptr)
// Add pointer of current parsed key
if (!keys.TryAdd(new ArgSlice(keyPtr, ksize), KeyMigrationStatus.QUEUED))
logger?.LogWarning($"Failed to add {{key}}", Encoding.ASCII.GetString(keyPtr, ksize));
else
_ = slots.Add(slot);
}
}
else if (option.Equals("SLOTS", StringComparison.OrdinalIgnoreCase))
Expand Down
2 changes: 1 addition & 1 deletion libs/host/GarnetServer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ public class GarnetServer : IDisposable
protected StoreWrapper storeWrapper;

// IMPORTANT: Keep the version in sync with .azure\pipelines\azure-pipelines-external-release.yml line ~6.
readonly string version = "1.0.15";
readonly string version = "1.0.16";

/// <summary>
/// Resp protocol version
Expand Down
37 changes: 23 additions & 14 deletions test/Garnet.test.cluster/ClusterMigrateTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -1572,6 +1572,8 @@ Task<byte[]> WriteWorkload(IPEndPoint endPoint, byte[] key, int keyLen = 16)
}
}

[Test, Order(16)]
[Category("CLUSTER")]
public void ClusterMigrateForgetTest()
{
context.logger.LogDebug($"0. ClusterSimpleMigrateSlotsRanges started");
Expand All @@ -1585,23 +1587,30 @@ public void ClusterMigrateForgetTest()
var sourceNodeId = context.clusterTestUtils.ClusterMyId(sourceNodeIndex, context.logger);
var targetNodeId = context.clusterTestUtils.ClusterMyId(targetNodeIndex, context.logger);

var resp = context.clusterTestUtils.SetSlot(sourceNodeIndex, 0, "MIGRATING", targetNodeId, context.logger);
Assert.AreEqual("OK", resp);

var slotState = context.clusterTestUtils.SlotState(sourceNodeIndex, 0, context.logger);
Assert.AreEqual(3, slotState.Length);
Assert.AreEqual("0", slotState[0]);
Assert.AreEqual(">", slotState[1]);
Assert.AreEqual(targetNodeId, slotState[2]);
var numSlots = 3;
for (var slot = 0; slot < numSlots; slot++)
{
var migresp = context.clusterTestUtils.SetSlot(sourceNodeIndex, slot, "MIGRATING", targetNodeId, context.logger);
Assert.AreEqual("OK", migresp);

var slotState = context.clusterTestUtils.SlotState(sourceNodeIndex, slot, context.logger);
Assert.AreEqual(3, slotState.Length);
Assert.AreEqual(slot.ToString(), slotState[0]);
Assert.AreEqual(">", slotState[1]);
Assert.AreEqual(targetNodeId, slotState[2]);
}

resp = context.clusterTestUtils.ClusterForget(sourceNodeIndex, targetNodeId, 100, context.logger);
var resp = context.clusterTestUtils.ClusterForget(sourceNodeIndex, targetNodeId, 100, context.logger);
Assert.AreEqual("OK", resp);

slotState = context.clusterTestUtils.SlotState(sourceNodeIndex, 0, context.logger);
Assert.AreEqual(3, slotState.Length);
Assert.AreEqual("0", slotState[0]);
Assert.AreEqual("=", slotState[1]);
Assert.AreEqual(sourceNodeId, slotState[2]);
for (var slot = 0; slot < numSlots; slot++)
{
var slotState = context.clusterTestUtils.SlotState(sourceNodeIndex, slot, context.logger);
Assert.AreEqual(3, slotState.Length);
Assert.AreEqual(slot.ToString(), slotState[0]);
Assert.AreEqual("=", slotState[1]);
Assert.AreEqual(sourceNodeId, slotState[2]);
}
}
}
}

0 comments on commit a1e6e77

Please sign in to comment.