From e0523ca46c744d784ef5bd63313a7ee880eee46b Mon Sep 17 00:00:00 2001 From: James Warner Date: Fri, 28 Jun 2024 20:22:53 +1200 Subject: [PATCH] fix: Leader election failure to restart (#783) This fixes the following behavioral issues noted when testing a leader-aware operator with transient network issues: - In `LeaderElectionBackgroundService`, if `elector.RunUntilLeadershipLostAsync()` throws, the exception is not observed in the library and no further attempts to become the leader occur. The library now logs any unexpected exceptions and tries to become the leader again. - A leader could not stop and then subsequently start being a leader once more due to cancellation token sources not being recreated. The library now disposes and recreates the cancellation token sources as required. - `LeaderAwareResourceWatcher.StoppedLeading` would erroneously pass a cancelled cancellation token to `ResourceWatcher`. The library now passes the `IHostApplicationLifetime.ApplicationStopped` token to the `ResourceWatcher` - we can assume that `ApplicationStopped` is a good indication that the stop should no longer be graceful. --- .../LeaderElectionBackgroundService.cs | 38 ++++++++++-- .../LeaderAwareResourceWatcher{TEntity}.cs | 29 ++++++++-- .../Watcher/ResourceWatcher{TEntity}.cs | 8 ++- .../KubeOps.Operator.Test.csproj | 1 + .../LeaderElectionBackgroundService.Test.cs | 58 +++++++++++++++++++ .../Watcher/ResourceWatcher{TEntity}.Test.cs | 53 +++++++++++++++++ 6 files changed, 174 insertions(+), 13 deletions(-) create mode 100644 test/KubeOps.Operator.Test/LeaderElector/LeaderElectionBackgroundService.Test.cs create mode 100644 test/KubeOps.Operator.Test/Watcher/ResourceWatcher{TEntity}.Test.cs diff --git a/src/KubeOps.Operator/LeaderElection/LeaderElectionBackgroundService.cs b/src/KubeOps.Operator/LeaderElection/LeaderElectionBackgroundService.cs index d7d6c141..fd9e1b07 100644 --- a/src/KubeOps.Operator/LeaderElection/LeaderElectionBackgroundService.cs +++ b/src/KubeOps.Operator/LeaderElection/LeaderElectionBackgroundService.cs @@ -1,18 +1,21 @@ using k8s.LeaderElection; using Microsoft.Extensions.Hosting; +using Microsoft.Extensions.Logging; namespace KubeOps.Operator.LeaderElection; /// /// This background service connects to the API and continuously watches the leader election. /// +/// The logger. /// The elector. -internal sealed class LeaderElectionBackgroundService(LeaderElector elector) +internal sealed class LeaderElectionBackgroundService(ILogger logger, LeaderElector elector) : IHostedService, IDisposable, IAsyncDisposable { private readonly CancellationTokenSource _cts = new(); private bool _disposed; + private Task? _leadershipTask; public Task StartAsync(CancellationToken cancellationToken) { @@ -26,7 +29,7 @@ public Task StartAsync(CancellationToken cancellationToken) // Therefore, we use Task.Run() and put the work to queue. The passed cancellation token of the StartAsync // method is not used, because it would only cancel the scheduling (which we definitely don't want to cancel). // To make this intention explicit, CancellationToken.None gets passed. - _ = Task.Run(() => elector.RunUntilLeadershipLostAsync(_cts.Token), CancellationToken.None); + _leadershipTask = Task.Run(RunAndTryToHoldLeadershipForeverAsync, CancellationToken.None); return Task.CompletedTask; } @@ -38,19 +41,23 @@ public void Dispose() _disposed = true; } - public Task StopAsync(CancellationToken cancellationToken) + public async Task StopAsync(CancellationToken cancellationToken) { if (_disposed) { - return Task.CompletedTask; + return; } #if NET8_0_OR_GREATER - return _cts.CancelAsync(); + await _cts.CancelAsync(); #else _cts.Cancel(); - return Task.CompletedTask; #endif + + if (_leadershipTask is not null) + { + await _leadershipTask; + } } public async ValueTask DisposeAsync() @@ -72,4 +79,23 @@ static async ValueTask CastAndDispose(IDisposable resource) } } } + + private async Task RunAndTryToHoldLeadershipForeverAsync() + { + while (!_cts.IsCancellationRequested) + { + try + { + await elector.RunUntilLeadershipLostAsync(_cts.Token); + } + catch (OperationCanceledException) when (_cts.IsCancellationRequested) + { + // Ignore cancellation exceptions when we've been asked to stop. + } + catch (Exception exception) + { + logger.LogError(exception, "Failed to hold leadership."); + } + } + } } diff --git a/src/KubeOps.Operator/Watcher/LeaderAwareResourceWatcher{TEntity}.cs b/src/KubeOps.Operator/Watcher/LeaderAwareResourceWatcher{TEntity}.cs index b5643de6..c3ee6de3 100644 --- a/src/KubeOps.Operator/Watcher/LeaderAwareResourceWatcher{TEntity}.cs +++ b/src/KubeOps.Operator/Watcher/LeaderAwareResourceWatcher{TEntity}.cs @@ -1,4 +1,4 @@ -using k8s; +using k8s; using k8s.LeaderElection; using k8s.Models; @@ -6,6 +6,7 @@ using KubeOps.KubernetesClient; using KubeOps.Operator.Queue; +using Microsoft.Extensions.Hosting; using Microsoft.Extensions.Logging; namespace KubeOps.Operator.Watcher; @@ -16,21 +17,26 @@ internal sealed class LeaderAwareResourceWatcher( TimedEntityQueue queue, OperatorSettings settings, IKubernetesClient client, + IHostApplicationLifetime hostApplicationLifetime, LeaderElector elector) : ResourceWatcher(logger, provider, queue, settings, client) where TEntity : IKubernetesObject { - private readonly CancellationTokenSource _cts = new(); + private CancellationTokenSource _cts = new(); private bool _disposed; - public override Task StartAsync(CancellationToken cancellationToken) + public override async Task StartAsync(CancellationToken cancellationToken) { logger.LogDebug("Subscribe for leadership updates."); elector.OnStartedLeading += StartedLeading; elector.OnStoppedLeading += StoppedLeading; - return elector.IsLeader() ? base.StartAsync(_cts.Token) : Task.CompletedTask; + if (elector.IsLeader()) + { + using CancellationTokenSource linkedCancellationTokenSource = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, _cts.Token); + await base.StartAsync(linkedCancellationTokenSource.Token); + } } public override Task StopAsync(CancellationToken cancellationToken) @@ -43,7 +49,8 @@ public override Task StopAsync(CancellationToken cancellationToken) elector.OnStartedLeading -= StartedLeading; elector.OnStoppedLeading -= StoppedLeading; - return Task.CompletedTask; + + return elector.IsLeader() ? base.StopAsync(cancellationToken) : Task.CompletedTask; } protected override void Dispose(bool disposing) @@ -63,6 +70,13 @@ protected override void Dispose(bool disposing) private void StartedLeading() { logger.LogInformation("This instance started leading, starting watcher."); + + if (_cts.IsCancellationRequested) + { + _cts.Dispose(); + _cts = new CancellationTokenSource(); + } + base.StartAsync(_cts.Token); } @@ -71,6 +85,9 @@ private void StoppedLeading() _cts.Cancel(); logger.LogInformation("This instance stopped leading, stopping watcher."); - base.StopAsync(_cts.Token).Wait(); + + // Stop the base implementation using the 'ApplicationStopped' cancellation token. + // The cancellation token should only be marked cancelled when the stop should no longer be graceful. + base.StopAsync(hostApplicationLifetime.ApplicationStopped).Wait(); } } diff --git a/src/KubeOps.Operator/Watcher/ResourceWatcher{TEntity}.cs b/src/KubeOps.Operator/Watcher/ResourceWatcher{TEntity}.cs index 483385b2..02fc2991 100644 --- a/src/KubeOps.Operator/Watcher/ResourceWatcher{TEntity}.cs +++ b/src/KubeOps.Operator/Watcher/ResourceWatcher{TEntity}.cs @@ -28,8 +28,8 @@ internal class ResourceWatcher( where TEntity : IKubernetesObject { private readonly ConcurrentDictionary _entityCache = new(); - private readonly CancellationTokenSource _cancellationTokenSource = new(); + private CancellationTokenSource _cancellationTokenSource = new(); private uint _watcherReconnectRetries; private Task? _eventWatcher; private bool _disposed; @@ -40,6 +40,12 @@ public virtual Task StartAsync(CancellationToken cancellationToken) { logger.LogInformation("Starting resource watcher for {ResourceType}.", typeof(TEntity).Name); + if (_cancellationTokenSource.IsCancellationRequested) + { + _cancellationTokenSource.Dispose(); + _cancellationTokenSource = new CancellationTokenSource(); + } + _eventWatcher = WatchClientEventsAsync(_cancellationTokenSource.Token); logger.LogInformation("Started resource watcher for {ResourceType}.", typeof(TEntity).Name); diff --git a/test/KubeOps.Operator.Test/KubeOps.Operator.Test.csproj b/test/KubeOps.Operator.Test/KubeOps.Operator.Test.csproj index 974c6f29..a43371f4 100644 --- a/test/KubeOps.Operator.Test/KubeOps.Operator.Test.csproj +++ b/test/KubeOps.Operator.Test/KubeOps.Operator.Test.csproj @@ -7,6 +7,7 @@ + diff --git a/test/KubeOps.Operator.Test/LeaderElector/LeaderElectionBackgroundService.Test.cs b/test/KubeOps.Operator.Test/LeaderElector/LeaderElectionBackgroundService.Test.cs new file mode 100644 index 00000000..a6316c70 --- /dev/null +++ b/test/KubeOps.Operator.Test/LeaderElector/LeaderElectionBackgroundService.Test.cs @@ -0,0 +1,58 @@ +using FluentAssertions; + +using k8s.LeaderElection; + +using KubeOps.Operator.LeaderElection; + +using Microsoft.Extensions.Logging; + +using Moq; + +namespace KubeOps.Operator.Test.LeaderElector; + +public sealed class LeaderElectionBackgroundServiceTest +{ + [Fact] + public async Task Elector_Throws_Should_Retry() + { + // Arrange. + var logger = Mock.Of>(); + + var electionLock = Mock.Of(); + + var electionLockSubsequentCallEvent = new AutoResetEvent(false); + bool hasElectionLockThrown = false; + Mock.Get(electionLock) + .Setup(electionLock => electionLock.GetAsync(It.IsAny())) + .Returns( + async cancellationToken => + { + if (hasElectionLockThrown) + { + // Signal to the test that a subsequent call has been made. + electionLockSubsequentCallEvent.Set(); + + // Delay returning for a long time, allowing the test to stop the background service, in turn cancelling the cancellation token. + await Task.Delay(TimeSpan.FromSeconds(10), cancellationToken); + throw new InvalidOperationException(); + } + + hasElectionLockThrown = true; + throw new Exception("Unit test exception"); + }); + + var leaderElectionConfig = new LeaderElectionConfig(electionLock); + var leaderElector = new k8s.LeaderElection.LeaderElector(leaderElectionConfig); + + var leaderElectionBackgroundService = new LeaderElectionBackgroundService(logger, leaderElector); + + // Act / Assert. + await leaderElectionBackgroundService.StartAsync(CancellationToken.None); + + // Starting the background service should result in the lock attempt throwing, and then a subsequent attempt being made. + // Wait for the subsequent event to be signalled, if we time out the test fails. + electionLockSubsequentCallEvent.WaitOne(TimeSpan.FromMilliseconds(500)).Should().BeTrue(); + + await leaderElectionBackgroundService.StopAsync(CancellationToken.None); + } +} diff --git a/test/KubeOps.Operator.Test/Watcher/ResourceWatcher{TEntity}.Test.cs b/test/KubeOps.Operator.Test/Watcher/ResourceWatcher{TEntity}.Test.cs new file mode 100644 index 00000000..b58efa3e --- /dev/null +++ b/test/KubeOps.Operator.Test/Watcher/ResourceWatcher{TEntity}.Test.cs @@ -0,0 +1,53 @@ +using System.Runtime.CompilerServices; + +using k8s; +using k8s.Models; + +using KubeOps.Abstractions.Builder; +using KubeOps.KubernetesClient; +using KubeOps.Operator.Queue; +using KubeOps.Operator.Watcher; + +using Microsoft.Extensions.Logging; + +using Moq; + +namespace KubeOps.Operator.Test.Watcher; + +public sealed class ResourceWatcherTest +{ + [Fact] + public async Task Restarting_Watcher_Should_Trigger_New_Watch() + { + // Arrange. + var logger = Mock.Of>>(); + var serviceProvider = Mock.Of(); + var timedEntityQueue = new TimedEntityQueue(); + var operatorSettings = new OperatorSettings() { Namespace = "unit-test" }; + var kubernetesClient = Mock.Of(); + + Mock.Get(kubernetesClient) + .Setup(client => client.WatchAsync("unit-test", null, null, true, It.IsAny())) + .Returns((_, _, _, _, cancellationToken) => WaitForCancellationAsync<(WatchEventType, V1Pod)>(cancellationToken)); + + var resourceWatcher = new ResourceWatcher(logger, serviceProvider, timedEntityQueue, operatorSettings, kubernetesClient); + + // Act. + // Start and stop the watcher. + await resourceWatcher.StartAsync(CancellationToken.None); + await resourceWatcher.StopAsync(CancellationToken.None); + + // Restart the watcher. + await resourceWatcher.StartAsync(CancellationToken.None); + + // Assert. + Mock.Get(kubernetesClient) + .Verify(client => client.WatchAsync("unit-test", null, null, true, It.IsAny()), Times.Exactly(2)); + } + + private static async IAsyncEnumerable WaitForCancellationAsync([EnumeratorCancellation] CancellationToken cancellationToken) + { + await Task.Delay(Timeout.Infinite, cancellationToken); + yield return default!; + } +}