From f5240697eff3557e8e284ac4a26f1ae6ccb4b6d7 Mon Sep 17 00:00:00 2001 From: Jasmin Date: Thu, 21 Mar 2024 10:35:34 +0100 Subject: [PATCH] fix(client): don't access disposed resources (#738) This PR is ensuring that the host does not stop with an exception. Those exceptions were occuring because the `CancellationTokenSource` inside the hosted services was already disposed when `StopAsync` got called. The reason for this is that the host invokes `DisposeAsync` **before** `StopAsync`. --- .../KubernetesClient.cs | 4 +- .../LeaderElectionBackgroundService.cs | 44 +++++++++-- .../Queue/EntityRequeueBackgroundService.cs | 46 +++++++++-- .../LeaderAwareResourceWatcher{TEntity}.cs | 21 ++++- .../Watcher/ResourceWatcher{TEntity}.cs | 78 ++++++++++++++++++- .../LeaderResourceWatcher.Integration.Test.cs | 25 ++++++ .../ResourceWatcher.Integration.Test.cs | 54 +++++++++++++ 7 files changed, 251 insertions(+), 21 deletions(-) create mode 100644 test/KubeOps.Operator.Test/HostedServices/LeaderResourceWatcher.Integration.Test.cs create mode 100644 test/KubeOps.Operator.Test/HostedServices/ResourceWatcher.Integration.Test.cs diff --git a/src/KubeOps.KubernetesClient/KubernetesClient.cs b/src/KubeOps.KubernetesClient/KubernetesClient.cs index c170c13b..18ddf946 100644 --- a/src/KubeOps.KubernetesClient/KubernetesClient.cs +++ b/src/KubeOps.KubernetesClient/KubernetesClient.cs @@ -428,13 +428,11 @@ public void Dispose() protected virtual void Dispose(bool disposing) { - if (!disposing) + if (!disposing || _disposed) { return; } - ThrowIfDisposed(); - // The property is intentionally set before the underlying _client is disposed. // This ensures that even if the disposal of the client is not finished yet, that all calls to the client // are instantly failing. diff --git a/src/KubeOps.Operator/LeaderElection/LeaderElectionBackgroundService.cs b/src/KubeOps.Operator/LeaderElection/LeaderElectionBackgroundService.cs index 66caf6f8..eedbfc05 100644 --- a/src/KubeOps.Operator/LeaderElection/LeaderElectionBackgroundService.cs +++ b/src/KubeOps.Operator/LeaderElection/LeaderElectionBackgroundService.cs @@ -8,9 +8,11 @@ namespace KubeOps.Operator.LeaderElection; /// This background service connects to the API and continuously watches the leader election. /// /// The elector. -internal sealed class LeaderElectionBackgroundService(LeaderElector elector) : IHostedService, IDisposable +internal sealed class LeaderElectionBackgroundService(LeaderElector elector) + : IHostedService, IDisposable, IAsyncDisposable { private readonly CancellationTokenSource _cts = new(); + private bool _disposed; public Task StartAsync(CancellationToken cancellationToken) { @@ -29,15 +31,45 @@ public Task StartAsync(CancellationToken cancellationToken) return Task.CompletedTask; } - public void Dispose() => _cts.Dispose(); + public void Dispose() + { + _cts.Dispose(); + elector.Dispose(); + _disposed = true; + } -#if NET8_0_OR_GREATER - public Task StopAsync(CancellationToken cancellationToken) => _cts.CancelAsync(); -#else public Task StopAsync(CancellationToken cancellationToken) { + if (_disposed) + { + return Task.CompletedTask; + } + +#if NET8_0_OR_GREATER + return _cts.CancelAsync(); +#else _cts.Cancel(); return Task.CompletedTask; - } #endif + } + + public async ValueTask DisposeAsync() + { + await CastAndDispose(_cts); + await CastAndDispose(elector); + + _disposed = true; + + static async ValueTask CastAndDispose(IDisposable resource) + { + if (resource is IAsyncDisposable resourceAsyncDisposable) + { + await resourceAsyncDisposable.DisposeAsync(); + } + else + { + resource.Dispose(); + } + } + } } diff --git a/src/KubeOps.Operator/Queue/EntityRequeueBackgroundService.cs b/src/KubeOps.Operator/Queue/EntityRequeueBackgroundService.cs index 0eb8d7d2..6b95f568 100644 --- a/src/KubeOps.Operator/Queue/EntityRequeueBackgroundService.cs +++ b/src/KubeOps.Operator/Queue/EntityRequeueBackgroundService.cs @@ -14,10 +14,11 @@ internal sealed class EntityRequeueBackgroundService( IKubernetesClient client, TimedEntityQueue queue, IServiceProvider provider, - ILogger> logger) : IHostedService, IDisposable + ILogger> logger) : IHostedService, IDisposable, IAsyncDisposable where TEntity : IKubernetesObject { private readonly CancellationTokenSource _cts = new(); + private bool _disposed; public Task StartAsync(CancellationToken cancellationToken) { @@ -36,17 +37,50 @@ public Task StartAsync(CancellationToken cancellationToken) return Task.CompletedTask; } -#if NET8_0_OR_GREATER - public Task StopAsync(CancellationToken cancellationToken) => _cts.CancelAsync(); -#else public Task StopAsync(CancellationToken cancellationToken) { + if (_disposed) + { + return Task.CompletedTask; + } + +#if NET8_0_OR_GREATER + return _cts.CancelAsync(); +#else _cts.Cancel(); return Task.CompletedTask; - } #endif + } - public void Dispose() => _cts.Dispose(); + public void Dispose() + { + _cts.Dispose(); + client.Dispose(); + queue.Dispose(); + + _disposed = true; + } + + public async ValueTask DisposeAsync() + { + await CastAndDispose(_cts); + await CastAndDispose(client); + await CastAndDispose(queue); + + _disposed = true; + + static async ValueTask CastAndDispose(IDisposable resource) + { + if (resource is IAsyncDisposable resourceAsyncDisposable) + { + await resourceAsyncDisposable.DisposeAsync(); + } + else + { + resource.Dispose(); + } + } + } private async Task WatchAsync(CancellationToken cancellationToken) { diff --git a/src/KubeOps.Operator/Watcher/LeaderAwareResourceWatcher{TEntity}.cs b/src/KubeOps.Operator/Watcher/LeaderAwareResourceWatcher{TEntity}.cs index 9369705c..b5643de6 100644 --- a/src/KubeOps.Operator/Watcher/LeaderAwareResourceWatcher{TEntity}.cs +++ b/src/KubeOps.Operator/Watcher/LeaderAwareResourceWatcher{TEntity}.cs @@ -21,6 +21,7 @@ internal sealed class LeaderAwareResourceWatcher( where TEntity : IKubernetesObject { private readonly CancellationTokenSource _cts = new(); + private bool _disposed; public override Task StartAsync(CancellationToken cancellationToken) { @@ -35,14 +36,30 @@ public override Task StartAsync(CancellationToken cancellationToken) public override Task StopAsync(CancellationToken cancellationToken) { logger.LogDebug("Unsubscribe from leadership updates."); - _cts.Cancel(); - _cts.Dispose(); + if (_disposed) + { + return Task.CompletedTask; + } elector.OnStartedLeading -= StartedLeading; elector.OnStoppedLeading -= StoppedLeading; return Task.CompletedTask; } + protected override void Dispose(bool disposing) + { + if (!disposing) + { + return; + } + + _cts.Dispose(); + elector.Dispose(); + _disposed = true; + + base.Dispose(disposing); + } + private void StartedLeading() { logger.LogInformation("This instance started leading, starting watcher."); diff --git a/src/KubeOps.Operator/Watcher/ResourceWatcher{TEntity}.cs b/src/KubeOps.Operator/Watcher/ResourceWatcher{TEntity}.cs index 5484bba8..1566e83d 100644 --- a/src/KubeOps.Operator/Watcher/ResourceWatcher{TEntity}.cs +++ b/src/KubeOps.Operator/Watcher/ResourceWatcher{TEntity}.cs @@ -24,14 +24,20 @@ internal class ResourceWatcher( TimedEntityQueue requeue, OperatorSettings settings, IKubernetesClient client) - : IHostedService + : IHostedService, IAsyncDisposable, IDisposable where TEntity : IKubernetesObject { private readonly ConcurrentDictionary _entityCache = new(); private readonly CancellationTokenSource _cancellationTokenSource = new(); private uint _watcherReconnectRetries; - private Task _eventWatcher = Task.CompletedTask; + private Task? _eventWatcher; + private bool _disposed; + + ~ResourceWatcher() + { + Dispose(false); + } public virtual Task StartAsync(CancellationToken cancellationToken) { @@ -46,16 +52,80 @@ public virtual Task StartAsync(CancellationToken cancellationToken) public virtual async Task StopAsync(CancellationToken cancellationToken) { logger.LogInformation("Stopping resource watcher for {ResourceType}.", typeof(TEntity).Name); + if (_disposed) + { + return; + } + #if NET8_0_OR_GREATER await _cancellationTokenSource.CancelAsync(); #else _cancellationTokenSource.Cancel(); #endif - await _eventWatcher.WaitAsync(cancellationToken); - _cancellationTokenSource.Dispose(); + if (_eventWatcher is not null) + { + await _eventWatcher.WaitAsync(cancellationToken); + } + logger.LogInformation("Stopped resource watcher for {ResourceType}.", typeof(TEntity).Name); } + public async ValueTask DisposeAsync() + { + await StopAsync(CancellationToken.None); + await DisposeAsyncCore(); + GC.SuppressFinalize(this); + } + + public void Dispose() + { + Dispose(true); + GC.SuppressFinalize(this); + } + + protected virtual void Dispose(bool disposing) + { + if (!disposing) + { + return; + } + + _cancellationTokenSource.Dispose(); + _eventWatcher?.Dispose(); + requeue.Dispose(); + client.Dispose(); + + _disposed = true; + } + + protected virtual async ValueTask DisposeAsyncCore() + { + if (_eventWatcher is not null) + { + await CastAndDispose(_eventWatcher); + } + + await CastAndDispose(_cancellationTokenSource); + await CastAndDispose(requeue); + await CastAndDispose(client); + + _disposed = true; + + return; + + static async ValueTask CastAndDispose(IDisposable resource) + { + if (resource is IAsyncDisposable resourceAsyncDisposable) + { + await resourceAsyncDisposable.DisposeAsync(); + } + else + { + resource.Dispose(); + } + } + } + private async Task WatchClientEventsAsync(CancellationToken stoppingToken) { try diff --git a/test/KubeOps.Operator.Test/HostedServices/LeaderResourceWatcher.Integration.Test.cs b/test/KubeOps.Operator.Test/HostedServices/LeaderResourceWatcher.Integration.Test.cs new file mode 100644 index 00000000..d4994428 --- /dev/null +++ b/test/KubeOps.Operator.Test/HostedServices/LeaderResourceWatcher.Integration.Test.cs @@ -0,0 +1,25 @@ +using KubeOps.Abstractions.Controller; +using KubeOps.Operator.Test.TestEntities; + +using Microsoft.Extensions.Hosting; + +namespace KubeOps.Operator.Test.HostedServices; + +public class LeaderAwareHostedServiceDisposeIntegrationTest : HostedServiceDisposeIntegrationTest +{ + protected override void ConfigureHost(HostApplicationBuilder builder) + { + builder.Services + .AddKubernetesOperator(op => op.EnableLeaderElection = true) + .AddController(); + } + + private class TestController : IEntityController + { + public Task ReconcileAsync(V1OperatorIntegrationTestEntity entity, CancellationToken cancellationToken) => + Task.CompletedTask; + + public Task DeletedAsync(V1OperatorIntegrationTestEntity entity, CancellationToken cancellationToken) => + Task.CompletedTask; + } +} diff --git a/test/KubeOps.Operator.Test/HostedServices/ResourceWatcher.Integration.Test.cs b/test/KubeOps.Operator.Test/HostedServices/ResourceWatcher.Integration.Test.cs new file mode 100644 index 00000000..a30a15a9 --- /dev/null +++ b/test/KubeOps.Operator.Test/HostedServices/ResourceWatcher.Integration.Test.cs @@ -0,0 +1,54 @@ +using KubeOps.Abstractions.Controller; +using KubeOps.Operator.Test.TestEntities; + +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Hosting; + +namespace KubeOps.Operator.Test.HostedServices; + +public class HostedServiceDisposeIntegrationTest : IntegrationTestBase +{ + [Fact] + public async Task Should_Allow_DisposeAsync_Before_StopAsync() + { + var hostedServices = Services.GetServices() + .Where(service => service.GetType().Namespace!.StartsWith("KubeOps")); + + // We need to test the inverse order, because the Host is usually disposing the resources in advance of + // stopping them. + foreach (IHostedService service in hostedServices) + { + await Assert.IsAssignableFrom(service).DisposeAsync(); + await service.StopAsync(CancellationToken.None); + } + } + + [Fact] + public async Task Should_Allow_StopAsync_Before_DisposeAsync() + { + var hostedServices = Services.GetServices() + .Where(service => service.GetType().Namespace!.StartsWith("KubeOps")); + + foreach (IHostedService service in hostedServices) + { + await service.StopAsync(CancellationToken.None); + await Assert.IsAssignableFrom(service).DisposeAsync(); + } + } + + protected override void ConfigureHost(HostApplicationBuilder builder) + { + builder.Services + .AddKubernetesOperator() + .AddController(); + } + + private class TestController : IEntityController + { + public Task ReconcileAsync(V1OperatorIntegrationTestEntity entity, CancellationToken cancellationToken) => + Task.CompletedTask; + + public Task DeletedAsync(V1OperatorIntegrationTestEntity entity, CancellationToken cancellationToken) => + Task.CompletedTask; + } +}