diff --git a/src/KubeOps.Operator/LeaderElection/LeaderElectionBackgroundService.cs b/src/KubeOps.Operator/LeaderElection/LeaderElectionBackgroundService.cs
index d7d6c141..fd9e1b07 100644
--- a/src/KubeOps.Operator/LeaderElection/LeaderElectionBackgroundService.cs
+++ b/src/KubeOps.Operator/LeaderElection/LeaderElectionBackgroundService.cs
@@ -1,18 +1,21 @@
using k8s.LeaderElection;
using Microsoft.Extensions.Hosting;
+using Microsoft.Extensions.Logging;
namespace KubeOps.Operator.LeaderElection;
///
/// This background service connects to the API and continuously watches the leader election.
///
+/// The logger.
/// The elector.
-internal sealed class LeaderElectionBackgroundService(LeaderElector elector)
+internal sealed class LeaderElectionBackgroundService(ILogger logger, LeaderElector elector)
: IHostedService, IDisposable, IAsyncDisposable
{
private readonly CancellationTokenSource _cts = new();
private bool _disposed;
+ private Task? _leadershipTask;
public Task StartAsync(CancellationToken cancellationToken)
{
@@ -26,7 +29,7 @@ public Task StartAsync(CancellationToken cancellationToken)
// Therefore, we use Task.Run() and put the work to queue. The passed cancellation token of the StartAsync
// method is not used, because it would only cancel the scheduling (which we definitely don't want to cancel).
// To make this intention explicit, CancellationToken.None gets passed.
- _ = Task.Run(() => elector.RunUntilLeadershipLostAsync(_cts.Token), CancellationToken.None);
+ _leadershipTask = Task.Run(RunAndTryToHoldLeadershipForeverAsync, CancellationToken.None);
return Task.CompletedTask;
}
@@ -38,19 +41,23 @@ public void Dispose()
_disposed = true;
}
- public Task StopAsync(CancellationToken cancellationToken)
+ public async Task StopAsync(CancellationToken cancellationToken)
{
if (_disposed)
{
- return Task.CompletedTask;
+ return;
}
#if NET8_0_OR_GREATER
- return _cts.CancelAsync();
+ await _cts.CancelAsync();
#else
_cts.Cancel();
- return Task.CompletedTask;
#endif
+
+ if (_leadershipTask is not null)
+ {
+ await _leadershipTask;
+ }
}
public async ValueTask DisposeAsync()
@@ -72,4 +79,23 @@ static async ValueTask CastAndDispose(IDisposable resource)
}
}
}
+
+ private async Task RunAndTryToHoldLeadershipForeverAsync()
+ {
+ while (!_cts.IsCancellationRequested)
+ {
+ try
+ {
+ await elector.RunUntilLeadershipLostAsync(_cts.Token);
+ }
+ catch (OperationCanceledException) when (_cts.IsCancellationRequested)
+ {
+ // Ignore cancellation exceptions when we've been asked to stop.
+ }
+ catch (Exception exception)
+ {
+ logger.LogError(exception, "Failed to hold leadership.");
+ }
+ }
+ }
}
diff --git a/src/KubeOps.Operator/Watcher/LeaderAwareResourceWatcher{TEntity}.cs b/src/KubeOps.Operator/Watcher/LeaderAwareResourceWatcher{TEntity}.cs
index b5643de6..c3ee6de3 100644
--- a/src/KubeOps.Operator/Watcher/LeaderAwareResourceWatcher{TEntity}.cs
+++ b/src/KubeOps.Operator/Watcher/LeaderAwareResourceWatcher{TEntity}.cs
@@ -1,4 +1,4 @@
-using k8s;
+using k8s;
using k8s.LeaderElection;
using k8s.Models;
@@ -6,6 +6,7 @@
using KubeOps.KubernetesClient;
using KubeOps.Operator.Queue;
+using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
namespace KubeOps.Operator.Watcher;
@@ -16,21 +17,26 @@ internal sealed class LeaderAwareResourceWatcher(
TimedEntityQueue queue,
OperatorSettings settings,
IKubernetesClient client,
+ IHostApplicationLifetime hostApplicationLifetime,
LeaderElector elector)
: ResourceWatcher(logger, provider, queue, settings, client)
where TEntity : IKubernetesObject
{
- private readonly CancellationTokenSource _cts = new();
+ private CancellationTokenSource _cts = new();
private bool _disposed;
- public override Task StartAsync(CancellationToken cancellationToken)
+ public override async Task StartAsync(CancellationToken cancellationToken)
{
logger.LogDebug("Subscribe for leadership updates.");
elector.OnStartedLeading += StartedLeading;
elector.OnStoppedLeading += StoppedLeading;
- return elector.IsLeader() ? base.StartAsync(_cts.Token) : Task.CompletedTask;
+ if (elector.IsLeader())
+ {
+ using CancellationTokenSource linkedCancellationTokenSource = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, _cts.Token);
+ await base.StartAsync(linkedCancellationTokenSource.Token);
+ }
}
public override Task StopAsync(CancellationToken cancellationToken)
@@ -43,7 +49,8 @@ public override Task StopAsync(CancellationToken cancellationToken)
elector.OnStartedLeading -= StartedLeading;
elector.OnStoppedLeading -= StoppedLeading;
- return Task.CompletedTask;
+
+ return elector.IsLeader() ? base.StopAsync(cancellationToken) : Task.CompletedTask;
}
protected override void Dispose(bool disposing)
@@ -63,6 +70,13 @@ protected override void Dispose(bool disposing)
private void StartedLeading()
{
logger.LogInformation("This instance started leading, starting watcher.");
+
+ if (_cts.IsCancellationRequested)
+ {
+ _cts.Dispose();
+ _cts = new CancellationTokenSource();
+ }
+
base.StartAsync(_cts.Token);
}
@@ -71,6 +85,9 @@ private void StoppedLeading()
_cts.Cancel();
logger.LogInformation("This instance stopped leading, stopping watcher.");
- base.StopAsync(_cts.Token).Wait();
+
+ // Stop the base implementation using the 'ApplicationStopped' cancellation token.
+ // The cancellation token should only be marked cancelled when the stop should no longer be graceful.
+ base.StopAsync(hostApplicationLifetime.ApplicationStopped).Wait();
}
}
diff --git a/src/KubeOps.Operator/Watcher/ResourceWatcher{TEntity}.cs b/src/KubeOps.Operator/Watcher/ResourceWatcher{TEntity}.cs
index 483385b2..02fc2991 100644
--- a/src/KubeOps.Operator/Watcher/ResourceWatcher{TEntity}.cs
+++ b/src/KubeOps.Operator/Watcher/ResourceWatcher{TEntity}.cs
@@ -28,8 +28,8 @@ internal class ResourceWatcher(
where TEntity : IKubernetesObject
{
private readonly ConcurrentDictionary _entityCache = new();
- private readonly CancellationTokenSource _cancellationTokenSource = new();
+ private CancellationTokenSource _cancellationTokenSource = new();
private uint _watcherReconnectRetries;
private Task? _eventWatcher;
private bool _disposed;
@@ -40,6 +40,12 @@ public virtual Task StartAsync(CancellationToken cancellationToken)
{
logger.LogInformation("Starting resource watcher for {ResourceType}.", typeof(TEntity).Name);
+ if (_cancellationTokenSource.IsCancellationRequested)
+ {
+ _cancellationTokenSource.Dispose();
+ _cancellationTokenSource = new CancellationTokenSource();
+ }
+
_eventWatcher = WatchClientEventsAsync(_cancellationTokenSource.Token);
logger.LogInformation("Started resource watcher for {ResourceType}.", typeof(TEntity).Name);
diff --git a/test/KubeOps.Operator.Test/KubeOps.Operator.Test.csproj b/test/KubeOps.Operator.Test/KubeOps.Operator.Test.csproj
index 974c6f29..a43371f4 100644
--- a/test/KubeOps.Operator.Test/KubeOps.Operator.Test.csproj
+++ b/test/KubeOps.Operator.Test/KubeOps.Operator.Test.csproj
@@ -7,6 +7,7 @@
+
diff --git a/test/KubeOps.Operator.Test/LeaderElector/LeaderElectionBackgroundService.Test.cs b/test/KubeOps.Operator.Test/LeaderElector/LeaderElectionBackgroundService.Test.cs
new file mode 100644
index 00000000..a6316c70
--- /dev/null
+++ b/test/KubeOps.Operator.Test/LeaderElector/LeaderElectionBackgroundService.Test.cs
@@ -0,0 +1,58 @@
+using FluentAssertions;
+
+using k8s.LeaderElection;
+
+using KubeOps.Operator.LeaderElection;
+
+using Microsoft.Extensions.Logging;
+
+using Moq;
+
+namespace KubeOps.Operator.Test.LeaderElector;
+
+public sealed class LeaderElectionBackgroundServiceTest
+{
+ [Fact]
+ public async Task Elector_Throws_Should_Retry()
+ {
+ // Arrange.
+ var logger = Mock.Of>();
+
+ var electionLock = Mock.Of();
+
+ var electionLockSubsequentCallEvent = new AutoResetEvent(false);
+ bool hasElectionLockThrown = false;
+ Mock.Get(electionLock)
+ .Setup(electionLock => electionLock.GetAsync(It.IsAny()))
+ .Returns(
+ async cancellationToken =>
+ {
+ if (hasElectionLockThrown)
+ {
+ // Signal to the test that a subsequent call has been made.
+ electionLockSubsequentCallEvent.Set();
+
+ // Delay returning for a long time, allowing the test to stop the background service, in turn cancelling the cancellation token.
+ await Task.Delay(TimeSpan.FromSeconds(10), cancellationToken);
+ throw new InvalidOperationException();
+ }
+
+ hasElectionLockThrown = true;
+ throw new Exception("Unit test exception");
+ });
+
+ var leaderElectionConfig = new LeaderElectionConfig(electionLock);
+ var leaderElector = new k8s.LeaderElection.LeaderElector(leaderElectionConfig);
+
+ var leaderElectionBackgroundService = new LeaderElectionBackgroundService(logger, leaderElector);
+
+ // Act / Assert.
+ await leaderElectionBackgroundService.StartAsync(CancellationToken.None);
+
+ // Starting the background service should result in the lock attempt throwing, and then a subsequent attempt being made.
+ // Wait for the subsequent event to be signalled, if we time out the test fails.
+ electionLockSubsequentCallEvent.WaitOne(TimeSpan.FromMilliseconds(500)).Should().BeTrue();
+
+ await leaderElectionBackgroundService.StopAsync(CancellationToken.None);
+ }
+}
diff --git a/test/KubeOps.Operator.Test/Watcher/ResourceWatcher{TEntity}.Test.cs b/test/KubeOps.Operator.Test/Watcher/ResourceWatcher{TEntity}.Test.cs
new file mode 100644
index 00000000..b58efa3e
--- /dev/null
+++ b/test/KubeOps.Operator.Test/Watcher/ResourceWatcher{TEntity}.Test.cs
@@ -0,0 +1,53 @@
+using System.Runtime.CompilerServices;
+
+using k8s;
+using k8s.Models;
+
+using KubeOps.Abstractions.Builder;
+using KubeOps.KubernetesClient;
+using KubeOps.Operator.Queue;
+using KubeOps.Operator.Watcher;
+
+using Microsoft.Extensions.Logging;
+
+using Moq;
+
+namespace KubeOps.Operator.Test.Watcher;
+
+public sealed class ResourceWatcherTest
+{
+ [Fact]
+ public async Task Restarting_Watcher_Should_Trigger_New_Watch()
+ {
+ // Arrange.
+ var logger = Mock.Of>>();
+ var serviceProvider = Mock.Of();
+ var timedEntityQueue = new TimedEntityQueue();
+ var operatorSettings = new OperatorSettings() { Namespace = "unit-test" };
+ var kubernetesClient = Mock.Of();
+
+ Mock.Get(kubernetesClient)
+ .Setup(client => client.WatchAsync("unit-test", null, null, true, It.IsAny()))
+ .Returns((_, _, _, _, cancellationToken) => WaitForCancellationAsync<(WatchEventType, V1Pod)>(cancellationToken));
+
+ var resourceWatcher = new ResourceWatcher(logger, serviceProvider, timedEntityQueue, operatorSettings, kubernetesClient);
+
+ // Act.
+ // Start and stop the watcher.
+ await resourceWatcher.StartAsync(CancellationToken.None);
+ await resourceWatcher.StopAsync(CancellationToken.None);
+
+ // Restart the watcher.
+ await resourceWatcher.StartAsync(CancellationToken.None);
+
+ // Assert.
+ Mock.Get(kubernetesClient)
+ .Verify(client => client.WatchAsync("unit-test", null, null, true, It.IsAny()), Times.Exactly(2));
+ }
+
+ private static async IAsyncEnumerable WaitForCancellationAsync([EnumeratorCancellation] CancellationToken cancellationToken)
+ {
+ await Task.Delay(Timeout.Infinite, cancellationToken);
+ yield return default!;
+ }
+}