Skip to content

Commit

Permalink
feat(operator): add leader election via KubernetesClient (#627)
Browse files Browse the repository at this point in the history
  • Loading branch information
buehler authored Oct 5, 2023
1 parent 8e07bc6 commit b8d7827
Show file tree
Hide file tree
Showing 11 changed files with 285 additions and 26 deletions.
30 changes: 30 additions & 0 deletions examples/Operator/Controller/V1SecondEntityController.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
using KubeOps.Abstractions.Controller;
using KubeOps.Abstractions.Events;
using KubeOps.Abstractions.Finalizer;
using KubeOps.Abstractions.Queue;
using KubeOps.Abstractions.Rbac;
using KubeOps.KubernetesClient;

using Microsoft.Extensions.Logging;

using Operator.Entities;
using Operator.Finalizer;

namespace Operator.Controller;

[EntityRbac(typeof(V1SecondEntity), Verbs = RbacVerb.All)]
public class V1SecondEntityController : IEntityController<V1SecondEntity>
{
private readonly ILogger<V1SecondEntityController> _logger;

public V1SecondEntityController(
ILogger<V1SecondEntityController> logger)
{
_logger = logger;
}

public async Task ReconcileAsync(V1SecondEntity entity)
{
_logger.LogInformation("Reconciling entity {Entity}.", entity);
}
}
11 changes: 11 additions & 0 deletions examples/Operator/Entities/V1SecondEntity.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
using k8s.Models;

using KubeOps.Abstractions.Entities;

namespace Operator.Entities;

[KubernetesEntity(Group = "testing.dev", ApiVersion = "v1", Kind = "SecondEntity")]
public partial class V1SecondEntity : CustomKubernetesEntity
{
public override string ToString() => $"Second Entity ({Metadata.Name})";
}
2 changes: 1 addition & 1 deletion examples/Operator/todos.txt
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
todo:
- leadership election
- build targets
- other CLI commands
- error handling
- namespaced operator
- web: webhooks
- docs
- try .net 8 AOT?
27 changes: 16 additions & 11 deletions src/KubeOps.Abstractions/Builder/OperatorSettings.cs
Original file line number Diff line number Diff line change
Expand Up @@ -31,27 +31,32 @@ public sealed class OperatorSettings

/// <summary>
/// <para>
/// Defines if the leader elector should run. You may disable this,
/// if you don't intend to run your operator multiple times.
/// Whether the leader elector should run. You should enable
/// this if you plan to run the operator redundantly.
/// </para>
/// <para>
/// If this is disabled, and an operator runs in multiple instance
/// (in the same namespace) it can lead to a "split brain" problem.
/// If this is disabled and an operator runs in multiple instances
/// (in the same namespace), it can lead to a "split brain" problem.
/// </para>
/// <para>
/// This could be disabled when developing locally.
/// Defaults to `false`.
/// </para>
/// </summary>
public bool EnableLeaderElection { get; set; } = true;
public bool EnableLeaderElection { get; set; }

/// <summary>
/// The interval in seconds in which this particular instance of the operator
/// will check for leader election.
/// Defines how long one lease is valid for any leader.
/// Defaults to 15 seconds.
/// </summary>
public ushort LeaderElectionCheckInterval { get; set; } = 15;
public TimeSpan LeaderElectionLeaseDuration { get; set; } = TimeSpan.FromSeconds(15);

/// <summary>
/// The duration in seconds in which the leader lease is valid.
/// When the leader elector tries to refresh the leadership lease.
/// </summary>
public ushort LeaderElectionLeaseDuration { get; set; } = 30;
public TimeSpan LeaderElectionRenewDeadline { get; set; } = TimeSpan.FromSeconds(10);

/// <summary>
/// The wait timeout if the lease cannot be acquired.
/// </summary>
public TimeSpan LeaderElectionRetryPeriod { get; set; } = TimeSpan.FromSeconds(2);
}
55 changes: 47 additions & 8 deletions src/KubeOps.Operator/Builder/OperatorBuilder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
using System.Text;

using k8s;
using k8s.LeaderElection;
using k8s.LeaderElection.ResourceLock;
using k8s.Models;

using KubeOps.Abstractions.Builder;
Expand All @@ -22,13 +24,13 @@ namespace KubeOps.Operator.Builder;

internal class OperatorBuilder : IOperatorBuilder
{
private readonly OperatorSettings _settings;

public OperatorBuilder(IServiceCollection services, OperatorSettings settings)
{
_settings = settings;
Services = services;
Services.AddSingleton(settings);
Services.AddTransient<IKubernetesClient<Corev1Event>>(_ => new KubernetesClient<Corev1Event>(new(
Corev1Event.KubeKind, Corev1Event.KubeApiVersion, Plural: Corev1Event.KubePluralName)));
Services.AddTransient(CreateEventPublisher());
AddOperatorBase();
}

public IServiceCollection Services { get; }
Expand All @@ -45,10 +47,18 @@ public IOperatorBuilder AddController<TImplementation, TEntity>()
where TEntity : IKubernetesObject<V1ObjectMeta>
{
Services.AddScoped<IEntityController<TEntity>, TImplementation>();
Services.AddHostedService<ResourceWatcher<TEntity>>();
Services.AddSingleton(new TimedEntityQueue<TEntity>());
Services.AddTransient(CreateEntityRequeue<TEntity>());

if (_settings.EnableLeaderElection)
{
Services.AddHostedService<LeaderAwareResourceWatcher<TEntity>>();
}
else
{
Services.AddHostedService<ResourceWatcher<TEntity>>();
}

return this;
}

Expand Down Expand Up @@ -99,7 +109,7 @@ private static Func<IServiceProvider, EntityFinalizerAttacher<TImplementation, T

private static Func<IServiceProvider, EntityRequeue<TEntity>> CreateEntityRequeue<TEntity>()
where TEntity : IKubernetesObject<V1ObjectMeta>
=> services => (entity, timespan) =>
=> services => (entity, timeSpan) =>
{
var logger = services.GetService<ILogger<EntityRequeue<TEntity>>>();
var queue = services.GetRequiredService<TimedEntityQueue<TEntity>>();
Expand All @@ -108,9 +118,9 @@ private static Func<IServiceProvider, EntityRequeue<TEntity>> CreateEntityRequeu
"""Requeue entity "{kind}/{name}" in {milliseconds}ms.""",
entity.Kind,
entity.Name(),
timespan.TotalMilliseconds);
timeSpan.TotalMilliseconds);

queue.Enqueue(entity, timespan);
queue.Enqueue(entity, timeSpan);
};

private static Func<IServiceProvider, EventPublisher> CreateEventPublisher()
Expand Down Expand Up @@ -192,4 +202,33 @@ private static Func<IServiceProvider, EventPublisher> CreateEventPublisher()
entity.Name());
}
};

private void AddOperatorBase()
{
Services.AddSingleton(_settings);
Services.AddTransient<IKubernetesClient<Corev1Event>>(_ => new KubernetesClient<Corev1Event>(new(
Corev1Event.KubeKind, Corev1Event.KubeApiVersion, Plural: Corev1Event.KubePluralName)));
Services.AddTransient(CreateEventPublisher());

if (_settings.EnableLeaderElection)
{
using var client = new KubernetesClient<Corev1Event>(new(
Corev1Event.KubeKind, Corev1Event.KubeApiVersion, Plural: Corev1Event.KubePluralName));

var elector = new LeaderElector(
new LeaderElectionConfig(
new LeaseLock(
new Kubernetes(KubernetesClientConfiguration.BuildDefaultConfig()),
client.GetCurrentNamespace(),
$"{_settings.Name}-leader",
Environment.MachineName))
{
LeaseDuration = _settings.LeaderElectionLeaseDuration,
RenewDeadline = _settings.LeaderElectionRenewDeadline,
RetryPeriod = _settings.LeaderElectionRetryPeriod,
});
Services.AddSingleton(elector);
elector.RunAsync();
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
using k8s;
using k8s.LeaderElection;
using k8s.Models;

using KubeOps.KubernetesClient;
using KubeOps.Operator.Queue;

using Microsoft.Extensions.Logging;

namespace KubeOps.Operator.Watcher;

internal class LeaderAwareResourceWatcher<TEntity> : ResourceWatcher<TEntity>
where TEntity : IKubernetesObject<V1ObjectMeta>
{
private readonly ILogger<LeaderAwareResourceWatcher<TEntity>> _logger;
private readonly LeaderElector _elector;

public LeaderAwareResourceWatcher(
ILogger<LeaderAwareResourceWatcher<TEntity>> logger,
IServiceProvider provider,
IKubernetesClient<TEntity> client,
TimedEntityQueue<TEntity> queue,
LeaderElector elector)
: base(logger, provider, client, queue)
{
_logger = logger;
_elector = elector;
}

public override Task StartAsync(CancellationToken cancellationToken)
{
_logger.LogDebug("Subscribe for leadership updates.");
_elector.OnStartedLeading += StartedLeading;
_elector.OnStoppedLeading += StoppedLeading;
if (_elector.IsLeader())
{
StartedLeading();
}

return Task.CompletedTask;
}

public override Task StopAsync(CancellationToken cancellationToken)
{
_logger.LogDebug("Unsubscribe from leadership updates.");
_elector.OnStartedLeading -= StartedLeading;
_elector.OnStoppedLeading -= StoppedLeading;
return Task.CompletedTask;
}

private void StartedLeading()
{
_logger.LogInformation("This instance started leading, starting watcher.");
base.StartAsync(default);
}

private void StoppedLeading()
{
_logger.LogInformation("This instance stopped leading, stopping watcher.");
base.StopAsync(default);
}
}
14 changes: 10 additions & 4 deletions src/KubeOps.Operator/Watcher/ResourceWatcher{TEntity}.cs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ internal class ResourceWatcher<TEntity> : IHostedService
private readonly TimedEntityQueue<TEntity> _queue;
private readonly ConcurrentDictionary<string, long> _entityCache = new();
private readonly Lazy<List<FinalizerRegistration>> _finalizers;
private bool _stopped;

private Watcher<TEntity>? _watcher;

Expand All @@ -42,17 +43,19 @@ public ResourceWatcher(
_finalizers = new(() => _provider.GetServices<FinalizerRegistration>().ToList());
}

public Task StartAsync(CancellationToken cancellationToken)
public virtual Task StartAsync(CancellationToken cancellationToken)
{
_logger.LogInformation("Starting resource watcher for {ResourceType}.", typeof(TEntity).Name);
_stopped = false;
_queue.RequeueRequested += OnEntityRequeue;
WatchResource();
return Task.CompletedTask;
}

public Task StopAsync(CancellationToken cancellationToken)
public virtual Task StopAsync(CancellationToken cancellationToken)
{
_logger.LogInformation("Stopping resource watcher for {ResourceType}.", typeof(TEntity).Name);
_stopped = true;
StopWatching();
_queue.RequeueRequested -= OnEntityRequeue;
_queue.Clear();
Expand Down Expand Up @@ -84,8 +87,11 @@ private void StopWatching()

private void OnClosed()
{
_logger.LogDebug("The server closed the connection. Trying to reconnect.");
WatchResource();
_logger.LogDebug("The server closed the connection.");
if (!_stopped)
{
WatchResource();
}
}

private async void OnEntityRequeue(object? sender, (string Name, string? Namespace) queued)
Expand Down
26 changes: 26 additions & 0 deletions test/KubeOps.Operator.Test/Builder/OperatorBuilder.Test.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
using FluentAssertions;

using k8s.LeaderElection;
using k8s.Models;

using KubeOps.Abstractions.Builder;
Expand Down Expand Up @@ -85,6 +86,31 @@ public void Should_Add_Finalizer_Resources()
s.Lifetime == ServiceLifetime.Transient);
}

[Fact]
public void Should_Add_Leader_Elector()
{
var builder = new OperatorBuilder(new ServiceCollection(), new() { EnableLeaderElection = true });
builder.Services.Should().Contain(s =>
s.ServiceType == typeof(k8s.LeaderElection.LeaderElector) &&
s.Lifetime == ServiceLifetime.Singleton);
}

[Fact]
public void Should_Add_LeaderAwareResourceWatcher()
{
var builder = new OperatorBuilder(new ServiceCollection(), new() { EnableLeaderElection = true });
builder.AddController<TestController, V1IntegrationTestEntity>();

builder.Services.Should().Contain(s =>
s.ServiceType == typeof(IHostedService) &&
s.ImplementationType == typeof(LeaderAwareResourceWatcher<V1IntegrationTestEntity>) &&
s.Lifetime == ServiceLifetime.Singleton);
builder.Services.Should().NotContain(s =>
s.ServiceType == typeof(IHostedService) &&
s.ImplementationType == typeof(ResourceWatcher<V1IntegrationTestEntity>) &&
s.Lifetime == ServiceLifetime.Singleton);
}

private class TestController : IEntityController<V1IntegrationTestEntity>
{
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ public async Task ReconcileAsync(V1IntegrationTestEntity entity)

if (_svc.Invocations.Count < _svc.TargetInvocationCount)
{
_requeue(entity, TimeSpan.FromMilliseconds(1));
_requeue(entity, TimeSpan.FromMilliseconds(10));
}
}
}
Expand Down
6 changes: 5 additions & 1 deletion test/KubeOps.Operator.Test/IntegrationTestCollection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,11 @@ public async Task ConfigureAndStart(Action<HostApplicationBuilder> configure)
}

var builder = Host.CreateApplicationBuilder();
builder.Logging.SetMinimumLevel(LogLevel.Warning);
#if DEBUG
builder.Logging.SetMinimumLevel(LogLevel.Trace);
#else
builder.Logging.SetMinimumLevel(LogLevel.None);
#endif
configure(builder);
_host = builder.Build();
await _host.StartAsync();
Expand Down
Loading

0 comments on commit b8d7827

Please sign in to comment.