diff --git a/cmd/kcp-core/kcpcore.go b/cmd/kcp-core/kcpcore.go index 2ccc3b75f7d..c6ace6c3d1d 100644 --- a/cmd/kcp-core/kcpcore.go +++ b/cmd/kcp-core/kcpcore.go @@ -31,8 +31,8 @@ import ( "k8s.io/component-base/cli" cliflag "k8s.io/component-base/cli/flag" "k8s.io/component-base/cli/globalflag" - "k8s.io/component-base/config" "k8s.io/component-base/logs" + logsapiv1 "k8s.io/component-base/logs/api/v1" _ "k8s.io/component-base/logs/json/register" "k8s.io/component-base/term" "k8s.io/component-base/version" @@ -79,7 +79,7 @@ func main() { } serverOptions := options.NewOptions(rootDir) - serverOptions.Server.GenericControlPlane.Logs.Config.Verbosity = config.VerbosityLevel(2) + serverOptions.Server.GenericControlPlane.Logs.Verbosity = logsapiv1.VerbosityLevel(2) startCmd := &cobra.Command{ Use: "start", @@ -101,7 +101,7 @@ func main() { }, RunE: func(cmd *cobra.Command, args []string) error { // run as early as possible to avoid races later when some components (e.g. grpc) start early using klog - if err := serverOptions.Server.GenericControlPlane.Logs.ValidateAndApply(kcpfeatures.DefaultFeatureGate); err != nil { + if err := logsapiv1.ValidateAndApply(serverOptions.Server.GenericControlPlane.Logs, kcpfeatures.DefaultFeatureGate); err != nil { return err } diff --git a/cmd/kcp-front-proxy/main.go b/cmd/kcp-front-proxy/main.go index bbf33abdb9a..48dd1678051 100644 --- a/cmd/kcp-front-proxy/main.go +++ b/cmd/kcp-front-proxy/main.go @@ -31,6 +31,7 @@ import ( genericapiserver "k8s.io/apiserver/pkg/server" "k8s.io/component-base/cli" utilflag "k8s.io/component-base/cli/flag" + logsapiv1 "k8s.io/component-base/logs/api/v1" _ "k8s.io/component-base/logs/json/register" "k8s.io/component-base/version" @@ -62,7 +63,7 @@ forwards Common Name and Organizations to backend API servers in HTTP headers. The proxy terminates TLS and communicates with API servers via mTLS. Traffic is routed based on paths.`, RunE: func(cmd *cobra.Command, args []string) error { - if err := options.Logs.ValidateAndApply(kcpfeatures.DefaultFeatureGate); err != nil { + if err := logsapiv1.ValidateAndApply(options.Logs, kcpfeatures.DefaultFeatureGate); err != nil { return err } if err := options.Complete(); err != nil { diff --git a/cmd/kcp-front-proxy/options/options.go b/cmd/kcp-front-proxy/options/options.go index 9449f516cdd..2c9124727ce 100644 --- a/cmd/kcp-front-proxy/options/options.go +++ b/cmd/kcp-front-proxy/options/options.go @@ -19,8 +19,8 @@ package options import ( "github.com/spf13/pflag" - "k8s.io/component-base/config" "k8s.io/component-base/logs" + logsapiv1 "k8s.io/component-base/logs/api/v1" proxyoptions "github.com/kcp-dev/kcp/pkg/proxy/options" ) @@ -37,13 +37,13 @@ func NewOptions() *Options { } // Default to -v=2 - o.Logs.Config.Verbosity = config.VerbosityLevel(2) + o.Logs.Verbosity = logsapiv1.VerbosityLevel(2) return o } func (o *Options) AddFlags(fs *pflag.FlagSet) { o.Proxy.AddFlags(fs) - o.Logs.AddFlags(fs) + logsapiv1.AddFlags(o.Logs, fs) } func (o *Options) Complete() error { diff --git a/cmd/kcp/kcp.go b/cmd/kcp/kcp.go index 92d62b2be71..b15b652aef3 100644 --- a/cmd/kcp/kcp.go +++ b/cmd/kcp/kcp.go @@ -31,8 +31,8 @@ import ( "k8s.io/component-base/cli" cliflag "k8s.io/component-base/cli/flag" "k8s.io/component-base/cli/globalflag" - "k8s.io/component-base/config" "k8s.io/component-base/logs" + logsapiv1 "k8s.io/component-base/logs/api/v1" _ "k8s.io/component-base/logs/json/register" "k8s.io/component-base/term" "k8s.io/component-base/version" @@ -83,7 +83,7 @@ func main() { } serverOptions := options.NewOptions(rootDir) - serverOptions.Server.Core.GenericControlPlane.Logs.Config.Verbosity = config.VerbosityLevel(2) + serverOptions.Server.Core.GenericControlPlane.Logs.Verbosity = logsapiv1.VerbosityLevel(2) startCmd := &cobra.Command{ Use: "start", @@ -105,7 +105,7 @@ func main() { }, RunE: func(cmd *cobra.Command, args []string) error { // run as early as possible to avoid races later when some components (e.g. grpc) start early using klog - if err := serverOptions.Server.Core.GenericControlPlane.Logs.ValidateAndApply(kcpfeatures.DefaultFeatureGate); err != nil { + if err := logsapiv1.ValidateAndApply(serverOptions.Server.Core.GenericControlPlane.Logs, kcpfeatures.DefaultFeatureGate); err != nil { return err } diff --git a/cmd/syncer/cmd/syncer.go b/cmd/syncer/cmd/syncer.go index 50ae9b36d8d..976a7a2de7b 100644 --- a/cmd/syncer/cmd/syncer.go +++ b/cmd/syncer/cmd/syncer.go @@ -29,6 +29,7 @@ import ( "k8s.io/apimachinery/pkg/util/sets" genericapiserver "k8s.io/apiserver/pkg/server" "k8s.io/client-go/tools/clientcmd" + logsapiv1 "k8s.io/component-base/logs/api/v1" "k8s.io/component-base/version" "k8s.io/klog/v2" @@ -47,7 +48,7 @@ func NewSyncerCommand() *cobra.Command { Use: "syncer", Short: "Synchronizes resources in `kcp` assigned to the clusters", RunE: func(cmd *cobra.Command, args []string) error { - if err := options.Logs.ValidateAndApply(kcpfeatures.DefaultFeatureGate); err != nil { + if err := logsapiv1.ValidateAndApply(options.Logs, kcpfeatures.DefaultFeatureGate); err != nil { return err } if err := options.Complete(); err != nil { diff --git a/cmd/syncer/options/options.go b/cmd/syncer/options/options.go index 97e45824b73..870cb982bf0 100644 --- a/cmd/syncer/options/options.go +++ b/cmd/syncer/options/options.go @@ -24,8 +24,8 @@ import ( "github.com/spf13/pflag" - "k8s.io/component-base/config" "k8s.io/component-base/logs" + logsapiv1 "k8s.io/component-base/logs/api/v1" kcpfeatures "github.com/kcp-dev/kcp/pkg/features" workloadv1alpha1 "github.com/kcp-dev/kcp/sdk/apis/workload/v1alpha1" @@ -51,14 +51,14 @@ type Options struct { func NewOptions() *Options { // Default to -v=2 - logs := logs.NewOptions() - logs.Config.Verbosity = config.VerbosityLevel(2) + logsOptions := logs.NewOptions() + logsOptions.Verbosity = logsapiv1.VerbosityLevel(2) return &Options{ QPS: 30, Burst: 20, SyncedResourceTypes: []string{}, - Logs: logs, + Logs: logsOptions, APIImportPollInterval: 1 * time.Minute, DownstreamNamespaceCleanDelay: 30 * time.Second, } @@ -83,7 +83,7 @@ func (options *Options) AddFlags(fs *pflag.FlagSet) { fs.StringVar(&options.DNSImage, "dns-image", options.DNSImage, "kcp DNS server image.") fs.DurationVar(&options.DownstreamNamespaceCleanDelay, "downstream-namespace-clean-delay", options.DownstreamNamespaceCleanDelay, "Time to wait before deleting a downstream namespace, defaults to 30s.") - options.Logs.AddFlags(fs) + logsapiv1.AddFlags(options.Logs, fs) } func (options *Options) Complete() error { diff --git a/cmd/virtual-workspaces/command/cmd.go b/cmd/virtual-workspaces/command/cmd.go index 7a3c61387ac..40c4e042878 100644 --- a/cmd/virtual-workspaces/command/cmd.go +++ b/cmd/virtual-workspaces/command/cmd.go @@ -36,7 +36,7 @@ import ( genericapiserver "k8s.io/apiserver/pkg/server" "k8s.io/client-go/pkg/version" "k8s.io/client-go/tools/clientcmd" - "k8s.io/component-base/config" + logsapiv1 "k8s.io/component-base/logs/api/v1" "k8s.io/klog/v2" "github.com/kcp-dev/kcp/cmd/virtual-workspaces/options" @@ -52,7 +52,7 @@ func NewCommand(ctx context.Context, errout io.Writer) *cobra.Command { opts := options.NewOptions() // Default to -v=2 - opts.Logs.Config.Verbosity = config.VerbosityLevel(2) + opts.Logs.Verbosity = logsapiv1.VerbosityLevel(2) cmd := &cobra.Command{ Use: "workspaces", @@ -60,7 +60,7 @@ func NewCommand(ctx context.Context, errout io.Writer) *cobra.Command { Long: "Start the root virtual workspace apiserver to enable virtual workspace management.", RunE: func(c *cobra.Command, args []string) error { - if err := opts.Logs.ValidateAndApply(kcpfeatures.DefaultFeatureGate); err != nil { + if err := logsapiv1.ValidateAndApply(opts.Logs, kcpfeatures.DefaultFeatureGate); err != nil { return err } if err := opts.Validate(); err != nil { diff --git a/cmd/virtual-workspaces/options/options.go b/cmd/virtual-workspaces/options/options.go index 00e2997f263..bbf5fe2ca10 100644 --- a/cmd/virtual-workspaces/options/options.go +++ b/cmd/virtual-workspaces/options/options.go @@ -28,6 +28,7 @@ import ( utilerrors "k8s.io/apimachinery/pkg/util/errors" genericapiserveroptions "k8s.io/apiserver/pkg/server/options" "k8s.io/component-base/logs" + logsapiv1 "k8s.io/component-base/logs/api/v1" cacheoptions "github.com/kcp-dev/kcp/pkg/cache/client/options" corevwoptions "github.com/kcp-dev/kcp/pkg/virtual/options" @@ -53,7 +54,7 @@ type Options struct { Authorization corevwoptions.Authorization Audit genericapiserveroptions.AuditOptions - Logs logs.Options + Logs *logs.Options CoreVirtualWorkspaces corevwoptions.Options TmcVirtualWorkspaces tmcvwoptions.Options @@ -73,7 +74,7 @@ func NewOptions() *Options { Authentication: *genericapiserveroptions.NewDelegatingAuthenticationOptions(), Authorization: *corevwoptions.NewAuthorization(), Audit: *genericapiserveroptions.NewAuditOptions(), - Logs: *logs.NewOptions(), + Logs: logs.NewOptions(), CoreVirtualWorkspaces: *corevwoptions.NewOptions(), TmcVirtualWorkspaces: *tmcvwoptions.NewOptions(), @@ -92,7 +93,7 @@ func (o *Options) AddFlags(flags *pflag.FlagSet) { o.SecureServing.AddFlags(flags) o.Authentication.AddFlags(flags) o.Audit.AddFlags(flags) - o.Logs.AddFlags(flags) + logsapiv1.AddFlags(o.Logs, flags) o.CoreVirtualWorkspaces.AddFlags(flags) o.TmcVirtualWorkspaces.AddFlags(flags) diff --git a/pkg/admission/kubequota/kubequota_admission.go b/pkg/admission/kubequota/kubequota_admission.go index 82da3053b57..a8c2613bbbd 100644 --- a/pkg/admission/kubequota/kubequota_admission.go +++ b/pkg/admission/kubequota/kubequota_admission.go @@ -98,7 +98,7 @@ type KubeResourceQuota struct { lock sync.RWMutex delegates map[logicalcluster.Name]*stoppableQuotaAdmission - workspaceDeletionMonitorStarter sync.Once + logicalClusterDeletionMonitorStarter sync.Once } // ValidateInitialization validates all the expected fields are set. @@ -143,8 +143,8 @@ func (k *KubeResourceQuota) Validate(ctx context.Context, a admission.Attributes } } - k.workspaceDeletionMonitorStarter.Do(func() { - m := newLogicalClusterDeletionMonitor(k.logicalClusterInformer, k.stopQuotaAdmissionForCluster) + k.logicalClusterDeletionMonitorStarter.Do(func() { + m := NewLogicalClusterDeletionMonitor("kubequota-logicalcluster-deletion-monitor", k.logicalClusterInformer, k.stopQuotaAdmissionForCluster) go m.Start(k.serverDone) }) @@ -191,7 +191,7 @@ func (k *KubeResourceQuota) getOrCreateDelegate(clusterName logicalcluster.Name) }() const evaluatorWorkersPerWorkspace = 5 - quotaAdmission, err := resourcequota.NewResourceQuota(k.userSuppliedConfiguration, evaluatorWorkersPerWorkspace, ctx.Done()) + quotaAdmission, err := resourcequota.NewResourceQuota(k.userSuppliedConfiguration, evaluatorWorkersPerWorkspace) if err != nil { cancel() return nil, err @@ -202,6 +202,7 @@ func (k *KubeResourceQuota) getOrCreateDelegate(clusterName logicalcluster.Name) stop: cancel, } + delegate.SetDrainedNotification(ctx.Done()) delegate.SetResourceQuotaLister(k.scopingResourceQuotaInformer.Cluster(clusterName).Lister()) delegate.SetExternalKubeClientSet(k.kubeClusterClient.Cluster(clusterName.Path())) delegate.SetQuotaConfiguration(k.quotaConfiguration) diff --git a/pkg/admission/kubequota/kubequota_clusterworkspace_monitor.go b/pkg/admission/kubequota/kubequota_clusterworkspace_monitor.go index c035d6c6d58..7e67768800e 100644 --- a/pkg/admission/kubequota/kubequota_clusterworkspace_monitor.go +++ b/pkg/admission/kubequota/kubequota_clusterworkspace_monitor.go @@ -33,25 +33,25 @@ import ( corev1alpha1informers "github.com/kcp-dev/kcp/sdk/client/informers/externalversions/core/v1alpha1" ) -const logicalClusterDeletionMonitorControllerName = "kcp-kubequota-logical-cluster-deletion-monitor" - -// logicalClusterDeletionMonitor monitors LogicalClusters and terminates QuotaAdmission for a logical cluster -// when its corresponding workspace is deleted. -type logicalClusterDeletionMonitor struct { +// LogicalClusterDeletionMonitor monitors LogicalClusters and invokes stopFunc for each deleted LogicalCluster. +type LogicalClusterDeletionMonitor struct { + name string queue workqueue.RateLimitingInterface stopFunc func(name logicalcluster.Name) } -func newLogicalClusterDeletionMonitor( - workspaceInformer corev1alpha1informers.LogicalClusterClusterInformer, +func NewLogicalClusterDeletionMonitor( + name string, + logicalClusterInformer corev1alpha1informers.LogicalClusterClusterInformer, stopFunc func(logicalcluster.Name), -) *logicalClusterDeletionMonitor { - m := &logicalClusterDeletionMonitor{ - queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), logicalClusterDeletionMonitorControllerName), +) *LogicalClusterDeletionMonitor { + m := &LogicalClusterDeletionMonitor{ + name: name, + queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), name), stopFunc: stopFunc, } - workspaceInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ + logicalClusterInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ DeleteFunc: func(obj interface{}) { m.enqueue(obj) }, @@ -60,7 +60,7 @@ func newLogicalClusterDeletionMonitor( return m } -func (m *logicalClusterDeletionMonitor) enqueue(obj interface{}) { +func (m *LogicalClusterDeletionMonitor) enqueue(obj interface{}) { key, err := kcpcache.DeletionHandlingMetaClusterNamespaceKeyFunc(obj) if err != nil { runtime.HandleError(err) @@ -70,11 +70,11 @@ func (m *logicalClusterDeletionMonitor) enqueue(obj interface{}) { m.queue.Add(key) } -func (m *logicalClusterDeletionMonitor) Start(stop <-chan struct{}) { +func (m *LogicalClusterDeletionMonitor) Start(stop <-chan struct{}) { defer runtime.HandleCrash() defer m.queue.ShutDown() - logger := logging.WithReconciler(klog.Background(), logicalClusterDeletionMonitorControllerName) + logger := logging.WithReconciler(klog.Background(), m.name) logger.Info("Starting controller") defer logger.Info("Shutting down controller") @@ -83,12 +83,12 @@ func (m *logicalClusterDeletionMonitor) Start(stop <-chan struct{}) { <-stop } -func (m *logicalClusterDeletionMonitor) startWorker() { +func (m *LogicalClusterDeletionMonitor) startWorker() { for m.processNextWorkItem() { } } -func (m *logicalClusterDeletionMonitor) processNextWorkItem() bool { +func (m *LogicalClusterDeletionMonitor) processNextWorkItem() bool { // Wait until there is a new item in the working queue k, quit := m.queue.Get() if quit { @@ -101,7 +101,7 @@ func (m *logicalClusterDeletionMonitor) processNextWorkItem() bool { defer m.queue.Done(key) if err := m.process(key); err != nil { - runtime.HandleError(fmt.Errorf("logicalClusterDeletionMonitor failed to sync %q, err: %w", key, err)) + runtime.HandleError(fmt.Errorf("LogicalClusterDeletionMonitor failed to sync %q, err: %w", key, err)) m.queue.AddRateLimited(key) @@ -114,7 +114,7 @@ func (m *logicalClusterDeletionMonitor) processNextWorkItem() bool { return true } -func (m *logicalClusterDeletionMonitor) process(key string) error { +func (m *LogicalClusterDeletionMonitor) process(key string) error { clusterName, _, _, err := kcpcache.SplitMetaClusterNamespaceKey(key) if err != nil { runtime.HandleError(err) diff --git a/pkg/admission/mutatingwebhook/plugin.go b/pkg/admission/mutatingwebhook/plugin.go index aca889b0645..2639d3fb8d1 100644 --- a/pkg/admission/mutatingwebhook/plugin.go +++ b/pkg/admission/mutatingwebhook/plugin.go @@ -17,25 +17,27 @@ limitations under the License. package mutatingwebhook import ( + "bytes" "context" + "errors" + "fmt" "io" kcpkubernetesinformers "github.com/kcp-dev/client-go/informers" + kcpkubernetesclientset "github.com/kcp-dev/client-go/kubernetes" "github.com/kcp-dev/logicalcluster/v3" - admissionv1 "k8s.io/api/admission/v1" - admissionv1beta1 "k8s.io/api/admission/v1beta1" + "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apiserver/pkg/admission" "k8s.io/apiserver/pkg/admission/configuration" - "k8s.io/apiserver/pkg/admission/plugin/webhook/config" "k8s.io/apiserver/pkg/admission/plugin/webhook/generic" "k8s.io/apiserver/pkg/admission/plugin/webhook/mutating" - webhookutil "k8s.io/apiserver/pkg/util/webhook" - "k8s.io/client-go/informers" + genericapirequest "k8s.io/apiserver/pkg/endpoints/request" kcpinitializers "github.com/kcp-dev/kcp/pkg/admission/initializers" - "github.com/kcp-dev/kcp/pkg/admission/webhook" + apisv1alpha1 "github.com/kcp-dev/kcp/sdk/apis/apis/v1alpha1" + kcpinformers "github.com/kcp-dev/kcp/sdk/client/informers/externalversions" ) const ( @@ -43,88 +45,129 @@ const ( ) type Plugin struct { - // Using validating plugin, for the dispatcher to use. - // This plugins admit function will never be called. - mutating.Plugin - *webhook.WebhookDispatcher + *admission.Handler + config []byte + + // Injected/set via initializers + kubeClusterClient kcpkubernetesclientset.ClusterInterface + kubeSharedInformerFactory kcpkubernetesinformers.SharedInformerFactory + + getAPIBindings func(clusterName logicalcluster.Name) ([]*apisv1alpha1.APIBinding, error) } var ( _ = admission.MutationInterface(&Plugin{}) _ = admission.InitializationValidator(&Plugin{}) - _ = kcpinitializers.WantsKcpInformers(&Plugin{}) + _ = kcpinitializers.WantsKubeClusterClient(&Plugin{}) _ = kcpinitializers.WantsKubeInformers(&Plugin{}) + _ = kcpinitializers.WantsKcpInformers(&Plugin{}) ) -func NewMutatingAdmissionWebhook(configfile io.Reader) (*Plugin, error) { +func NewMutatingAdmissionWebhook(configFile io.Reader) (*Plugin, error) { p := &Plugin{ - Plugin: mutating.Plugin{Webhook: &generic.Webhook{}}, - WebhookDispatcher: webhook.NewWebhookDispatcher(), + Handler: admission.NewHandler(admission.Connect, admission.Create, admission.Delete, admission.Update), + } + if configFile != nil { + config, err := io.ReadAll(configFile) + if err != nil { + return nil, err + } + p.config = config } - p.WebhookDispatcher.Handler = admission.NewHandler(admission.Connect, admission.Create, admission.Delete, admission.Update) - dispatcherFactory := mutating.NewMutatingDispatcher(&p.Plugin) + return p, nil +} - // Making our own dispatcher so that we can control the webhook accessors. - kubeconfigFile, err := config.LoadConfig(configfile) +func Register(plugins *admission.Plugins) { + plugins.Register(PluginName, func(configFile io.Reader) (admission.Interface, error) { + return NewMutatingAdmissionWebhook(configFile) + }) +} + +func (p *Plugin) Admit(ctx context.Context, attr admission.Attributes, o admission.ObjectInterfaces) error { + cluster, err := genericapirequest.ValidClusterFrom(ctx) if err != nil { - return nil, err + return err + } + clusterName := cluster.Name + + var config io.Reader + if len(p.config) > 0 { + config = bytes.NewReader(p.config) } - cm, err := webhookutil.NewClientManager( - []schema.GroupVersion{ - admissionv1beta1.SchemeGroupVersion, - admissionv1.SchemeGroupVersion, - }, - admissionv1beta1.AddToScheme, - admissionv1.AddToScheme, - ) + + hookSource, err := p.getHookSource(clusterName, attr.GetResource().GroupResource()) if err != nil { - return nil, err + return err } - authInfoResolver, err := webhookutil.NewDefaultAuthenticationInfoResolver(kubeconfigFile) + + plugin, err := mutating.NewMutatingWebhook(config) if err != nil { - return nil, err + return fmt.Errorf("error creating mutating admission webhook: %w", err) } - // Set defaults which may be overridden later. - cm.SetAuthenticationInfoResolver(authInfoResolver) - cm.SetServiceResolver(webhookutil.NewDefaultServiceResolver()) - p.WebhookDispatcher.SetDispatcher(dispatcherFactory(&cm)) - // Need to do this, to make sure that the underlying objects for the call to ShouldCallHook have the right values - p.Plugin.Webhook, err = generic.NewWebhook(p.Handler, configfile, configuration.NewMutatingWebhookConfigurationManager, dispatcherFactory) + plugin.SetExternalKubeClientSet(p.kubeClusterClient.Cluster(clusterName.Path())) + plugin.SetNamespaceInformer(p.kubeSharedInformerFactory.Core().V1().Namespaces().Cluster(clusterName)) + plugin.SetHookSource(hookSource) + plugin.SetReadyFuncFromKCP(p.kubeSharedInformerFactory.Core().V1().Namespaces().Cluster(clusterName)) + + if err := plugin.ValidateInitialization(); err != nil { + return fmt.Errorf("error validating MutatingWebhook initialization: %w", err) + } + + return plugin.Admit(ctx, attr, o) +} + +func (p *Plugin) getHookSource(clusterName logicalcluster.Name, groupResource schema.GroupResource) (generic.Source, error) { + clusterNameForGroupResource, err := p.getSourceClusterForGroupResource(clusterName, groupResource) if err != nil { return nil, err } - // Override the ready func + return configuration.NewMutatingWebhookConfigurationManagerForInformer( + p.kubeSharedInformerFactory.Admissionregistration().V1().MutatingWebhookConfigurations().Cluster(clusterNameForGroupResource), + ), nil +} - p.SetReadyFunc(func() bool { - if p.WebhookDispatcher.HasSynced() && p.Plugin.WaitForReady() { - return true +func (p *Plugin) getSourceClusterForGroupResource(clusterName logicalcluster.Name, groupResource schema.GroupResource) (logicalcluster.Name, error) { + objs, err := p.getAPIBindings(clusterName) + if err != nil { + return "", err + } + + for _, apiBinding := range objs { + for _, br := range apiBinding.Status.BoundResources { + if br.Group == groupResource.Group && br.Resource == groupResource.Resource { + // GroupResource comes from an APIBinding/APIExport + return logicalcluster.Name(apiBinding.Status.APIExportClusterName), nil + } } - return false - }) - return p, nil -} + } -func Register(plugins *admission.Plugins) { - plugins.Register(PluginName, func(configFile io.Reader) (admission.Interface, error) { - return NewMutatingAdmissionWebhook(configFile) - }) + // GroupResource is local to this cluster + return clusterName, nil } -func (p *Plugin) Admit(ctx context.Context, attr admission.Attributes, o admission.ObjectInterfaces) error { - return p.WebhookDispatcher.Dispatch(ctx, attr, o) +func (p *Plugin) ValidateInitialization() error { + if p.kubeClusterClient == nil { + return errors.New("missing kubeClusterClient") + } + if p.kubeSharedInformerFactory == nil { + return errors.New("missing kubeSharedInformerFactory") + } + return nil } -// SetExternalKubeInformerFactory implements the WantsExternalKubeInformerFactory interface. -func (p *Plugin) SetExternalKubeInformerFactory(f informers.SharedInformerFactory) { - p.Plugin.SetExternalKubeInformerFactory(f) // for namespaces +func (p *Plugin) SetKubeClusterClient(client kcpkubernetesclientset.ClusterInterface) { + p.kubeClusterClient = client } func (p *Plugin) SetKubeInformers(local, global kcpkubernetesinformers.SharedInformerFactory) { - p.WebhookDispatcher.SetHookSource(func(cluster logicalcluster.Name) generic.Source { - informer := global.Admissionregistration().V1().MutatingWebhookConfigurations().Cluster(cluster) - return configuration.NewMutatingWebhookConfigurationManagerForInformer(informer) - }, global.Admissionregistration().V1().MutatingWebhookConfigurations().Informer().HasSynced) + p.kubeSharedInformerFactory = local +} + +func (p *Plugin) SetKcpInformers(local, global kcpinformers.SharedInformerFactory) { + p.getAPIBindings = func(clusterName logicalcluster.Name) ([]*apisv1alpha1.APIBinding, error) { + return local.Apis().V1alpha1().APIBindings().Lister().Cluster(clusterName).List(labels.Everything()) + } } diff --git a/pkg/admission/plugins.go b/pkg/admission/plugins.go index 518763f6944..18ef37002cb 100644 --- a/pkg/admission/plugins.go +++ b/pkg/admission/plugins.go @@ -21,6 +21,7 @@ import ( "k8s.io/apiserver/pkg/admission" "k8s.io/apiserver/pkg/admission/plugin/namespace/lifecycle" "k8s.io/apiserver/pkg/admission/plugin/resourcequota" + "k8s.io/apiserver/pkg/admission/plugin/validatingadmissionpolicy" mutatingwebhook "k8s.io/apiserver/pkg/admission/plugin/webhook/mutating" validatingwebhook "k8s.io/apiserver/pkg/admission/plugin/webhook/validating" kubeapiserveroptions "k8s.io/kubernetes/pkg/kubeapiserver/options" @@ -59,6 +60,7 @@ import ( "github.com/kcp-dev/kcp/pkg/admission/reservedmetadata" "github.com/kcp-dev/kcp/pkg/admission/reservednames" "github.com/kcp-dev/kcp/pkg/admission/shard" + kcpvalidatingadmissionpolicy "github.com/kcp-dev/kcp/pkg/admission/validatingadmissionpolicy" kcpvalidatingwebhook "github.com/kcp-dev/kcp/pkg/admission/validatingwebhook" "github.com/kcp-dev/kcp/pkg/admission/workspace" "github.com/kcp-dev/kcp/pkg/admission/workspacetype" @@ -80,8 +82,9 @@ var AllOrderedPlugins = beforeWebhooks(kubeapiserveroptions.AllOrderedPlugins, apibinding.PluginName, apibindingfinalizer.PluginName, apiexportendpointslice.PluginName, - kcpvalidatingwebhook.PluginName, kcpmutatingwebhook.PluginName, + kcpvalidatingadmissionpolicy.PluginName, + kcpvalidatingwebhook.PluginName, kcplimitranger.PluginName, reservedcrdannotations.PluginName, reservedcrdgroups.PluginName, @@ -121,8 +124,9 @@ func RegisterAllKcpAdmissionPlugins(plugins *admission.Plugins) { apibindingfinalizer.Register(plugins) apiexportendpointslice.Register(plugins) workspacenamespacelifecycle.Register(plugins) - kcpvalidatingwebhook.Register(plugins) kcpmutatingwebhook.Register(plugins) + kcpvalidatingadmissionpolicy.Register(plugins) + kcpvalidatingwebhook.Register(plugins) kcplimitranger.Register(plugins) reservedcrdannotations.Register(plugins) reservedcrdgroups.Register(plugins) @@ -154,8 +158,9 @@ var defaultOnPluginsInKcp = sets.NewString( apibinding.PluginName, apibindingfinalizer.PluginName, apiexportendpointslice.PluginName, - kcpvalidatingwebhook.PluginName, kcpmutatingwebhook.PluginName, + kcpvalidatingadmissionpolicy.PluginName, + kcpvalidatingwebhook.PluginName, reservedcrdannotations.PluginName, reservedcrdgroups.PluginName, reservednames.PluginName, @@ -177,6 +182,7 @@ var defaultOnKubePluginsInKube = sets.NewString( defaulttolerationseconds.PluginName, // DefaultTolerationSeconds mutatingwebhook.PluginName, // MutatingAdmissionWebhook validatingwebhook.PluginName, // ValidatingAdmissionWebhook + validatingadmissionpolicy.PluginName, // ValidatingAdmissionPolicy resourcequota.PluginName, // ResourceQuota storageobjectinuseprotection.PluginName, // StorageObjectInUseProtection podpriority.PluginName, // PodPriority diff --git a/pkg/admission/validatingadmissionpolicy/validating_admission_policy.go b/pkg/admission/validatingadmissionpolicy/validating_admission_policy.go new file mode 100644 index 00000000000..579ed17eb00 --- /dev/null +++ b/pkg/admission/validatingadmissionpolicy/validating_admission_policy.go @@ -0,0 +1,204 @@ +/* +Copyright 2023 The KCP Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package validatingadmissionpolicy + +import ( + "context" + "io" + "sync" + + kcpdynamic "github.com/kcp-dev/client-go/dynamic" + kcpkubernetesinformers "github.com/kcp-dev/client-go/informers" + kcpkubernetesclientset "github.com/kcp-dev/client-go/kubernetes" + "github.com/kcp-dev/logicalcluster/v3" + + "k8s.io/apiserver/pkg/admission" + "k8s.io/apiserver/pkg/admission/plugin/validatingadmissionpolicy" + "k8s.io/apiserver/pkg/dynamichack" + genericapirequest "k8s.io/apiserver/pkg/endpoints/request" + "k8s.io/client-go/discovery/cached/memory" + "k8s.io/client-go/dynamic" + "k8s.io/client-go/restmapper" + "k8s.io/component-base/featuregate" + "k8s.io/klog/v2" + + "github.com/kcp-dev/kcp/pkg/admission/initializers" + "github.com/kcp-dev/kcp/pkg/admission/kubequota" + kcpinformers "github.com/kcp-dev/kcp/sdk/client/informers/externalversions" + corev1alpha1informers "github.com/kcp-dev/kcp/sdk/client/informers/externalversions/core/v1alpha1" +) + +const PluginName = "KCPValidatingAdmissionPolicy" + +func Register(plugins *admission.Plugins) { + plugins.Register(PluginName, + func(config io.Reader) (admission.Interface, error) { + return NewKubeValidatingAdmissionPolicy(), nil + }, + ) +} + +func NewKubeValidatingAdmissionPolicy() *KubeValidatingAdmissionPolicy { + return &KubeValidatingAdmissionPolicy{} +} + +type KubeValidatingAdmissionPolicy struct { + *admission.Handler + + // Injected/set via initializers + logicalClusterInformer corev1alpha1informers.LogicalClusterClusterInformer + kubeClusterClient kcpkubernetesclientset.ClusterInterface + dynamicClusterClient kcpdynamic.ClusterInterface + kubeSharedInformerFactory kcpkubernetesinformers.SharedInformerFactory + serverDone <-chan struct{} + featureGates featuregate.FeatureGate + + lock sync.RWMutex + delegates map[logicalcluster.Name]*stoppableValidatingAdmissionPolicy + + logicalClusterDeletionMonitorStarter sync.Once +} + +var _ admission.ValidationInterface = &KubeValidatingAdmissionPolicy{} +var _ = initializers.WantsKcpInformers(&KubeValidatingAdmissionPolicy{}) +var _ = initializers.WantsKubeClusterClient(&KubeValidatingAdmissionPolicy{}) +var _ = initializers.WantsServerShutdownChannel(&KubeValidatingAdmissionPolicy{}) + +func (k *KubeValidatingAdmissionPolicy) SetKubeClusterClient(kubeClusterClient kcpkubernetesclientset.ClusterInterface) { + k.kubeClusterClient = kubeClusterClient +} + +func (k *KubeValidatingAdmissionPolicy) SetKcpInformers(local, global kcpinformers.SharedInformerFactory) { + k.logicalClusterInformer = local.Core().V1alpha1().LogicalClusters() +} + +func (k *KubeValidatingAdmissionPolicy) SetKubeInformers(local, global kcpkubernetesinformers.SharedInformerFactory) { +} + +func (k *KubeValidatingAdmissionPolicy) SetServerShutdownChannel(ch <-chan struct{}) { + k.serverDone = ch +} + +func (k *KubeValidatingAdmissionPolicy) SetDynamicClient(c dynamic.Interface) { + k.dynamicClusterClient = dynamichack.Unwrap(c) +} + +func (k *KubeValidatingAdmissionPolicy) Validate(ctx context.Context, a admission.Attributes, o admission.ObjectInterfaces) error { + k.logicalClusterDeletionMonitorStarter.Do(func() { + m := kubequota.NewLogicalClusterDeletionMonitor("kubequota-logicalcluster-deletion-monitor", k.logicalClusterInformer, k.logicalClusterDeleted) + go m.Start(k.serverDone) + }) + + cluster, err := genericapirequest.ValidClusterFrom(ctx) + if err != nil { + return err + } + + delegate, err := k.getOrCreateDelegate(cluster.Name) + if err != nil { + return err + } + + return delegate.Validate(ctx, a, o) +} + +// getOrCreateDelegate creates an actual plugin for clusterName. +func (k *KubeValidatingAdmissionPolicy) getOrCreateDelegate(clusterName logicalcluster.Name) (*stoppableValidatingAdmissionPolicy, error) { + k.lock.RLock() + delegate := k.delegates[clusterName] + k.lock.RUnlock() + + if delegate != nil { + return delegate, nil + } + + k.lock.Lock() + defer k.lock.Unlock() + + delegate = k.delegates[clusterName] + if delegate != nil { + return delegate, nil + } + + // Set up a context that is cancelable and that is bounded by k.serverDone + ctx, cancel := context.WithCancel(context.Background()) + go func() { + // Wait for either the context or the server to be done. If it's the server, cancel the context. + select { + case <-ctx.Done(): + case <-k.serverDone: + cancel() + } + }() + + plugin, err := validatingadmissionpolicy.NewPlugin() + if err != nil { + return nil, err + } + + delegate = &stoppableValidatingAdmissionPolicy{ + CELAdmissionPlugin: plugin, + stop: cancel, + } + + plugin.SetNamespaceInformer(k.kubeSharedInformerFactory.Core().V1().Namespaces().Cluster(clusterName)) + plugin.SetValidatingAdmissionPoliciesInformer(k.kubeSharedInformerFactory.Admissionregistration().V1alpha1().ValidatingAdmissionPolicies().Cluster(clusterName)) + plugin.SetValidatingAdmissionPolicyBindingsInformer(k.kubeSharedInformerFactory.Admissionregistration().V1alpha1().ValidatingAdmissionPolicyBindings().Cluster(clusterName)) + plugin.SetExternalKubeClientSet(k.kubeClusterClient.Cluster(clusterName.Path())) + + // TODO(ncdc): this is super inefficient to do per workspace + discoveryClient := memory.NewMemCacheClient(k.kubeClusterClient.Cluster(clusterName.Path()).Discovery()) + restMapper := restmapper.NewDeferredDiscoveryRESTMapper(discoveryClient) + plugin.SetRESTMapper(restMapper) + + plugin.SetDynamicClient(k.dynamicClusterClient.Cluster(clusterName.Path())) + plugin.SetDrainedNotification(ctx.Done()) + plugin.InspectFeatureGates(k.featureGates) + + if err := plugin.ValidateInitialization(); err != nil { + cancel() + return nil, err + } + + k.delegates[clusterName] = delegate + + return delegate, nil +} + +func (k *KubeValidatingAdmissionPolicy) logicalClusterDeleted(clusterName logicalcluster.Name) { + k.lock.Lock() + defer k.lock.Unlock() + + delegate := k.delegates[clusterName] + + logger := klog.Background().WithValues("clusterName", clusterName) + + if delegate == nil { + logger.V(3).Info("received event to stop validating admission policy for logical cluster, but it wasn't in the map") + return + } + + logger.V(2).Info("stopping validating admission policy for logical cluster") + + delete(k.delegates, clusterName) + delegate.stop() +} + +type stoppableValidatingAdmissionPolicy struct { + *validatingadmissionpolicy.CELAdmissionPlugin + stop func() +} diff --git a/pkg/admission/validatingwebhook/plugin.go b/pkg/admission/validatingwebhook/plugin.go index 74a7929c64f..53fc9ccee84 100644 --- a/pkg/admission/validatingwebhook/plugin.go +++ b/pkg/admission/validatingwebhook/plugin.go @@ -17,25 +17,27 @@ limitations under the License. package validatingwebhook import ( + "bytes" "context" + "errors" + "fmt" "io" kcpkubernetesinformers "github.com/kcp-dev/client-go/informers" + kcpkubernetesclientset "github.com/kcp-dev/client-go/kubernetes" "github.com/kcp-dev/logicalcluster/v3" - admissionv1 "k8s.io/api/admission/v1" - admissionv1beta1 "k8s.io/api/admission/v1beta1" + "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apiserver/pkg/admission" "k8s.io/apiserver/pkg/admission/configuration" - "k8s.io/apiserver/pkg/admission/plugin/webhook/config" "k8s.io/apiserver/pkg/admission/plugin/webhook/generic" "k8s.io/apiserver/pkg/admission/plugin/webhook/validating" - webhookutil "k8s.io/apiserver/pkg/util/webhook" - kubernetesinformers "k8s.io/client-go/informers" + genericapirequest "k8s.io/apiserver/pkg/endpoints/request" kcpinitializers "github.com/kcp-dev/kcp/pkg/admission/initializers" - "github.com/kcp-dev/kcp/pkg/admission/webhook" + apisv1alpha1 "github.com/kcp-dev/kcp/sdk/apis/apis/v1alpha1" + kcpinformers "github.com/kcp-dev/kcp/sdk/client/informers/externalversions" ) const ( @@ -43,88 +45,129 @@ const ( ) type Plugin struct { - // Using validating plugin, for the dispatcher to use. - // This plugins admit function will never be called. - validating.Plugin - *webhook.WebhookDispatcher + *admission.Handler + config []byte + + // Injected/set via initializers + kubeClusterClient kcpkubernetesclientset.ClusterInterface + kubeSharedInformerFactory kcpkubernetesinformers.SharedInformerFactory + + getAPIBindings func(clusterName logicalcluster.Name) ([]*apisv1alpha1.APIBinding, error) } var ( _ = admission.ValidationInterface(&Plugin{}) _ = admission.InitializationValidator(&Plugin{}) - _ = kcpinitializers.WantsKcpInformers(&Plugin{}) + _ = kcpinitializers.WantsKubeClusterClient(&Plugin{}) _ = kcpinitializers.WantsKubeInformers(&Plugin{}) + _ = kcpinitializers.WantsKcpInformers(&Plugin{}) ) -func NewValidatingAdmissionWebhook(configfile io.Reader) (*Plugin, error) { +func NewValidatingAdmissionWebhook(configFile io.Reader) (*Plugin, error) { p := &Plugin{ - Plugin: validating.Plugin{Webhook: &generic.Webhook{}}, - WebhookDispatcher: webhook.NewWebhookDispatcher(), + Handler: admission.NewHandler(admission.Connect, admission.Create, admission.Delete, admission.Update), + } + if configFile != nil { + config, err := io.ReadAll(configFile) + if err != nil { + return nil, err + } + p.config = config } - p.WebhookDispatcher.Handler = admission.NewHandler(admission.Connect, admission.Create, admission.Delete, admission.Update) - dispatcherFactory := validating.NewValidatingDispatcher(&p.Plugin) + return p, nil +} - // Making our own dispatcher so that we can control the webhook accessors. - kubeconfigFile, err := config.LoadConfig(configfile) +func Register(plugins *admission.Plugins) { + plugins.Register(PluginName, func(configFile io.Reader) (admission.Interface, error) { + return NewValidatingAdmissionWebhook(configFile) + }) +} + +func (p *Plugin) Validate(ctx context.Context, attr admission.Attributes, o admission.ObjectInterfaces) error { + cluster, err := genericapirequest.ValidClusterFrom(ctx) if err != nil { - return nil, err + return err + } + clusterName := cluster.Name + + var config io.Reader + if len(p.config) > 0 { + config = bytes.NewReader(p.config) } - cm, err := webhookutil.NewClientManager( - []schema.GroupVersion{ - admissionv1beta1.SchemeGroupVersion, - admissionv1.SchemeGroupVersion, - }, - admissionv1beta1.AddToScheme, - admissionv1.AddToScheme, - ) + + hookSource, err := p.getHookSource(clusterName, attr.GetResource().GroupResource()) if err != nil { - return nil, err + return err } - authInfoResolver, err := webhookutil.NewDefaultAuthenticationInfoResolver(kubeconfigFile) + + plugin, err := validating.NewValidatingAdmissionWebhook(config) if err != nil { - return nil, err + return fmt.Errorf("error creating validating admission webhook: %w", err) } - // Set defaults which may be overridden later. - cm.SetAuthenticationInfoResolver(authInfoResolver) - cm.SetServiceResolver(webhookutil.NewDefaultServiceResolver()) - p.WebhookDispatcher.SetDispatcher(dispatcherFactory(&cm)) - // Need to do this, to make sure that the underlying objects for the call to ShouldCallHook have the right values - p.Plugin.Webhook, err = generic.NewWebhook(p.Handler, configfile, configuration.NewValidatingWebhookConfigurationManager, dispatcherFactory) + plugin.SetExternalKubeClientSet(p.kubeClusterClient.Cluster(clusterName.Path())) + plugin.SetNamespaceInformer(p.kubeSharedInformerFactory.Core().V1().Namespaces().Cluster(clusterName)) + plugin.SetHookSource(hookSource) + plugin.SetReadyFuncFromKCP(p.kubeSharedInformerFactory.Core().V1().Namespaces().Cluster(clusterName)) + + if err := plugin.ValidateInitialization(); err != nil { + return fmt.Errorf("error validating ValidatingAdmissionWebhook initialization: %w", err) + } + + return plugin.Validate(ctx, attr, o) +} + +func (p *Plugin) getHookSource(clusterName logicalcluster.Name, groupResource schema.GroupResource) (generic.Source, error) { + clusterNameForGroupResource, err := p.getSourceClusterForGroupResource(clusterName, groupResource) if err != nil { return nil, err } - // Override the ready func + return configuration.NewValidatingWebhookConfigurationManagerForInformer( + p.kubeSharedInformerFactory.Admissionregistration().V1().ValidatingWebhookConfigurations().Cluster(clusterNameForGroupResource), + ), nil +} - p.SetReadyFunc(func() bool { - if p.WebhookDispatcher.HasSynced() && p.Plugin.WaitForReady() { - return true +func (p *Plugin) getSourceClusterForGroupResource(clusterName logicalcluster.Name, groupResource schema.GroupResource) (logicalcluster.Name, error) { + objs, err := p.getAPIBindings(clusterName) + if err != nil { + return "", err + } + + for _, apiBinding := range objs { + for _, br := range apiBinding.Status.BoundResources { + if br.Group == groupResource.Group && br.Resource == groupResource.Resource { + // GroupResource comes from an APIBinding/APIExport + return logicalcluster.Name(apiBinding.Status.APIExportClusterName), nil + } } - return false - }) - return p, nil -} + } -func Register(plugins *admission.Plugins) { - plugins.Register(PluginName, func(configFile io.Reader) (admission.Interface, error) { - return NewValidatingAdmissionWebhook(configFile) - }) + // GroupResource is local to this cluster + return clusterName, nil } -func (p *Plugin) Validate(ctx context.Context, attr admission.Attributes, o admission.ObjectInterfaces) error { - return p.WebhookDispatcher.Dispatch(ctx, attr, o) +func (p *Plugin) ValidateInitialization() error { + if p.kubeClusterClient == nil { + return errors.New("missing kubeClusterClient") + } + if p.kubeSharedInformerFactory == nil { + return errors.New("missing kubeSharedInformerFactory") + } + return nil } -// SetExternalKubeInformerFactory implements the WantsExternalKubeInformerFactory interface. -func (p *Plugin) SetExternalKubeInformerFactory(f kubernetesinformers.SharedInformerFactory) { - p.Plugin.SetExternalKubeInformerFactory(f) // for namespaces +func (p *Plugin) SetKubeClusterClient(client kcpkubernetesclientset.ClusterInterface) { + p.kubeClusterClient = client } func (p *Plugin) SetKubeInformers(local, global kcpkubernetesinformers.SharedInformerFactory) { - p.WebhookDispatcher.SetHookSource(func(cluster logicalcluster.Name) generic.Source { - informer := global.Admissionregistration().V1().ValidatingWebhookConfigurations().Cluster(cluster) - return configuration.NewValidatingWebhookConfigurationManagerForInformer(informer) - }, global.Admissionregistration().V1().ValidatingWebhookConfigurations().Informer().HasSynced) + p.kubeSharedInformerFactory = local +} + +func (p *Plugin) SetKcpInformers(local, global kcpinformers.SharedInformerFactory) { + p.getAPIBindings = func(clusterName logicalcluster.Name) ([]*apisv1alpha1.APIBinding, error) { + return local.Apis().V1alpha1().APIBindings().Lister().Cluster(clusterName).List(labels.Everything()) + } } diff --git a/pkg/admission/webhook/generic_webhook.go b/pkg/admission/webhook/generic_webhook.go index b62dcd4fb08..9d79f6f7e20 100644 --- a/pkg/admission/webhook/generic_webhook.go +++ b/pkg/admission/webhook/generic_webhook.go @@ -27,7 +27,7 @@ import ( "k8s.io/apiserver/pkg/admission" "k8s.io/apiserver/pkg/admission/plugin/webhook" "k8s.io/apiserver/pkg/admission/plugin/webhook/generic" - "k8s.io/apiserver/pkg/admission/plugin/webhook/rules" + "k8s.io/apiserver/pkg/admission/plugin/webhook/predicates/rules" genericapirequest "k8s.io/apiserver/pkg/endpoints/request" "k8s.io/client-go/tools/cache" "k8s.io/klog/v2" @@ -109,7 +109,7 @@ func (p *WebhookDispatcher) SetDispatcher(dispatch generic.Dispatcher) { func (p *WebhookDispatcher) Dispatch(ctx context.Context, attr admission.Attributes, o admission.ObjectInterfaces) error { // If the object is a Webhook configuration, do not call webhooks // This is because we need some way to recover if a webhook is preventing a cluster resources from being updated - if rules.IsWebhookConfigurationResource(attr) { + if rules.IsExemptAdmissionConfigurationResource(attr) { return nil } lcluster, err := genericapirequest.ClusterNameFrom(ctx) diff --git a/pkg/authorization/decorator_test.go b/pkg/authorization/decorator_test.go index af2ca49dc2f..21c658cba9c 100644 --- a/pkg/authorization/decorator_test.go +++ b/pkg/authorization/decorator_test.go @@ -217,7 +217,11 @@ func TestDecorator(t *testing.T) { }, } { t.Run(name, func(t *testing.T) { - ctx := audit.WithAuditContext(context.Background(), newAuditContext(auditapis.LevelMetadata)) + ctx := audit.WithAuditContext(context.Background()) + auditCtx := audit.AuditContextFrom(ctx) + auditCtx.Event = &auditapis.Event{ + Level: auditapis.LevelMetadata, + } attr := authorizer.AttributesRecord{} dec, reason, _ := tc.authz.Authorize(ctx, attr) if dec != tc.wantDecision { @@ -233,11 +237,3 @@ func TestDecorator(t *testing.T) { }) } } - -func newAuditContext(l auditapis.Level) *audit.AuditContext { - return &audit.AuditContext{ - Event: &auditapis.Event{ - Level: l, - }, - } -} diff --git a/pkg/cache/server/config.go b/pkg/cache/server/config.go index 807191073d2..5b613eedd82 100644 --- a/pkg/cache/server/config.go +++ b/pkg/cache/server/config.go @@ -33,7 +33,6 @@ import ( "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apiserver/pkg/features" genericapiserver "k8s.io/apiserver/pkg/server" - genericoptions "k8s.io/apiserver/pkg/server/options" utilfeature "k8s.io/apiserver/pkg/util/feature" "k8s.io/client-go/rest" @@ -110,6 +109,9 @@ func NewConfig(opts *cacheserveroptions.CompletedOptions, optionalLocalShardRest if err := opts.ServerRunOptions.ApplyTo(&serverConfig.Config); err != nil { return nil, err } + if err := opts.Etcd.Complete(serverConfig.Config.StorageObjectCountTracker, serverConfig.Config.DrainedNotify(), serverConfig.Config.AddPostStartHook); err != nil { + return nil, err + } if err := opts.Etcd.ApplyTo(&serverConfig.Config); err != nil { return nil, err } @@ -150,7 +152,10 @@ func NewConfig(opts *cacheserveroptions.CompletedOptions, optionalLocalShardRest opts.Etcd.StorageConfig.Codec = apiextensionsapiserver.Codecs.LegacyCodec(apiextensionsv1beta1.SchemeGroupVersion, apiextensionsv1.SchemeGroupVersion) // prefer the more compact serialization (v1beta1) for storage until http://issue.k8s.io/82292 is resolved for objects whose v1 serialization is too big but whose v1beta1 serialization can be stored opts.Etcd.StorageConfig.EncodeVersioner = runtime.NewMultiGroupVersioner(apiextensionsv1beta1.SchemeGroupVersion, schema.GroupKind{Group: apiextensionsv1beta1.GroupName}) - serverConfig.RESTOptionsGetter = &genericoptions.SimpleRestOptionsFactory{Options: *opts.Etcd} + opts.Etcd.SkipHealthEndpoints = true // avoid double wiring of health checks + if err := opts.Etcd.ApplyTo(&serverConfig.Config); err != nil { + return nil, err + } // an ordered list of HTTP round trippers that add // shard and cluster awareness to all clients that use @@ -198,10 +203,15 @@ func NewConfig(opts *cacheserveroptions.CompletedOptions, optionalLocalShardRest resyncPeriod, ) + crdRESTOptionsGetter, err := apiextensionsoptions.NewCRDRESTOptionsGetter(*opts.Etcd) + if err != nil { + return nil, err + } + c.ApiExtensions = &apiextensionsapiserver.Config{ GenericConfig: serverConfig, ExtraConfig: apiextensionsapiserver.ExtraConfig{ - CRDRESTOptionsGetter: apiextensionsoptions.NewCRDRESTOptionsGetter(*opts.Etcd), + CRDRESTOptionsGetter: crdRESTOptionsGetter, MasterCount: 1, Client: c.ApiExtensionsClusterClient, Informers: c.ApiExtensionsSharedInformerFactory, diff --git a/pkg/conversion/conversion_rules.go b/pkg/conversion/conversion_rules.go index 1ed1624726d..8f3d30ee041 100644 --- a/pkg/conversion/conversion_rules.go +++ b/pkg/conversion/conversion_rules.go @@ -22,14 +22,13 @@ import ( "time" "github.com/google/cel-go/cel" - "github.com/google/cel-go/checker/decls" - expr "google.golang.org/genproto/googleapis/api/expr/v1alpha1" structuralschema "k8s.io/apiextensions-apiserver/pkg/apiserver/schema" - "k8s.io/apiextensions-apiserver/pkg/apiserver/schema/cel/library" - "k8s.io/apiextensions-apiserver/third_party/forked/celopenapi/model" + "k8s.io/apiextensions-apiserver/pkg/apiserver/schema/cel/model" kerrors "k8s.io/apimachinery/pkg/util/errors" "k8s.io/apimachinery/pkg/util/validation/field" + apiservercel "k8s.io/apiserver/pkg/cel" + "k8s.io/apiserver/pkg/cel/library" apisv1alpha1 "github.com/kcp-dev/kcp/sdk/apis/apis/v1alpha1" ) @@ -158,12 +157,12 @@ func createCELEnv(structuralSchema *structuralschema.Structural) (*cel.Env, erro return nil, fmt.Errorf("error creating CEL environment: %w", err) } - registry := model.NewRegistry(env) + registry := apiservercel.NewRegistry(env) // inline local copy of upstream's generateUniqueSelfTypeName() scopedTypeName := fmt.Sprintf("selfType%d", time.Now().Nanosecond()) - - ruleTypes, err := model.NewRuleTypes(scopedTypeName, structuralSchema, true, registry) + declType := model.SchemaDeclType(structuralSchema, true) + ruleTypes, err := apiservercel.NewRuleTypes(scopedTypeName, declType, registry) if err != nil { return nil, fmt.Errorf("error creating rule types: %w", err) } @@ -178,17 +177,18 @@ func createCELEnv(structuralSchema *structuralschema.Structural) (*cel.Env, erro root, ok := ruleTypes.FindDeclType(scopedTypeName) if !ok { - rootDecl := model.SchemaDeclType(structuralSchema, true) - if rootDecl == nil { - return nil, fmt.Errorf("unable to find CEL decl type for %s", structuralSchema.Type) + if declType == nil { + return nil, fmt.Errorf("rule declared on schema that does not support validation rules type: '%s' x-kubernetes-preserve-unknown-fields: '%t'", structuralSchema.Type, structuralSchema.XPreserveUnknownFields) } - root = rootDecl.MaybeAssignTypeName(scopedTypeName) + root = declType.MaybeAssignTypeName(scopedTypeName) } - var propDecls []*expr.Decl - propDecls = append(propDecls, decls.NewVar("self", root.ExprType())) + var propDecls []cel.EnvOption + propDecls = append(propDecls, cel.Variable("self", root.CelType())) - opts = append(opts, cel.Declarations(propDecls...), cel.HomogeneousAggregateLiterals()) + opts = append(opts, propDecls...) + opts = append(opts, cel.HomogeneousAggregateLiterals()) + opts = append(opts, cel.EagerlyValidateDeclarations(true), cel.DefaultUTCTimeZone(true)) opts = append(opts, library.ExtensionLibs...) return env.Extend(opts...) } diff --git a/pkg/features/kcp_features.go b/pkg/features/kcp_features.go index 89cbf9aae8d..b9d762967bd 100644 --- a/pkg/features/kcp_features.go +++ b/pkg/features/kcp_features.go @@ -27,7 +27,7 @@ import ( genericfeatures "k8s.io/apiserver/pkg/features" utilfeature "k8s.io/apiserver/pkg/util/feature" "k8s.io/component-base/featuregate" - "k8s.io/component-base/logs" + logsapi "k8s.io/component-base/logs/api/v1" ) const ( @@ -104,10 +104,14 @@ var defaultGenericControlPlaneFeatureGates = map[featuregate.Feature]featuregate genericfeatures.AdvancedAuditing: {Default: true, PreRelease: featuregate.GA}, genericfeatures.APIResponseCompression: {Default: true, PreRelease: featuregate.Beta}, genericfeatures.APIListChunking: {Default: true, PreRelease: featuregate.Beta}, - genericfeatures.DryRun: {Default: true, PreRelease: featuregate.GA}, - genericfeatures.ServerSideApply: {Default: true, PreRelease: featuregate.GA}, genericfeatures.APIPriorityAndFairness: {Default: true, PreRelease: featuregate.Beta}, - genericfeatures.CustomResourceValidationExpressions: {Default: false, PreRelease: featuregate.Alpha}, - - logs.ContextualLogging: {Default: true, PreRelease: featuregate.Alpha}, + genericfeatures.CustomResourceValidationExpressions: {Default: true, PreRelease: featuregate.Beta}, + genericfeatures.DryRun: {Default: true, PreRelease: featuregate.GA, LockToDefault: true}, // remove in 1.28 + genericfeatures.OpenAPIEnums: {Default: true, PreRelease: featuregate.Beta}, + genericfeatures.OpenAPIV3: {Default: true, PreRelease: featuregate.Beta}, + genericfeatures.ServerSideApply: {Default: true, PreRelease: featuregate.GA, LockToDefault: true}, // remove in 1.29 + genericfeatures.ServerSideFieldValidation: {Default: true, PreRelease: featuregate.Beta}, + genericfeatures.ValidatingAdmissionPolicy: {Default: false, PreRelease: featuregate.Alpha}, + + logsapi.ContextualLogging: {Default: true, PreRelease: featuregate.Alpha}, } diff --git a/pkg/informer/informer.go b/pkg/informer/informer.go index 11f088ab01a..79b3c44a828 100644 --- a/pkg/informer/informer.go +++ b/pkg/informer/informer.go @@ -834,23 +834,25 @@ type crdGVRSource struct { // Hard-code built in types that support list+watch. var builtInInformableTypes map[schema.GroupVersionResource]GVRPartialMetadata = map[schema.GroupVersionResource]GVRPartialMetadata{ - gvrFor("", "v1", "configmaps"): withGVRPartialMetadata(apiextensionsv1.NamespaceScoped, "ConfigMap", "configmap"), - gvrFor("", "v1", "events"): withGVRPartialMetadata(apiextensionsv1.NamespaceScoped, "Event", "event"), - gvrFor("", "v1", "limitranges"): withGVRPartialMetadata(apiextensionsv1.NamespaceScoped, "LimitRange", "limitrange"), - gvrFor("", "v1", "namespaces"): withGVRPartialMetadata(apiextensionsv1.ClusterScoped, "Namespace", "namespace"), - gvrFor("", "v1", "resourcequotas"): withGVRPartialMetadata(apiextensionsv1.NamespaceScoped, "ResourceQuota", "resourcequota"), - gvrFor("", "v1", "secrets"): withGVRPartialMetadata(apiextensionsv1.NamespaceScoped, "Secret", "secret"), - gvrFor("", "v1", "serviceaccounts"): withGVRPartialMetadata(apiextensionsv1.NamespaceScoped, "ServiceAccount", "serviceaccount"), - gvrFor("certificates.k8s.io", "v1", "certificatesigningrequests"): withGVRPartialMetadata(apiextensionsv1.ClusterScoped, "CertificateSigningRequest", "certificatesigningrequest"), - gvrFor("coordination.k8s.io", "v1", "leases"): withGVRPartialMetadata(apiextensionsv1.NamespaceScoped, "Lease", "lease"), - gvrFor("rbac.authorization.k8s.io", "v1", "clusterroles"): withGVRPartialMetadata(apiextensionsv1.ClusterScoped, "ClusterRole", "clusterrole"), - gvrFor("rbac.authorization.k8s.io", "v1", "clusterrolebindings"): withGVRPartialMetadata(apiextensionsv1.ClusterScoped, "ClusterRoleBinding", "clusterrolebinding"), - gvrFor("rbac.authorization.k8s.io", "v1", "roles"): withGVRPartialMetadata(apiextensionsv1.NamespaceScoped, "Role", "role"), - gvrFor("rbac.authorization.k8s.io", "v1", "rolebindings"): withGVRPartialMetadata(apiextensionsv1.NamespaceScoped, "RoleBinding", "rolebinding"), - gvrFor("events.k8s.io", "v1", "events"): withGVRPartialMetadata(apiextensionsv1.NamespaceScoped, "Event", "event"), - gvrFor("admissionregistration.k8s.io", "v1", "mutatingwebhookconfigurations"): withGVRPartialMetadata(apiextensionsv1.ClusterScoped, "MutatingWebhookConfiguration", "mutatingwebhookconfiguration"), - gvrFor("admissionregistration.k8s.io", "v1", "validatingwebhookconfigurations"): withGVRPartialMetadata(apiextensionsv1.ClusterScoped, "ValidatingWebhookConfiguration", "validatingwebhookconfiguration"), - gvrFor("apiextensions.k8s.io", "v1", "customresourcedefinitions"): withGVRPartialMetadata(apiextensionsv1.ClusterScoped, "CustomResourceDefinition", "customresourcedefinition"), + gvrFor("", "v1", "configmaps"): withGVRPartialMetadata(apiextensionsv1.NamespaceScoped, "ConfigMap", "configmap"), + gvrFor("", "v1", "events"): withGVRPartialMetadata(apiextensionsv1.NamespaceScoped, "Event", "event"), + gvrFor("", "v1", "limitranges"): withGVRPartialMetadata(apiextensionsv1.NamespaceScoped, "LimitRange", "limitrange"), + gvrFor("", "v1", "namespaces"): withGVRPartialMetadata(apiextensionsv1.ClusterScoped, "Namespace", "namespace"), + gvrFor("", "v1", "resourcequotas"): withGVRPartialMetadata(apiextensionsv1.NamespaceScoped, "ResourceQuota", "resourcequota"), + gvrFor("", "v1", "secrets"): withGVRPartialMetadata(apiextensionsv1.NamespaceScoped, "Secret", "secret"), + gvrFor("", "v1", "serviceaccounts"): withGVRPartialMetadata(apiextensionsv1.NamespaceScoped, "ServiceAccount", "serviceaccount"), + gvrFor("certificates.k8s.io", "v1", "certificatesigningrequests"): withGVRPartialMetadata(apiextensionsv1.ClusterScoped, "CertificateSigningRequest", "certificatesigningrequest"), + gvrFor("coordination.k8s.io", "v1", "leases"): withGVRPartialMetadata(apiextensionsv1.NamespaceScoped, "Lease", "lease"), + gvrFor("rbac.authorization.k8s.io", "v1", "clusterroles"): withGVRPartialMetadata(apiextensionsv1.ClusterScoped, "ClusterRole", "clusterrole"), + gvrFor("rbac.authorization.k8s.io", "v1", "clusterrolebindings"): withGVRPartialMetadata(apiextensionsv1.ClusterScoped, "ClusterRoleBinding", "clusterrolebinding"), + gvrFor("rbac.authorization.k8s.io", "v1", "roles"): withGVRPartialMetadata(apiextensionsv1.NamespaceScoped, "Role", "role"), + gvrFor("rbac.authorization.k8s.io", "v1", "rolebindings"): withGVRPartialMetadata(apiextensionsv1.NamespaceScoped, "RoleBinding", "rolebinding"), + gvrFor("events.k8s.io", "v1", "events"): withGVRPartialMetadata(apiextensionsv1.NamespaceScoped, "Event", "event"), + gvrFor("admissionregistration.k8s.io", "v1", "mutatingwebhookconfigurations"): withGVRPartialMetadata(apiextensionsv1.ClusterScoped, "MutatingWebhookConfiguration", "mutatingwebhookconfiguration"), + gvrFor("admissionregistration.k8s.io", "v1", "validatingwebhookconfigurations"): withGVRPartialMetadata(apiextensionsv1.ClusterScoped, "ValidatingWebhookConfiguration", "validatingwebhookconfiguration"), + gvrFor("admissionregistration.k8s.io", "v1alpha1", "validatingadmissionpolicies"): withGVRPartialMetadata(apiextensionsv1.ClusterScoped, "ValidatingAdmissionPolicy", "validatingadmissionpolicy"), + gvrFor("admissionregistration.k8s.io", "v1alpha1", "validatingadmissionpolicybindings"): withGVRPartialMetadata(apiextensionsv1.ClusterScoped, "ValidatingAdmissionPolicyBinding", "validatingadmissionpolicybinding"), + gvrFor("apiextensions.k8s.io", "v1", "customresourcedefinitions"): withGVRPartialMetadata(apiextensionsv1.ClusterScoped, "CustomResourceDefinition", "customresourcedefinition"), } func (s *crdGVRSource) GVRs() map[schema.GroupVersionResource]GVRPartialMetadata { diff --git a/pkg/informer/informer_test.go b/pkg/informer/informer_test.go index 84c8d91b95f..f2c002c031d 100644 --- a/pkg/informer/informer_test.go +++ b/pkg/informer/informer_test.go @@ -57,6 +57,7 @@ func TestBuiltInInformableTypes(t *testing.T) { {Version: "v1", Kind: "RangeAllocation"}: {}, {Version: "v1", Kind: "SerializedReference"}: {}, {Version: "v1", Kind: "Status"}: {}, + {Group: "authentication.k8s.io", Version: "v1alpha1", Kind: "SelfSubjectReview"}: {}, {Group: "authentication.k8s.io", Version: "v1", Kind: "TokenRequest"}: {}, {Group: "authentication.k8s.io", Version: "v1", Kind: "TokenReview"}: {}, {Group: "authorization.k8s.io", Version: "v1", Kind: "LocalSubjectAccessReview"}: {}, @@ -107,6 +108,12 @@ func TestBuiltInInformableTypes(t *testing.T) { } resourceName := strings.ToLower(gvk.Kind) + "s" + + // Handle e.g. ValidatingAdmissionPolicy -> validatingadmissionpolicies + if strings.HasSuffix(gvk.Kind, "y") { + resourceName = resourceName[:len(resourceName)-2] + "ies" + } + gvr := gvk.GroupVersion().WithResource(resourceName) builtInGVRs[gvr] = struct{}{} diff --git a/pkg/reconciler/garbagecollector/garbagecollector_controller.go b/pkg/reconciler/garbagecollector/garbagecollector_controller.go index 995e53aeec0..66b3972b089 100644 --- a/pkg/reconciler/garbagecollector/garbagecollector_controller.go +++ b/pkg/reconciler/garbagecollector/garbagecollector_controller.go @@ -33,7 +33,6 @@ import ( "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/tools/cache" "k8s.io/client-go/util/workqueue" - "k8s.io/component-base/metrics/prometheus/ratelimiter" "k8s.io/klog/v2" "k8s.io/kubernetes/pkg/controller/garbagecollector" @@ -243,25 +242,18 @@ func (c *Controller) startGarbageCollectorForLogicalCluster(ctx context.Context, kubeClient := c.kubeClusterClient.Cluster(clusterName.Path()) - garbageCollector, err := garbagecollector.NewClusterAwareGarbageCollector( + garbageCollector, err := garbagecollector.NewGarbageCollector( kubeClient, c.metadataClient.Cluster(clusterName.Path()), c.dynamicDiscoverySharedInformerFactory.RESTMapper(), c.ignoredResources, c.dynamicDiscoverySharedInformerFactory.Cluster(clusterName), c.informersStarted, - clusterName, ) if err != nil { return fmt.Errorf("failed to create the garbage collector: %w", err) } - if kubeClient.CoreV1().RESTClient().GetRateLimiter() != nil { - if err := ratelimiter.RegisterMetricAndTrackRateLimiterUsage(clusterName.String()+"-garbage_collector_controller", kubeClient.CoreV1().RESTClient().GetRateLimiter()); err != nil { - return err - } - } - // Here we diverge from what upstream does. Upstream starts a goroutine that retrieves discovery every 30 seconds, // starting/stopping dynamic informers as needed based on the updated discovery data. We know that kcp contains // the combination of built-in types plus CRDs. We use that information to drive what garbage collector evaluates. diff --git a/pkg/reconciler/kubequota/kubequota_controller.go b/pkg/reconciler/kubequota/kubequota_controller.go index 2b4aa2caafc..8037d55c5c9 100644 --- a/pkg/reconciler/kubequota/kubequota_controller.go +++ b/pkg/reconciler/kubequota/kubequota_controller.go @@ -34,7 +34,6 @@ import ( "k8s.io/apiserver/pkg/quota/v1/generic" "k8s.io/client-go/tools/cache" "k8s.io/client-go/util/workqueue" - "k8s.io/component-base/metrics/prometheus/ratelimiter" "k8s.io/klog/v2" "k8s.io/kubernetes/pkg/controller" "k8s.io/kubernetes/pkg/controller/resourcequota" @@ -269,15 +268,9 @@ func (c *Controller) startQuotaForLogicalCluster(ctx context.Context, clusterNam IgnoredResourcesFunc: quotaConfiguration.IgnoredResources, InformersStarted: c.informersStarted, Registry: generic.NewRegistry(quotaConfiguration.Evaluators()), - ClusterName: clusterName, - } - if resourceQuotaControllerClient.CoreV1().RESTClient().GetRateLimiter() != nil { - if err := ratelimiter.RegisterMetricAndTrackRateLimiterUsage(clusterName.String()+"-resource_quota_controller", resourceQuotaControllerClient.CoreV1().RESTClient().GetRateLimiter()); err != nil { - return err - } } - resourceQuotaController, err := resourcequota.NewController(resourceQuotaControllerOptions) + resourceQuotaController, err := resourcequota.NewController(ctx, resourceQuotaControllerOptions) if err != nil { return err } diff --git a/pkg/server/config.go b/pkg/server/config.go index 9995296812b..bd316259bb1 100644 --- a/pkg/server/config.go +++ b/pkg/server/config.go @@ -454,7 +454,7 @@ func NewConfig(opts kcpserveroptions.CompletedOptions) (*Config, error) { apiHandler = mux apiHandler = kcpfilters.WithAuditEventClusterAnnotation(apiHandler) - apiHandler = WithAuditAnnotation(apiHandler) // Must run before any audit annotation is made + apiHandler = filters.WithAuditInit(apiHandler) // Must run before any audit annotation is made apiHandler = WithLocalProxy(apiHandler, opts.Extra.ShardName, opts.Extra.ShardBaseURL, c.KcpSharedInformerFactory.Tenancy().V1alpha1().Workspaces(), c.KcpSharedInformerFactory.Core().V1alpha1().LogicalClusters()) apiHandler = WithInClusterServiceAccountRequestRewrite(apiHandler) apiHandler = kcpfilters.WithAcceptHeader(apiHandler) diff --git a/pkg/server/handler.go b/pkg/server/handler.go index 1e721bf52eb..e4f6b902a19 100644 --- a/pkg/server/handler.go +++ b/pkg/server/handler.go @@ -27,7 +27,7 @@ import ( "sort" "strings" - "github.com/emicklei/go-restful" + "github.com/emicklei/go-restful/v3" jwt2 "gopkg.in/square/go-jose.v2/jwt" apiextensionsapiserver "k8s.io/apiextensions-apiserver/pkg/apiserver" @@ -41,7 +41,6 @@ import ( "k8s.io/apimachinery/pkg/runtime/serializer" utilnet "k8s.io/apimachinery/pkg/util/net" "k8s.io/apimachinery/pkg/util/sets" - kaudit "k8s.io/apiserver/pkg/audit" apiserverdiscovery "k8s.io/apiserver/pkg/endpoints/discovery" "k8s.io/apiserver/pkg/endpoints/handlers/responsewriters" "k8s.io/apiserver/pkg/endpoints/request" @@ -91,16 +90,6 @@ func UserAgentFrom(ctx context.Context) string { return "" } -// WithAuditAnnotation initializes audit annotations in the context. Without -// initialization kaudit.AddAuditAnnotation isn't preserved. -func WithAuditAnnotation(handler http.Handler) http.HandlerFunc { - return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { - handler.ServeHTTP(w, req.WithContext( - kaudit.WithAuditAnnotations(req.Context()), - )) - }) -} - // WithVirtualWorkspacesProxy proxies internal requests to virtual workspaces (i.e., requests that did // not go through the front proxy) to the external virtual workspaces server. Proxying is required to avoid // certificate verification errors because these requests typically come from the kcp loopback client, and it is diff --git a/pkg/server/home_workspaces.go b/pkg/server/home_workspaces.go index afccb7e7b64..cd643ab8f48 100644 --- a/pkg/server/home_workspaces.go +++ b/pkg/server/home_workspaces.go @@ -316,7 +316,7 @@ func (h *homeWorkspaceHandler) ServeHTTP(rw http.ResponseWriter, req *http.Reque Initializers: logicalCluster.Status.Initializers, }, } - responsewriters.WriteObjectNegotiated(homeWorkspaceCodecs, negotiation.DefaultEndpointRestrictions, tenancyv1alpha1.SchemeGroupVersion, rw, req, http.StatusOK, homeWorkspace) + responsewriters.WriteObjectNegotiated(homeWorkspaceCodecs, negotiation.DefaultEndpointRestrictions, tenancyv1alpha1.SchemeGroupVersion, rw, req, http.StatusOK, homeWorkspace, false) } func (h *homeWorkspaceHandler) getWorkspaceType(path logicalcluster.Path, name string) (*tenancyv1alpha1.WorkspaceType, error) { diff --git a/pkg/server/options/flags.go b/pkg/server/options/flags.go index 83ec142ac6e..53635b264fd 100644 --- a/pkg/server/options/flags.go +++ b/pkg/server/options/flags.go @@ -78,7 +78,6 @@ var ( "token-auth-file", // If set, the file that will be used to secure the secure port of the API server via token authentication. // Kubernetes ServiceAccount Authentication flags - "service-account-api-audiences", // Identifiers of the API. The service account token authenticator will validate that tokens used against the API are bound to at least one of these audiences. "service-account-extend-token-expiration", // Turns on projected service account expiration extension during token generation, which helps safe transition from legacy token to bound service account token feature. If this flag is enabled, admission injected tokens would be extended up to 1 year to prevent unexpected failure during transition, ignoring value of service-account-max-token-expiration. "service-account-issuer", // Identifier of the service account token issuer. The issuer will assert this identifier in "iss" claim of issued tokens. This value is a string or URI. If this option is not a valid URI per the OpenID Discovery 1.0 spec, the ServiceAccountIssuerDiscovery feature will remain disabled, even if the feature gate is set to true. It is highly recommended that this value comply with the OpenID spec: https://openid.net/specs/openid-connect-discovery-1_0.html. In practice, this means that service-account-issuer must be an https URL. It is also highly recommended that this URL be capable of serving OpenID discovery documents at {service-account-issuer}/.well-known/openid-configuration. When this flag is specified multiple times, the first is used to generate tokens and all are used to determine which issuers are accepted. "service-account-jwks-uri", // Overrides the URI for the JSON Web Key Set in the discovery doc served at /.well-known/openid-configuration. This flag is useful if the discovery docand key set are served to relying parties from a URL other than the API server's external (as auto-detected or overridden with external-hostname). @@ -118,6 +117,7 @@ var ( "external-hostname", // The hostname to use when generating externalized URLs for this master (e.g. Swagger API Docs or OpenID Discovery). // etcd flags + "encryption-provider-config-automatic-reload", // Determines if the file set by --encryption-provider-config should be automatically reloaded if the disk contents change. Setting this to true disables the ability to uniquely identify distinct KMS plugins via the API server healthz endpoints. "etcd-cafile", // SSL Certificate Authority file used to secure etcd communication. "etcd-certfile", // SSL certification file used to secure etcd communication. "etcd-compaction-interval", // The interval of compaction requests. If 0, the compaction request from apiserver is disabled. @@ -126,6 +126,7 @@ var ( "etcd-healthcheck-timeout", // The timeout to use when checking etcd health. "etcd-keyfile", // SSL key file used to secure etcd communication. "etcd-prefix", // The prefix to prepend to all resource paths in etcd. + "etcd-readycheck-timeout", // The timeout to use when checking etcd readiness "etcd-servers", // List of etcd servers to connect with (scheme://ip:port), comma separated. "etcd-servers-overrides", // Per-resource etcd servers overrides, comma separated. The individual override format: group/resource#servers, where servers are URLs, semicolon separated. Note that this applies only to resources compiled into this server binary. "lease-reuse-duration-seconds", // The time in seconds that each lease is reused. A lower value could avoid large number of objects reusing the same lease. Notice that a too small value may cause performance problems at storage layer. @@ -144,13 +145,11 @@ var ( "show-hidden-metrics-for-version", // The previous version for which you want to show hidden metrics. Only the previous minor version is meaningful, other values will not be allowed. The format is ., e.g.: '1.16'. The purpose of this format is make sure you have the opportunity to notice if the next release hides additional metrics, rather than being surprised when they are permanently removed in the release after that. // misc flags - "enable-logs-handler", // If true, install a /logs handler for the apiserver logs. - "event-ttl", // Amount of time to retain events. - "identity-lease-duration-seconds", // The duration of kube-apiserver lease in seconds, must be a positive number. (In use when the APIServerIdentity feature gate is enabled.) - "identity-lease-renew-interval-seconds", // The interval of kube-apiserver renewing its lease in seconds, must be a positive number. (In use when the APIServerIdentity feature gate is enabled.) - "max-connection-bytes-per-sec", // If non-zero, throttle each user connection to this number of bytes/sec. Currently only applies to long-running requests. - "proxy-client-cert-file", // Client certificate used to prove the identity of the aggregator or kube-apiserver when it must call out during a request. This includes proxying requests to a user api-server and calling out to webhook admission plugins. It is expected that this cert includes a signature from the CA in the --requestheader-client-ca-file flag. That CA is published in the 'extension-apiserver-authentication' configmap in the kube-system namespace. Components receiving calls from kube-aggregator should use that CA to perform their half of the mutual TLS verification. - "proxy-client-key-file", // Private key for the client certificate used to prove the identity of the aggregator or kube-apiserver when it must call out during a request. This includes proxying requests to a user api-server and calling out to webhook admission plugins. + "enable-logs-handler", // If true, install a /logs handler for the apiserver logs. + "event-ttl", // Amount of time to retain events. + "max-connection-bytes-per-sec", // If non-zero, throttle each user connection to this number of bytes/sec. Currently only applies to long-running requests. + "proxy-client-cert-file", // Client certificate used to prove the identity of the aggregator or kube-apiserver when it must call out during a request. This includes proxying requests to a user api-server and calling out to webhook admission plugins. It is expected that this cert includes a signature from the CA in the --requestheader-client-ca-file flag. That CA is published in the 'extension-apiserver-authentication' configmap in the kube-system namespace. Components receiving calls from kube-aggregator should use that CA to perform their half of the mutual TLS verification. + "proxy-client-key-file", // Private key for the client certificate used to prove the identity of the aggregator or kube-apiserver when it must call out during a request. This includes proxying requests to a user api-server and calling out to webhook admission plugins. ) disallowedFlags = sets.NewString( diff --git a/pkg/virtual/framework/dynamic/apiserver/apiserver.go b/pkg/virtual/framework/dynamic/apiserver/apiserver.go index 16e32207dae..06a7860663c 100644 --- a/pkg/virtual/framework/dynamic/apiserver/apiserver.go +++ b/pkg/virtual/framework/dynamic/apiserver/apiserver.go @@ -21,7 +21,7 @@ import ( "net/http" "time" - "github.com/emicklei/go-restful" + "github.com/emicklei/go-restful/v3" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" diff --git a/pkg/virtual/framework/dynamic/apiserver/discovery.go b/pkg/virtual/framework/dynamic/apiserver/discovery.go index 97dcf16ffcf..9dd4585a18f 100644 --- a/pkg/virtual/framework/dynamic/apiserver/discovery.go +++ b/pkg/virtual/framework/dynamic/apiserver/discovery.go @@ -313,7 +313,7 @@ func (r *rootDiscoveryHandler) ServeHTTP(w http.ResponseWriter, req *http.Reques } groupList = append(groupList, g) } - responsewriters.WriteObjectNegotiated(aggregator.DiscoveryCodecs, negotiation.DefaultEndpointRestrictions, schema.GroupVersion{}, w, req, http.StatusOK, &metav1.APIGroupList{Groups: groupList}) + responsewriters.WriteObjectNegotiated(aggregator.DiscoveryCodecs, negotiation.DefaultEndpointRestrictions, schema.GroupVersion{}, w, req, http.StatusOK, &metav1.APIGroupList{Groups: groupList}, false) } // splitPath returns the segments for a URL path. diff --git a/pkg/virtual/framework/forwardingregistry/rest_test.go b/pkg/virtual/framework/forwardingregistry/rest_test.go index d41b96e9327..a0692b557f7 100644 --- a/pkg/virtual/framework/forwardingregistry/rest_test.go +++ b/pkg/virtual/framework/forwardingregistry/rest_test.go @@ -19,6 +19,7 @@ package forwardingregistry_test import ( "context" "fmt" + "reflect" "testing" "time" @@ -473,7 +474,7 @@ func TestPatch(t *testing.T) { ctx = request.WithCluster(ctx, request.Cluster{Name: "test"}) patcher := func(ctx context.Context, newObj, oldObj runtime.Object) (runtime.Object, error) { - if oldObj == nil { + if reflect.DeepEqual(&unstructured.Unstructured{}, oldObj.(*unstructured.Unstructured)) { return nil, errors.NewNotFound(schema.ParseGroupResource("noxus.mygroup.example.com"), "foo") } updated := oldObj.DeepCopyObject().(*unstructured.Unstructured) diff --git a/pkg/virtual/framework/forwardingregistry/store.go b/pkg/virtual/framework/forwardingregistry/store.go index e49041fd9ad..86b603a4762 100644 --- a/pkg/virtual/framework/forwardingregistry/store.go +++ b/pkg/virtual/framework/forwardingregistry/store.go @@ -183,6 +183,7 @@ func DefaultDynamicDelegatedStoreFuncs( requestInfo, _ := genericapirequest.RequestInfoFrom(ctx) doUpdate := func() (*unstructured.Unstructured, error) { + needToCreate := false oldObj, err := s.Get(ctx, name, &metav1.GetOptions{}) if err != nil { // Continue on 404 when forceAllowCreate is enabled, @@ -193,7 +194,12 @@ func DefaultDynamicDelegatedStoreFuncs( !(requestInfo != nil && requestInfo.Verb == "patch") { return nil, err } - oldObj = nil + + // This needs to be the zero value of the object and not nil. This matches the normal rest/storage + // flows. Additionally, if oldObj were nil, the call the objInfo.UpdatedObject below would return an + // error because one of the transformers wouldn't be able to extract metadata for oldObj. + oldObj = &unstructured.Unstructured{} + needToCreate = true } // The following call returns a 404 error for non server-side apply @@ -212,7 +218,7 @@ func DefaultDynamicDelegatedStoreFuncs( return nil, fmt.Errorf("not an Unstructured: %T", obj) } - if oldObj == nil { + if needToCreate { // The object does not currently exist. // We switch to calling a create operation on the forwarding registry. // This enables support for server-side apply requests, to create non-existent objects. diff --git a/pkg/virtual/framework/transforming/client.go b/pkg/virtual/framework/transforming/client.go index 1e4d8cf3a7d..20929ed5d36 100644 --- a/pkg/virtual/framework/transforming/client.go +++ b/pkg/virtual/framework/transforming/client.go @@ -332,6 +332,37 @@ func (tc *transformingResourceClient) Update(ctx context.Context, obj *unstructu return result, err } +func (tc *transformingResourceClient) Apply(ctx context.Context, name string, obj *unstructured.Unstructured, options metav1.ApplyOptions, subresources ...string) (*unstructured.Unstructured, error) { + var err error + logger := getLogger(ctx).WithValues("subresources", subresources).WithValues("action", "apply") + if obj != nil { + logger = logging.WithObject(logger, obj) + } + beforeLogger := logger.WithValues("moment", before) + beforeLogger.Info(startingMessage) + obj, err = tc.transformer.BeforeWrite(tc.delegate, ctx, tc.resource, obj, subresources...) + if err != nil { + beforeLogger.Error(err, errorMessage) + return nil, err + } + result, err := tc.delegate.Apply(ctx, name, obj, options, subresources...) + if err != nil { + return result, err + } + afterLogger := logger.WithValues("moment", after) + afterLogger.Info(startingMessage) + result, err = tc.transformer.AfterRead(tc.delegate, ctx, tc.resource, result, nil, subresources...) + if err != nil { + afterLogger.Error(err, errorMessage) + return result, err + } + return result, err +} + +func (tc *transformingResourceClient) ApplyStatus(ctx context.Context, name string, obj *unstructured.Unstructured, options metav1.ApplyOptions) (*unstructured.Unstructured, error) { + return tc.Apply(ctx, name, obj, options, "status") +} + // Get implements dynamic.ResourceInterface. // It delegates the Get call to the underlying kubernetes client, // and transforms back the result of the Get call by calling the transformer AfterRead method. diff --git a/pkg/virtual/framework/transforming/transformer_test.go b/pkg/virtual/framework/transforming/transformer_test.go index 240f20a2314..99760136f15 100644 --- a/pkg/virtual/framework/transforming/transformer_test.go +++ b/pkg/virtual/framework/transforming/transformer_test.go @@ -121,6 +121,16 @@ type namespaceableResourceClient struct { resourceClient } +func (c *namespaceableResourceClient) Apply(ctx context.Context, name string, obj *unstructured.Unstructured, options metav1.ApplyOptions, subresources ...string) (*unstructured.Unstructured, error) { + return c.resourceClient.Apply(ctx, name, obj, options, subresources...) +} + +func (c *namespaceableResourceClient) ApplyStatus(ctx context.Context, name string, obj *unstructured.Unstructured, options metav1.ApplyOptions) (*unstructured.Unstructured, error) { + return c.resourceClient.ApplyStatus(ctx, name, obj, options) +} + +var _ dynamic.NamespaceableResourceInterface = (*namespaceableResourceClient)(nil) + func (c *namespaceableResourceClient) ClusterName() string { return c.lcluster.String() } func (c *namespaceableResourceClient) Namespace(namespace string) dynamic.ResourceInterface { @@ -143,6 +153,8 @@ type resourceClient struct { lclusterRecorder func(lcluster string) } +var _ dynamic.ResourceInterface = (*resourceClient)(nil) + func (c *resourceClient) ClusterName() string { return c.lcluster.String() } func (c *resourceClient) RawDelete(ctx context.Context, name string, opts metav1.DeleteOptions, subresources ...string) ([]byte, int, error) { @@ -199,6 +211,14 @@ func (c *resourceClient) Patch(ctx context.Context, name string, pt types.PatchT c.lclusterRecorder(c.lcluster.String()) return c.resourceInterface.Patch(ctx, name, pt, data, options, subresources...) } +func (c *resourceClient) Apply(ctx context.Context, name string, obj *unstructured.Unstructured, options metav1.ApplyOptions, subresources ...string) (*unstructured.Unstructured, error) { + c.lclusterRecorder(c.lcluster.String()) + return c.resourceInterface.Apply(ctx, name, obj, options, subresources...) +} +func (c *resourceClient) ApplyStatus(ctx context.Context, name string, obj *unstructured.Unstructured, options metav1.ApplyOptions) (*unstructured.Unstructured, error) { + c.lclusterRecorder(c.lcluster.String()) + return c.resourceInterface.ApplyStatus(ctx, name, obj, options) +} type transformerCall struct { Moment string @@ -604,7 +624,7 @@ func TestResourceTransformer(t *testing.T) { Object: map[string]interface{}{ "apiVersion": "group/version", "kind": "ResourceList", - "metadata": map[string]interface{}{"resourceVersion": ""}, + "metadata": map[string]interface{}{"resourceVersion": "", "continue": ""}, }, Items: []unstructured.Unstructured{ *resource("group/version", "Resource", "aThing").cluster("cluster1").field("after", "added aThing")(), @@ -657,7 +677,7 @@ func TestResourceTransformer(t *testing.T) { Object: map[string]interface{}{ "apiVersion": "group/version", "kind": "ResourceList", - "metadata": map[string]interface{}{"resourceVersion": ""}, + "metadata": map[string]interface{}{"resourceVersion": "", "continue": ""}, }, Items: []unstructured.Unstructured{ *resource("group/version", "Resource", "aThingMore").cluster("cluster2").field("after", "added aThingMore")(), @@ -707,7 +727,7 @@ func TestResourceTransformer(t *testing.T) { Object: map[string]interface{}{ "apiVersion": "group/version", "kind": "ResourceList", - "metadata": map[string]interface{}{"resourceVersion": ""}, + "metadata": map[string]interface{}{"resourceVersion": "", "continue": ""}, }, Items: []unstructured.Unstructured{ *resource("group/version", "Resource", "aThing").field("after", "added aThing")(), @@ -758,7 +778,7 @@ func TestResourceTransformer(t *testing.T) { Object: map[string]interface{}{ "apiVersion": "group/version", "kind": "ResourceList", - "metadata": map[string]interface{}{"resourceVersion": ""}, + "metadata": map[string]interface{}{"resourceVersion": "", "continue": ""}, }, Items: []unstructured.Unstructured{ *resource("group/version", "Resource", "aThing").field("after", "added aThing")(), @@ -811,7 +831,7 @@ func TestResourceTransformer(t *testing.T) { Object: map[string]interface{}{ "apiVersion": "group/version", "kind": "ResourceList", - "metadata": map[string]interface{}{"resourceVersion": ""}, + "metadata": map[string]interface{}{"resourceVersion": "", "continue": ""}, }, Items: []unstructured.Unstructured{ *resource("group/version", "Resource", "aThing").ns("aNamespace").field("after", "added aThing")(), diff --git a/test/e2e/cache/cache_server_test.go b/test/e2e/cache/cache_server_test.go index 4607b180578..03a11b213b5 100644 --- a/test/e2e/cache/cache_server_test.go +++ b/test/e2e/cache/cache_server_test.go @@ -43,8 +43,6 @@ import ( // testSchemaIsNotEnforced checks if an object of any schema can be stored as "apis.kcp.io.v1alpha1.apiexports". func testSchemaIsNotEnforced(ctx context.Context, t *testing.T, cacheClientRT *rest.Config, cluster logicalcluster.Path, gvr schema.GroupVersionResource) { - t.Helper() - cacheDynamicClient, err := kcpdynamic.NewForConfig(cacheClientRT) require.NoError(t, err) type planet struct { @@ -66,6 +64,7 @@ func testSchemaIsNotEnforced(ctx context.Context, t *testing.T, cacheClientRT *r earth.CreationTimestamp = cachedEarth.CreationTimestamp earth.ResourceVersion = cachedEarth.ResourceVersion earth.Annotations = cachedEarth.Annotations + earth.ManagedFields = cachedEarth.ManagedFields if !cmp.Equal(cachedEarth, &earth) { t.Fatalf("received object from the cache server differs from the expected one:\n%s", cmp.Diff(cachedEarth, &earth)) } @@ -107,8 +106,6 @@ func testSchemaIsNotEnforced(ctx context.Context, t *testing.T, cacheClientRT *r // testShardNamesAssigned checks if a shard name is provided in the "kcp.io/shard" annotation and // if a cluster name is stored at "kcp.io/cluster" annotation. func testShardClusterNamesAssigned(ctx context.Context, t *testing.T, cacheClientRT *rest.Config, cluster logicalcluster.Path, gvr schema.GroupVersionResource) { - t.Helper() - cacheDynamicClient, err := kcpdynamic.NewForConfig(cacheClientRT) require.NoError(t, err) initialComicDB := newFakeAPIExport("comicdb") @@ -141,8 +138,6 @@ func testShardClusterNamesAssigned(ctx context.Context, t *testing.T, cacheClien // testUIDGenerationCreationTime checks if overwriting UID, Generation, CreationTime when the shard annotation is set works. func testUIDGenerationCreationTime(ctx context.Context, t *testing.T, cacheClientRT *rest.Config, cluster logicalcluster.Path, gvr schema.GroupVersionResource) { - t.Helper() - cacheDynamicClient, err := kcpdynamic.NewForConfig(cacheClientRT) require.NoError(t, err) initialMangoDB := newFakeAPIExport("mangodb") @@ -157,6 +152,7 @@ func testUIDGenerationCreationTime(ctx context.Context, t *testing.T, cacheClien require.NoError(t, json.Unmarshal(cachedMangoDBJson, cachedMangoDB)) mangoDB.ResourceVersion = cachedMangoDB.ResourceVersion + mangoDB.ManagedFields = cachedMangoDB.ManagedFields mangoDB.Annotations["kcp.io/cluster"] = cluster.String() if !cmp.Equal(cachedMangoDB, &mangoDB) { t.Fatalf("received object from the cache server differs from the expected one:\n%s", cmp.Diff(cachedMangoDB, &mangoDB)) @@ -179,8 +175,6 @@ func testUIDGenerationCreationTime(ctx context.Context, t *testing.T, cacheClien // testUIDGenerationCreationTimeNegative checks if UID, Generation, CreationTime are set when the shard annotation is NOT set. func testUIDGenerationCreationTimeNegative(ctx context.Context, t *testing.T, cacheClientRT *rest.Config, cluster logicalcluster.Path, gvr schema.GroupVersionResource) { - t.Helper() - cacheDynamicClient, err := kcpdynamic.NewForConfig(cacheClientRT) require.NoError(t, err) initialMangoDB := newFakeAPIExport("mangodbnegative") @@ -209,6 +203,7 @@ func testUIDGenerationCreationTimeNegative(ctx context.Context, t *testing.T, ca mangoDB.Generation = cachedMangoDB.Generation mangoDB.ResourceVersion = cachedMangoDB.ResourceVersion mangoDB.CreationTimestamp = cachedMangoDB.CreationTimestamp + mangoDB.ManagedFields = cachedMangoDB.ManagedFields mangoDB.Annotations["kcp.io/cluster"] = cluster.String() mangoDB.Annotations["kcp.io/shard"] = "amber" if !cmp.Equal(cachedMangoDB, &mangoDB) { @@ -230,8 +225,6 @@ func testUIDGenerationCreationTimeNegative(ctx context.Context, t *testing.T, ca // testGenerationOnSpecChanges checks if Generation is not increased when the spec is changed. func testGenerationOnSpecChanges(ctx context.Context, t *testing.T, cacheClientRT *rest.Config, cluster logicalcluster.Path, gvr schema.GroupVersionResource) { - t.Helper() - cacheDynamicClient, err := kcpdynamic.NewForConfig(cacheClientRT) require.NoError(t, err) initialCinnamonDB := newFakeAPIExport("cinnamondb") @@ -276,8 +269,6 @@ func testGenerationOnSpecChanges(ctx context.Context, t *testing.T, cacheClientR // testDeletionWithFinalizers checks if deleting an object with finalizers immediately removes it. func testDeletionWithFinalizers(ctx context.Context, t *testing.T, cacheClientRT *rest.Config, cluster logicalcluster.Path, gvr schema.GroupVersionResource) { - t.Helper() - cacheDynamicClient, err := kcpdynamic.NewForConfig(cacheClientRT) require.NoError(t, err) initialGhostDB := newFakeAPIExport("ghostdb") @@ -302,8 +293,6 @@ func testDeletionWithFinalizers(ctx context.Context, t *testing.T, cacheClientRT // testSpecStatusSimultaneously checks if updating spec and status at the same time works. func testSpecStatusSimultaneously(ctx context.Context, t *testing.T, cacheClientRT *rest.Config, cluster logicalcluster.Path, gvr schema.GroupVersionResource) { - t.Helper() - cacheDynamicClient, err := kcpdynamic.NewForConfig(cacheClientRT) require.NoError(t, err) initialCucumberDB := newFakeAPIExport("cucumberdb") diff --git a/test/e2e/conformance/webhook_test.go b/test/e2e/conformance/webhook_test.go index 3772037d27e..351afa849d2 100644 --- a/test/e2e/conformance/webhook_test.go +++ b/test/e2e/conformance/webhook_test.go @@ -129,7 +129,7 @@ func TestMutatingWebhookInWorkspace(t *testing.T) { }}, } _, err = kubeClusterClient.Cluster(workspaces[0]).AdmissionregistrationV1().MutatingWebhookConfigurations().Create(ctx, webhook, metav1.CreateOptions{}) - require.NoError(t, err, "failed to add validating webhook configurations") + require.NoError(t, err, "failed to add mutating webhook configurations") cowboy := v1alpha1.Cowboy{ ObjectMeta: metav1.ObjectMeta{ diff --git a/tmc/cmd/deployment-coordinator/cmd/deployment_coordinator.go b/tmc/cmd/deployment-coordinator/cmd/deployment_coordinator.go index bf3436845d6..9f5d8f3c6b1 100644 --- a/tmc/cmd/deployment-coordinator/cmd/deployment_coordinator.go +++ b/tmc/cmd/deployment-coordinator/cmd/deployment_coordinator.go @@ -27,12 +27,13 @@ import ( genericapiserver "k8s.io/apiserver/pkg/server" "k8s.io/client-go/rest" "k8s.io/client-go/tools/clientcmd" - api "k8s.io/client-go/tools/clientcmd/api" + "k8s.io/client-go/tools/clientcmd/api" + logsapiv1 "k8s.io/component-base/logs/api/v1" "k8s.io/component-base/version" kcpfeatures "github.com/kcp-dev/kcp/pkg/features" "github.com/kcp-dev/kcp/pkg/reconciler/coordination/deployment" - options "github.com/kcp-dev/kcp/tmc/cmd/deployment-coordinator/options" + "github.com/kcp-dev/kcp/tmc/cmd/deployment-coordinator/options" ) const numThreads = 2 @@ -45,7 +46,7 @@ func NewDeploymentCoordinatorCommand() *cobra.Command { Use: "deployment-coordinator", Short: "Coordination controller for deployments. Spreads replicas across locations", RunE: func(cmd *cobra.Command, args []string) error { - if err := options.Logs.ValidateAndApply(kcpfeatures.DefaultFeatureGate); err != nil { + if err := logsapiv1.ValidateAndApply(options.Logs, kcpfeatures.DefaultFeatureGate); err != nil { return err } if err := options.Complete(); err != nil { diff --git a/tmc/cmd/deployment-coordinator/options/options.go b/tmc/cmd/deployment-coordinator/options/options.go index 7e62ad96e43..84c5fabba23 100644 --- a/tmc/cmd/deployment-coordinator/options/options.go +++ b/tmc/cmd/deployment-coordinator/options/options.go @@ -19,8 +19,8 @@ package options import ( "github.com/spf13/pflag" - "k8s.io/component-base/config" "k8s.io/component-base/logs" + logsapiv1 "k8s.io/component-base/logs/api/v1" ) type Options struct { @@ -32,11 +32,11 @@ type Options struct { func NewOptions() *Options { // Default to -v=2 - logs := logs.NewOptions() - logs.Config.Verbosity = config.VerbosityLevel(2) + logsOptions := logs.NewOptions() + logsOptions.Verbosity = logsapiv1.VerbosityLevel(2) return &Options{ - Logs: logs, + Logs: logsOptions, } } @@ -44,7 +44,7 @@ func (options *Options) AddFlags(fs *pflag.FlagSet) { fs.StringVar(&options.Kubeconfig, "kubeconfig", options.Kubeconfig, "Kubeconfig file.") fs.StringVar(&options.Context, "context", options.Context, "Context to use in the Kubeconfig file, instead of the current context.") fs.StringVar(&options.Server, "server", options.Server, "APIServer URL to use in the Kubeconfig file, instead of the one in the current context.") - options.Logs.AddFlags(fs) + logsapiv1.AddFlags(options.Logs, fs) } func (options *Options) Complete() error { diff --git a/tmc/pkg/virtual/syncer/transformations/transformer_test.go b/tmc/pkg/virtual/syncer/transformations/transformer_test.go index 0b2e68a0964..7d33d895865 100644 --- a/tmc/pkg/virtual/syncer/transformations/transformer_test.go +++ b/tmc/pkg/virtual/syncer/transformations/transformer_test.go @@ -197,6 +197,14 @@ func (c *resourceClient) Patch(ctx context.Context, name string, pt types.PatchT c.lclusterRecorder(c.lcluster.String()) return c.resourceInterface.Patch(ctx, name, pt, data, options, subresources...) } +func (c *resourceClient) Apply(ctx context.Context, name string, obj *unstructured.Unstructured, options metav1.ApplyOptions, subresources ...string) (*unstructured.Unstructured, error) { + c.lclusterRecorder(c.lcluster.String()) + return c.resourceInterface.Apply(ctx, name, obj, options, subresources...) +} +func (c *resourceClient) ApplyStatus(ctx context.Context, name string, obj *unstructured.Unstructured, options metav1.ApplyOptions) (*unstructured.Unstructured, error) { + c.lclusterRecorder(c.lcluster.String()) + return c.resourceInterface.ApplyStatus(ctx, name, obj, options) +} func gvr(group, version, resource string) schema.GroupVersionResource { return schema.GroupVersionResource{ diff --git a/tmc/pkg/virtual/syncer/upsyncer/storage_wrapper.go b/tmc/pkg/virtual/syncer/upsyncer/storage_wrapper.go index 50fcde36169..bf2c7b93981 100644 --- a/tmc/pkg/virtual/syncer/upsyncer/storage_wrapper.go +++ b/tmc/pkg/virtual/syncer/upsyncer/storage_wrapper.go @@ -22,6 +22,7 @@ import ( apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" @@ -48,7 +49,12 @@ func WithStaticLabelSelectorAndInWriteCallsCheck(labelSelector labels.Requiremen delegateUpdater := storage.UpdaterFunc storage.UpdaterFunc = func(ctx context.Context, name string, objInfo rest.UpdatedObjectInfo, createValidation rest.ValidateObjectFunc, updateValidation rest.ValidateObjectUpdateFunc, forceAllowCreate bool, options *metav1.UpdateOptions) (runtime.Object, bool, error) { - obj, err := objInfo.UpdatedObject(ctx, nil) + // Note, we have to pass in a non-nil value for oldObj. Ideally it would be the zero value of the + // appropriate type (e.g a built-in type such as corev1.Namespace, or Unstructured for a custom resource). + // Unfortunately we don't know what the appropriate type is here, so we're using Unstructured. The + // transformers called by UpdatedObject should only be acting on things that satisfy the ObjectMeta + // interface, so this should be ok. + obj, err := objInfo.UpdatedObject(ctx, &unstructured.Unstructured{}) if apierrors.IsNotFound(err) { return delegateUpdater.Update(ctx, name, objInfo, createValidation, updateValidation, forceAllowCreate, options) }