From b8d78279f3ceddfa2fe9f36541f84772f3f497d6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Christoph=20B=C3=BChler?= Date: Thu, 5 Oct 2023 14:02:06 +0200 Subject: [PATCH] feat(operator): add leader election via KubernetesClient (#627) --- .../Controller/V1SecondEntityController.cs | 30 ++++++++ examples/Operator/Entities/V1SecondEntity.cs | 11 +++ examples/Operator/todos.txt | 2 +- .../Builder/OperatorSettings.cs | 27 ++++--- .../Builder/OperatorBuilder.cs | 55 ++++++++++++-- .../LeaderAwareResourceWatcher{TEntity}.cs | 62 +++++++++++++++ .../Watcher/ResourceWatcher{TEntity}.cs | 14 +++- .../Builder/OperatorBuilder.Test.cs | 26 +++++++ .../Events/EventPublisher.Integration.Test.cs | 2 +- .../IntegrationTestCollection.cs | 6 +- .../LeaderAwareness.Integration.Test.cs | 76 +++++++++++++++++++ 11 files changed, 285 insertions(+), 26 deletions(-) create mode 100644 examples/Operator/Controller/V1SecondEntityController.cs create mode 100644 examples/Operator/Entities/V1SecondEntity.cs create mode 100644 src/KubeOps.Operator/Watcher/LeaderAwareResourceWatcher{TEntity}.cs create mode 100644 test/KubeOps.Operator.Test/LeaderElector/LeaderAwareness.Integration.Test.cs diff --git a/examples/Operator/Controller/V1SecondEntityController.cs b/examples/Operator/Controller/V1SecondEntityController.cs new file mode 100644 index 00000000..afe84391 --- /dev/null +++ b/examples/Operator/Controller/V1SecondEntityController.cs @@ -0,0 +1,30 @@ +using KubeOps.Abstractions.Controller; +using KubeOps.Abstractions.Events; +using KubeOps.Abstractions.Finalizer; +using KubeOps.Abstractions.Queue; +using KubeOps.Abstractions.Rbac; +using KubeOps.KubernetesClient; + +using Microsoft.Extensions.Logging; + +using Operator.Entities; +using Operator.Finalizer; + +namespace Operator.Controller; + +[EntityRbac(typeof(V1SecondEntity), Verbs = RbacVerb.All)] +public class V1SecondEntityController : IEntityController +{ + private readonly ILogger _logger; + + public V1SecondEntityController( + ILogger logger) + { + _logger = logger; + } + + public async Task ReconcileAsync(V1SecondEntity entity) + { + _logger.LogInformation("Reconciling entity {Entity}.", entity); + } +} diff --git a/examples/Operator/Entities/V1SecondEntity.cs b/examples/Operator/Entities/V1SecondEntity.cs new file mode 100644 index 00000000..ee4662f2 --- /dev/null +++ b/examples/Operator/Entities/V1SecondEntity.cs @@ -0,0 +1,11 @@ +using k8s.Models; + +using KubeOps.Abstractions.Entities; + +namespace Operator.Entities; + +[KubernetesEntity(Group = "testing.dev", ApiVersion = "v1", Kind = "SecondEntity")] +public partial class V1SecondEntity : CustomKubernetesEntity +{ + public override string ToString() => $"Second Entity ({Metadata.Name})"; +} diff --git a/examples/Operator/todos.txt b/examples/Operator/todos.txt index 0f2f3cda..a1a8efe5 100644 --- a/examples/Operator/todos.txt +++ b/examples/Operator/todos.txt @@ -1,8 +1,8 @@ todo: -- leadership election - build targets - other CLI commands - error handling +- namespaced operator - web: webhooks - docs - try .net 8 AOT? diff --git a/src/KubeOps.Abstractions/Builder/OperatorSettings.cs b/src/KubeOps.Abstractions/Builder/OperatorSettings.cs index ea8fb153..739d6520 100644 --- a/src/KubeOps.Abstractions/Builder/OperatorSettings.cs +++ b/src/KubeOps.Abstractions/Builder/OperatorSettings.cs @@ -31,27 +31,32 @@ public sealed class OperatorSettings /// /// - /// Defines if the leader elector should run. You may disable this, - /// if you don't intend to run your operator multiple times. + /// Whether the leader elector should run. You should enable + /// this if you plan to run the operator redundantly. /// /// - /// If this is disabled, and an operator runs in multiple instance - /// (in the same namespace) it can lead to a "split brain" problem. + /// If this is disabled and an operator runs in multiple instances + /// (in the same namespace), it can lead to a "split brain" problem. /// /// - /// This could be disabled when developing locally. + /// Defaults to `false`. /// /// - public bool EnableLeaderElection { get; set; } = true; + public bool EnableLeaderElection { get; set; } /// - /// The interval in seconds in which this particular instance of the operator - /// will check for leader election. + /// Defines how long one lease is valid for any leader. + /// Defaults to 15 seconds. /// - public ushort LeaderElectionCheckInterval { get; set; } = 15; + public TimeSpan LeaderElectionLeaseDuration { get; set; } = TimeSpan.FromSeconds(15); /// - /// The duration in seconds in which the leader lease is valid. + /// When the leader elector tries to refresh the leadership lease. /// - public ushort LeaderElectionLeaseDuration { get; set; } = 30; + public TimeSpan LeaderElectionRenewDeadline { get; set; } = TimeSpan.FromSeconds(10); + + /// + /// The wait timeout if the lease cannot be acquired. + /// + public TimeSpan LeaderElectionRetryPeriod { get; set; } = TimeSpan.FromSeconds(2); } diff --git a/src/KubeOps.Operator/Builder/OperatorBuilder.cs b/src/KubeOps.Operator/Builder/OperatorBuilder.cs index 3f3c3b46..76980b2e 100644 --- a/src/KubeOps.Operator/Builder/OperatorBuilder.cs +++ b/src/KubeOps.Operator/Builder/OperatorBuilder.cs @@ -2,6 +2,8 @@ using System.Text; using k8s; +using k8s.LeaderElection; +using k8s.LeaderElection.ResourceLock; using k8s.Models; using KubeOps.Abstractions.Builder; @@ -22,13 +24,13 @@ namespace KubeOps.Operator.Builder; internal class OperatorBuilder : IOperatorBuilder { + private readonly OperatorSettings _settings; + public OperatorBuilder(IServiceCollection services, OperatorSettings settings) { + _settings = settings; Services = services; - Services.AddSingleton(settings); - Services.AddTransient>(_ => new KubernetesClient(new( - Corev1Event.KubeKind, Corev1Event.KubeApiVersion, Plural: Corev1Event.KubePluralName))); - Services.AddTransient(CreateEventPublisher()); + AddOperatorBase(); } public IServiceCollection Services { get; } @@ -45,10 +47,18 @@ public IOperatorBuilder AddController() where TEntity : IKubernetesObject { Services.AddScoped, TImplementation>(); - Services.AddHostedService>(); Services.AddSingleton(new TimedEntityQueue()); Services.AddTransient(CreateEntityRequeue()); + if (_settings.EnableLeaderElection) + { + Services.AddHostedService>(); + } + else + { + Services.AddHostedService>(); + } + return this; } @@ -99,7 +109,7 @@ private static Func> CreateEntityRequeue() where TEntity : IKubernetesObject - => services => (entity, timespan) => + => services => (entity, timeSpan) => { var logger = services.GetService>>(); var queue = services.GetRequiredService>(); @@ -108,9 +118,9 @@ private static Func> CreateEntityRequeu """Requeue entity "{kind}/{name}" in {milliseconds}ms.""", entity.Kind, entity.Name(), - timespan.TotalMilliseconds); + timeSpan.TotalMilliseconds); - queue.Enqueue(entity, timespan); + queue.Enqueue(entity, timeSpan); }; private static Func CreateEventPublisher() @@ -192,4 +202,33 @@ private static Func CreateEventPublisher() entity.Name()); } }; + + private void AddOperatorBase() + { + Services.AddSingleton(_settings); + Services.AddTransient>(_ => new KubernetesClient(new( + Corev1Event.KubeKind, Corev1Event.KubeApiVersion, Plural: Corev1Event.KubePluralName))); + Services.AddTransient(CreateEventPublisher()); + + if (_settings.EnableLeaderElection) + { + using var client = new KubernetesClient(new( + Corev1Event.KubeKind, Corev1Event.KubeApiVersion, Plural: Corev1Event.KubePluralName)); + + var elector = new LeaderElector( + new LeaderElectionConfig( + new LeaseLock( + new Kubernetes(KubernetesClientConfiguration.BuildDefaultConfig()), + client.GetCurrentNamespace(), + $"{_settings.Name}-leader", + Environment.MachineName)) + { + LeaseDuration = _settings.LeaderElectionLeaseDuration, + RenewDeadline = _settings.LeaderElectionRenewDeadline, + RetryPeriod = _settings.LeaderElectionRetryPeriod, + }); + Services.AddSingleton(elector); + elector.RunAsync(); + } + } } diff --git a/src/KubeOps.Operator/Watcher/LeaderAwareResourceWatcher{TEntity}.cs b/src/KubeOps.Operator/Watcher/LeaderAwareResourceWatcher{TEntity}.cs new file mode 100644 index 00000000..9775486d --- /dev/null +++ b/src/KubeOps.Operator/Watcher/LeaderAwareResourceWatcher{TEntity}.cs @@ -0,0 +1,62 @@ +using k8s; +using k8s.LeaderElection; +using k8s.Models; + +using KubeOps.KubernetesClient; +using KubeOps.Operator.Queue; + +using Microsoft.Extensions.Logging; + +namespace KubeOps.Operator.Watcher; + +internal class LeaderAwareResourceWatcher : ResourceWatcher + where TEntity : IKubernetesObject +{ + private readonly ILogger> _logger; + private readonly LeaderElector _elector; + + public LeaderAwareResourceWatcher( + ILogger> logger, + IServiceProvider provider, + IKubernetesClient client, + TimedEntityQueue queue, + LeaderElector elector) + : base(logger, provider, client, queue) + { + _logger = logger; + _elector = elector; + } + + public override Task StartAsync(CancellationToken cancellationToken) + { + _logger.LogDebug("Subscribe for leadership updates."); + _elector.OnStartedLeading += StartedLeading; + _elector.OnStoppedLeading += StoppedLeading; + if (_elector.IsLeader()) + { + StartedLeading(); + } + + return Task.CompletedTask; + } + + public override Task StopAsync(CancellationToken cancellationToken) + { + _logger.LogDebug("Unsubscribe from leadership updates."); + _elector.OnStartedLeading -= StartedLeading; + _elector.OnStoppedLeading -= StoppedLeading; + return Task.CompletedTask; + } + + private void StartedLeading() + { + _logger.LogInformation("This instance started leading, starting watcher."); + base.StartAsync(default); + } + + private void StoppedLeading() + { + _logger.LogInformation("This instance stopped leading, stopping watcher."); + base.StopAsync(default); + } +} diff --git a/src/KubeOps.Operator/Watcher/ResourceWatcher{TEntity}.cs b/src/KubeOps.Operator/Watcher/ResourceWatcher{TEntity}.cs index 8744c3dd..a8ebf640 100644 --- a/src/KubeOps.Operator/Watcher/ResourceWatcher{TEntity}.cs +++ b/src/KubeOps.Operator/Watcher/ResourceWatcher{TEntity}.cs @@ -26,6 +26,7 @@ internal class ResourceWatcher : IHostedService private readonly TimedEntityQueue _queue; private readonly ConcurrentDictionary _entityCache = new(); private readonly Lazy> _finalizers; + private bool _stopped; private Watcher? _watcher; @@ -42,17 +43,19 @@ public ResourceWatcher( _finalizers = new(() => _provider.GetServices().ToList()); } - public Task StartAsync(CancellationToken cancellationToken) + public virtual Task StartAsync(CancellationToken cancellationToken) { _logger.LogInformation("Starting resource watcher for {ResourceType}.", typeof(TEntity).Name); + _stopped = false; _queue.RequeueRequested += OnEntityRequeue; WatchResource(); return Task.CompletedTask; } - public Task StopAsync(CancellationToken cancellationToken) + public virtual Task StopAsync(CancellationToken cancellationToken) { _logger.LogInformation("Stopping resource watcher for {ResourceType}.", typeof(TEntity).Name); + _stopped = true; StopWatching(); _queue.RequeueRequested -= OnEntityRequeue; _queue.Clear(); @@ -84,8 +87,11 @@ private void StopWatching() private void OnClosed() { - _logger.LogDebug("The server closed the connection. Trying to reconnect."); - WatchResource(); + _logger.LogDebug("The server closed the connection."); + if (!_stopped) + { + WatchResource(); + } } private async void OnEntityRequeue(object? sender, (string Name, string? Namespace) queued) diff --git a/test/KubeOps.Operator.Test/Builder/OperatorBuilder.Test.cs b/test/KubeOps.Operator.Test/Builder/OperatorBuilder.Test.cs index db4b59fe..f628f8d5 100644 --- a/test/KubeOps.Operator.Test/Builder/OperatorBuilder.Test.cs +++ b/test/KubeOps.Operator.Test/Builder/OperatorBuilder.Test.cs @@ -1,5 +1,6 @@ using FluentAssertions; +using k8s.LeaderElection; using k8s.Models; using KubeOps.Abstractions.Builder; @@ -85,6 +86,31 @@ public void Should_Add_Finalizer_Resources() s.Lifetime == ServiceLifetime.Transient); } + [Fact] + public void Should_Add_Leader_Elector() + { + var builder = new OperatorBuilder(new ServiceCollection(), new() { EnableLeaderElection = true }); + builder.Services.Should().Contain(s => + s.ServiceType == typeof(k8s.LeaderElection.LeaderElector) && + s.Lifetime == ServiceLifetime.Singleton); + } + + [Fact] + public void Should_Add_LeaderAwareResourceWatcher() + { + var builder = new OperatorBuilder(new ServiceCollection(), new() { EnableLeaderElection = true }); + builder.AddController(); + + builder.Services.Should().Contain(s => + s.ServiceType == typeof(IHostedService) && + s.ImplementationType == typeof(LeaderAwareResourceWatcher) && + s.Lifetime == ServiceLifetime.Singleton); + builder.Services.Should().NotContain(s => + s.ServiceType == typeof(IHostedService) && + s.ImplementationType == typeof(ResourceWatcher) && + s.Lifetime == ServiceLifetime.Singleton); + } + private class TestController : IEntityController { } diff --git a/test/KubeOps.Operator.Test/Events/EventPublisher.Integration.Test.cs b/test/KubeOps.Operator.Test/Events/EventPublisher.Integration.Test.cs index 949d0cba..77925938 100644 --- a/test/KubeOps.Operator.Test/Events/EventPublisher.Integration.Test.cs +++ b/test/KubeOps.Operator.Test/Events/EventPublisher.Integration.Test.cs @@ -104,7 +104,7 @@ public async Task ReconcileAsync(V1IntegrationTestEntity entity) if (_svc.Invocations.Count < _svc.TargetInvocationCount) { - _requeue(entity, TimeSpan.FromMilliseconds(1)); + _requeue(entity, TimeSpan.FromMilliseconds(10)); } } } diff --git a/test/KubeOps.Operator.Test/IntegrationTestCollection.cs b/test/KubeOps.Operator.Test/IntegrationTestCollection.cs index 3cfc6512..e7f0f12d 100644 --- a/test/KubeOps.Operator.Test/IntegrationTestCollection.cs +++ b/test/KubeOps.Operator.Test/IntegrationTestCollection.cs @@ -41,7 +41,11 @@ public async Task ConfigureAndStart(Action configure) } var builder = Host.CreateApplicationBuilder(); - builder.Logging.SetMinimumLevel(LogLevel.Warning); +#if DEBUG + builder.Logging.SetMinimumLevel(LogLevel.Trace); +#else + builder.Logging.SetMinimumLevel(LogLevel.None); +#endif configure(builder); _host = builder.Build(); await _host.StartAsync(); diff --git a/test/KubeOps.Operator.Test/LeaderElector/LeaderAwareness.Integration.Test.cs b/test/KubeOps.Operator.Test/LeaderElector/LeaderAwareness.Integration.Test.cs new file mode 100644 index 00000000..5778e34c --- /dev/null +++ b/test/KubeOps.Operator.Test/LeaderElector/LeaderAwareness.Integration.Test.cs @@ -0,0 +1,76 @@ +using FluentAssertions; + +using k8s.Models; + +using KubeOps.Abstractions.Controller; +using KubeOps.KubernetesClient; +using KubeOps.Operator.Test.TestEntities; +using KubeOps.Transpiler; + +using Microsoft.Extensions.DependencyInjection; + +namespace KubeOps.Operator.Test.LeaderElector; + +public class LeaderAwarenessIntegrationTest : IntegrationTestBase, IAsyncLifetime +{ + private static readonly InvocationCounter Mock = new(); + private IKubernetesClient _client = null!; + + private readonly IKubernetesClient _leaseClient = new KubernetesClient(new(V1Lease.KubeKind, + V1Lease.KubeApiVersion, V1Lease.KubeGroup, V1Lease.KubePluralName)); + + public LeaderAwarenessIntegrationTest(HostBuilder hostBuilder) : base(hostBuilder) + { + Mock.Clear(); + } + + [Fact] + public async Task Should_Create_V1Lease_And_Start_Watcher() + { + await _client.CreateAsync(new V1IntegrationTestEntity("test-entity", "username", "default")); + await Mock.WaitForInvocations; + + var lease = await _leaseClient.GetAsync("kubernetesoperator-leader", "default"); + lease!.Spec.HolderIdentity.Should().Be(Environment.MachineName); + } + + public async Task InitializeAsync() + { + var meta = Entities.ToEntityMetadata(typeof(V1IntegrationTestEntity)).Metadata; + _client = new KubernetesClient(meta); + await _hostBuilder.ConfigureAndStart(builder => builder.Services + .AddSingleton(Mock) + .AddKubernetesOperator(s => s.EnableLeaderElection = true) + .AddControllerWithEntity(meta)); + } + + public async Task DisposeAsync() + { + var entities = await _client.ListAsync("default"); + await _client.DeleteAsync(entities); + await _leaseClient.DeleteAsync(await _leaseClient.ListAsync("default")); + _client.Dispose(); + } + + private class TestController : IEntityController + { + private readonly InvocationCounter _svc; + + public TestController(InvocationCounter svc) + { + _svc = svc; + } + + public Task ReconcileAsync(V1IntegrationTestEntity entity) + { + _svc.Invocation(entity); + return Task.CompletedTask; + } + + public Task DeletedAsync(V1IntegrationTestEntity entity) + { + _svc.Invocation(entity); + return Task.CompletedTask; + } + } +}