From f57951ca5c67e49740bef12cd31087bfeee6ea06 Mon Sep 17 00:00:00 2001 From: sankarm Date: Tue, 16 Apr 2024 15:23:00 +0530 Subject: [PATCH] leader election issue fixed for all the kcp controlelrs Signed-off-by: sankarm --- .../permissionclaim_labeler.go | 5 - .../apis/apibinding/apibinding_controller.go | 15 - .../apis/apiexport/apiexport_controller.go | 8 - .../apiexportendpointslice_controller.go | 12 - .../apiexportendpointslice_indexes.go | 44 - .../apis/crdcleanup/crdcleanup_controller.go | 7 - .../apibindingannotation_controller.go | 8 - .../permissionclaimlabel_controller.go | 8 +- ...ermissionclaimlabel_resource_controller.go | 14 +- .../labelclusterrolebinding_controller.go | 5 - .../labelclusterrole_controller.go | 5 - .../replication/replication_controller.go | 107 +-- .../replication/replication_reconcile.go | 12 +- .../apibinder_initializer_controller.go | 8 - .../tenancy/workspace/workspace_controller.go | 11 - .../tenancy/workspace/workspace_indexes.go | 17 - .../workspacetype/workspacetype_controller.go | 4 - pkg/server/controllers.go | 862 ++++++++++++------ 18 files changed, 600 insertions(+), 552 deletions(-) diff --git a/pkg/permissionclaim/permissionclaim_labeler.go b/pkg/permissionclaim/permissionclaim_labeler.go index 6ae8b927832..c459d596faf 100644 --- a/pkg/permissionclaim/permissionclaim_labeler.go +++ b/pkg/permissionclaim/permissionclaim_labeler.go @@ -23,7 +23,6 @@ import ( "github.com/kcp-dev/logicalcluster/v3" "k8s.io/apimachinery/pkg/runtime/schema" - "k8s.io/client-go/tools/cache" "k8s.io/klog/v2" "github.com/kcp-dev/kcp/pkg/indexers" @@ -46,10 +45,6 @@ func NewLabeler( apiBindingInformer apisv1alpha1informers.APIBindingClusterInformer, apiExportInformer, globalAPIExportInformer apisv1alpha1informers.APIExportClusterInformer, ) *Labeler { - indexers.AddIfNotPresentOrDie(apiExportInformer.Informer().GetIndexer(), cache.Indexers{ - indexers.ByLogicalClusterPathAndName: indexers.IndexByLogicalClusterPathAndName, - }) - return &Labeler{ listAPIBindingsAcceptingClaimedGroupResource: func(clusterName logicalcluster.Name, groupResource schema.GroupResource) ([]*apisv1alpha1.APIBinding, error) { indexKey := indexers.ClusterAndGroupResourceValue(clusterName, groupResource) diff --git a/pkg/reconciler/apis/apibinding/apibinding_controller.go b/pkg/reconciler/apis/apibinding/apibinding_controller.go index a653f9a4798..37fe2c37378 100644 --- a/pkg/reconciler/apis/apibinding/apibinding_controller.go +++ b/pkg/reconciler/apis/apibinding/apibinding_controller.go @@ -164,21 +164,6 @@ func NewController( logger := logging.WithReconciler(klog.Background(), ControllerName) - // APIBinding indexers - indexers.AddIfNotPresentOrDie(apiBindingInformer.Informer().GetIndexer(), cache.Indexers{ - indexers.APIBindingsByAPIExport: indexers.IndexAPIBindingByAPIExport, - }) - - // APIExport indexers - indexers.AddIfNotPresentOrDie(apiExportInformer.Informer().GetIndexer(), cache.Indexers{ - indexers.ByLogicalClusterPathAndName: indexers.IndexByLogicalClusterPathAndName, - indexAPIExportsByAPIResourceSchema: indexAPIExportsByAPIResourceSchemasFunc, - }) - indexers.AddIfNotPresentOrDie(globalAPIExportInformer.Informer().GetIndexer(), cache.Indexers{ - indexers.ByLogicalClusterPathAndName: indexers.IndexByLogicalClusterPathAndName, - indexAPIExportsByAPIResourceSchema: indexAPIExportsByAPIResourceSchemasFunc, - }) - // APIBinding handlers _, _ = apiBindingInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: func(obj interface{}) { c.enqueueAPIBinding(objOrTombstone[*apisv1alpha1.APIBinding](obj), logger, "") }, diff --git a/pkg/reconciler/apis/apiexport/apiexport_controller.go b/pkg/reconciler/apis/apiexport/apiexport_controller.go index df2d7a4e8ac..081f09e3d7f 100644 --- a/pkg/reconciler/apis/apiexport/apiexport_controller.go +++ b/pkg/reconciler/apis/apiexport/apiexport_controller.go @@ -121,14 +121,6 @@ func NewController( commit: committer.NewCommitter[*APIExport, Patcher, *APIExportSpec, *APIExportStatus](kcpClusterClient.ApisV1alpha1().APIExports()), } - indexers.AddIfNotPresentOrDie( - apiExportInformer.Informer().GetIndexer(), - cache.Indexers{ - indexers.APIExportByIdentity: indexers.IndexAPIExportByIdentity, - indexers.APIExportBySecret: indexers.IndexAPIExportBySecret, - }, - ) - _, _ = apiExportInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: func(obj interface{}) { c.enqueueAPIExport(obj.(*apisv1alpha1.APIExport)) diff --git a/pkg/reconciler/apis/apiexportendpointslice/apiexportendpointslice_controller.go b/pkg/reconciler/apis/apiexportendpointslice/apiexportendpointslice_controller.go index b23254eb1a0..bf4e2b580bc 100644 --- a/pkg/reconciler/apis/apiexportendpointslice/apiexportendpointslice_controller.go +++ b/pkg/reconciler/apis/apiexportendpointslice/apiexportendpointslice_controller.go @@ -100,14 +100,6 @@ func NewController( commit: committer.NewCommitter[*APIExportEndpointSlice, Patcher, *APIExportEndpointSliceSpec, *APIExportEndpointSliceStatus](kcpClusterClient.ApisV1alpha1().APIExportEndpointSlices()), } - indexers.AddIfNotPresentOrDie(globalAPIExportClusterInformer.Informer().GetIndexer(), cache.Indexers{ - indexers.ByLogicalClusterPathAndName: indexers.IndexByLogicalClusterPathAndName, - }) - - indexers.AddIfNotPresentOrDie(apiExportEndpointSliceClusterInformer.Informer().GetIndexer(), cache.Indexers{ - indexAPIExportEndpointSliceByAPIExport: indexAPIExportEndpointSliceByAPIExportFunc, - }) - _, _ = apiExportEndpointSliceClusterInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: func(obj interface{}) { c.enqueueAPIExportEndpointSlice(obj) @@ -158,10 +150,6 @@ func NewController( }, ) - indexers.AddIfNotPresentOrDie(apiExportEndpointSliceClusterInformer.Informer().GetIndexer(), cache.Indexers{ - indexAPIExportEndpointSlicesByPartition: indexAPIExportEndpointSlicesByPartitionFunc, - }) - return c, nil } diff --git a/pkg/reconciler/apis/apiexportendpointslice/apiexportendpointslice_indexes.go b/pkg/reconciler/apis/apiexportendpointslice/apiexportendpointslice_indexes.go index b55d565e8d1..472506fb8fe 100644 --- a/pkg/reconciler/apis/apiexportendpointslice/apiexportendpointslice_indexes.go +++ b/pkg/reconciler/apis/apiexportendpointslice/apiexportendpointslice_indexes.go @@ -16,51 +16,7 @@ limitations under the License. package apiexportendpointslice -import ( - "fmt" - - "github.com/kcp-dev/logicalcluster/v3" - - apisv1alpha1 "github.com/kcp-dev/kcp/sdk/apis/apis/v1alpha1" - "github.com/kcp-dev/kcp/sdk/client" -) - const ( indexAPIExportEndpointSliceByAPIExport = "indexAPIExportEndpointSliceByAPIExport" indexAPIExportEndpointSlicesByPartition = "indexAPIExportEndpointSlicesByPartition" ) - -// indexAPIExportEndpointSliceByAPIExportFunc indexes the APIExportEndpointSlice by their APIExport's Reference Path and Name. -func indexAPIExportEndpointSliceByAPIExportFunc(obj interface{}) ([]string, error) { - apiExportEndpointSlice, ok := obj.(*apisv1alpha1.APIExportEndpointSlice) - if !ok { - return []string{}, fmt.Errorf("obj %T is not an APIExportEndpointSlice", obj) - } - - path := logicalcluster.NewPath(apiExportEndpointSlice.Spec.APIExport.Path) - if path.Empty() { - path = logicalcluster.From(apiExportEndpointSlice).Path() - } - return []string{path.Join(apiExportEndpointSlice.Spec.APIExport.Name).String()}, nil -} - -// indexAPIExportEndpointSlicesByPartitionFunc is an index function that maps a Partition to the key for its -// spec.partition. -func indexAPIExportEndpointSlicesByPartitionFunc(obj interface{}) ([]string, error) { - slice, ok := obj.(*apisv1alpha1.APIExportEndpointSlice) - if !ok { - return []string{}, fmt.Errorf("obj is supposed to be an APIExportEndpointSlice, but is %T", obj) - } - - if slice.Spec.Partition != "" { - clusterName := logicalcluster.From(slice).Path() - if !ok { - // this will never happen due to validation - return []string{}, fmt.Errorf("cluster information missing") - } - key := client.ToClusterAwareKey(clusterName, slice.Spec.Partition) - return []string{key}, nil - } - - return []string{}, nil -} diff --git a/pkg/reconciler/apis/crdcleanup/crdcleanup_controller.go b/pkg/reconciler/apis/crdcleanup/crdcleanup_controller.go index 0d4adc85aa2..984f3da8f8a 100644 --- a/pkg/reconciler/apis/crdcleanup/crdcleanup_controller.go +++ b/pkg/reconciler/apis/crdcleanup/crdcleanup_controller.go @@ -71,13 +71,6 @@ func NewController( }, } - indexers.AddIfNotPresentOrDie( - apiBindingInformer.Informer().GetIndexer(), - cache.Indexers{ - indexers.APIBindingByBoundResourceUID: indexers.IndexAPIBindingByBoundResourceUID, - }, - ) - _, _ = crdInformer.Informer().AddEventHandler(cache.FilteringResourceEventHandler{ FilterFunc: func(obj interface{}) bool { crd := obj.(*apiextensionsv1.CustomResourceDefinition) diff --git a/pkg/reconciler/apis/extraannotationsync/apibindingannotation_controller.go b/pkg/reconciler/apis/extraannotationsync/apibindingannotation_controller.go index b8f9db04ba7..d32ae3319a5 100644 --- a/pkg/reconciler/apis/extraannotationsync/apibindingannotation_controller.go +++ b/pkg/reconciler/apis/extraannotationsync/apibindingannotation_controller.go @@ -106,14 +106,6 @@ func NewController( logger := logging.WithReconciler(klog.Background(), ControllerName) - indexers.AddIfNotPresentOrDie(apiExportInformer.Informer().GetIndexer(), cache.Indexers{ - indexers.ByLogicalClusterPathAndName: indexers.IndexByLogicalClusterPathAndName, - }) - - indexers.AddIfNotPresentOrDie(apiBindingInformer.Informer().GetIndexer(), cache.Indexers{ - indexers.APIBindingsByAPIExport: indexers.IndexAPIBindingByAPIExport, - }) - _, _ = apiExportInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: func(obj interface{}) { c.enqueueAPIExport(obj, logger) }, UpdateFunc: func(_, obj interface{}) { c.enqueueAPIExport(obj, logger) }, diff --git a/pkg/reconciler/apis/permissionclaimlabel/permissionclaimlabel_controller.go b/pkg/reconciler/apis/permissionclaimlabel/permissionclaimlabel_controller.go index 7ee0b1c6402..fbd779bac67 100644 --- a/pkg/reconciler/apis/permissionclaimlabel/permissionclaimlabel_controller.go +++ b/pkg/reconciler/apis/permissionclaimlabel/permissionclaimlabel_controller.go @@ -78,10 +78,6 @@ func NewController( commit: committer.NewCommitter[*APIBinding, Patcher, *APIBindingSpec, *APIBindingStatus](kcpClusterClient.ApisV1alpha1().APIBindings()), } - indexers.AddIfNotPresentOrDie(apiExportInformer.Informer().GetIndexer(), cache.Indexers{ - indexers.ByLogicalClusterPathAndName: indexers.IndexByLogicalClusterPathAndName, - }) - _, _ = apiBindingInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: func(obj interface{}) { c.enqueueAPIBinding(obj, logger) }, UpdateFunc: func(_, newObj interface{}) { @@ -125,7 +121,7 @@ func (c *controller) enqueueAPIBinding(obj interface{}, logger logr.Logger) { return } - logging.WithQueueKey(logger, key).V(4).Info("queueing APIBinding") + logging.WithQueueKey(logger, key).V(2).Info("queueing APIBinding") c.queue.Add(key) } @@ -161,7 +157,7 @@ func (c *controller) processNextWorkItem(ctx context.Context) bool { logger := logging.WithQueueKey(klog.FromContext(ctx), key) ctx = klog.NewContext(ctx, logger) - logger.V(4).Info("processing key") + logger.V(1).Info("processing key") // No matter what, tell the queue we're done with this key, to unblock // other workers. diff --git a/pkg/reconciler/apis/permissionclaimlabel/permissionclaimlabel_resource_controller.go b/pkg/reconciler/apis/permissionclaimlabel/permissionclaimlabel_resource_controller.go index 491233efa3b..c17256e6574 100644 --- a/pkg/reconciler/apis/permissionclaimlabel/permissionclaimlabel_resource_controller.go +++ b/pkg/reconciler/apis/permissionclaimlabel/permissionclaimlabel_resource_controller.go @@ -31,11 +31,9 @@ import ( "k8s.io/apimachinery/pkg/runtime/schema" utilruntime "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apimachinery/pkg/util/wait" - "k8s.io/client-go/tools/cache" "k8s.io/client-go/util/workqueue" "k8s.io/klog/v2" - "github.com/kcp-dev/kcp/pkg/indexers" "github.com/kcp-dev/kcp/pkg/informer" "github.com/kcp-dev/kcp/pkg/logging" "github.com/kcp-dev/kcp/pkg/permissionclaim" @@ -55,14 +53,6 @@ func NewResourceController( apiBindingInformer apisv1alpha1informers.APIBindingClusterInformer, apiExportInformer, globalAPIExportInformer apisv1alpha1informers.APIExportClusterInformer, ) (*resourceController, error) { - if err := apiBindingInformer.Informer().GetIndexer().AddIndexers( - cache.Indexers{ - indexers.APIBindingByClusterAndAcceptedClaimedGroupResources: indexers.IndexAPIBindingByClusterAndAcceptedClaimedGroupResources, - }, - ); err != nil { - return nil, err - } - c := &resourceController{ queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), ResourceControllerName), kcpClusterClient: kcpClusterClient, @@ -102,7 +92,7 @@ func (c *resourceController) enqueueForResource(logger logr.Logger, gvr schema.G } queueKey += "::" + key - logging.WithQueueKey(logger, queueKey).V(4).Info("queuing resource") + logging.WithQueueKey(logger, queueKey).V(2).Info("queuing resource") c.queue.Add(queueKey) } @@ -138,7 +128,7 @@ func (c *resourceController) processNextWorkItem(ctx context.Context) bool { logger := logging.WithQueueKey(klog.FromContext(ctx), key) ctx = klog.NewContext(ctx, logger) - logger.V(4).Info("processing key") + logger.V(1).Info("processing key") // No matter what, tell the queue we're done with this key, to unblock // other workers. diff --git a/pkg/reconciler/cache/labelclusterrolebindings/labelclusterrolebinding_controller.go b/pkg/reconciler/cache/labelclusterrolebindings/labelclusterrolebinding_controller.go index 2d8a41486cb..e7b73a41725 100644 --- a/pkg/reconciler/cache/labelclusterrolebindings/labelclusterrolebinding_controller.go +++ b/pkg/reconciler/cache/labelclusterrolebindings/labelclusterrolebinding_controller.go @@ -38,7 +38,6 @@ import ( "k8s.io/client-go/util/workqueue" "k8s.io/klog/v2" - "github.com/kcp-dev/kcp/pkg/indexers" "github.com/kcp-dev/kcp/pkg/logging" "github.com/kcp-dev/kcp/pkg/reconciler/cache/labelclusterroles" "github.com/kcp-dev/kcp/pkg/reconciler/cache/replication" @@ -83,10 +82,6 @@ func NewController( commit: committer.NewStatuslessCommitter[*rbacv1.ClusterRoleBinding, rbacclientv1.ClusterRoleBindingInterface](kubeClusterClient.RbacV1().ClusterRoleBindings(), committer.ShallowCopy[rbacv1.ClusterRoleBinding]), } - indexers.AddIfNotPresentOrDie(clusterRoleBindingInformer.Informer().GetIndexer(), cache.Indexers{ - labelclusterroles.ClusterRoleBindingByClusterRoleName: labelclusterroles.IndexClusterRoleBindingByClusterRoleName, - }) - _, _ = clusterRoleBindingInformer.Informer().AddEventHandler(cache.FilteringResourceEventHandler{ FilterFunc: replication.IsNoSystemClusterName, Handler: cache.ResourceEventHandlerFuncs{ diff --git a/pkg/reconciler/cache/labelclusterroles/labelclusterrole_controller.go b/pkg/reconciler/cache/labelclusterroles/labelclusterrole_controller.go index 77734d1c646..8bb2b3d60cb 100644 --- a/pkg/reconciler/cache/labelclusterroles/labelclusterrole_controller.go +++ b/pkg/reconciler/cache/labelclusterroles/labelclusterrole_controller.go @@ -38,7 +38,6 @@ import ( "k8s.io/client-go/util/workqueue" "k8s.io/klog/v2" - "github.com/kcp-dev/kcp/pkg/indexers" "github.com/kcp-dev/kcp/pkg/logging" "github.com/kcp-dev/kcp/pkg/reconciler/cache/replication" "github.com/kcp-dev/kcp/pkg/reconciler/committer" @@ -82,10 +81,6 @@ func NewController( commit: committer.NewStatuslessCommitter[*rbacv1.ClusterRole, rbacclientv1.ClusterRoleInterface](kubeClusterClient.RbacV1().ClusterRoles(), committer.ShallowCopy[rbacv1.ClusterRole]), } - indexers.AddIfNotPresentOrDie(clusterRoleBindingInformer.Informer().GetIndexer(), cache.Indexers{ - ClusterRoleBindingByClusterRoleName: IndexClusterRoleBindingByClusterRoleName, - }) - _, _ = clusterRoleInformer.Informer().AddEventHandler(cache.FilteringResourceEventHandler{ FilterFunc: replication.IsNoSystemClusterName, Handler: cache.ResourceEventHandlerFuncs{ diff --git a/pkg/reconciler/cache/replication/replication_controller.go b/pkg/reconciler/cache/replication/replication_controller.go index 792c080093e..5009fb75ab3 100644 --- a/pkg/reconciler/cache/replication/replication_controller.go +++ b/pkg/reconciler/cache/replication/replication_controller.go @@ -26,9 +26,6 @@ import ( kcpdynamic "github.com/kcp-dev/client-go/dynamic" kcpkubernetesinformers "github.com/kcp-dev/client-go/informers" - admissionregistrationv1 "k8s.io/api/admissionregistration/v1" - admissionregistrationv1alpha1 "k8s.io/api/admissionregistration/v1alpha1" - rbacv1 "k8s.io/api/rbac/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/util/runtime" @@ -39,12 +36,7 @@ import ( cacheclient "github.com/kcp-dev/kcp/pkg/cache/client" "github.com/kcp-dev/kcp/pkg/cache/client/shard" - "github.com/kcp-dev/kcp/pkg/indexers" "github.com/kcp-dev/kcp/pkg/logging" - apisv1alpha1 "github.com/kcp-dev/kcp/sdk/apis/apis/v1alpha1" - "github.com/kcp-dev/kcp/sdk/apis/core" - corev1alpha1 "github.com/kcp-dev/kcp/sdk/apis/core/v1alpha1" - tenancyv1alpha1 "github.com/kcp-dev/kcp/sdk/apis/tenancy/v1alpha1" kcpinformers "github.com/kcp-dev/kcp/sdk/client/informers/externalversions" ) @@ -53,6 +45,12 @@ const ( ControllerName = "kcp-replication-controller" ) +type ReplicatedGVR struct { + Kind string + Filter func(u *unstructured.Unstructured) bool + Global, Local cache.SharedIndexInformer +} + // NewController returns a new replication controller. // // The replicated object will be placed under the same cluster as the original object. @@ -65,97 +63,20 @@ func NewController( globalKcpInformers kcpinformers.SharedInformerFactory, localKubeInformers kcpkubernetesinformers.SharedInformerFactory, globalKubeInformers kcpkubernetesinformers.SharedInformerFactory, + gvrs map[schema.GroupVersionResource]ReplicatedGVR, ) (*controller, error) { c := &controller{ shardName: shardName, queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), ControllerName), dynamicCacheClient: dynamicCacheClient, - gvrs: map[schema.GroupVersionResource]replicatedGVR{ - apisv1alpha1.SchemeGroupVersion.WithResource("apiexports"): { - kind: "APIExport", - local: localKcpInformers.Apis().V1alpha1().APIExports().Informer(), - global: globalKcpInformers.Apis().V1alpha1().APIExports().Informer(), - }, - apisv1alpha1.SchemeGroupVersion.WithResource("apiresourceschemas"): { - kind: "APIResourceSchema", - local: localKcpInformers.Apis().V1alpha1().APIResourceSchemas().Informer(), - global: globalKcpInformers.Apis().V1alpha1().APIResourceSchemas().Informer(), - }, - apisv1alpha1.SchemeGroupVersion.WithResource("apiconversions"): { - kind: "APIConversion", - local: localKcpInformers.Apis().V1alpha1().APIConversions().Informer(), - global: globalKcpInformers.Apis().V1alpha1().APIConversions().Informer(), - }, - admissionregistrationv1.SchemeGroupVersion.WithResource("mutatingwebhookconfigurations"): { - kind: "MutatingWebhookConfiguration", - local: localKubeInformers.Admissionregistration().V1().MutatingWebhookConfigurations().Informer(), - global: globalKubeInformers.Admissionregistration().V1().MutatingWebhookConfigurations().Informer(), - }, - admissionregistrationv1.SchemeGroupVersion.WithResource("validatingwebhookconfigurations"): { - kind: "ValidatingWebhookConfiguration", - local: localKubeInformers.Admissionregistration().V1().ValidatingWebhookConfigurations().Informer(), - global: globalKubeInformers.Admissionregistration().V1().ValidatingWebhookConfigurations().Informer(), - }, - admissionregistrationv1alpha1.SchemeGroupVersion.WithResource("validatingadmissionpolicies"): { - kind: "ValidatingAdmissionPolicy", - local: localKubeInformers.Admissionregistration().V1alpha1().ValidatingAdmissionPolicies().Informer(), - global: globalKubeInformers.Admissionregistration().V1alpha1().ValidatingAdmissionPolicies().Informer(), - }, - admissionregistrationv1alpha1.SchemeGroupVersion.WithResource("validatingadmissionpolicybindings"): { - kind: "ValidatingAdmissionPolicyBinding", - local: localKubeInformers.Admissionregistration().V1alpha1().ValidatingAdmissionPolicyBindings().Informer(), - global: globalKubeInformers.Admissionregistration().V1alpha1().ValidatingAdmissionPolicyBindings().Informer(), - }, - corev1alpha1.SchemeGroupVersion.WithResource("shards"): { - kind: "Shard", - local: localKcpInformers.Core().V1alpha1().Shards().Informer(), - global: globalKcpInformers.Core().V1alpha1().Shards().Informer(), - }, - corev1alpha1.SchemeGroupVersion.WithResource("logicalclusters"): { - kind: "LogicalCluster", - filter: func(u *unstructured.Unstructured) bool { - return u.GetAnnotations()[core.ReplicateAnnotationKey] != "" - }, - local: localKcpInformers.Core().V1alpha1().LogicalClusters().Informer(), - global: globalKcpInformers.Core().V1alpha1().LogicalClusters().Informer(), - }, - tenancyv1alpha1.SchemeGroupVersion.WithResource("workspacetypes"): { - kind: "WorkspaceType", - local: localKcpInformers.Tenancy().V1alpha1().WorkspaceTypes().Informer(), - global: globalKcpInformers.Tenancy().V1alpha1().WorkspaceTypes().Informer(), - }, - rbacv1.SchemeGroupVersion.WithResource("clusterroles"): { - kind: "ClusterRole", - filter: func(u *unstructured.Unstructured) bool { - return u.GetAnnotations()[core.ReplicateAnnotationKey] != "" - }, - local: localKubeInformers.Rbac().V1().ClusterRoles().Informer(), - global: globalKubeInformers.Rbac().V1().ClusterRoles().Informer(), - }, - rbacv1.SchemeGroupVersion.WithResource("clusterrolebindings"): { - kind: "ClusterRoleBinding", - filter: func(u *unstructured.Unstructured) bool { - return u.GetAnnotations()[core.ReplicateAnnotationKey] != "" - }, - local: localKubeInformers.Rbac().V1().ClusterRoleBindings().Informer(), - global: globalKubeInformers.Rbac().V1().ClusterRoleBindings().Informer(), - }, - }, + gvrs: gvrs, } for gvr, info := range c.gvrs { - indexers.AddIfNotPresentOrDie( - info.global.GetIndexer(), - cache.Indexers{ - ByShardAndLogicalClusterAndNamespaceAndName: IndexByShardAndLogicalClusterAndNamespace, - }, - ) - // shadow gvr to get the right value in the closure gvr := gvr - - _, _ = info.local.AddEventHandler(cache.FilteringResourceEventHandler{ + _, _ = info.Local.AddEventHandler(cache.FilteringResourceEventHandler{ FilterFunc: IsNoSystemClusterName, Handler: cache.ResourceEventHandlerFuncs{ AddFunc: func(obj interface{}) { c.enqueueObject(obj, gvr) }, @@ -164,7 +85,7 @@ func NewController( }, }) - _, _ = info.global.AddEventHandler(cache.FilteringResourceEventHandler{ + _, _ = info.Global.AddEventHandler(cache.FilteringResourceEventHandler{ FilterFunc: IsNoSystemClusterName, // not really needed, but cannot harm Handler: cache.ResourceEventHandlerFuncs{ AddFunc: func(obj interface{}) { c.enqueueCacheObject(obj, gvr) }, @@ -263,11 +184,5 @@ type controller struct { dynamicCacheClient kcpdynamic.ClusterInterface - gvrs map[schema.GroupVersionResource]replicatedGVR -} - -type replicatedGVR struct { - kind string - filter func(u *unstructured.Unstructured) bool - global, local cache.SharedIndexInformer + gvrs map[schema.GroupVersionResource]ReplicatedGVR } diff --git a/pkg/reconciler/cache/replication/replication_reconcile.go b/pkg/reconciler/cache/replication/replication_reconcile.go index 1e3760d77da..8e835ac824a 100644 --- a/pkg/reconciler/cache/replication/replication_reconcile.go +++ b/pkg/reconciler/cache/replication/replication_reconcile.go @@ -49,7 +49,7 @@ func (c *controller) reconcile(ctx context.Context, gvrKey string) error { shardName: c.shardName, getLocalCopy: func(cluster logicalcluster.Name, namespace, name string) (*unstructured.Unstructured, error) { key := kcpcache.ToClusterAwareKey(cluster.String(), namespace, name) - obj, exists, err := info.local.GetIndexer().GetByKey(key) + obj, exists, err := info.Local.GetIndexer().GetByKey(key) if !exists { return nil, apierrors.NewNotFound(gvr.GroupResource(), name) } else if err != nil { @@ -61,19 +61,19 @@ func (c *controller) reconcile(ctx context.Context, gvrKey string) error { return nil, err } - if info.filter != nil && !info.filter(u) { + if info.Filter != nil && !info.Filter(u) { return nil, apierrors.NewNotFound(gvr.GroupResource(), name) } if _, ok := obj.(*unstructured.Unstructured); ok { u = u.DeepCopy() } - u.SetKind(info.kind) + u.SetKind(info.Kind) u.SetAPIVersion(gvr.GroupVersion().String()) return u, nil }, getGlobalCopy: func(cluster logicalcluster.Name, namespace, name string) (*unstructured.Unstructured, error) { - objs, err := info.global.GetIndexer().ByIndex(ByShardAndLogicalClusterAndNamespaceAndName, ShardAndLogicalClusterAndNamespaceKey(c.shardName, cluster, namespace, name)) + objs, err := info.Global.GetIndexer().ByIndex(ByShardAndLogicalClusterAndNamespaceAndName, ShardAndLogicalClusterAndNamespaceKey(c.shardName, cluster, namespace, name)) if err != nil { return nil, err // necessary to avoid non-zero nil interface } @@ -93,7 +93,7 @@ func (c *controller) reconcile(ctx context.Context, gvrKey string) error { u = u.DeepCopy() } - u.SetKind(info.kind) + u.SetKind(info.Kind) u.SetAPIVersion(gvr.GroupVersion().String()) return u, nil }, @@ -197,7 +197,7 @@ func (r *reconciler) reconcile(ctx context.Context, key string) error { return nil } - logger.V(2).WithValues("kind", globalCopy.GetKind(), "namespace", globalCopy.GetNamespace(), "name", globalCopy.GetName()).Info("Updating object in global cache") + logger.V(2).Info("Updating object in global cache") _, err = r.updateObject(ctx, clusterName, globalCopy) // no need for patch because there is only this actor return err } diff --git a/pkg/reconciler/tenancy/initialization/apibinder_initializer_controller.go b/pkg/reconciler/tenancy/initialization/apibinder_initializer_controller.go index 136d585ea87..09713be2ba7 100644 --- a/pkg/reconciler/tenancy/initialization/apibinder_initializer_controller.go +++ b/pkg/reconciler/tenancy/initialization/apibinder_initializer_controller.go @@ -96,14 +96,6 @@ func NewAPIBinder( logger := logging.WithReconciler(klog.Background(), ControllerName) - indexers.AddIfNotPresentOrDie(workspaceTypeInformer.Informer().GetIndexer(), cache.Indexers{ - indexers.ByLogicalClusterPathAndName: indexers.IndexByLogicalClusterPathAndName, - }) - - indexers.AddIfNotPresentOrDie(globalWorkspaceTypeInformer.Informer().GetIndexer(), cache.Indexers{ - indexers.ByLogicalClusterPathAndName: indexers.IndexByLogicalClusterPathAndName, - }) - _, _ = logicalClusterInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: func(obj interface{}) { c.enqueueLogicalCluster(obj, logger) diff --git a/pkg/reconciler/tenancy/workspace/workspace_controller.go b/pkg/reconciler/tenancy/workspace/workspace_controller.go index 2d441ba67ed..8c4e70ab5aa 100644 --- a/pkg/reconciler/tenancy/workspace/workspace_controller.go +++ b/pkg/reconciler/tenancy/workspace/workspace_controller.go @@ -33,7 +33,6 @@ import ( "k8s.io/client-go/util/workqueue" "k8s.io/klog/v2" - "github.com/kcp-dev/kcp/pkg/indexers" "github.com/kcp-dev/kcp/pkg/logging" "github.com/kcp-dev/kcp/pkg/reconciler/committer" tenancyv1alpha1 "github.com/kcp-dev/kcp/sdk/apis/tenancy/v1alpha1" @@ -87,16 +86,6 @@ func NewController( commit: committer.NewCommitter[*tenancyv1alpha1.Workspace, tenancyv1alpha1client.WorkspaceInterface, *tenancyv1alpha1.WorkspaceSpec, *tenancyv1alpha1.WorkspaceStatus](kcpClusterClient.TenancyV1alpha1().Workspaces()), } - indexers.AddIfNotPresentOrDie(workspaceInformer.Informer().GetIndexer(), cache.Indexers{ - unschedulable: indexUnschedulable, - }) - indexers.AddIfNotPresentOrDie(globalShardInformer.Informer().GetIndexer(), cache.Indexers{ - byBase36Sha224Name: indexByBase36Sha224Name, - }) - indexers.AddIfNotPresentOrDie(globalWorkspaceTypeInformer.Informer().GetIndexer(), cache.Indexers{ - indexers.ByLogicalClusterPathAndName: indexers.IndexByLogicalClusterPathAndName, - }) - _, _ = workspaceInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: func(obj interface{}) { c.enqueue(obj) }, UpdateFunc: func(_, obj interface{}) { c.enqueue(obj) }, diff --git a/pkg/reconciler/tenancy/workspace/workspace_indexes.go b/pkg/reconciler/tenancy/workspace/workspace_indexes.go index d4d6089d326..11de65355d0 100644 --- a/pkg/reconciler/tenancy/workspace/workspace_indexes.go +++ b/pkg/reconciler/tenancy/workspace/workspace_indexes.go @@ -21,10 +21,6 @@ import ( "strings" "github.com/martinlindhe/base36" - - corev1alpha1 "github.com/kcp-dev/kcp/sdk/apis/core/v1alpha1" - tenancyv1alpha1 "github.com/kcp-dev/kcp/sdk/apis/tenancy/v1alpha1" - "github.com/kcp-dev/kcp/sdk/apis/third_party/conditions/util/conditions" ) const ( @@ -32,19 +28,6 @@ const ( unschedulable = "unschedulable" ) -func indexUnschedulable(obj interface{}) ([]string, error) { - workspace := obj.(*tenancyv1alpha1.Workspace) - if conditions.IsFalse(workspace, tenancyv1alpha1.WorkspaceScheduled) && conditions.GetReason(workspace, tenancyv1alpha1.WorkspaceScheduled) == tenancyv1alpha1.WorkspaceReasonUnschedulable { - return []string{"true"}, nil - } - return []string{}, nil -} - -func indexByBase36Sha224Name(obj interface{}) ([]string, error) { - s := obj.(*corev1alpha1.Shard) - return []string{ByBase36Sha224NameValue(s.Name)}, nil -} - func ByBase36Sha224NameValue(name string) string { hash := sha256.Sum224([]byte(name)) base36hash := strings.ToLower(base36.EncodeBytes(hash[:])) diff --git a/pkg/reconciler/tenancy/workspacetype/workspacetype_controller.go b/pkg/reconciler/tenancy/workspacetype/workspacetype_controller.go index d4034b700ec..30e86d0b33d 100644 --- a/pkg/reconciler/tenancy/workspacetype/workspacetype_controller.go +++ b/pkg/reconciler/tenancy/workspacetype/workspacetype_controller.go @@ -74,10 +74,6 @@ func NewController( commit: committer.NewCommitter[*WorkspaceType, Patcher, *WorkspaceTypeSpec, *WorkspaceTypeStatus](kcpClusterClient.TenancyV1alpha1().WorkspaceTypes()), } - indexers.AddIfNotPresentOrDie(workspaceTypeInformer.Informer().GetIndexer(), cache.Indexers{ - indexers.ByLogicalClusterPathAndName: indexers.IndexByLogicalClusterPathAndName, - }) - _, _ = workspaceTypeInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: func(obj interface{}) { c.enqueueWorkspaceTypes(obj) diff --git a/pkg/server/controllers.go b/pkg/server/controllers.go index 0995c7f0531..fec48bbb135 100644 --- a/pkg/server/controllers.go +++ b/pkg/server/controllers.go @@ -29,13 +29,19 @@ import ( kcpkubernetesclientset "github.com/kcp-dev/client-go/kubernetes" kcpmetadata "github.com/kcp-dev/client-go/metadata" "github.com/kcp-dev/logicalcluster/v3" + admissionregistrationv1 "k8s.io/api/admissionregistration/v1" + admissionregistrationv1alpha1 "k8s.io/api/admissionregistration/v1alpha1" corev1 "k8s.io/api/core/v1" + rbacv1 "k8s.io/api/rbac/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/discovery" "k8s.io/client-go/rest" + "k8s.io/client-go/tools/cache" certutil "k8s.io/client-go/util/cert" "k8s.io/client-go/util/keyutil" "k8s.io/klog/v2" @@ -48,6 +54,7 @@ import ( configuniversal "github.com/kcp-dev/kcp/config/universal" bootstrappolicy "github.com/kcp-dev/kcp/pkg/authorization/bootstrap" kcpfeatures "github.com/kcp-dev/kcp/pkg/features" + "github.com/kcp-dev/kcp/pkg/indexers" "github.com/kcp-dev/kcp/pkg/informer" "github.com/kcp-dev/kcp/pkg/reconciler/apis/apibinding" "github.com/kcp-dev/kcp/pkg/reconciler/apis/apibindingdeletion" @@ -60,6 +67,7 @@ import ( apisreplicateclusterrole "github.com/kcp-dev/kcp/pkg/reconciler/apis/replicateclusterrole" apisreplicateclusterrolebinding "github.com/kcp-dev/kcp/pkg/reconciler/apis/replicateclusterrolebinding" apisreplicatelogicalcluster "github.com/kcp-dev/kcp/pkg/reconciler/apis/replicatelogicalcluster" + "github.com/kcp-dev/kcp/pkg/reconciler/cache/labelclusterroles" "github.com/kcp-dev/kcp/pkg/reconciler/cache/replication" logicalclusterctrl "github.com/kcp-dev/kcp/pkg/reconciler/core/logicalcluster" "github.com/kcp-dev/kcp/pkg/reconciler/core/logicalclusterdeletion" @@ -79,12 +87,24 @@ import ( "github.com/kcp-dev/kcp/pkg/reconciler/tenancy/workspacetype" "github.com/kcp-dev/kcp/pkg/reconciler/topology/partitionset" initializingworkspacesbuilder "github.com/kcp-dev/kcp/pkg/virtual/initializingworkspaces/builder" + apisv1alpha1 "github.com/kcp-dev/kcp/sdk/apis/apis/v1alpha1" + "github.com/kcp-dev/kcp/sdk/apis/core" corev1alpha1 "github.com/kcp-dev/kcp/sdk/apis/core/v1alpha1" tenancyv1alpha1 "github.com/kcp-dev/kcp/sdk/apis/tenancy/v1alpha1" + "github.com/kcp-dev/kcp/sdk/apis/third_party/conditions/util/conditions" + "github.com/kcp-dev/kcp/sdk/client" kcpclientset "github.com/kcp-dev/kcp/sdk/client/clientset/versioned/cluster" kcpinformers "github.com/kcp-dev/kcp/sdk/client/informers/externalversions" ) +const ( + byBase36Sha224Name = "byBase36Sha224Name" + unschedulable = "unschedulable" + indexAPIExportsByAPIResourceSchema = "apiExportsByAPIResourceSchema" + indexAPIExportEndpointSliceByAPIExport = "indexAPIExportEndpointSliceByAPIExport" + indexAPIExportEndpointSlicesByPartition = "indexAPIExportEndpointSlicesByPartition" +) + type RunFunc func(ctx context.Context) type WaitFunc func(ctx context.Context, s *Server) error @@ -94,6 +114,69 @@ type controllerWrapper struct { Wait WaitFunc } +func indexUnschedulable(obj interface{}) ([]string, error) { + workspace := obj.(*tenancyv1alpha1.Workspace) + if conditions.IsFalse(workspace, tenancyv1alpha1.WorkspaceScheduled) && conditions.GetReason(workspace, tenancyv1alpha1.WorkspaceScheduled) == tenancyv1alpha1.WorkspaceReasonUnschedulable { + return []string{"true"}, nil + } + return []string{}, nil +} + +func indexByBase36Sha224Name(obj interface{}) ([]string, error) { + s := obj.(*corev1alpha1.Shard) + return []string{workspace.ByBase36Sha224NameValue(s.Name)}, nil +} + +// indexAPIExportsByAPIResourceSchemasFunc is an index function that maps an APIExport to its spec.latestResourceSchemas. +func indexAPIExportsByAPIResourceSchemasFunc(obj interface{}) ([]string, error) { + apiExport, ok := obj.(*apisv1alpha1.APIExport) + if !ok { + return []string{}, fmt.Errorf("obj is supposed to be an APIExport, but is %T", obj) + } + + ret := make([]string, len(apiExport.Spec.LatestResourceSchemas)) + for i := range apiExport.Spec.LatestResourceSchemas { + ret[i] = client.ToClusterAwareKey(logicalcluster.From(apiExport).Path(), apiExport.Spec.LatestResourceSchemas[i]) + } + + return ret, nil +} + +// indexAPIExportEndpointSliceByAPIExportFunc indexes the APIExportEndpointSlice by their APIExport's Reference Path and Name. +func indexAPIExportEndpointSliceByAPIExportFunc(obj interface{}) ([]string, error) { + apiExportEndpointSlice, ok := obj.(*apisv1alpha1.APIExportEndpointSlice) + if !ok { + return []string{}, fmt.Errorf("obj %T is not an APIExportEndpointSlice", obj) + } + + path := logicalcluster.NewPath(apiExportEndpointSlice.Spec.APIExport.Path) + if path.Empty() { + path = logicalcluster.From(apiExportEndpointSlice).Path() + } + return []string{path.Join(apiExportEndpointSlice.Spec.APIExport.Name).String()}, nil +} + +// indexAPIExportEndpointSlicesByPartitionFunc is an index function that maps a Partition to the key for its +// spec.partition. +func indexAPIExportEndpointSlicesByPartitionFunc(obj interface{}) ([]string, error) { + slice, ok := obj.(*apisv1alpha1.APIExportEndpointSlice) + if !ok { + return []string{}, fmt.Errorf("obj is supposed to be an APIExportEndpointSlice, but is %T", obj) + } + + if slice.Spec.Partition != "" { + clusterName := logicalcluster.From(slice).Path() + if !ok { + // this will never happen due to validation + return []string{}, fmt.Errorf("cluster information missing") + } + key := client.ToClusterAwareKey(clusterName, slice.Spec.Partition) + return []string{key}, nil + } + + return []string{}, nil +} + func (s *Server) startControllers(ctx context.Context) { for _, controller := range s.controllers { go s.runController(ctx, controller) @@ -139,13 +222,13 @@ func (s *Server) installClusterRoleAggregationController(ctx context.Context, co if err != nil { return err } - c := clusterroleaggregation.NewClusterRoleAggregation( - s.KubeSharedInformerFactory.Rbac().V1().ClusterRoles(), - kubeClient.RbacV1()) return s.registerController(&controllerWrapper{ Name: controllerName, Runner: func(ctx context.Context) { + c := clusterroleaggregation.NewClusterRoleAggregation( + s.KubeSharedInformerFactory.Rbac().V1().ClusterRoles(), + kubeClient.RbacV1()) c.Run(ctx, 5) }, }) @@ -333,15 +416,14 @@ func (s *Server) installTenancyLogicalClusterController(ctx context.Context, con return err } - controller := tenancylogicalcluster.NewController( - kubeClusterClient, - s.KcpSharedInformerFactory.Core().V1alpha1().LogicalClusters(), - s.KubeSharedInformerFactory.Rbac().V1().ClusterRoleBindings(), - ) - return s.registerController(&controllerWrapper{ Name: tenancylogicalcluster.ControllerName, Runner: func(ctx context.Context) { + controller := tenancylogicalcluster.NewController( + kubeClusterClient, + s.KcpSharedInformerFactory.Core().V1alpha1().LogicalClusters(), + s.KubeSharedInformerFactory.Rbac().V1().ClusterRoleBindings(), + ) controller.Start(ctx, 10) }, }) @@ -372,20 +454,19 @@ func (s *Server) installLogicalClusterDeletionController(ctx context.Context, co return err } - logicalClusterDeletionController := logicalclusterdeletion.NewController( - kubeClusterClient, - kcpClusterClient, - logicalClusterAdminConfig, - externalLogicalClusterAdminConfig, - metadataClusterClient, - s.KcpSharedInformerFactory.Core().V1alpha1().LogicalClusters(), - discoverResourcesFn, - s.KcpSharedInformerFactory.Apis().V1alpha1().APIBindings(), - ) - return s.registerController(&controllerWrapper{ Name: logicalclusterdeletion.ControllerName, Runner: func(ctx context.Context) { + logicalClusterDeletionController := logicalclusterdeletion.NewController( + kubeClusterClient, + kcpClusterClient, + logicalClusterAdminConfig, + externalLogicalClusterAdminConfig, + metadataClusterClient, + s.KcpSharedInformerFactory.Core().V1alpha1().LogicalClusters(), + discoverResourcesFn, + s.KcpSharedInformerFactory.Apis().V1alpha1().APIBindings(), + ) logicalClusterDeletionController.Start(ctx, 10) }, }) @@ -410,24 +491,33 @@ func (s *Server) installWorkspaceScheduler(ctx context.Context, config *rest.Con externalLogicalClusterAdminConfig = rest.CopyConfig(externalLogicalClusterAdminConfig) externalLogicalClusterAdminConfig = rest.AddUserAgent(externalLogicalClusterAdminConfig, workspace.ControllerName+"+"+s.Options.Extra.ShardName) - workspaceController, err := workspace.NewController( - s.Options.Extra.ShardName, - kcpClusterClient, - kubeClusterClient, - logicalClusterAdminConfig, - externalLogicalClusterAdminConfig, - s.KcpSharedInformerFactory.Tenancy().V1alpha1().Workspaces(), - s.CacheKcpSharedInformerFactory.Core().V1alpha1().Shards(), - s.CacheKcpSharedInformerFactory.Tenancy().V1alpha1().WorkspaceTypes(), - s.KcpSharedInformerFactory.Core().V1alpha1().LogicalClusters(), - ) - if err != nil { - return err - } + indexers.AddIfNotPresentOrDie(s.KcpSharedInformerFactory.Tenancy().V1alpha1().Workspaces().Informer().GetIndexer(), cache.Indexers{ + unschedulable: indexUnschedulable, + }) + indexers.AddIfNotPresentOrDie(s.CacheKcpSharedInformerFactory.Core().V1alpha1().Shards().Informer().GetIndexer(), cache.Indexers{ + byBase36Sha224Name: indexByBase36Sha224Name, + }) + indexers.AddIfNotPresentOrDie(s.CacheKcpSharedInformerFactory.Tenancy().V1alpha1().WorkspaceTypes().Informer().GetIndexer(), cache.Indexers{ + indexers.ByLogicalClusterPathAndName: indexers.IndexByLogicalClusterPathAndName, + }) if err := s.registerController(&controllerWrapper{ Name: workspace.ControllerName, Runner: func(ctx context.Context) { + workspaceController, err := workspace.NewController( + s.Options.Extra.ShardName, + kcpClusterClient, + kubeClusterClient, + logicalClusterAdminConfig, + externalLogicalClusterAdminConfig, + s.KcpSharedInformerFactory.Tenancy().V1alpha1().Workspaces(), + s.CacheKcpSharedInformerFactory.Core().V1alpha1().Shards(), + s.CacheKcpSharedInformerFactory.Tenancy().V1alpha1().WorkspaceTypes(), + s.KcpSharedInformerFactory.Core().V1alpha1().LogicalClusters(), + ) + if err != nil { + panic(err) + } workspaceController.Start(ctx, 2) }, }); err != nil { @@ -441,25 +531,25 @@ func (s *Server) installWorkspaceScheduler(ctx context.Context, config *rest.Con return err } - var workspaceShardController *shard.Controller - if s.Options.Extra.ShardName == corev1alpha1.RootShard { - workspaceShardController, err = shard.NewController( - kcpClusterClient, - s.KcpSharedInformerFactory.Core().V1alpha1().Shards(), - ) - if err != nil { - return err - } - } - if workspaceShardController != nil { - if err := s.registerController(&controllerWrapper{ - Name: shard.ControllerName, - Runner: func(ctx context.Context) { + if err := s.registerController(&controllerWrapper{ + Name: shard.ControllerName, + Runner: func(ctx context.Context) { + var workspaceShardController *shard.Controller + if s.Options.Extra.ShardName == corev1alpha1.RootShard { + workspaceShardController, err = shard.NewController( + kcpClusterClient, + s.KcpSharedInformerFactory.Core().V1alpha1().Shards(), + ) + if err != nil { + panic(err) + } + } + if workspaceShardController != nil { workspaceShardController.Start(ctx, 2) - }, - }); err != nil { - return err - } + } + }, + }); err != nil { + return err } workspaceTypeConfig := rest.CopyConfig(config) @@ -468,19 +558,20 @@ func (s *Server) installWorkspaceScheduler(ctx context.Context, config *rest.Con if err != nil { return err } - - workspaceTypeController, err := workspacetype.NewController( - kcpClusterClient, - s.KcpSharedInformerFactory.Tenancy().V1alpha1().WorkspaceTypes(), - s.CacheKcpSharedInformerFactory.Core().V1alpha1().Shards(), - ) - if err != nil { - return err - } - + indexers.AddIfNotPresentOrDie(s.KcpSharedInformerFactory.Tenancy().V1alpha1().WorkspaceTypes().Informer().GetIndexer(), cache.Indexers{ + indexers.ByLogicalClusterPathAndName: indexers.IndexByLogicalClusterPathAndName, + }) if err := s.registerController(&controllerWrapper{ Name: workspacetype.ControllerName, Runner: func(ctx context.Context) { + workspaceTypeController, err := workspacetype.NewController( + kcpClusterClient, + s.KcpSharedInformerFactory.Tenancy().V1alpha1().WorkspaceTypes(), + s.CacheKcpSharedInformerFactory.Core().V1alpha1().Shards(), + ) + if err != nil { + panic(err) + } workspaceTypeController.Start(ctx, 2) }, }); err != nil { @@ -502,22 +593,20 @@ func (s *Server) installWorkspaceScheduler(ctx context.Context, config *rest.Con if err != nil { return err } - - universalController, err := bootstrap.NewController( - dynamicClusterClient, - bootstrapKcpClusterClient, - s.KcpSharedInformerFactory.Core().V1alpha1().LogicalClusters(), - tenancyv1alpha1.WorkspaceTypeReference{Path: "root", Name: "universal"}, - configuniversal.Bootstrap, - sets.New[string](s.Options.Extra.BatteriesIncluded...), - ) - if err != nil { - return err - } - return s.registerController(&controllerWrapper{ Name: universalControllerName, Runner: func(ctx context.Context) { + universalController, err := bootstrap.NewController( + dynamicClusterClient, + bootstrapKcpClusterClient, + s.KcpSharedInformerFactory.Core().V1alpha1().LogicalClusters(), + tenancyv1alpha1.WorkspaceTypeReference{Path: "root", Name: "universal"}, + configuniversal.Bootstrap, + sets.New[string](s.Options.Extra.BatteriesIncluded...), + ) + if err != nil { + panic(err) + } universalController.Start(ctx, 2) }, }) @@ -542,19 +631,18 @@ func (s *Server) installWorkspaceMountsScheduler(ctx context.Context, config *re return err } - workspaceMountsController, err := workspacemounts.NewController( - kcpClusterClient, - dynamicClusterClient, - s.KcpSharedInformerFactory.Tenancy().V1alpha1().Workspaces(), - s.DiscoveringDynamicSharedInformerFactory, - ) - if err != nil { - return err - } - return s.registerController(&controllerWrapper{ Name: workspacemounts.ControllerName, Runner: func(ctx context.Context) { + workspaceMountsController, err := workspacemounts.NewController( + kcpClusterClient, + dynamicClusterClient, + s.KcpSharedInformerFactory.Tenancy().V1alpha1().Workspaces(), + s.DiscoveringDynamicSharedInformerFactory, + ) + if err != nil { + panic(err) + } workspaceMountsController.Start(ctx, 2) }, }) @@ -568,18 +656,17 @@ func (s *Server) installLogicalCluster(ctx context.Context, config *rest.Config) return err } - logicalClusterController, err := logicalclusterctrl.NewController( - s.CompletedConfig.ShardExternalURL, - kcpClusterClient, - s.KcpSharedInformerFactory.Core().V1alpha1().LogicalClusters(), - ) - if err != nil { - return err - } - return s.registerController(&controllerWrapper{ Name: logicalclusterctrl.ControllerName, Runner: func(ctx context.Context) { + logicalClusterController, err := logicalclusterctrl.NewController( + s.CompletedConfig.ShardExternalURL, + kcpClusterClient, + s.KcpSharedInformerFactory.Core().V1alpha1().LogicalClusters(), + ) + if err != nil { + panic(err) + } logicalClusterController.Start(ctx, 2) }, }) @@ -600,21 +687,20 @@ func (s *Server) installAPIBindingController(ctx context.Context, config *rest.C return err } - c, err := apibinding.NewController( - crdClusterClient, - kcpClusterClient, - s.KcpSharedInformerFactory.Apis().V1alpha1().APIBindings(), - s.KcpSharedInformerFactory.Apis().V1alpha1().APIExports(), - s.KcpSharedInformerFactory.Apis().V1alpha1().APIResourceSchemas(), - s.KcpSharedInformerFactory.Apis().V1alpha1().APIConversions(), - s.CacheKcpSharedInformerFactory.Apis().V1alpha1().APIExports(), - s.CacheKcpSharedInformerFactory.Apis().V1alpha1().APIResourceSchemas(), - s.CacheKcpSharedInformerFactory.Apis().V1alpha1().APIConversions(), - s.ApiExtensionsSharedInformerFactory.Apiextensions().V1().CustomResourceDefinitions(), - ) - if err != nil { - return err - } + // APIBinding indexers + indexers.AddIfNotPresentOrDie(s.KcpSharedInformerFactory.Apis().V1alpha1().APIBindings().Informer().GetIndexer(), cache.Indexers{ + indexers.APIBindingsByAPIExport: indexers.IndexAPIBindingByAPIExport, + }) + + // APIExport indexers + indexers.AddIfNotPresentOrDie(s.KcpSharedInformerFactory.Apis().V1alpha1().APIExports().Informer().GetIndexer(), cache.Indexers{ + indexers.ByLogicalClusterPathAndName: indexers.IndexByLogicalClusterPathAndName, + indexAPIExportsByAPIResourceSchema: indexAPIExportsByAPIResourceSchemasFunc, + }) + indexers.AddIfNotPresentOrDie(s.CacheKcpSharedInformerFactory.Apis().V1alpha1().APIExports().Informer().GetIndexer(), cache.Indexers{ + indexers.ByLogicalClusterPathAndName: indexers.IndexByLogicalClusterPathAndName, + indexAPIExportsByAPIResourceSchema: indexAPIExportsByAPIResourceSchemasFunc, + }) if err := s.registerController(&controllerWrapper{ Name: apibinding.ControllerName, @@ -633,6 +719,21 @@ func (s *Server) installAPIBindingController(ctx context.Context, config *rest.C }) }, Runner: func(ctx context.Context) { + c, err := apibinding.NewController( + crdClusterClient, + kcpClusterClient, + s.KcpSharedInformerFactory.Apis().V1alpha1().APIBindings(), + s.KcpSharedInformerFactory.Apis().V1alpha1().APIExports(), + s.KcpSharedInformerFactory.Apis().V1alpha1().APIResourceSchemas(), + s.KcpSharedInformerFactory.Apis().V1alpha1().APIConversions(), + s.CacheKcpSharedInformerFactory.Apis().V1alpha1().APIExports(), + s.CacheKcpSharedInformerFactory.Apis().V1alpha1().APIResourceSchemas(), + s.CacheKcpSharedInformerFactory.Apis().V1alpha1().APIConversions(), + s.ApiExtensionsSharedInformerFactory.Apiextensions().V1().CustomResourceDefinitions(), + ) + if err != nil { + panic(err) + } c.Start(ctx, 2) }, }); err != nil { @@ -651,21 +752,24 @@ func (s *Server) installAPIBindingController(ctx context.Context, config *rest.C return err } - permissionClaimLabelController, err := permissionclaimlabel.NewController( - kcpClusterClient, - dynamicClusterClient, - ddsif, - s.KcpSharedInformerFactory.Apis().V1alpha1().APIBindings(), - s.KcpSharedInformerFactory.Apis().V1alpha1().APIExports(), - s.CacheKcpSharedInformerFactory.Apis().V1alpha1().APIExports(), - ) - if err != nil { - return err - } + indexers.AddIfNotPresentOrDie(s.KcpSharedInformerFactory.Apis().V1alpha1().APIExports().Informer().GetIndexer(), cache.Indexers{ + indexers.ByLogicalClusterPathAndName: indexers.IndexByLogicalClusterPathAndName, + }) if err := s.registerController(&controllerWrapper{ Name: permissionclaimlabel.ControllerName, Runner: func(ctx context.Context) { + permissionClaimLabelController, err := permissionclaimlabel.NewController( + kcpClusterClient, + dynamicClusterClient, + ddsif, + s.KcpSharedInformerFactory.Apis().V1alpha1().APIBindings(), + s.KcpSharedInformerFactory.Apis().V1alpha1().APIExports(), + s.CacheKcpSharedInformerFactory.Apis().V1alpha1().APIExports(), + ) + if err != nil { + panic(err) + } permissionClaimLabelController.Start(ctx, 5) }, }); err != nil { @@ -683,21 +787,33 @@ func (s *Server) installAPIBindingController(ctx context.Context, config *rest.C if err != nil { return err } - permissionClaimLabelResourceController, err := permissionclaimlabel.NewResourceController( - kcpClusterClient, - dynamicClusterClient, - ddsif, - s.KcpSharedInformerFactory.Apis().V1alpha1().APIBindings(), - s.KcpSharedInformerFactory.Apis().V1alpha1().APIExports(), - s.CacheKcpSharedInformerFactory.Apis().V1alpha1().APIExports(), - ) - if err != nil { + + if err := s.KcpSharedInformerFactory.Apis().V1alpha1().APIBindings().Informer().GetIndexer().AddIndexers( + cache.Indexers{ + indexers.APIBindingByClusterAndAcceptedClaimedGroupResources: indexers.IndexAPIBindingByClusterAndAcceptedClaimedGroupResources, + }, + ); err != nil { return err } + indexers.AddIfNotPresentOrDie(s.KcpSharedInformerFactory.Apis().V1alpha1().APIExports().Informer().GetIndexer(), cache.Indexers{ + indexers.ByLogicalClusterPathAndName: indexers.IndexByLogicalClusterPathAndName, + }) + if err := s.registerController(&controllerWrapper{ Name: permissionclaimlabel.ResourceControllerName, Runner: func(ctx context.Context) { + permissionClaimLabelResourceController, err := permissionclaimlabel.NewResourceController( + kcpClusterClient, + dynamicClusterClient, + ddsif, + s.KcpSharedInformerFactory.Apis().V1alpha1().APIBindings(), + s.KcpSharedInformerFactory.Apis().V1alpha1().APIExports(), + s.CacheKcpSharedInformerFactory.Apis().V1alpha1().APIExports(), + ) + if err != nil { + panic(err) + } permissionClaimLabelResourceController.Start(ctx, 2) }, }); err != nil { @@ -715,15 +831,15 @@ func (s *Server) installAPIBindingController(ctx context.Context, config *rest.C if err != nil { return err } - apibindingDeletionController := apibindingdeletion.NewController( - metadataClient, - kcpClusterClient, - s.KcpSharedInformerFactory.Apis().V1alpha1().APIBindings(), - ) return s.registerController(&controllerWrapper{ Name: apibindingdeletion.ControllerName, Runner: func(ctx context.Context) { + apibindingDeletionController := apibindingdeletion.NewController( + metadataClient, + kcpClusterClient, + s.KcpSharedInformerFactory.Apis().V1alpha1().APIBindings(), + ) apibindingDeletionController.Start(ctx, 10) }, }) @@ -771,25 +887,36 @@ func (s *Server) installAPIBinderController(ctx context.Context, config *rest.Co resyncPeriod, ) - c, err := initialization.NewAPIBinder( - initializingWorkspacesKcpClusterClient, - initializingWorkspacesKcpInformers.Core().V1alpha1().LogicalClusters(), - s.KcpSharedInformerFactory.Tenancy().V1alpha1().WorkspaceTypes(), - s.CacheKcpSharedInformerFactory.Tenancy().V1alpha1().WorkspaceTypes(), - s.KcpSharedInformerFactory.Apis().V1alpha1().APIBindings(), - s.KcpSharedInformerFactory.Apis().V1alpha1().APIExports(), - s.CacheKcpSharedInformerFactory.Apis().V1alpha1().APIExports(), - ) - if err != nil { - return err - } + indexers.AddIfNotPresentOrDie(s.KcpSharedInformerFactory.Tenancy().V1alpha1().WorkspaceTypes().Informer().GetIndexer(), cache.Indexers{ + indexers.ByLogicalClusterPathAndName: indexers.IndexByLogicalClusterPathAndName, + }) + + indexers.AddIfNotPresentOrDie(s.CacheKcpSharedInformerFactory.Tenancy().V1alpha1().WorkspaceTypes().Informer().GetIndexer(), cache.Indexers{ + indexers.ByLogicalClusterPathAndName: indexers.IndexByLogicalClusterPathAndName, + }) return s.registerController(&controllerWrapper{ Name: initialization.ControllerName, + Wait: func(ctx context.Context, s *Server) error { + return wait.PollUntilContextCancel(ctx, time.Millisecond*100, true, func(ctx context.Context) (bool, error) { + initializingWorkspacesKcpInformers.Start(ctx.Done()) + logicalClusterCacheSynced := initializingWorkspacesKcpInformers.Core().V1alpha1().LogicalClusters().Informer().HasSynced() + return logicalClusterCacheSynced, nil + }) + }, Runner: func(ctx context.Context) { - initializingWorkspacesKcpInformers.Start(ctx.Done()) - initializingWorkspacesKcpInformers.WaitForCacheSync(ctx.Done()) - + c, err := initialization.NewAPIBinder( + initializingWorkspacesKcpClusterClient, + initializingWorkspacesKcpInformers.Core().V1alpha1().LogicalClusters(), + s.KcpSharedInformerFactory.Tenancy().V1alpha1().WorkspaceTypes(), + s.CacheKcpSharedInformerFactory.Tenancy().V1alpha1().WorkspaceTypes(), + s.KcpSharedInformerFactory.Apis().V1alpha1().APIBindings(), + s.KcpSharedInformerFactory.Apis().V1alpha1().APIExports(), + s.CacheKcpSharedInformerFactory.Apis().V1alpha1().APIExports(), + ) + if err != nil { + panic(err) + } c.Start(ctx, 2) }, }) @@ -804,18 +931,24 @@ func (s *Server) installCRDCleanupController(ctx context.Context, config *rest.C return err } - c, err := crdcleanup.NewController( - s.ApiExtensionsSharedInformerFactory.Apiextensions().V1().CustomResourceDefinitions(), - crdClusterClient, - s.KcpSharedInformerFactory.Apis().V1alpha1().APIBindings(), + indexers.AddIfNotPresentOrDie( + s.KcpSharedInformerFactory.Apis().V1alpha1().APIBindings().Informer().GetIndexer(), + cache.Indexers{ + indexers.APIBindingByBoundResourceUID: indexers.IndexAPIBindingByBoundResourceUID, + }, ) - if err != nil { - return err - } return s.registerController(&controllerWrapper{ Name: crdcleanup.ControllerName, Runner: func(ctx context.Context) { + c, err := crdcleanup.NewController( + s.ApiExtensionsSharedInformerFactory.Apiextensions().V1().CustomResourceDefinitions(), + crdClusterClient, + s.KcpSharedInformerFactory.Apis().V1alpha1().APIBindings(), + ) + if err != nil { + panic(err) + } c.Start(ctx, 2) }, }) @@ -833,24 +966,17 @@ func (s *Server) installAPIExportController(ctx context.Context, config *rest.Co return err } - c, err := apiexport.NewController( - kcpClusterClient, - s.KcpSharedInformerFactory.Apis().V1alpha1().APIExports(), - s.CacheKcpSharedInformerFactory.Core().V1alpha1().Shards(), - kubeClusterClient, - s.KubeSharedInformerFactory.Core().V1().Namespaces(), - s.KubeSharedInformerFactory.Core().V1().Secrets(), + indexers.AddIfNotPresentOrDie( + s.KcpSharedInformerFactory.Apis().V1alpha1().APIExports().Informer().GetIndexer(), + cache.Indexers{ + indexers.APIExportByIdentity: indexers.IndexAPIExportByIdentity, + indexers.APIExportBySecret: indexers.IndexAPIExportBySecret, + }, ) - if err != nil { - return err - } return s.registerController(&controllerWrapper{ Name: apiexport.ControllerName, Wait: func(ctx context.Context, s *Server) error { - // do custom wait logic here because APIExports+APIBindings are special as system CRDs, - // and the controllers must run as soon as these two informers are up in order to bootstrap - // the rest of the system. Everything else in the kcp clientset is APIBinding based. return wait.PollUntilContextCancel(ctx, time.Millisecond*100, true, func(ctx context.Context) (bool, error) { crdsSynced := s.ApiExtensionsSharedInformerFactory.Apiextensions().V1().CustomResourceDefinitions().Informer().HasSynced() exportsSynced := s.KcpSharedInformerFactory.Apis().V1alpha1().APIExports().Informer().HasSynced() @@ -858,6 +984,17 @@ func (s *Server) installAPIExportController(ctx context.Context, config *rest.Co }) }, Runner: func(ctx context.Context) { + c, err := apiexport.NewController( + kcpClusterClient, + s.KcpSharedInformerFactory.Apis().V1alpha1().APIExports(), + s.CacheKcpSharedInformerFactory.Core().V1alpha1().Shards(), + kubeClusterClient, + s.KubeSharedInformerFactory.Core().V1().Namespaces(), + s.KubeSharedInformerFactory.Core().V1().Secrets(), + ) + if err != nil { + panic(err) + } c.Start(ctx, 2) }, }) @@ -870,16 +1007,17 @@ func (s *Server) installApisReplicateClusterRoleControllers(ctx context.Context, if err != nil { return err } - - c := apisreplicateclusterrole.NewController( - kubeClusterClient, - s.KubeSharedInformerFactory.Rbac().V1().ClusterRoles(), - s.KubeSharedInformerFactory.Rbac().V1().ClusterRoleBindings(), - ) - + indexers.AddIfNotPresentOrDie(s.KubeSharedInformerFactory.Rbac().V1().ClusterRoleBindings().Informer().GetIndexer(), cache.Indexers{ + labelclusterroles.ClusterRoleBindingByClusterRoleName: labelclusterroles.IndexClusterRoleBindingByClusterRoleName, + }) return s.registerController(&controllerWrapper{ Name: apisreplicateclusterrole.ControllerName, Runner: func(ctx context.Context) { + c := apisreplicateclusterrole.NewController( + kubeClusterClient, + s.KubeSharedInformerFactory.Rbac().V1().ClusterRoles(), + s.KubeSharedInformerFactory.Rbac().V1().ClusterRoleBindings(), + ) c.Start(ctx, 2) }, }) @@ -892,17 +1030,19 @@ func (s *Server) installCoreReplicateClusterRoleControllers(ctx context.Context, if err != nil { return err } - - c := coresreplicateclusterrole.NewController( - kubeClusterClient, - s.KubeSharedInformerFactory.Rbac().V1().ClusterRoles(), - s.KubeSharedInformerFactory.Rbac().V1().ClusterRoleBindings(), - s.KcpSharedInformerFactory.Core().V1alpha1().LogicalClusters(), - ) - + indexers.AddIfNotPresentOrDie(s.KubeSharedInformerFactory.Rbac().V1().ClusterRoleBindings().Informer().GetIndexer(), cache.Indexers{ + labelclusterroles.ClusterRoleBindingByClusterRoleName: labelclusterroles.IndexClusterRoleBindingByClusterRoleName, + }) return s.registerController(&controllerWrapper{ Name: coresreplicateclusterrole.ControllerName, Runner: func(ctx context.Context) { + c := coresreplicateclusterrole.NewController( + kubeClusterClient, + s.KubeSharedInformerFactory.Rbac().V1().ClusterRoles(), + s.KubeSharedInformerFactory.Rbac().V1().ClusterRoleBindings(), + s.KcpSharedInformerFactory.Core().V1alpha1().LogicalClusters(), + ) + c.Start(ctx, 2) }, }) @@ -915,16 +1055,19 @@ func (s *Server) installApisReplicateClusterRoleBindingControllers(ctx context.C if err != nil { return err } - - c := apisreplicateclusterrolebinding.NewController( - kubeClusterClient, - s.KubeSharedInformerFactory.Rbac().V1().ClusterRoleBindings(), - s.KubeSharedInformerFactory.Rbac().V1().ClusterRoles(), - ) + indexers.AddIfNotPresentOrDie(s.KubeSharedInformerFactory.Rbac().V1().ClusterRoleBindings().Informer().GetIndexer(), cache.Indexers{ + labelclusterroles.ClusterRoleBindingByClusterRoleName: labelclusterroles.IndexClusterRoleBindingByClusterRoleName, + }) return s.registerController(&controllerWrapper{ Name: apisreplicateclusterrolebinding.ControllerName, Runner: func(ctx context.Context) { + c := apisreplicateclusterrolebinding.NewController( + kubeClusterClient, + s.KubeSharedInformerFactory.Rbac().V1().ClusterRoleBindings(), + s.KubeSharedInformerFactory.Rbac().V1().ClusterRoles(), + ) + c.Start(ctx, 2) }, }) @@ -938,15 +1081,14 @@ func (s *Server) installApisReplicateLogicalClusterControllers(ctx context.Conte return err } - c := apisreplicatelogicalcluster.NewController( - kcpClusterClient, - s.KcpSharedInformerFactory.Core().V1alpha1().LogicalClusters(), - s.KcpSharedInformerFactory.Apis().V1alpha1().APIExports(), - ) - return s.registerController(&controllerWrapper{ Name: apisreplicatelogicalcluster.ControllerName, Runner: func(ctx context.Context) { + c := apisreplicatelogicalcluster.NewController( + kcpClusterClient, + s.KcpSharedInformerFactory.Core().V1alpha1().LogicalClusters(), + s.KcpSharedInformerFactory.Apis().V1alpha1().APIExports(), + ) c.Start(ctx, 2) }, }) @@ -960,15 +1102,14 @@ func (s *Server) installTenancyReplicateLogicalClusterControllers(ctx context.Co return err } - c := tenancyreplicatelogicalcluster.NewController( - kcpClusterClient, - s.KcpSharedInformerFactory.Core().V1alpha1().LogicalClusters(), - s.KcpSharedInformerFactory.Tenancy().V1alpha1().WorkspaceTypes(), - ) - return s.registerController(&controllerWrapper{ Name: tenancyreplicatelogicalcluster.ControllerName, Runner: func(ctx context.Context) { + c := tenancyreplicatelogicalcluster.NewController( + kcpClusterClient, + s.KcpSharedInformerFactory.Core().V1alpha1().LogicalClusters(), + s.KcpSharedInformerFactory.Tenancy().V1alpha1().WorkspaceTypes(), + ) c.Start(ctx, 2) }, }) @@ -982,16 +1123,15 @@ func (s *Server) installCoreReplicateClusterRoleBindingControllers(ctx context.C return err } - c := corereplicateclusterrolebinding.NewController( - kubeClusterClient, - s.KubeSharedInformerFactory.Rbac().V1().ClusterRoleBindings(), - s.KubeSharedInformerFactory.Rbac().V1().ClusterRoles(), - s.KcpSharedInformerFactory.Core().V1alpha1().LogicalClusters(), - ) - return s.registerController(&controllerWrapper{ Name: corereplicateclusterrolebinding.ControllerName, Runner: func(ctx context.Context) { + c := corereplicateclusterrolebinding.NewController( + kubeClusterClient, + s.KubeSharedInformerFactory.Rbac().V1().ClusterRoleBindings(), + s.KubeSharedInformerFactory.Rbac().V1().ClusterRoles(), + s.KcpSharedInformerFactory.Core().V1alpha1().LogicalClusters(), + ) c.Start(ctx, 2) }, }) @@ -1005,15 +1145,14 @@ func (s *Server) installTenancyReplicateClusterRoleControllers(ctx context.Conte return err } - c := tenancyreplicateclusterrole.NewController( - kubeClusterClient, - s.KubeSharedInformerFactory.Rbac().V1().ClusterRoles(), - s.KubeSharedInformerFactory.Rbac().V1().ClusterRoleBindings(), - ) - return s.registerController(&controllerWrapper{ Name: tenancyreplicateclusterrole.ControllerName, Runner: func(ctx context.Context) { + c := tenancyreplicateclusterrole.NewController( + kubeClusterClient, + s.KubeSharedInformerFactory.Rbac().V1().ClusterRoles(), + s.KubeSharedInformerFactory.Rbac().V1().ClusterRoleBindings(), + ) c.Start(ctx, 2) }, }) @@ -1027,15 +1166,14 @@ func (s *Server) installTenancyReplicateClusterRoleBindingControllers(ctx contex return err } - c := tenancyreplicateclusterrolebinding.NewController( - kubeClusterClient, - s.KubeSharedInformerFactory.Rbac().V1().ClusterRoleBindings(), - s.KubeSharedInformerFactory.Rbac().V1().ClusterRoles(), - ) - return s.registerController(&controllerWrapper{ Name: tenancyreplicateclusterrolebinding.ControllerName, Runner: func(ctx context.Context) { + c := tenancyreplicateclusterrolebinding.NewController( + kubeClusterClient, + s.KubeSharedInformerFactory.Rbac().V1().ClusterRoleBindings(), + s.KubeSharedInformerFactory.Rbac().V1().ClusterRoles(), + ) c.Start(ctx, 2) }, }) @@ -1050,21 +1188,41 @@ func (s *Server) installAPIExportEndpointSliceController(ctx context.Context, co return err } - c, err := apiexportendpointslice.NewController( - s.KcpSharedInformerFactory.Apis().V1alpha1().APIExportEndpointSlices(), - // Shards and APIExports get retrieved from cache server - s.CacheKcpSharedInformerFactory.Core().V1alpha1().Shards(), - s.CacheKcpSharedInformerFactory.Apis().V1alpha1().APIExports(), - s.KcpSharedInformerFactory.Topology().V1alpha1().Partitions(), - kcpClusterClient, - ) - if err != nil { - return err - } + indexers.AddIfNotPresentOrDie(s.CacheKcpSharedInformerFactory.Apis().V1alpha1().APIExports().Informer().GetIndexer(), cache.Indexers{ + indexers.ByLogicalClusterPathAndName: indexers.IndexByLogicalClusterPathAndName, + }) + + indexers.AddIfNotPresentOrDie(s.KcpSharedInformerFactory.Apis().V1alpha1().APIExportEndpointSlices().Informer().GetIndexer(), cache.Indexers{ + indexAPIExportEndpointSliceByAPIExport: indexAPIExportEndpointSliceByAPIExportFunc, + }) + + indexers.AddIfNotPresentOrDie(s.KcpSharedInformerFactory.Apis().V1alpha1().APIExportEndpointSlices().Informer().GetIndexer(), cache.Indexers{ + indexAPIExportEndpointSlicesByPartition: indexAPIExportEndpointSlicesByPartitionFunc, + }) return s.registerController(&controllerWrapper{ Name: apiexportendpointslice.ControllerName, + Wait: func(ctx context.Context, s *Server) error { + return wait.PollUntilContextCancel(ctx, time.Millisecond*100, true, func(ctx context.Context) (bool, error) { + apiexportEndpointSliceCache := s.KcpSharedInformerFactory.Apis().V1alpha1().APIExportEndpointSlices().Informer().HasSynced() + shardCacheSynced := s.CacheKcpSharedInformerFactory.Core().V1alpha1().Shards().Informer().HasSynced() + apiexportendpointcacheSynced := s.CacheKcpSharedInformerFactory.Apis().V1alpha1().APIExports().Informer().HasSynced() + partitionSynced := s.KcpSharedInformerFactory.Topology().V1alpha1().Partitions().Informer().HasSynced() + return apiexportEndpointSliceCache && shardCacheSynced && apiexportendpointcacheSynced && partitionSynced, nil + }) + }, Runner: func(ctx context.Context) { + c, err := apiexportendpointslice.NewController( + s.KcpSharedInformerFactory.Apis().V1alpha1().APIExportEndpointSlices(), + // Shards and APIExports get retrieved from cache server + s.CacheKcpSharedInformerFactory.Core().V1alpha1().Shards(), + s.CacheKcpSharedInformerFactory.Apis().V1alpha1().APIExports(), + s.KcpSharedInformerFactory.Topology().V1alpha1().Partitions(), + kcpClusterClient, + ) + if err != nil { + panic(err) + } c.Start(ctx, 2) }, }) @@ -1079,19 +1237,27 @@ func (s *Server) installPartitionSetController(ctx context.Context, config *rest return err } - c, err := partitionset.NewController( - s.KcpSharedInformerFactory.Topology().V1alpha1().PartitionSets(), - s.KcpSharedInformerFactory.Topology().V1alpha1().Partitions(), - s.CacheKcpSharedInformerFactory.Core().V1alpha1().Shards(), - kcpClusterClient, - ) - if err != nil { - return err - } - return s.registerController(&controllerWrapper{ Name: partitionset.ControllerName, + Wait: func(ctx context.Context, s *Server) error { + return wait.PollUntilContextCancel(ctx, time.Millisecond*100, true, func(ctx context.Context) (bool, error) { + partitionsetSynced := s.KcpSharedInformerFactory.Topology().V1alpha1().PartitionSets().Informer().HasSynced() + partitionSynced := s.KcpSharedInformerFactory.Topology().V1alpha1().Partitions().Informer().HasSynced() + shardSynced := s.CacheKcpSharedInformerFactory.Core().V1alpha1().Shards().Informer().HasSynced() + return partitionsetSynced && partitionSynced && shardSynced, nil + }) + }, + Runner: func(ctx context.Context) { + c, err := partitionset.NewController( + s.KcpSharedInformerFactory.Topology().V1alpha1().PartitionSets(), + s.KcpSharedInformerFactory.Topology().V1alpha1().Partitions(), + s.CacheKcpSharedInformerFactory.Core().V1alpha1().Shards(), + kcpClusterClient, + ) + if err != nil { + panic(err) + } c.Start(ctx, 2) }, }) @@ -1105,17 +1271,24 @@ func (s *Server) installExtraAnnotationSyncController(ctx context.Context, confi return err } - c, err := extraannotationsync.NewController(kcpClusterClient, - s.KcpSharedInformerFactory.Apis().V1alpha1().APIExports(), - s.KcpSharedInformerFactory.Apis().V1alpha1().APIBindings(), - ) - if err != nil { - return err - } + indexers.AddIfNotPresentOrDie(s.KcpSharedInformerFactory.Apis().V1alpha1().APIExports().Informer().GetIndexer(), cache.Indexers{ + indexers.ByLogicalClusterPathAndName: indexers.IndexByLogicalClusterPathAndName, + }) + + indexers.AddIfNotPresentOrDie(s.KcpSharedInformerFactory.Apis().V1alpha1().APIBindings().Informer().GetIndexer(), cache.Indexers{ + indexers.APIBindingsByAPIExport: indexers.IndexAPIBindingByAPIExport, + }) return s.registerController(&controllerWrapper{ Name: extraannotationsync.ControllerName, Runner: func(ctx context.Context) { + c, err := extraannotationsync.NewController(kcpClusterClient, + s.KcpSharedInformerFactory.Apis().V1alpha1().APIExports(), + s.KcpSharedInformerFactory.Apis().V1alpha1().APIBindings(), + ) + if err != nil { + panic(err) + } c.Start(ctx, 2) }, }) @@ -1140,23 +1313,22 @@ func (s *Server) installKubeQuotaController( workersPerLogicalCluster = 1 ) - c, err := kubequota.NewController( - s.KcpSharedInformerFactory.Core().V1alpha1().LogicalClusters(), - kubeClusterClient, - s.KubeSharedInformerFactory, - s.DiscoveringDynamicSharedInformerFactory, - quotaResyncPeriod, - replenishmentPeriod, - workersPerLogicalCluster, - s.syncedCh, - ) - if err != nil { - return err - } - if err := s.registerController(&controllerWrapper{ Name: kubequota.ControllerName, Runner: func(ctx context.Context) { + c, err := kubequota.NewController( + s.KcpSharedInformerFactory.Core().V1alpha1().LogicalClusters(), + kubeClusterClient, + s.KubeSharedInformerFactory, + s.DiscoveringDynamicSharedInformerFactory, + quotaResyncPeriod, + replenishmentPeriod, + workersPerLogicalCluster, + s.syncedCh, + ) + if err != nil { + panic(err) + } c.Start(ctx, 2) }, }); err != nil { @@ -1179,29 +1351,154 @@ func (s *Server) installApiExportIdentityController(ctx context.Context, config if err != nil { return err } - c, err := identitycache.NewApiExportIdentityProviderController(kubeClusterClient, s.CacheKcpSharedInformerFactory.Apis().V1alpha1().APIExports(), s.KubeSharedInformerFactory.Core().V1().ConfigMaps()) - if err != nil { - return err - } return s.registerController(&controllerWrapper{ Name: identitycache.ControllerName, Runner: func(ctx context.Context) { + c, err := identitycache.NewApiExportIdentityProviderController(kubeClusterClient, s.CacheKcpSharedInformerFactory.Apis().V1alpha1().APIExports(), s.KubeSharedInformerFactory.Core().V1().ConfigMaps()) + if err != nil { + panic(err) + } c.Start(ctx, 1) }, }) } func (s *Server) installReplicationController(ctx context.Context, config *rest.Config) error { - // TODO(sttts): set user agent - controller, err := replication.NewController(s.Options.Extra.ShardName, s.CacheDynamicClient, s.KcpSharedInformerFactory, s.CacheKcpSharedInformerFactory, s.KubeSharedInformerFactory, s.CacheKubeSharedInformerFactory) - if err != nil { - return err + localKcpInformers := s.KcpSharedInformerFactory + globalKcpInformers := s.CacheKcpSharedInformerFactory + localKubeInformers := s.KubeSharedInformerFactory + globalKubeInformers := s.CacheKubeSharedInformerFactory + + gvrs := map[schema.GroupVersionResource]replication.ReplicatedGVR{ + apisv1alpha1.SchemeGroupVersion.WithResource("apiexports"): { + Kind: "APIExport", + Local: localKcpInformers.Apis().V1alpha1().APIExports().Informer(), + Global: globalKcpInformers.Apis().V1alpha1().APIExports().Informer(), + }, + apisv1alpha1.SchemeGroupVersion.WithResource("apiresourceschemas"): { + Kind: "APIResourceSchema", + Local: localKcpInformers.Apis().V1alpha1().APIResourceSchemas().Informer(), + Global: globalKcpInformers.Apis().V1alpha1().APIResourceSchemas().Informer(), + }, + apisv1alpha1.SchemeGroupVersion.WithResource("apiconversions"): { + Kind: "APIConversion", + Local: localKcpInformers.Apis().V1alpha1().APIConversions().Informer(), + Global: globalKcpInformers.Apis().V1alpha1().APIConversions().Informer(), + }, + admissionregistrationv1.SchemeGroupVersion.WithResource("mutatingwebhookconfigurations"): { + Kind: "MutatingWebhookConfiguration", + Local: localKubeInformers.Admissionregistration().V1().MutatingWebhookConfigurations().Informer(), + Global: globalKubeInformers.Admissionregistration().V1().MutatingWebhookConfigurations().Informer(), + }, + admissionregistrationv1.SchemeGroupVersion.WithResource("validatingwebhookconfigurations"): { + Kind: "ValidatingWebhookConfiguration", + Local: localKubeInformers.Admissionregistration().V1().ValidatingWebhookConfigurations().Informer(), + Global: globalKubeInformers.Admissionregistration().V1().ValidatingWebhookConfigurations().Informer(), + }, + admissionregistrationv1alpha1.SchemeGroupVersion.WithResource("validatingadmissionpolicies"): { + Kind: "ValidatingAdmissionPolicy", + Local: localKubeInformers.Admissionregistration().V1alpha1().ValidatingAdmissionPolicies().Informer(), + Global: globalKubeInformers.Admissionregistration().V1alpha1().ValidatingAdmissionPolicies().Informer(), + }, + admissionregistrationv1alpha1.SchemeGroupVersion.WithResource("validatingadmissionpolicybindings"): { + Kind: "ValidatingAdmissionPolicyBinding", + Local: localKubeInformers.Admissionregistration().V1alpha1().ValidatingAdmissionPolicyBindings().Informer(), + Global: globalKubeInformers.Admissionregistration().V1alpha1().ValidatingAdmissionPolicyBindings().Informer(), + }, + corev1alpha1.SchemeGroupVersion.WithResource("shards"): { + Kind: "Shard", + Local: localKcpInformers.Core().V1alpha1().Shards().Informer(), + Global: globalKcpInformers.Core().V1alpha1().Shards().Informer(), + }, + corev1alpha1.SchemeGroupVersion.WithResource("logicalclusters"): { + Kind: "LogicalCluster", + Filter: func(u *unstructured.Unstructured) bool { + return u.GetAnnotations()[core.ReplicateAnnotationKey] != "" + }, + Local: localKcpInformers.Core().V1alpha1().LogicalClusters().Informer(), + Global: globalKcpInformers.Core().V1alpha1().LogicalClusters().Informer(), + }, + tenancyv1alpha1.SchemeGroupVersion.WithResource("workspacetypes"): { + Kind: "WorkspaceType", + Local: localKcpInformers.Tenancy().V1alpha1().WorkspaceTypes().Informer(), + Global: globalKcpInformers.Tenancy().V1alpha1().WorkspaceTypes().Informer(), + }, + rbacv1.SchemeGroupVersion.WithResource("clusterroles"): { + Kind: "ClusterRole", + Filter: func(u *unstructured.Unstructured) bool { + return u.GetAnnotations()[core.ReplicateAnnotationKey] != "" + }, + Local: localKubeInformers.Rbac().V1().ClusterRoles().Informer(), + Global: globalKubeInformers.Rbac().V1().ClusterRoles().Informer(), + }, + rbacv1.SchemeGroupVersion.WithResource("clusterrolebindings"): { + Kind: "ClusterRoleBinding", + Filter: func(u *unstructured.Unstructured) bool { + return u.GetAnnotations()[core.ReplicateAnnotationKey] != "" + }, + Local: localKubeInformers.Rbac().V1().ClusterRoleBindings().Informer(), + Global: globalKubeInformers.Rbac().V1().ClusterRoleBindings().Informer(), + }, } + for _, controller := range gvrs { + indexers.AddIfNotPresentOrDie( + controller.Global.GetIndexer(), + cache.Indexers{ + replication.ByShardAndLogicalClusterAndNamespaceAndName: replication.IndexByShardAndLogicalClusterAndNamespace, + }, + ) + } return s.registerController(&controllerWrapper{ Name: replication.ControllerName, + Wait: func(ctx context.Context, s *Server) error { + return wait.PollUntilContextCancel(ctx, time.Millisecond*100, true, func(ctx context.Context) (bool, error) { + localApiExportsSynced := localKcpInformers.Apis().V1alpha1().APIExports().Informer().HasSynced() + globalApiExportsSynced := globalKcpInformers.Apis().V1alpha1().APIExports().Informer().HasSynced() + localAPIResourceSchemasSynced := localKcpInformers.Apis().V1alpha1().APIResourceSchemas().Informer().HasSynced() + globalAPIResourceSchemasSynced := globalKcpInformers.Apis().V1alpha1().APIResourceSchemas().Informer().HasSynced() + localAPIConversionsSynced := localKcpInformers.Apis().V1alpha1().APIConversions().Informer().HasSynced() + globalAPIConversionsSynced := globalKcpInformers.Apis().V1alpha1().APIConversions().Informer().HasSynced() + localMutatingWebhookConfigurationsSynced := localKubeInformers.Admissionregistration().V1().MutatingWebhookConfigurations().Informer().HasSynced() + globalMutatingWebhookConfigurationsSynced := globalKubeInformers.Admissionregistration().V1().MutatingWebhookConfigurations().Informer().HasSynced() + localValidatingWebhookConfigurationsSynced := localKubeInformers.Admissionregistration().V1().ValidatingWebhookConfigurations().Informer().HasSynced() + globalValidatingWebhookConfigurationsSynced := globalKubeInformers.Admissionregistration().V1().ValidatingWebhookConfigurations().Informer().HasSynced() + localValidatingAdmissionPoliciesSynced := localKubeInformers.Admissionregistration().V1alpha1().ValidatingAdmissionPolicies().Informer().HasSynced() + globalValidatingAdmissionPoliciesSynced := globalKubeInformers.Admissionregistration().V1alpha1().ValidatingAdmissionPolicies().Informer().HasSynced() + localValidatingAdmissionPolicyBindingsSynced := localKubeInformers.Admissionregistration().V1alpha1().ValidatingAdmissionPolicyBindings().Informer().HasSynced() + globalValidatingAdmissionPolicyBindingsSynced := globalKubeInformers.Admissionregistration().V1alpha1().ValidatingAdmissionPolicyBindings().Informer().HasSynced() + localShardsSynced := localKcpInformers.Core().V1alpha1().Shards().Informer().HasSynced() + globalShardsSynced := globalKcpInformers.Core().V1alpha1().Shards().Informer().HasSynced() + localLogicalClustersSynced := localKcpInformers.Core().V1alpha1().LogicalClusters().Informer().HasSynced() + globalLogicalClustersSynced := globalKcpInformers.Core().V1alpha1().LogicalClusters().Informer().HasSynced() + localWorkspaceTypesSynced := localKcpInformers.Tenancy().V1alpha1().WorkspaceTypes().Informer().HasSynced() + globalWorkspaceTypesSynced := globalKcpInformers.Tenancy().V1alpha1().WorkspaceTypes().Informer().HasSynced() + localClusterRolesSynced := localKubeInformers.Rbac().V1().ClusterRoles().Informer().HasSynced() + globalClusterRolesSynced := globalKubeInformers.Rbac().V1().ClusterRoles().Informer().HasSynced() + localClusterRoleBindingsSynced := localKubeInformers.Rbac().V1().ClusterRoleBindings().Informer().HasSynced() + globalClusterRoleBindingsSynced := globalKubeInformers.Rbac().V1().ClusterRoleBindings().Informer().HasSynced() + return localApiExportsSynced && globalApiExportsSynced && localAPIResourceSchemasSynced && globalAPIResourceSchemasSynced && localAPIConversionsSynced && + globalAPIConversionsSynced && localMutatingWebhookConfigurationsSynced && globalMutatingWebhookConfigurationsSynced && localValidatingWebhookConfigurationsSynced && + globalValidatingWebhookConfigurationsSynced && localValidatingAdmissionPoliciesSynced && globalValidatingAdmissionPoliciesSynced && localValidatingAdmissionPolicyBindingsSynced && + globalValidatingAdmissionPolicyBindingsSynced && localShardsSynced && globalShardsSynced && localLogicalClustersSynced && globalLogicalClustersSynced && localWorkspaceTypesSynced && + globalWorkspaceTypesSynced && localClusterRolesSynced && globalClusterRolesSynced && localClusterRoleBindingsSynced && globalClusterRoleBindingsSynced, nil + }) + }, Runner: func(ctx context.Context) { + // TODO(sttts): set user agent + controller, err := replication.NewController( + s.Options.Extra.ShardName, + s.CacheDynamicClient, + s.KcpSharedInformerFactory, + s.CacheKcpSharedInformerFactory, + s.KubeSharedInformerFactory, + s.CacheKubeSharedInformerFactory, + gvrs, + ) + if err != nil { + panic(err) + } controller.Start(ctx, 2) }, }) @@ -1226,21 +1523,20 @@ func (s *Server) installGarbageCollectorController(ctx context.Context, config * workersPerLogicalCluster = 1 ) - c, err := garbagecollector.NewController( - s.KcpSharedInformerFactory.Core().V1alpha1().LogicalClusters(), - kubeClusterClient, - metadataClient, - s.DiscoveringDynamicSharedInformerFactory, - workersPerLogicalCluster, - s.syncedCh, - ) - if err != nil { - return err - } - return s.registerController(&controllerWrapper{ Name: garbagecollector.ControllerName, Runner: func(ctx context.Context) { + c, err := garbagecollector.NewController( + s.KcpSharedInformerFactory.Core().V1alpha1().LogicalClusters(), + kubeClusterClient, + metadataClient, + s.DiscoveringDynamicSharedInformerFactory, + workersPerLogicalCluster, + s.syncedCh, + ) + if err != nil { + panic(err) + } c.Start(ctx, 2) }, })