Skip to content

Commit

Permalink
feat(operator): add namespaced operators
Browse files Browse the repository at this point in the history
Signed-off-by: Christoph Bühler <[email protected]>
  • Loading branch information
buehler committed Oct 5, 2023
1 parent e878ad8 commit c1345c3
Show file tree
Hide file tree
Showing 5 changed files with 109 additions and 6 deletions.
1 change: 0 additions & 1 deletion examples/Operator/todos.txt
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
- other CLI commands
- build targets
- error handling
- namespaced operator
- web: webhooks
- docs
- try .net 8 AOT?
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
using k8s.LeaderElection;
using k8s.Models;

using KubeOps.Abstractions.Builder;
using KubeOps.KubernetesClient;
using KubeOps.Operator.Queue;

Expand All @@ -20,8 +21,9 @@ public LeaderAwareResourceWatcher(
IServiceProvider provider,
IKubernetesClient<TEntity> client,
TimedEntityQueue<TEntity> queue,
OperatorSettings settings,
LeaderElector elector)
: base(logger, provider, client, queue)
: base(logger, provider, client, queue, settings)
{
_logger = logger;
_elector = elector;
Expand Down
8 changes: 6 additions & 2 deletions src/KubeOps.Operator/Watcher/ResourceWatcher{TEntity}.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
using k8s;
using k8s.Models;

using KubeOps.Abstractions.Builder;
using KubeOps.Abstractions.Controller;
using KubeOps.Abstractions.Finalizer;
using KubeOps.KubernetesClient;
Expand All @@ -24,6 +25,7 @@ internal class ResourceWatcher<TEntity> : IHostedService
private readonly IServiceProvider _provider;
private readonly IKubernetesClient<TEntity> _client;
private readonly TimedEntityQueue<TEntity> _queue;
private readonly OperatorSettings _settings;
private readonly ConcurrentDictionary<string, long> _entityCache = new();
private readonly Lazy<List<FinalizerRegistration>> _finalizers;
private bool _stopped;
Expand All @@ -34,12 +36,14 @@ public ResourceWatcher(
ILogger<ResourceWatcher<TEntity>> logger,
IServiceProvider provider,
IKubernetesClient<TEntity> client,
TimedEntityQueue<TEntity> queue)
TimedEntityQueue<TEntity> queue,
OperatorSettings settings)
{
_logger = logger;
_provider = provider;
_client = client;
_queue = queue;
_settings = settings;
_finalizers = new(() => _provider.GetServices<FinalizerRegistration>().ToList());
}

Expand Down Expand Up @@ -77,7 +81,7 @@ private void WatchResource()
}
}

_watcher = _client.Watch(OnEvent, OnError, OnClosed);
_watcher = _client.Watch(OnEvent, OnError, OnClosed, @namespace: _settings.Namespace);
}

private void StopWatching()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,13 @@ public EventPublisherIntegrationTest(HostBuilder hostBuilder) : base(hostBuilder
[Fact]
public async Task Should_Create_New_Event()
{
const string eventName = "test-entity.default.REASON.message.Normal";
const string eventName = "single-entity.default.REASON.message.Normal";
var encodedEventName =
Convert.ToHexString(
SHA512.HashData(
Encoding.UTF8.GetBytes(eventName)));

await _client.CreateAsync(new V1IntegrationTestEntity("test-entity", "username", "default"));
await _client.CreateAsync(new V1IntegrationTestEntity("single-entity", "username", "default"));
await Mock.WaitForInvocations;

var eventClient = _hostBuilder.Services.GetRequiredService<IKubernetesClient<Corev1Event>>();
Expand Down
98 changes: 98 additions & 0 deletions test/KubeOps.Operator.Test/NamespacedOperator.Integration.Test.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
using FluentAssertions;

using k8s;
using k8s.Models;

using KubeOps.Abstractions.Controller;
using KubeOps.KubernetesClient;
using KubeOps.Operator.Test.TestEntities;
using KubeOps.Transpiler;

using Microsoft.Extensions.DependencyInjection;

namespace KubeOps.Operator.Test;

public class NamespacedOperatorIntegrationTest : IntegrationTestBase, IAsyncLifetime
{
private static readonly InvocationCounter<V1IntegrationTestEntity> Mock = new();
private IKubernetesClient<V1IntegrationTestEntity> _client = null!;
private IKubernetesClient<V1Namespace> _nsClient = null!;

public NamespacedOperatorIntegrationTest(HostBuilder hostBuilder) : base(hostBuilder)
{
Mock.Clear();
}

[Fact]
public async Task Should_Call_Reconcile_On_Entity_In_Namespace()
{
var watcherCounter = new InvocationCounter<V1IntegrationTestEntity> { TargetInvocationCount = 2 };
using var watcher = _client.Watch((_, e) => watcherCounter.Invocation(e));

await _client.CreateAsync(new V1IntegrationTestEntity("test-entity", "username", "foobar"));
await _client.CreateAsync(new V1IntegrationTestEntity("test-entity", "username", "default"));
await Mock.WaitForInvocations;
await watcherCounter.WaitForInvocations;
Mock.Invocations.Count.Should().Be(1);
watcherCounter.Invocations.Count.Should().Be(2);
}

[Fact]
public async Task Should_Not_Call_Reconcile_On_Entity_In_Other_Namespace()
{
var watcherCounter = new InvocationCounter<V1IntegrationTestEntity> { TargetInvocationCount = 1 };
using var watcher = _client.Watch((_, e) => watcherCounter.Invocation(e));

await _client.CreateAsync(new V1IntegrationTestEntity("test-entity2", "username", "default"));
await watcherCounter.WaitForInvocations;
Mock.Invocations.Count.Should().Be(0);
watcherCounter.Invocations.Count.Should().Be(1);
}

public async Task InitializeAsync()
{
var meta = Entities.ToEntityMetadata(typeof(V1IntegrationTestEntity)).Metadata;
_client = new KubernetesClient<V1IntegrationTestEntity>(meta);
_nsClient = new KubernetesClient<V1Namespace>(new(V1Namespace.KubeKind, V1Namespace.KubeApiVersion,
V1Namespace.KubeGroup, V1Namespace.KubePluralName));
await _nsClient.SaveAsync(new V1Namespace(metadata: new(name: "foobar")).Initialize());
await _hostBuilder.ConfigureAndStart(builder => builder.Services
.AddSingleton(Mock)
.AddKubernetesOperator(s => s.Namespace = "foobar")
.AddControllerWithEntity<TestController, V1IntegrationTestEntity>(meta));
}

public async Task DisposeAsync()
{
var entities = await _client.ListAsync("default");
await _nsClient.DeleteAsync("foobar");
while (await _nsClient.GetAsync("foobar") is not null)
{
await Task.Delay(100);
}
await _client.DeleteAsync(entities);
_client.Dispose();
}

private class TestController : IEntityController<V1IntegrationTestEntity>
{
private readonly InvocationCounter<V1IntegrationTestEntity> _svc;

public TestController(InvocationCounter<V1IntegrationTestEntity> svc)
{
_svc = svc;
}

public Task ReconcileAsync(V1IntegrationTestEntity entity)
{
_svc.Invocation(entity);
return Task.CompletedTask;
}

public Task DeletedAsync(V1IntegrationTestEntity entity)
{
_svc.Invocation(entity);
return Task.CompletedTask;
}
}
}

0 comments on commit c1345c3

Please sign in to comment.