diff --git a/src/KubeOps.KubernetesClient/KubernetesClient.cs b/src/KubeOps.KubernetesClient/KubernetesClient.cs
index c170c13b..18ddf946 100644
--- a/src/KubeOps.KubernetesClient/KubernetesClient.cs
+++ b/src/KubeOps.KubernetesClient/KubernetesClient.cs
@@ -428,13 +428,11 @@ public void Dispose()
protected virtual void Dispose(bool disposing)
{
- if (!disposing)
+ if (!disposing || _disposed)
{
return;
}
- ThrowIfDisposed();
-
// The property is intentionally set before the underlying _client is disposed.
// This ensures that even if the disposal of the client is not finished yet, that all calls to the client
// are instantly failing.
diff --git a/src/KubeOps.Operator/LeaderElection/LeaderElectionBackgroundService.cs b/src/KubeOps.Operator/LeaderElection/LeaderElectionBackgroundService.cs
index 66caf6f8..eedbfc05 100644
--- a/src/KubeOps.Operator/LeaderElection/LeaderElectionBackgroundService.cs
+++ b/src/KubeOps.Operator/LeaderElection/LeaderElectionBackgroundService.cs
@@ -8,9 +8,11 @@ namespace KubeOps.Operator.LeaderElection;
/// This background service connects to the API and continuously watches the leader election.
///
/// The elector.
-internal sealed class LeaderElectionBackgroundService(LeaderElector elector) : IHostedService, IDisposable
+internal sealed class LeaderElectionBackgroundService(LeaderElector elector)
+ : IHostedService, IDisposable, IAsyncDisposable
{
private readonly CancellationTokenSource _cts = new();
+ private bool _disposed;
public Task StartAsync(CancellationToken cancellationToken)
{
@@ -29,15 +31,45 @@ public Task StartAsync(CancellationToken cancellationToken)
return Task.CompletedTask;
}
- public void Dispose() => _cts.Dispose();
+ public void Dispose()
+ {
+ _cts.Dispose();
+ elector.Dispose();
+ _disposed = true;
+ }
-#if NET8_0_OR_GREATER
- public Task StopAsync(CancellationToken cancellationToken) => _cts.CancelAsync();
-#else
public Task StopAsync(CancellationToken cancellationToken)
{
+ if (_disposed)
+ {
+ return Task.CompletedTask;
+ }
+
+#if NET8_0_OR_GREATER
+ return _cts.CancelAsync();
+#else
_cts.Cancel();
return Task.CompletedTask;
- }
#endif
+ }
+
+ public async ValueTask DisposeAsync()
+ {
+ await CastAndDispose(_cts);
+ await CastAndDispose(elector);
+
+ _disposed = true;
+
+ static async ValueTask CastAndDispose(IDisposable resource)
+ {
+ if (resource is IAsyncDisposable resourceAsyncDisposable)
+ {
+ await resourceAsyncDisposable.DisposeAsync();
+ }
+ else
+ {
+ resource.Dispose();
+ }
+ }
+ }
}
diff --git a/src/KubeOps.Operator/Queue/EntityRequeueBackgroundService.cs b/src/KubeOps.Operator/Queue/EntityRequeueBackgroundService.cs
index 0eb8d7d2..6b95f568 100644
--- a/src/KubeOps.Operator/Queue/EntityRequeueBackgroundService.cs
+++ b/src/KubeOps.Operator/Queue/EntityRequeueBackgroundService.cs
@@ -14,10 +14,11 @@ internal sealed class EntityRequeueBackgroundService(
IKubernetesClient client,
TimedEntityQueue queue,
IServiceProvider provider,
- ILogger> logger) : IHostedService, IDisposable
+ ILogger> logger) : IHostedService, IDisposable, IAsyncDisposable
where TEntity : IKubernetesObject
{
private readonly CancellationTokenSource _cts = new();
+ private bool _disposed;
public Task StartAsync(CancellationToken cancellationToken)
{
@@ -36,17 +37,50 @@ public Task StartAsync(CancellationToken cancellationToken)
return Task.CompletedTask;
}
-#if NET8_0_OR_GREATER
- public Task StopAsync(CancellationToken cancellationToken) => _cts.CancelAsync();
-#else
public Task StopAsync(CancellationToken cancellationToken)
{
+ if (_disposed)
+ {
+ return Task.CompletedTask;
+ }
+
+#if NET8_0_OR_GREATER
+ return _cts.CancelAsync();
+#else
_cts.Cancel();
return Task.CompletedTask;
- }
#endif
+ }
- public void Dispose() => _cts.Dispose();
+ public void Dispose()
+ {
+ _cts.Dispose();
+ client.Dispose();
+ queue.Dispose();
+
+ _disposed = true;
+ }
+
+ public async ValueTask DisposeAsync()
+ {
+ await CastAndDispose(_cts);
+ await CastAndDispose(client);
+ await CastAndDispose(queue);
+
+ _disposed = true;
+
+ static async ValueTask CastAndDispose(IDisposable resource)
+ {
+ if (resource is IAsyncDisposable resourceAsyncDisposable)
+ {
+ await resourceAsyncDisposable.DisposeAsync();
+ }
+ else
+ {
+ resource.Dispose();
+ }
+ }
+ }
private async Task WatchAsync(CancellationToken cancellationToken)
{
diff --git a/src/KubeOps.Operator/Watcher/LeaderAwareResourceWatcher{TEntity}.cs b/src/KubeOps.Operator/Watcher/LeaderAwareResourceWatcher{TEntity}.cs
index 9369705c..b5643de6 100644
--- a/src/KubeOps.Operator/Watcher/LeaderAwareResourceWatcher{TEntity}.cs
+++ b/src/KubeOps.Operator/Watcher/LeaderAwareResourceWatcher{TEntity}.cs
@@ -21,6 +21,7 @@ internal sealed class LeaderAwareResourceWatcher(
where TEntity : IKubernetesObject
{
private readonly CancellationTokenSource _cts = new();
+ private bool _disposed;
public override Task StartAsync(CancellationToken cancellationToken)
{
@@ -35,14 +36,30 @@ public override Task StartAsync(CancellationToken cancellationToken)
public override Task StopAsync(CancellationToken cancellationToken)
{
logger.LogDebug("Unsubscribe from leadership updates.");
- _cts.Cancel();
- _cts.Dispose();
+ if (_disposed)
+ {
+ return Task.CompletedTask;
+ }
elector.OnStartedLeading -= StartedLeading;
elector.OnStoppedLeading -= StoppedLeading;
return Task.CompletedTask;
}
+ protected override void Dispose(bool disposing)
+ {
+ if (!disposing)
+ {
+ return;
+ }
+
+ _cts.Dispose();
+ elector.Dispose();
+ _disposed = true;
+
+ base.Dispose(disposing);
+ }
+
private void StartedLeading()
{
logger.LogInformation("This instance started leading, starting watcher.");
diff --git a/src/KubeOps.Operator/Watcher/ResourceWatcher{TEntity}.cs b/src/KubeOps.Operator/Watcher/ResourceWatcher{TEntity}.cs
index 5484bba8..1566e83d 100644
--- a/src/KubeOps.Operator/Watcher/ResourceWatcher{TEntity}.cs
+++ b/src/KubeOps.Operator/Watcher/ResourceWatcher{TEntity}.cs
@@ -24,14 +24,20 @@ internal class ResourceWatcher(
TimedEntityQueue requeue,
OperatorSettings settings,
IKubernetesClient client)
- : IHostedService
+ : IHostedService, IAsyncDisposable, IDisposable
where TEntity : IKubernetesObject
{
private readonly ConcurrentDictionary _entityCache = new();
private readonly CancellationTokenSource _cancellationTokenSource = new();
private uint _watcherReconnectRetries;
- private Task _eventWatcher = Task.CompletedTask;
+ private Task? _eventWatcher;
+ private bool _disposed;
+
+ ~ResourceWatcher()
+ {
+ Dispose(false);
+ }
public virtual Task StartAsync(CancellationToken cancellationToken)
{
@@ -46,16 +52,80 @@ public virtual Task StartAsync(CancellationToken cancellationToken)
public virtual async Task StopAsync(CancellationToken cancellationToken)
{
logger.LogInformation("Stopping resource watcher for {ResourceType}.", typeof(TEntity).Name);
+ if (_disposed)
+ {
+ return;
+ }
+
#if NET8_0_OR_GREATER
await _cancellationTokenSource.CancelAsync();
#else
_cancellationTokenSource.Cancel();
#endif
- await _eventWatcher.WaitAsync(cancellationToken);
- _cancellationTokenSource.Dispose();
+ if (_eventWatcher is not null)
+ {
+ await _eventWatcher.WaitAsync(cancellationToken);
+ }
+
logger.LogInformation("Stopped resource watcher for {ResourceType}.", typeof(TEntity).Name);
}
+ public async ValueTask DisposeAsync()
+ {
+ await StopAsync(CancellationToken.None);
+ await DisposeAsyncCore();
+ GC.SuppressFinalize(this);
+ }
+
+ public void Dispose()
+ {
+ Dispose(true);
+ GC.SuppressFinalize(this);
+ }
+
+ protected virtual void Dispose(bool disposing)
+ {
+ if (!disposing)
+ {
+ return;
+ }
+
+ _cancellationTokenSource.Dispose();
+ _eventWatcher?.Dispose();
+ requeue.Dispose();
+ client.Dispose();
+
+ _disposed = true;
+ }
+
+ protected virtual async ValueTask DisposeAsyncCore()
+ {
+ if (_eventWatcher is not null)
+ {
+ await CastAndDispose(_eventWatcher);
+ }
+
+ await CastAndDispose(_cancellationTokenSource);
+ await CastAndDispose(requeue);
+ await CastAndDispose(client);
+
+ _disposed = true;
+
+ return;
+
+ static async ValueTask CastAndDispose(IDisposable resource)
+ {
+ if (resource is IAsyncDisposable resourceAsyncDisposable)
+ {
+ await resourceAsyncDisposable.DisposeAsync();
+ }
+ else
+ {
+ resource.Dispose();
+ }
+ }
+ }
+
private async Task WatchClientEventsAsync(CancellationToken stoppingToken)
{
try
diff --git a/test/KubeOps.Operator.Test/HostedServices/LeaderResourceWatcher.Integration.Test.cs b/test/KubeOps.Operator.Test/HostedServices/LeaderResourceWatcher.Integration.Test.cs
new file mode 100644
index 00000000..d4994428
--- /dev/null
+++ b/test/KubeOps.Operator.Test/HostedServices/LeaderResourceWatcher.Integration.Test.cs
@@ -0,0 +1,25 @@
+using KubeOps.Abstractions.Controller;
+using KubeOps.Operator.Test.TestEntities;
+
+using Microsoft.Extensions.Hosting;
+
+namespace KubeOps.Operator.Test.HostedServices;
+
+public class LeaderAwareHostedServiceDisposeIntegrationTest : HostedServiceDisposeIntegrationTest
+{
+ protected override void ConfigureHost(HostApplicationBuilder builder)
+ {
+ builder.Services
+ .AddKubernetesOperator(op => op.EnableLeaderElection = true)
+ .AddController();
+ }
+
+ private class TestController : IEntityController
+ {
+ public Task ReconcileAsync(V1OperatorIntegrationTestEntity entity, CancellationToken cancellationToken) =>
+ Task.CompletedTask;
+
+ public Task DeletedAsync(V1OperatorIntegrationTestEntity entity, CancellationToken cancellationToken) =>
+ Task.CompletedTask;
+ }
+}
diff --git a/test/KubeOps.Operator.Test/HostedServices/ResourceWatcher.Integration.Test.cs b/test/KubeOps.Operator.Test/HostedServices/ResourceWatcher.Integration.Test.cs
new file mode 100644
index 00000000..a30a15a9
--- /dev/null
+++ b/test/KubeOps.Operator.Test/HostedServices/ResourceWatcher.Integration.Test.cs
@@ -0,0 +1,54 @@
+using KubeOps.Abstractions.Controller;
+using KubeOps.Operator.Test.TestEntities;
+
+using Microsoft.Extensions.DependencyInjection;
+using Microsoft.Extensions.Hosting;
+
+namespace KubeOps.Operator.Test.HostedServices;
+
+public class HostedServiceDisposeIntegrationTest : IntegrationTestBase
+{
+ [Fact]
+ public async Task Should_Allow_DisposeAsync_Before_StopAsync()
+ {
+ var hostedServices = Services.GetServices()
+ .Where(service => service.GetType().Namespace!.StartsWith("KubeOps"));
+
+ // We need to test the inverse order, because the Host is usually disposing the resources in advance of
+ // stopping them.
+ foreach (IHostedService service in hostedServices)
+ {
+ await Assert.IsAssignableFrom(service).DisposeAsync();
+ await service.StopAsync(CancellationToken.None);
+ }
+ }
+
+ [Fact]
+ public async Task Should_Allow_StopAsync_Before_DisposeAsync()
+ {
+ var hostedServices = Services.GetServices()
+ .Where(service => service.GetType().Namespace!.StartsWith("KubeOps"));
+
+ foreach (IHostedService service in hostedServices)
+ {
+ await service.StopAsync(CancellationToken.None);
+ await Assert.IsAssignableFrom(service).DisposeAsync();
+ }
+ }
+
+ protected override void ConfigureHost(HostApplicationBuilder builder)
+ {
+ builder.Services
+ .AddKubernetesOperator()
+ .AddController();
+ }
+
+ private class TestController : IEntityController
+ {
+ public Task ReconcileAsync(V1OperatorIntegrationTestEntity entity, CancellationToken cancellationToken) =>
+ Task.CompletedTask;
+
+ public Task DeletedAsync(V1OperatorIntegrationTestEntity entity, CancellationToken cancellationToken) =>
+ Task.CompletedTask;
+ }
+}