From c1345c3c709193913c7500ef22f6f0eb77e648cf Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Christoph=20B=C3=BChler?= Date: Thu, 5 Oct 2023 16:57:56 +0200 Subject: [PATCH] feat(operator): add namespaced operators MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Christoph Bühler --- examples/Operator/todos.txt | 1 - .../LeaderAwareResourceWatcher{TEntity}.cs | 4 +- .../Watcher/ResourceWatcher{TEntity}.cs | 8 +- .../Events/EventPublisher.Integration.Test.cs | 4 +- .../NamespacedOperator.Integration.Test.cs | 98 +++++++++++++++++++ 5 files changed, 109 insertions(+), 6 deletions(-) create mode 100644 test/KubeOps.Operator.Test/NamespacedOperator.Integration.Test.cs diff --git a/examples/Operator/todos.txt b/examples/Operator/todos.txt index 0311ca38..b2dcd672 100644 --- a/examples/Operator/todos.txt +++ b/examples/Operator/todos.txt @@ -2,7 +2,6 @@ - other CLI commands - build targets - error handling -- namespaced operator - web: webhooks - docs - try .net 8 AOT? diff --git a/src/KubeOps.Operator/Watcher/LeaderAwareResourceWatcher{TEntity}.cs b/src/KubeOps.Operator/Watcher/LeaderAwareResourceWatcher{TEntity}.cs index 9775486d..196d9ebb 100644 --- a/src/KubeOps.Operator/Watcher/LeaderAwareResourceWatcher{TEntity}.cs +++ b/src/KubeOps.Operator/Watcher/LeaderAwareResourceWatcher{TEntity}.cs @@ -2,6 +2,7 @@ using k8s.LeaderElection; using k8s.Models; +using KubeOps.Abstractions.Builder; using KubeOps.KubernetesClient; using KubeOps.Operator.Queue; @@ -20,8 +21,9 @@ public LeaderAwareResourceWatcher( IServiceProvider provider, IKubernetesClient client, TimedEntityQueue queue, + OperatorSettings settings, LeaderElector elector) - : base(logger, provider, client, queue) + : base(logger, provider, client, queue, settings) { _logger = logger; _elector = elector; diff --git a/src/KubeOps.Operator/Watcher/ResourceWatcher{TEntity}.cs b/src/KubeOps.Operator/Watcher/ResourceWatcher{TEntity}.cs index a8ebf640..df9b21ee 100644 --- a/src/KubeOps.Operator/Watcher/ResourceWatcher{TEntity}.cs +++ b/src/KubeOps.Operator/Watcher/ResourceWatcher{TEntity}.cs @@ -5,6 +5,7 @@ using k8s; using k8s.Models; +using KubeOps.Abstractions.Builder; using KubeOps.Abstractions.Controller; using KubeOps.Abstractions.Finalizer; using KubeOps.KubernetesClient; @@ -24,6 +25,7 @@ internal class ResourceWatcher : IHostedService private readonly IServiceProvider _provider; private readonly IKubernetesClient _client; private readonly TimedEntityQueue _queue; + private readonly OperatorSettings _settings; private readonly ConcurrentDictionary _entityCache = new(); private readonly Lazy> _finalizers; private bool _stopped; @@ -34,12 +36,14 @@ public ResourceWatcher( ILogger> logger, IServiceProvider provider, IKubernetesClient client, - TimedEntityQueue queue) + TimedEntityQueue queue, + OperatorSettings settings) { _logger = logger; _provider = provider; _client = client; _queue = queue; + _settings = settings; _finalizers = new(() => _provider.GetServices().ToList()); } @@ -77,7 +81,7 @@ private void WatchResource() } } - _watcher = _client.Watch(OnEvent, OnError, OnClosed); + _watcher = _client.Watch(OnEvent, OnError, OnClosed, @namespace: _settings.Namespace); } private void StopWatching() diff --git a/test/KubeOps.Operator.Test/Events/EventPublisher.Integration.Test.cs b/test/KubeOps.Operator.Test/Events/EventPublisher.Integration.Test.cs index 77925938..e88a713e 100644 --- a/test/KubeOps.Operator.Test/Events/EventPublisher.Integration.Test.cs +++ b/test/KubeOps.Operator.Test/Events/EventPublisher.Integration.Test.cs @@ -29,13 +29,13 @@ public EventPublisherIntegrationTest(HostBuilder hostBuilder) : base(hostBuilder [Fact] public async Task Should_Create_New_Event() { - const string eventName = "test-entity.default.REASON.message.Normal"; + const string eventName = "single-entity.default.REASON.message.Normal"; var encodedEventName = Convert.ToHexString( SHA512.HashData( Encoding.UTF8.GetBytes(eventName))); - await _client.CreateAsync(new V1IntegrationTestEntity("test-entity", "username", "default")); + await _client.CreateAsync(new V1IntegrationTestEntity("single-entity", "username", "default")); await Mock.WaitForInvocations; var eventClient = _hostBuilder.Services.GetRequiredService>(); diff --git a/test/KubeOps.Operator.Test/NamespacedOperator.Integration.Test.cs b/test/KubeOps.Operator.Test/NamespacedOperator.Integration.Test.cs new file mode 100644 index 00000000..6514196e --- /dev/null +++ b/test/KubeOps.Operator.Test/NamespacedOperator.Integration.Test.cs @@ -0,0 +1,98 @@ +using FluentAssertions; + +using k8s; +using k8s.Models; + +using KubeOps.Abstractions.Controller; +using KubeOps.KubernetesClient; +using KubeOps.Operator.Test.TestEntities; +using KubeOps.Transpiler; + +using Microsoft.Extensions.DependencyInjection; + +namespace KubeOps.Operator.Test; + +public class NamespacedOperatorIntegrationTest : IntegrationTestBase, IAsyncLifetime +{ + private static readonly InvocationCounter Mock = new(); + private IKubernetesClient _client = null!; + private IKubernetesClient _nsClient = null!; + + public NamespacedOperatorIntegrationTest(HostBuilder hostBuilder) : base(hostBuilder) + { + Mock.Clear(); + } + + [Fact] + public async Task Should_Call_Reconcile_On_Entity_In_Namespace() + { + var watcherCounter = new InvocationCounter { TargetInvocationCount = 2 }; + using var watcher = _client.Watch((_, e) => watcherCounter.Invocation(e)); + + await _client.CreateAsync(new V1IntegrationTestEntity("test-entity", "username", "foobar")); + await _client.CreateAsync(new V1IntegrationTestEntity("test-entity", "username", "default")); + await Mock.WaitForInvocations; + await watcherCounter.WaitForInvocations; + Mock.Invocations.Count.Should().Be(1); + watcherCounter.Invocations.Count.Should().Be(2); + } + + [Fact] + public async Task Should_Not_Call_Reconcile_On_Entity_In_Other_Namespace() + { + var watcherCounter = new InvocationCounter { TargetInvocationCount = 1 }; + using var watcher = _client.Watch((_, e) => watcherCounter.Invocation(e)); + + await _client.CreateAsync(new V1IntegrationTestEntity("test-entity2", "username", "default")); + await watcherCounter.WaitForInvocations; + Mock.Invocations.Count.Should().Be(0); + watcherCounter.Invocations.Count.Should().Be(1); + } + + public async Task InitializeAsync() + { + var meta = Entities.ToEntityMetadata(typeof(V1IntegrationTestEntity)).Metadata; + _client = new KubernetesClient(meta); + _nsClient = new KubernetesClient(new(V1Namespace.KubeKind, V1Namespace.KubeApiVersion, + V1Namespace.KubeGroup, V1Namespace.KubePluralName)); + await _nsClient.SaveAsync(new V1Namespace(metadata: new(name: "foobar")).Initialize()); + await _hostBuilder.ConfigureAndStart(builder => builder.Services + .AddSingleton(Mock) + .AddKubernetesOperator(s => s.Namespace = "foobar") + .AddControllerWithEntity(meta)); + } + + public async Task DisposeAsync() + { + var entities = await _client.ListAsync("default"); + await _nsClient.DeleteAsync("foobar"); + while (await _nsClient.GetAsync("foobar") is not null) + { + await Task.Delay(100); + } + await _client.DeleteAsync(entities); + _client.Dispose(); + } + + private class TestController : IEntityController + { + private readonly InvocationCounter _svc; + + public TestController(InvocationCounter svc) + { + _svc = svc; + } + + public Task ReconcileAsync(V1IntegrationTestEntity entity) + { + _svc.Invocation(entity); + return Task.CompletedTask; + } + + public Task DeletedAsync(V1IntegrationTestEntity entity) + { + _svc.Invocation(entity); + return Task.CompletedTask; + } + } +}