From 02157c3aa69b1790549646f386f61e7b3f81c366 Mon Sep 17 00:00:00 2001 From: Aaron Stannard Date: Mon, 28 Oct 2024 11:08:41 -0500 Subject: [PATCH] fix potential `ArgumentException` during shard rebalancing (#7367) * fix potential `ArgumentException` during shard rebalancing close #7365 * moved back to original discard design --- .../Internal/AbstractLeastShardAllocationStrategy.cs | 8 +++++++- .../Akka.Cluster.Sharding/ShardAllocationStrategy.cs | 4 ++-- 2 files changed, 9 insertions(+), 3 deletions(-) diff --git a/src/contrib/cluster/Akka.Cluster.Sharding/Internal/AbstractLeastShardAllocationStrategy.cs b/src/contrib/cluster/Akka.Cluster.Sharding/Internal/AbstractLeastShardAllocationStrategy.cs index b10a94a8734..ea39eb4f685 100644 --- a/src/contrib/cluster/Akka.Cluster.Sharding/Internal/AbstractLeastShardAllocationStrategy.cs +++ b/src/contrib/cluster/Akka.Cluster.Sharding/Internal/AbstractLeastShardAllocationStrategy.cs @@ -126,7 +126,13 @@ protected bool IsAGoodTimeToRebalance(IEnumerable regionEntries) protected ImmutableList RegionEntriesFor(IImmutableDictionary> currentShardAllocations) { - var addressToMember = ClusterState.Members.ToImmutableDictionary(m => m.Address, m => m); + // switched to using `GroupBy` instead just ToImmutableDictionary due to https://github.com/akkadotnet/akka.net/issues/7365 + // it's very rare, but possible, that there can be two members with the same address in the ClusterState. This can happen + // when a node quickly reboots and re-uses its old address, but the old incarnation hasn't been downed yet. + var addressToMember = ClusterState.Members + .GroupBy(m => m.Address) + // using Last or First here is non-deterministic since the UID that appears in the UniqueAddress sort order is random + .ToImmutableDictionary(g => g.Key, g => g.First()); return currentShardAllocations.Select(i => { diff --git a/src/contrib/cluster/Akka.Cluster.Sharding/ShardAllocationStrategy.cs b/src/contrib/cluster/Akka.Cluster.Sharding/ShardAllocationStrategy.cs index 6b1cc3a6acc..09cc8183f7d 100644 --- a/src/contrib/cluster/Akka.Cluster.Sharding/ShardAllocationStrategy.cs +++ b/src/contrib/cluster/Akka.Cluster.Sharding/ShardAllocationStrategy.cs @@ -169,11 +169,11 @@ public override Task> Rebalance(IImmutableDictionary i, ShardSuitabilityOrdering.Instance).ToImmutableList(); if (IsAGoodTimeToRebalance(sortedRegionEntries)) { - var (_, Shards) = MostSuitableRegion(sortedRegionEntries); + var (_, shards) = MostSuitableRegion(sortedRegionEntries); // even if it is to another new node. var mostShards = sortedRegionEntries.Select(r => r.ShardIds.Where(s => !rebalanceInProgress.Contains(s))).MaxBy(i => i.Count())?.ToArray() ?? Array.Empty(); - var difference = mostShards.Length - Shards.Count; + var difference = mostShards.Length - shards.Count; if (difference >= _rebalanceThreshold) { var n = Math.Min(