Skip to content

Commit

Permalink
leader election issue fixed for all the kcp controlelrs
Browse files Browse the repository at this point in the history
Signed-off-by: sankarm <[email protected]>
  • Loading branch information
msankara committed Apr 16, 2024
1 parent f79c2d5 commit f57951c
Show file tree
Hide file tree
Showing 18 changed files with 600 additions and 552 deletions.
5 changes: 0 additions & 5 deletions pkg/permissionclaim/permissionclaim_labeler.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (
"github.com/kcp-dev/logicalcluster/v3"

"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/tools/cache"
"k8s.io/klog/v2"

"github.com/kcp-dev/kcp/pkg/indexers"
Expand All @@ -46,10 +45,6 @@ func NewLabeler(
apiBindingInformer apisv1alpha1informers.APIBindingClusterInformer,
apiExportInformer, globalAPIExportInformer apisv1alpha1informers.APIExportClusterInformer,
) *Labeler {
indexers.AddIfNotPresentOrDie(apiExportInformer.Informer().GetIndexer(), cache.Indexers{
indexers.ByLogicalClusterPathAndName: indexers.IndexByLogicalClusterPathAndName,
})

return &Labeler{
listAPIBindingsAcceptingClaimedGroupResource: func(clusterName logicalcluster.Name, groupResource schema.GroupResource) ([]*apisv1alpha1.APIBinding, error) {
indexKey := indexers.ClusterAndGroupResourceValue(clusterName, groupResource)
Expand Down
15 changes: 0 additions & 15 deletions pkg/reconciler/apis/apibinding/apibinding_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,21 +164,6 @@ func NewController(

logger := logging.WithReconciler(klog.Background(), ControllerName)

// APIBinding indexers
indexers.AddIfNotPresentOrDie(apiBindingInformer.Informer().GetIndexer(), cache.Indexers{
indexers.APIBindingsByAPIExport: indexers.IndexAPIBindingByAPIExport,
})

// APIExport indexers
indexers.AddIfNotPresentOrDie(apiExportInformer.Informer().GetIndexer(), cache.Indexers{
indexers.ByLogicalClusterPathAndName: indexers.IndexByLogicalClusterPathAndName,
indexAPIExportsByAPIResourceSchema: indexAPIExportsByAPIResourceSchemasFunc,
})
indexers.AddIfNotPresentOrDie(globalAPIExportInformer.Informer().GetIndexer(), cache.Indexers{
indexers.ByLogicalClusterPathAndName: indexers.IndexByLogicalClusterPathAndName,
indexAPIExportsByAPIResourceSchema: indexAPIExportsByAPIResourceSchemasFunc,
})

// APIBinding handlers
_, _ = apiBindingInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) { c.enqueueAPIBinding(objOrTombstone[*apisv1alpha1.APIBinding](obj), logger, "") },
Expand Down
8 changes: 0 additions & 8 deletions pkg/reconciler/apis/apiexport/apiexport_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,14 +121,6 @@ func NewController(
commit: committer.NewCommitter[*APIExport, Patcher, *APIExportSpec, *APIExportStatus](kcpClusterClient.ApisV1alpha1().APIExports()),
}

indexers.AddIfNotPresentOrDie(
apiExportInformer.Informer().GetIndexer(),
cache.Indexers{
indexers.APIExportByIdentity: indexers.IndexAPIExportByIdentity,
indexers.APIExportBySecret: indexers.IndexAPIExportBySecret,
},
)

_, _ = apiExportInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
c.enqueueAPIExport(obj.(*apisv1alpha1.APIExport))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,14 +100,6 @@ func NewController(
commit: committer.NewCommitter[*APIExportEndpointSlice, Patcher, *APIExportEndpointSliceSpec, *APIExportEndpointSliceStatus](kcpClusterClient.ApisV1alpha1().APIExportEndpointSlices()),
}

indexers.AddIfNotPresentOrDie(globalAPIExportClusterInformer.Informer().GetIndexer(), cache.Indexers{
indexers.ByLogicalClusterPathAndName: indexers.IndexByLogicalClusterPathAndName,
})

indexers.AddIfNotPresentOrDie(apiExportEndpointSliceClusterInformer.Informer().GetIndexer(), cache.Indexers{
indexAPIExportEndpointSliceByAPIExport: indexAPIExportEndpointSliceByAPIExportFunc,
})

_, _ = apiExportEndpointSliceClusterInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
c.enqueueAPIExportEndpointSlice(obj)
Expand Down Expand Up @@ -158,10 +150,6 @@ func NewController(
},
)

indexers.AddIfNotPresentOrDie(apiExportEndpointSliceClusterInformer.Informer().GetIndexer(), cache.Indexers{
indexAPIExportEndpointSlicesByPartition: indexAPIExportEndpointSlicesByPartitionFunc,
})

return c, nil
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,51 +16,7 @@ limitations under the License.

package apiexportendpointslice

import (
"fmt"

"github.com/kcp-dev/logicalcluster/v3"

apisv1alpha1 "github.com/kcp-dev/kcp/sdk/apis/apis/v1alpha1"
"github.com/kcp-dev/kcp/sdk/client"
)

const (
indexAPIExportEndpointSliceByAPIExport = "indexAPIExportEndpointSliceByAPIExport"
indexAPIExportEndpointSlicesByPartition = "indexAPIExportEndpointSlicesByPartition"
)

// indexAPIExportEndpointSliceByAPIExportFunc indexes the APIExportEndpointSlice by their APIExport's Reference Path and Name.
func indexAPIExportEndpointSliceByAPIExportFunc(obj interface{}) ([]string, error) {
apiExportEndpointSlice, ok := obj.(*apisv1alpha1.APIExportEndpointSlice)
if !ok {
return []string{}, fmt.Errorf("obj %T is not an APIExportEndpointSlice", obj)
}

path := logicalcluster.NewPath(apiExportEndpointSlice.Spec.APIExport.Path)
if path.Empty() {
path = logicalcluster.From(apiExportEndpointSlice).Path()
}
return []string{path.Join(apiExportEndpointSlice.Spec.APIExport.Name).String()}, nil
}

// indexAPIExportEndpointSlicesByPartitionFunc is an index function that maps a Partition to the key for its
// spec.partition.
func indexAPIExportEndpointSlicesByPartitionFunc(obj interface{}) ([]string, error) {
slice, ok := obj.(*apisv1alpha1.APIExportEndpointSlice)
if !ok {
return []string{}, fmt.Errorf("obj is supposed to be an APIExportEndpointSlice, but is %T", obj)
}

if slice.Spec.Partition != "" {
clusterName := logicalcluster.From(slice).Path()
if !ok {
// this will never happen due to validation
return []string{}, fmt.Errorf("cluster information missing")
}
key := client.ToClusterAwareKey(clusterName, slice.Spec.Partition)
return []string{key}, nil
}

return []string{}, nil
}
7 changes: 0 additions & 7 deletions pkg/reconciler/apis/crdcleanup/crdcleanup_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,13 +71,6 @@ func NewController(
},
}

indexers.AddIfNotPresentOrDie(
apiBindingInformer.Informer().GetIndexer(),
cache.Indexers{
indexers.APIBindingByBoundResourceUID: indexers.IndexAPIBindingByBoundResourceUID,
},
)

_, _ = crdInformer.Informer().AddEventHandler(cache.FilteringResourceEventHandler{
FilterFunc: func(obj interface{}) bool {
crd := obj.(*apiextensionsv1.CustomResourceDefinition)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,14 +106,6 @@ func NewController(

logger := logging.WithReconciler(klog.Background(), ControllerName)

indexers.AddIfNotPresentOrDie(apiExportInformer.Informer().GetIndexer(), cache.Indexers{
indexers.ByLogicalClusterPathAndName: indexers.IndexByLogicalClusterPathAndName,
})

indexers.AddIfNotPresentOrDie(apiBindingInformer.Informer().GetIndexer(), cache.Indexers{
indexers.APIBindingsByAPIExport: indexers.IndexAPIBindingByAPIExport,
})

_, _ = apiExportInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) { c.enqueueAPIExport(obj, logger) },
UpdateFunc: func(_, obj interface{}) { c.enqueueAPIExport(obj, logger) },
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,10 +78,6 @@ func NewController(
commit: committer.NewCommitter[*APIBinding, Patcher, *APIBindingSpec, *APIBindingStatus](kcpClusterClient.ApisV1alpha1().APIBindings()),
}

indexers.AddIfNotPresentOrDie(apiExportInformer.Informer().GetIndexer(), cache.Indexers{
indexers.ByLogicalClusterPathAndName: indexers.IndexByLogicalClusterPathAndName,
})

_, _ = apiBindingInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) { c.enqueueAPIBinding(obj, logger) },
UpdateFunc: func(_, newObj interface{}) {
Expand Down Expand Up @@ -125,7 +121,7 @@ func (c *controller) enqueueAPIBinding(obj interface{}, logger logr.Logger) {
return
}

logging.WithQueueKey(logger, key).V(4).Info("queueing APIBinding")
logging.WithQueueKey(logger, key).V(2).Info("queueing APIBinding")
c.queue.Add(key)
}

Expand Down Expand Up @@ -161,7 +157,7 @@ func (c *controller) processNextWorkItem(ctx context.Context) bool {

logger := logging.WithQueueKey(klog.FromContext(ctx), key)
ctx = klog.NewContext(ctx, logger)
logger.V(4).Info("processing key")
logger.V(1).Info("processing key")

// No matter what, tell the queue we're done with this key, to unblock
// other workers.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,9 @@ import (
"k8s.io/apimachinery/pkg/runtime/schema"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue"
"k8s.io/klog/v2"

"github.com/kcp-dev/kcp/pkg/indexers"
"github.com/kcp-dev/kcp/pkg/informer"
"github.com/kcp-dev/kcp/pkg/logging"
"github.com/kcp-dev/kcp/pkg/permissionclaim"
Expand All @@ -55,14 +53,6 @@ func NewResourceController(
apiBindingInformer apisv1alpha1informers.APIBindingClusterInformer,
apiExportInformer, globalAPIExportInformer apisv1alpha1informers.APIExportClusterInformer,
) (*resourceController, error) {
if err := apiBindingInformer.Informer().GetIndexer().AddIndexers(
cache.Indexers{
indexers.APIBindingByClusterAndAcceptedClaimedGroupResources: indexers.IndexAPIBindingByClusterAndAcceptedClaimedGroupResources,
},
); err != nil {
return nil, err
}

c := &resourceController{
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), ResourceControllerName),
kcpClusterClient: kcpClusterClient,
Expand Down Expand Up @@ -102,7 +92,7 @@ func (c *resourceController) enqueueForResource(logger logr.Logger, gvr schema.G
}

queueKey += "::" + key
logging.WithQueueKey(logger, queueKey).V(4).Info("queuing resource")
logging.WithQueueKey(logger, queueKey).V(2).Info("queuing resource")
c.queue.Add(queueKey)
}

Expand Down Expand Up @@ -138,7 +128,7 @@ func (c *resourceController) processNextWorkItem(ctx context.Context) bool {

logger := logging.WithQueueKey(klog.FromContext(ctx), key)
ctx = klog.NewContext(ctx, logger)
logger.V(4).Info("processing key")
logger.V(1).Info("processing key")

// No matter what, tell the queue we're done with this key, to unblock
// other workers.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ import (
"k8s.io/client-go/util/workqueue"
"k8s.io/klog/v2"

"github.com/kcp-dev/kcp/pkg/indexers"
"github.com/kcp-dev/kcp/pkg/logging"
"github.com/kcp-dev/kcp/pkg/reconciler/cache/labelclusterroles"
"github.com/kcp-dev/kcp/pkg/reconciler/cache/replication"
Expand Down Expand Up @@ -83,10 +82,6 @@ func NewController(
commit: committer.NewStatuslessCommitter[*rbacv1.ClusterRoleBinding, rbacclientv1.ClusterRoleBindingInterface](kubeClusterClient.RbacV1().ClusterRoleBindings(), committer.ShallowCopy[rbacv1.ClusterRoleBinding]),
}

indexers.AddIfNotPresentOrDie(clusterRoleBindingInformer.Informer().GetIndexer(), cache.Indexers{
labelclusterroles.ClusterRoleBindingByClusterRoleName: labelclusterroles.IndexClusterRoleBindingByClusterRoleName,
})

_, _ = clusterRoleBindingInformer.Informer().AddEventHandler(cache.FilteringResourceEventHandler{
FilterFunc: replication.IsNoSystemClusterName,
Handler: cache.ResourceEventHandlerFuncs{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ import (
"k8s.io/client-go/util/workqueue"
"k8s.io/klog/v2"

"github.com/kcp-dev/kcp/pkg/indexers"
"github.com/kcp-dev/kcp/pkg/logging"
"github.com/kcp-dev/kcp/pkg/reconciler/cache/replication"
"github.com/kcp-dev/kcp/pkg/reconciler/committer"
Expand Down Expand Up @@ -82,10 +81,6 @@ func NewController(
commit: committer.NewStatuslessCommitter[*rbacv1.ClusterRole, rbacclientv1.ClusterRoleInterface](kubeClusterClient.RbacV1().ClusterRoles(), committer.ShallowCopy[rbacv1.ClusterRole]),
}

indexers.AddIfNotPresentOrDie(clusterRoleBindingInformer.Informer().GetIndexer(), cache.Indexers{
ClusterRoleBindingByClusterRoleName: IndexClusterRoleBindingByClusterRoleName,
})

_, _ = clusterRoleInformer.Informer().AddEventHandler(cache.FilteringResourceEventHandler{
FilterFunc: replication.IsNoSystemClusterName,
Handler: cache.ResourceEventHandlerFuncs{
Expand Down
Loading

0 comments on commit f57951c

Please sign in to comment.