Skip to content

Commit

Permalink
fix(client): don't access disposed resources (buehler#738)
Browse files Browse the repository at this point in the history
This PR is ensuring that the host does not stop with an exception. Those
exceptions were occuring because the `CancellationTokenSource` inside
the hosted services was already disposed when `StopAsync` got called.
The reason for this is that the host invokes `DisposeAsync` **before**
`StopAsync`.
  • Loading branch information
nachtjasmin authored Mar 21, 2024
1 parent b94eb81 commit f524069
Show file tree
Hide file tree
Showing 7 changed files with 251 additions and 21 deletions.
4 changes: 1 addition & 3 deletions src/KubeOps.KubernetesClient/KubernetesClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -428,13 +428,11 @@ public void Dispose()

protected virtual void Dispose(bool disposing)
{
if (!disposing)
if (!disposing || _disposed)
{
return;
}

ThrowIfDisposed();

// The property is intentionally set before the underlying _client is disposed.
// This ensures that even if the disposal of the client is not finished yet, that all calls to the client
// are instantly failing.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,11 @@ namespace KubeOps.Operator.LeaderElection;
/// This background service connects to the API and continuously watches the leader election.
/// </summary>
/// <param name="elector">The elector.</param>
internal sealed class LeaderElectionBackgroundService(LeaderElector elector) : IHostedService, IDisposable
internal sealed class LeaderElectionBackgroundService(LeaderElector elector)
: IHostedService, IDisposable, IAsyncDisposable
{
private readonly CancellationTokenSource _cts = new();
private bool _disposed;

public Task StartAsync(CancellationToken cancellationToken)
{
Expand All @@ -29,15 +31,45 @@ public Task StartAsync(CancellationToken cancellationToken)
return Task.CompletedTask;
}

public void Dispose() => _cts.Dispose();
public void Dispose()
{
_cts.Dispose();
elector.Dispose();
_disposed = true;
}

#if NET8_0_OR_GREATER
public Task StopAsync(CancellationToken cancellationToken) => _cts.CancelAsync();
#else
public Task StopAsync(CancellationToken cancellationToken)
{
if (_disposed)
{
return Task.CompletedTask;
}

#if NET8_0_OR_GREATER
return _cts.CancelAsync();
#else
_cts.Cancel();
return Task.CompletedTask;
}
#endif
}

public async ValueTask DisposeAsync()
{
await CastAndDispose(_cts);
await CastAndDispose(elector);

_disposed = true;

static async ValueTask CastAndDispose(IDisposable resource)
{
if (resource is IAsyncDisposable resourceAsyncDisposable)
{
await resourceAsyncDisposable.DisposeAsync();
}
else
{
resource.Dispose();
}
}
}
}
46 changes: 40 additions & 6 deletions src/KubeOps.Operator/Queue/EntityRequeueBackgroundService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,11 @@ internal sealed class EntityRequeueBackgroundService<TEntity>(
IKubernetesClient client,
TimedEntityQueue<TEntity> queue,
IServiceProvider provider,
ILogger<EntityRequeueBackgroundService<TEntity>> logger) : IHostedService, IDisposable
ILogger<EntityRequeueBackgroundService<TEntity>> logger) : IHostedService, IDisposable, IAsyncDisposable
where TEntity : IKubernetesObject<V1ObjectMeta>
{
private readonly CancellationTokenSource _cts = new();
private bool _disposed;

public Task StartAsync(CancellationToken cancellationToken)
{
Expand All @@ -36,17 +37,50 @@ public Task StartAsync(CancellationToken cancellationToken)
return Task.CompletedTask;
}

#if NET8_0_OR_GREATER
public Task StopAsync(CancellationToken cancellationToken) => _cts.CancelAsync();
#else
public Task StopAsync(CancellationToken cancellationToken)
{
if (_disposed)
{
return Task.CompletedTask;
}

#if NET8_0_OR_GREATER
return _cts.CancelAsync();
#else
_cts.Cancel();
return Task.CompletedTask;
}
#endif
}

public void Dispose() => _cts.Dispose();
public void Dispose()
{
_cts.Dispose();
client.Dispose();
queue.Dispose();

_disposed = true;
}

public async ValueTask DisposeAsync()
{
await CastAndDispose(_cts);
await CastAndDispose(client);
await CastAndDispose(queue);

_disposed = true;

static async ValueTask CastAndDispose(IDisposable resource)
{
if (resource is IAsyncDisposable resourceAsyncDisposable)
{
await resourceAsyncDisposable.DisposeAsync();
}
else
{
resource.Dispose();
}
}
}

private async Task WatchAsync(CancellationToken cancellationToken)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ internal sealed class LeaderAwareResourceWatcher<TEntity>(
where TEntity : IKubernetesObject<V1ObjectMeta>
{
private readonly CancellationTokenSource _cts = new();
private bool _disposed;

public override Task StartAsync(CancellationToken cancellationToken)
{
Expand All @@ -35,14 +36,30 @@ public override Task StartAsync(CancellationToken cancellationToken)
public override Task StopAsync(CancellationToken cancellationToken)
{
logger.LogDebug("Unsubscribe from leadership updates.");
_cts.Cancel();
_cts.Dispose();
if (_disposed)
{
return Task.CompletedTask;
}

elector.OnStartedLeading -= StartedLeading;
elector.OnStoppedLeading -= StoppedLeading;
return Task.CompletedTask;
}

protected override void Dispose(bool disposing)
{
if (!disposing)
{
return;
}

_cts.Dispose();
elector.Dispose();
_disposed = true;

base.Dispose(disposing);
}

private void StartedLeading()
{
logger.LogInformation("This instance started leading, starting watcher.");
Expand Down
78 changes: 74 additions & 4 deletions src/KubeOps.Operator/Watcher/ResourceWatcher{TEntity}.cs
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,20 @@ internal class ResourceWatcher<TEntity>(
TimedEntityQueue<TEntity> requeue,
OperatorSettings settings,
IKubernetesClient client)
: IHostedService
: IHostedService, IAsyncDisposable, IDisposable
where TEntity : IKubernetesObject<V1ObjectMeta>
{
private readonly ConcurrentDictionary<string, long> _entityCache = new();
private readonly CancellationTokenSource _cancellationTokenSource = new();

private uint _watcherReconnectRetries;
private Task _eventWatcher = Task.CompletedTask;
private Task? _eventWatcher;
private bool _disposed;

~ResourceWatcher()
{
Dispose(false);
}

public virtual Task StartAsync(CancellationToken cancellationToken)
{
Expand All @@ -46,16 +52,80 @@ public virtual Task StartAsync(CancellationToken cancellationToken)
public virtual async Task StopAsync(CancellationToken cancellationToken)
{
logger.LogInformation("Stopping resource watcher for {ResourceType}.", typeof(TEntity).Name);
if (_disposed)
{
return;
}

#if NET8_0_OR_GREATER
await _cancellationTokenSource.CancelAsync();
#else
_cancellationTokenSource.Cancel();
#endif
await _eventWatcher.WaitAsync(cancellationToken);
_cancellationTokenSource.Dispose();
if (_eventWatcher is not null)
{
await _eventWatcher.WaitAsync(cancellationToken);
}

logger.LogInformation("Stopped resource watcher for {ResourceType}.", typeof(TEntity).Name);
}

public async ValueTask DisposeAsync()
{
await StopAsync(CancellationToken.None);
await DisposeAsyncCore();
GC.SuppressFinalize(this);
}

public void Dispose()
{
Dispose(true);
GC.SuppressFinalize(this);
}

protected virtual void Dispose(bool disposing)
{
if (!disposing)
{
return;
}

_cancellationTokenSource.Dispose();
_eventWatcher?.Dispose();
requeue.Dispose();
client.Dispose();

_disposed = true;
}

protected virtual async ValueTask DisposeAsyncCore()
{
if (_eventWatcher is not null)
{
await CastAndDispose(_eventWatcher);
}

await CastAndDispose(_cancellationTokenSource);
await CastAndDispose(requeue);
await CastAndDispose(client);

_disposed = true;

return;

static async ValueTask CastAndDispose(IDisposable resource)
{
if (resource is IAsyncDisposable resourceAsyncDisposable)
{
await resourceAsyncDisposable.DisposeAsync();
}
else
{
resource.Dispose();
}
}
}

private async Task WatchClientEventsAsync(CancellationToken stoppingToken)
{
try
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
using KubeOps.Abstractions.Controller;
using KubeOps.Operator.Test.TestEntities;

using Microsoft.Extensions.Hosting;

namespace KubeOps.Operator.Test.HostedServices;

public class LeaderAwareHostedServiceDisposeIntegrationTest : HostedServiceDisposeIntegrationTest
{
protected override void ConfigureHost(HostApplicationBuilder builder)
{
builder.Services
.AddKubernetesOperator(op => op.EnableLeaderElection = true)
.AddController<TestController, V1OperatorIntegrationTestEntity>();
}

private class TestController : IEntityController<V1OperatorIntegrationTestEntity>
{
public Task ReconcileAsync(V1OperatorIntegrationTestEntity entity, CancellationToken cancellationToken) =>
Task.CompletedTask;

public Task DeletedAsync(V1OperatorIntegrationTestEntity entity, CancellationToken cancellationToken) =>
Task.CompletedTask;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
using KubeOps.Abstractions.Controller;
using KubeOps.Operator.Test.TestEntities;

using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;

namespace KubeOps.Operator.Test.HostedServices;

public class HostedServiceDisposeIntegrationTest : IntegrationTestBase
{
[Fact]
public async Task Should_Allow_DisposeAsync_Before_StopAsync()
{
var hostedServices = Services.GetServices<IHostedService>()
.Where(service => service.GetType().Namespace!.StartsWith("KubeOps"));

// We need to test the inverse order, because the Host is usually disposing the resources in advance of
// stopping them.
foreach (IHostedService service in hostedServices)
{
await Assert.IsAssignableFrom<IAsyncDisposable>(service).DisposeAsync();
await service.StopAsync(CancellationToken.None);
}
}

[Fact]
public async Task Should_Allow_StopAsync_Before_DisposeAsync()
{
var hostedServices = Services.GetServices<IHostedService>()
.Where(service => service.GetType().Namespace!.StartsWith("KubeOps"));

foreach (IHostedService service in hostedServices)
{
await service.StopAsync(CancellationToken.None);
await Assert.IsAssignableFrom<IAsyncDisposable>(service).DisposeAsync();
}
}

protected override void ConfigureHost(HostApplicationBuilder builder)
{
builder.Services
.AddKubernetesOperator()
.AddController<TestController, V1OperatorIntegrationTestEntity>();
}

private class TestController : IEntityController<V1OperatorIntegrationTestEntity>
{
public Task ReconcileAsync(V1OperatorIntegrationTestEntity entity, CancellationToken cancellationToken) =>
Task.CompletedTask;

public Task DeletedAsync(V1OperatorIntegrationTestEntity entity, CancellationToken cancellationToken) =>
Task.CompletedTask;
}
}

0 comments on commit f524069

Please sign in to comment.