Skip to content

Commit

Permalink
React to k8s 1.26
Browse files Browse the repository at this point in the history
Signed-off-by: Andy Goldstein <[email protected]>
  • Loading branch information
ncdc committed Apr 21, 2023
1 parent 8f0845b commit 03478ae
Show file tree
Hide file tree
Showing 39 changed files with 639 additions and 285 deletions.
6 changes: 3 additions & 3 deletions cmd/kcp-core/kcpcore.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@ import (
"k8s.io/component-base/cli"
cliflag "k8s.io/component-base/cli/flag"
"k8s.io/component-base/cli/globalflag"
"k8s.io/component-base/config"
"k8s.io/component-base/logs"
logsapiv1 "k8s.io/component-base/logs/api/v1"
_ "k8s.io/component-base/logs/json/register"
"k8s.io/component-base/term"
"k8s.io/component-base/version"
Expand Down Expand Up @@ -79,7 +79,7 @@ func main() {
}

serverOptions := options.NewOptions(rootDir)
serverOptions.Server.GenericControlPlane.Logs.Config.Verbosity = config.VerbosityLevel(2)
serverOptions.Server.GenericControlPlane.Logs.Verbosity = logsapiv1.VerbosityLevel(2)

startCmd := &cobra.Command{
Use: "start",
Expand All @@ -101,7 +101,7 @@ func main() {
},
RunE: func(cmd *cobra.Command, args []string) error {
// run as early as possible to avoid races later when some components (e.g. grpc) start early using klog
if err := serverOptions.Server.GenericControlPlane.Logs.ValidateAndApply(kcpfeatures.DefaultFeatureGate); err != nil {
if err := logsapiv1.ValidateAndApply(serverOptions.Server.GenericControlPlane.Logs, kcpfeatures.DefaultFeatureGate); err != nil {
return err
}

Expand Down
3 changes: 2 additions & 1 deletion cmd/kcp-front-proxy/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
genericapiserver "k8s.io/apiserver/pkg/server"
"k8s.io/component-base/cli"
utilflag "k8s.io/component-base/cli/flag"
logsapiv1 "k8s.io/component-base/logs/api/v1"
_ "k8s.io/component-base/logs/json/register"
"k8s.io/component-base/version"

Expand Down Expand Up @@ -62,7 +63,7 @@ forwards Common Name and Organizations to backend API servers in HTTP headers.
The proxy terminates TLS and communicates with API servers via mTLS. Traffic is
routed based on paths.`,
RunE: func(cmd *cobra.Command, args []string) error {
if err := options.Logs.ValidateAndApply(kcpfeatures.DefaultFeatureGate); err != nil {
if err := logsapiv1.ValidateAndApply(options.Logs, kcpfeatures.DefaultFeatureGate); err != nil {
return err
}
if err := options.Complete(); err != nil {
Expand Down
6 changes: 3 additions & 3 deletions cmd/kcp-front-proxy/options/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ package options
import (
"github.com/spf13/pflag"

"k8s.io/component-base/config"
"k8s.io/component-base/logs"
logsapiv1 "k8s.io/component-base/logs/api/v1"

proxyoptions "github.com/kcp-dev/kcp/pkg/proxy/options"
)
Expand All @@ -37,13 +37,13 @@ func NewOptions() *Options {
}

// Default to -v=2
o.Logs.Config.Verbosity = config.VerbosityLevel(2)
o.Logs.Verbosity = logsapiv1.VerbosityLevel(2)
return o
}

func (o *Options) AddFlags(fs *pflag.FlagSet) {
o.Proxy.AddFlags(fs)
o.Logs.AddFlags(fs)
logsapiv1.AddFlags(o.Logs, fs)
}

func (o *Options) Complete() error {
Expand Down
6 changes: 3 additions & 3 deletions cmd/kcp/kcp.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@ import (
"k8s.io/component-base/cli"
cliflag "k8s.io/component-base/cli/flag"
"k8s.io/component-base/cli/globalflag"
"k8s.io/component-base/config"
"k8s.io/component-base/logs"
logsapiv1 "k8s.io/component-base/logs/api/v1"
_ "k8s.io/component-base/logs/json/register"
"k8s.io/component-base/term"
"k8s.io/component-base/version"
Expand Down Expand Up @@ -83,7 +83,7 @@ func main() {
}

serverOptions := options.NewOptions(rootDir)
serverOptions.Server.Core.GenericControlPlane.Logs.Config.Verbosity = config.VerbosityLevel(2)
serverOptions.Server.Core.GenericControlPlane.Logs.Verbosity = logsapiv1.VerbosityLevel(2)

startCmd := &cobra.Command{
Use: "start",
Expand All @@ -105,7 +105,7 @@ func main() {
},
RunE: func(cmd *cobra.Command, args []string) error {
// run as early as possible to avoid races later when some components (e.g. grpc) start early using klog
if err := serverOptions.Server.Core.GenericControlPlane.Logs.ValidateAndApply(kcpfeatures.DefaultFeatureGate); err != nil {
if err := logsapiv1.ValidateAndApply(serverOptions.Server.Core.GenericControlPlane.Logs, kcpfeatures.DefaultFeatureGate); err != nil {
return err
}

Expand Down
3 changes: 2 additions & 1 deletion cmd/syncer/cmd/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"k8s.io/apimachinery/pkg/util/sets"
genericapiserver "k8s.io/apiserver/pkg/server"
"k8s.io/client-go/tools/clientcmd"
logsapiv1 "k8s.io/component-base/logs/api/v1"
"k8s.io/component-base/version"
"k8s.io/klog/v2"

Expand All @@ -47,7 +48,7 @@ func NewSyncerCommand() *cobra.Command {
Use: "syncer",
Short: "Synchronizes resources in `kcp` assigned to the clusters",
RunE: func(cmd *cobra.Command, args []string) error {
if err := options.Logs.ValidateAndApply(kcpfeatures.DefaultFeatureGate); err != nil {
if err := logsapiv1.ValidateAndApply(options.Logs, kcpfeatures.DefaultFeatureGate); err != nil {
return err
}
if err := options.Complete(); err != nil {
Expand Down
10 changes: 5 additions & 5 deletions cmd/syncer/options/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ import (

"github.com/spf13/pflag"

"k8s.io/component-base/config"
"k8s.io/component-base/logs"
logsapiv1 "k8s.io/component-base/logs/api/v1"

kcpfeatures "github.com/kcp-dev/kcp/pkg/features"
workloadv1alpha1 "github.com/kcp-dev/kcp/sdk/apis/workload/v1alpha1"
Expand All @@ -51,14 +51,14 @@ type Options struct {

func NewOptions() *Options {
// Default to -v=2
logs := logs.NewOptions()
logs.Config.Verbosity = config.VerbosityLevel(2)
logsOptions := logs.NewOptions()
logsOptions.Verbosity = logsapiv1.VerbosityLevel(2)

return &Options{
QPS: 30,
Burst: 20,
SyncedResourceTypes: []string{},
Logs: logs,
Logs: logsOptions,
APIImportPollInterval: 1 * time.Minute,
DownstreamNamespaceCleanDelay: 30 * time.Second,
}
Expand All @@ -83,7 +83,7 @@ func (options *Options) AddFlags(fs *pflag.FlagSet) {
fs.StringVar(&options.DNSImage, "dns-image", options.DNSImage, "kcp DNS server image.")
fs.DurationVar(&options.DownstreamNamespaceCleanDelay, "downstream-namespace-clean-delay", options.DownstreamNamespaceCleanDelay, "Time to wait before deleting a downstream namespace, defaults to 30s.")

options.Logs.AddFlags(fs)
logsapiv1.AddFlags(options.Logs, fs)
}

func (options *Options) Complete() error {
Expand Down
6 changes: 3 additions & 3 deletions cmd/virtual-workspaces/command/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ import (
genericapiserver "k8s.io/apiserver/pkg/server"
"k8s.io/client-go/pkg/version"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/component-base/config"
logsapiv1 "k8s.io/component-base/logs/api/v1"
"k8s.io/klog/v2"

"github.com/kcp-dev/kcp/cmd/virtual-workspaces/options"
Expand All @@ -52,15 +52,15 @@ func NewCommand(ctx context.Context, errout io.Writer) *cobra.Command {
opts := options.NewOptions()

// Default to -v=2
opts.Logs.Config.Verbosity = config.VerbosityLevel(2)
opts.Logs.Verbosity = logsapiv1.VerbosityLevel(2)

cmd := &cobra.Command{
Use: "workspaces",
Short: "Launch virtual workspace apiservers",
Long: "Start the root virtual workspace apiserver to enable virtual workspace management.",

RunE: func(c *cobra.Command, args []string) error {
if err := opts.Logs.ValidateAndApply(kcpfeatures.DefaultFeatureGate); err != nil {
if err := logsapiv1.ValidateAndApply(opts.Logs, kcpfeatures.DefaultFeatureGate); err != nil {
return err
}
if err := opts.Validate(); err != nil {
Expand Down
7 changes: 4 additions & 3 deletions cmd/virtual-workspaces/options/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
utilerrors "k8s.io/apimachinery/pkg/util/errors"
genericapiserveroptions "k8s.io/apiserver/pkg/server/options"
"k8s.io/component-base/logs"
logsapiv1 "k8s.io/component-base/logs/api/v1"

cacheoptions "github.com/kcp-dev/kcp/pkg/cache/client/options"
corevwoptions "github.com/kcp-dev/kcp/pkg/virtual/options"
Expand All @@ -53,7 +54,7 @@ type Options struct {
Authorization corevwoptions.Authorization
Audit genericapiserveroptions.AuditOptions

Logs logs.Options
Logs *logs.Options

CoreVirtualWorkspaces corevwoptions.Options
TmcVirtualWorkspaces tmcvwoptions.Options
Expand All @@ -73,7 +74,7 @@ func NewOptions() *Options {
Authentication: *genericapiserveroptions.NewDelegatingAuthenticationOptions(),
Authorization: *corevwoptions.NewAuthorization(),
Audit: *genericapiserveroptions.NewAuditOptions(),
Logs: *logs.NewOptions(),
Logs: logs.NewOptions(),

CoreVirtualWorkspaces: *corevwoptions.NewOptions(),
TmcVirtualWorkspaces: *tmcvwoptions.NewOptions(),
Expand All @@ -92,7 +93,7 @@ func (o *Options) AddFlags(flags *pflag.FlagSet) {
o.SecureServing.AddFlags(flags)
o.Authentication.AddFlags(flags)
o.Audit.AddFlags(flags)
o.Logs.AddFlags(flags)
logsapiv1.AddFlags(o.Logs, flags)
o.CoreVirtualWorkspaces.AddFlags(flags)
o.TmcVirtualWorkspaces.AddFlags(flags)

Expand Down
9 changes: 5 additions & 4 deletions pkg/admission/kubequota/kubequota_admission.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ type KubeResourceQuota struct {
lock sync.RWMutex
delegates map[logicalcluster.Name]*stoppableQuotaAdmission

workspaceDeletionMonitorStarter sync.Once
logicalClusterDeletionMonitorStarter sync.Once
}

// ValidateInitialization validates all the expected fields are set.
Expand Down Expand Up @@ -143,8 +143,8 @@ func (k *KubeResourceQuota) Validate(ctx context.Context, a admission.Attributes
}
}

k.workspaceDeletionMonitorStarter.Do(func() {
m := newLogicalClusterDeletionMonitor(k.logicalClusterInformer, k.stopQuotaAdmissionForCluster)
k.logicalClusterDeletionMonitorStarter.Do(func() {
m := NewLogicalClusterDeletionMonitor("kubequota-logicalcluster-deletion-monitor", k.logicalClusterInformer, k.stopQuotaAdmissionForCluster)
go m.Start(k.serverDone)
})

Expand Down Expand Up @@ -191,7 +191,7 @@ func (k *KubeResourceQuota) getOrCreateDelegate(clusterName logicalcluster.Name)
}()

const evaluatorWorkersPerWorkspace = 5
quotaAdmission, err := resourcequota.NewResourceQuota(k.userSuppliedConfiguration, evaluatorWorkersPerWorkspace, ctx.Done())
quotaAdmission, err := resourcequota.NewResourceQuota(k.userSuppliedConfiguration, evaluatorWorkersPerWorkspace)
if err != nil {
cancel()
return nil, err
Expand All @@ -202,6 +202,7 @@ func (k *KubeResourceQuota) getOrCreateDelegate(clusterName logicalcluster.Name)
stop: cancel,
}

delegate.SetDrainedNotification(ctx.Done())
delegate.SetResourceQuotaLister(k.scopingResourceQuotaInformer.Cluster(clusterName).Lister())
delegate.SetExternalKubeClientSet(k.kubeClusterClient.Cluster(clusterName.Path()))
delegate.SetQuotaConfiguration(k.quotaConfiguration)
Expand Down
36 changes: 18 additions & 18 deletions pkg/admission/kubequota/kubequota_clusterworkspace_monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,25 +33,25 @@ import (
corev1alpha1informers "github.com/kcp-dev/kcp/sdk/client/informers/externalversions/core/v1alpha1"
)

const logicalClusterDeletionMonitorControllerName = "kcp-kubequota-logical-cluster-deletion-monitor"

// logicalClusterDeletionMonitor monitors LogicalClusters and terminates QuotaAdmission for a logical cluster
// when its corresponding workspace is deleted.
type logicalClusterDeletionMonitor struct {
// LogicalClusterDeletionMonitor monitors LogicalClusters and invokes stopFunc for each deleted LogicalCluster.
type LogicalClusterDeletionMonitor struct {
name string
queue workqueue.RateLimitingInterface
stopFunc func(name logicalcluster.Name)
}

func newLogicalClusterDeletionMonitor(
workspaceInformer corev1alpha1informers.LogicalClusterClusterInformer,
func NewLogicalClusterDeletionMonitor(
name string,
logicalClusterInformer corev1alpha1informers.LogicalClusterClusterInformer,
stopFunc func(logicalcluster.Name),
) *logicalClusterDeletionMonitor {
m := &logicalClusterDeletionMonitor{
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), logicalClusterDeletionMonitorControllerName),
) *LogicalClusterDeletionMonitor {
m := &LogicalClusterDeletionMonitor{
name: name,
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), name),
stopFunc: stopFunc,
}

workspaceInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
logicalClusterInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
DeleteFunc: func(obj interface{}) {
m.enqueue(obj)
},
Expand All @@ -60,7 +60,7 @@ func newLogicalClusterDeletionMonitor(
return m
}

func (m *logicalClusterDeletionMonitor) enqueue(obj interface{}) {
func (m *LogicalClusterDeletionMonitor) enqueue(obj interface{}) {
key, err := kcpcache.DeletionHandlingMetaClusterNamespaceKeyFunc(obj)
if err != nil {
runtime.HandleError(err)
Expand All @@ -70,11 +70,11 @@ func (m *logicalClusterDeletionMonitor) enqueue(obj interface{}) {
m.queue.Add(key)
}

func (m *logicalClusterDeletionMonitor) Start(stop <-chan struct{}) {
func (m *LogicalClusterDeletionMonitor) Start(stop <-chan struct{}) {
defer runtime.HandleCrash()
defer m.queue.ShutDown()

logger := logging.WithReconciler(klog.Background(), logicalClusterDeletionMonitorControllerName)
logger := logging.WithReconciler(klog.Background(), m.name)
logger.Info("Starting controller")
defer logger.Info("Shutting down controller")

Expand All @@ -83,12 +83,12 @@ func (m *logicalClusterDeletionMonitor) Start(stop <-chan struct{}) {
<-stop
}

func (m *logicalClusterDeletionMonitor) startWorker() {
func (m *LogicalClusterDeletionMonitor) startWorker() {
for m.processNextWorkItem() {
}
}

func (m *logicalClusterDeletionMonitor) processNextWorkItem() bool {
func (m *LogicalClusterDeletionMonitor) processNextWorkItem() bool {
// Wait until there is a new item in the working queue
k, quit := m.queue.Get()
if quit {
Expand All @@ -101,7 +101,7 @@ func (m *logicalClusterDeletionMonitor) processNextWorkItem() bool {
defer m.queue.Done(key)

if err := m.process(key); err != nil {
runtime.HandleError(fmt.Errorf("logicalClusterDeletionMonitor failed to sync %q, err: %w", key, err))
runtime.HandleError(fmt.Errorf("LogicalClusterDeletionMonitor failed to sync %q, err: %w", key, err))

m.queue.AddRateLimited(key)

Expand All @@ -114,7 +114,7 @@ func (m *logicalClusterDeletionMonitor) processNextWorkItem() bool {
return true
}

func (m *logicalClusterDeletionMonitor) process(key string) error {
func (m *LogicalClusterDeletionMonitor) process(key string) error {
clusterName, _, _, err := kcpcache.SplitMetaClusterNamespaceKey(key)
if err != nil {
runtime.HandleError(err)
Expand Down
Loading

0 comments on commit 03478ae

Please sign in to comment.