From 4687d88b2b1d730a2c1c50f22b1c9d209258354b Mon Sep 17 00:00:00 2001 From: Jean-Philippe Evrard Date: Mon, 11 Nov 2024 19:35:35 +0100 Subject: [PATCH] Extract operations into internal pkg This reduces the global variables, and regroups all the operations in a single place. This will allow further refactor to represent all the k8s operations kured needs on a single node. Signed-off-by: Jean-Philippe Evrard --- cmd/kured/main.go | 214 +------------------ internal/k8soperations/annotations.go | 86 ++++++++ internal/k8soperations/drain.go | 124 +++++++++++ internal/k8soperations/logging.go | 19 ++ internal/{taints => k8soperations}/taints.go | 6 +- 5 files changed, 241 insertions(+), 208 deletions(-) create mode 100644 internal/k8soperations/annotations.go create mode 100644 internal/k8soperations/drain.go create mode 100644 internal/k8soperations/logging.go rename internal/{taints => k8soperations}/taints.go (96%) diff --git a/cmd/kured/main.go b/cmd/kured/main.go index 4846c6f76..db3237f5d 100644 --- a/cmd/kured/main.go +++ b/cmd/kured/main.go @@ -2,11 +2,10 @@ package main import ( "context" - "encoding/json" "fmt" "github.com/kubereboot/kured/internal/daemonsetlock" + "github.com/kubereboot/kured/internal/k8soperations" "github.com/kubereboot/kured/internal/notifications" - "github.com/kubereboot/kured/internal/taints" "github.com/kubereboot/kured/internal/timewindow" "github.com/kubereboot/kured/pkg/blockers" "github.com/kubereboot/kured/pkg/checkers" @@ -17,10 +16,8 @@ import ( flag "github.com/spf13/pflag" v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/kubernetes" "k8s.io/client-go/rest" - kubectldrain "k8s.io/kubectl/pkg/drain" "log" "log/slog" "net/http" @@ -92,8 +89,7 @@ var ( const ( // KuredNodeLockAnnotation is the canonical string value for the kured node-lock annotation - KuredNodeLockAnnotation string = "kured.dev/kured-node-lock" - KuredNodeWasUnschedulableBeforeDrainAnnotation string = "kured.dev/node-unschedulable-before-drain" + KuredNodeLockAnnotation string = "kured.dev/kured-node-lock" // KuredRebootInProgressAnnotation is the canonical string value for the kured reboot-in-progress annotation KuredRebootInProgressAnnotation string = "kured.dev/kured-reboot-in-progress" // KuredMostRecentRebootNeededAnnotation is the canonical string value for the kured most-recent-reboot-needed annotation @@ -339,202 +335,9 @@ func LoadFromEnv() { } -type slogWriter struct { - stream string - message string -} - -func (sw slogWriter) Write(p []byte) (n int, err error) { - output := string(p) - switch sw.stream { - case "stdout": - slog.Info(sw.message, "node", nodeID, "stdout", output) - case "stderr": - slog.Info(sw.message, "node", nodeID, "stderr", output) - } - return len(p), nil -} - -func drain(client *kubernetes.Clientset, node *v1.Node, notifier notifications.Notifier) error { - nodename := node.GetName() - - if preRebootNodeLabels != nil { - err := updateNodeLabels(client, node, preRebootNodeLabels) - if err != nil { - return fmt.Errorf("stopping drain due to problem with node labels %v", err) - } - } - - if drainDelay > 0 { - slog.Debug("Delaying drain", "delay", drainDelay, "node", nodename) - time.Sleep(drainDelay) - } - - slog.Info("Starting drain", "node", nodename) - - notifier.Send(fmt.Sprintf(messageTemplateDrain, nodename), "Starting drain") - - kubectlStdOutLogger := &slogWriter{message: "draining: results", stream: "stdout"} - kubectlStdErrLogger := &slogWriter{message: "draining: results", stream: "stderr"} - - drainer := &kubectldrain.Helper{ - Client: client, - Ctx: context.Background(), - GracePeriodSeconds: drainGracePeriod, - PodSelector: drainPodSelector, - SkipWaitForDeleteTimeoutSeconds: skipWaitForDeleteTimeoutSeconds, - Force: true, - DeleteEmptyDirData: true, - IgnoreAllDaemonSets: true, - ErrOut: kubectlStdErrLogger, - Out: kubectlStdOutLogger, - Timeout: drainTimeout, - } - - // Add previous state of the node Spec.Unschedulable into an annotation - // If an annotation was present, it means that either the cordon or drain failed, - // hence it does not need to reapply: It might override what the user has set - // (for example if the cordon succeeded but the drain failed) - if _, ok := node.Annotations[KuredNodeWasUnschedulableBeforeDrainAnnotation]; !ok { - // Store State of the node before cordon changes it - annotations := map[string]string{KuredNodeWasUnschedulableBeforeDrainAnnotation: strconv.FormatBool(node.Spec.Unschedulable)} - // & annotate this node with a timestamp so that other node maintenance tools know how long it's been since this node has been marked for reboot - err := addNodeAnnotations(client, nodeID, annotations) - if err != nil { - return fmt.Errorf("error saving state of the node %s, %v", nodename, err) - } - } - - if err := kubectldrain.RunCordonOrUncordon(drainer, node, true); err != nil { - return fmt.Errorf("error cordonning node %s, %v", nodename, err) - } - - if err := kubectldrain.RunNodeDrain(drainer, nodename); err != nil { - return fmt.Errorf("error draining node %s: %v", nodename, err) - } - return nil -} - -func uncordon(client *kubernetes.Clientset, node *v1.Node, notifier notifications.Notifier) error { - // Revert cordon spec change with the help of node annotation - annotationContent, ok := node.Annotations[KuredNodeWasUnschedulableBeforeDrainAnnotation] - if !ok { - // If no node annotations, uncordon will not act. - // Do not uncordon if you do not know previous state, it could bring nodes under maintenance online! - return nil - } - - wasUnschedulable, err := strconv.ParseBool(annotationContent) - if err != nil { - return fmt.Errorf("annotation was edited and cannot be converted back to bool %v, cannot uncordon (unrecoverable)", err) - } - - if wasUnschedulable { - // Just delete the annotation, keep Cordonned - err := deleteNodeAnnotation(client, nodeID, KuredNodeWasUnschedulableBeforeDrainAnnotation) - if err != nil { - return fmt.Errorf("error removing the WasUnschedulable annotation, keeping the node stuck in cordonned state forever %v", err) - } - return nil - } - - nodeName := node.GetName() - kubectlStdOutLogger := &slogWriter{message: "uncordon: results", stream: "stdout"} - kubectlStdErrLogger := &slogWriter{message: "uncordon: results", stream: "stderr"} - - drainer := &kubectldrain.Helper{ - Client: client, - ErrOut: kubectlStdErrLogger, - Out: kubectlStdOutLogger, - Ctx: context.Background(), - } - if err := kubectldrain.RunCordonOrUncordon(drainer, node, false); err != nil { - return fmt.Errorf("error uncordonning node %s: %v", nodeName, err) - } else if postRebootNodeLabels != nil { - err := updateNodeLabels(client, node, postRebootNodeLabels) - return fmt.Errorf("error updating node (%s) labels, needs manual intervention %v", nodeName, err) - } - - err = deleteNodeAnnotation(client, nodeID, KuredNodeWasUnschedulableBeforeDrainAnnotation) - if err != nil { - return fmt.Errorf("error removing the WasUnschedulable annotation, keeping the node stuck in current state forever %v", err) - } - notifier.Send(fmt.Sprintf(messageTemplateUncordon, nodeID), "Node uncordonned successfully") - return nil -} - -func addNodeAnnotations(client *kubernetes.Clientset, nodeID string, annotations map[string]string) error { - node, err := client.CoreV1().Nodes().Get(context.TODO(), nodeID, metav1.GetOptions{}) - if err != nil { - return fmt.Errorf("error retrieving node object via k8s API: %v", err) - } - for k, v := range annotations { - node.Annotations[k] = v - slog.Debug(fmt.Sprintf("adding node annotation: %s=%s", k, v), "node", node.GetName()) - } - - bytes, err := json.Marshal(node) - if err != nil { - return fmt.Errorf("error marshalling node object into JSON: %v", err) - } - - _, err = client.CoreV1().Nodes().Patch(context.TODO(), node.GetName(), types.StrategicMergePatchType, bytes, metav1.PatchOptions{}) - if err != nil { - var annotationsErr string - for k, v := range annotations { - annotationsErr += fmt.Sprintf("%s=%s ", k, v) - } - return fmt.Errorf("error adding node annotations %s via k8s API: %v", annotationsErr, err) - } - return nil -} - -func deleteNodeAnnotation(client *kubernetes.Clientset, nodeID, key string) error { - // JSON Patch takes as path input a JSON Pointer, defined in RFC6901 - // So we replace all instances of "/" with "~1" as per: - // https://tools.ietf.org/html/rfc6901#section-3 - patch := []byte(fmt.Sprintf("[{\"op\":\"remove\",\"path\":\"/metadata/annotations/%s\"}]", strings.ReplaceAll(key, "/", "~1"))) - _, err := client.CoreV1().Nodes().Patch(context.TODO(), nodeID, types.JSONPatchType, patch, metav1.PatchOptions{}) - if err != nil { - return fmt.Errorf("error deleting node annotation %s via k8s API: %v", key, err) - } - return nil -} - -func updateNodeLabels(client *kubernetes.Clientset, node *v1.Node, labels []string) error { - labelsMap := make(map[string]string) - for _, label := range labels { - k := strings.Split(label, "=")[0] - v := strings.Split(label, "=")[1] - labelsMap[k] = v - slog.Debug(fmt.Sprintf("Updating node %s label: %s=%s", node.GetName(), k, v), "node", node.GetName()) - } - - bytes, err := json.Marshal(map[string]interface{}{ - "metadata": map[string]interface{}{ - "labels": labelsMap, - }, - }) - if err != nil { - return fmt.Errorf("error marshalling node object into JSON: %v", err) - } - - _, err = client.CoreV1().Nodes().Patch(context.TODO(), node.GetName(), types.StrategicMergePatchType, bytes, metav1.PatchOptions{}) - if err != nil { - var labelsErr string - for _, label := range labels { - k := strings.Split(label, "=")[0] - v := strings.Split(label, "=")[1] - labelsErr += fmt.Sprintf("%s=%s ", k, v) - } - return fmt.Errorf("error updating node labels %s via k8s API: %v", labelsErr, err) - } - return nil -} - func rebootAsRequired(nodeID string, rebooter reboot.Rebooter, checker checkers.Checker, blockCheckers []blockers.RebootBlocker, window *timewindow.TimeWindow, lock daemonsetlock.Lock, client *kubernetes.Clientset, period time.Duration, notifier notifications.Notifier) { - preferNoScheduleTaint := taints.New(client, nodeID, preferNoScheduleTaintName, v1.TaintEffectPreferNoSchedule) + preferNoScheduleTaint := k8soperations.NewTaint(client, nodeID, preferNoScheduleTaintName, v1.TaintEffectPreferNoSchedule) // No reason to delay the first ticks. // On top of it, we used to leak a goroutine, which was never garbage collected. @@ -559,7 +362,7 @@ func rebootAsRequired(nodeID string, rebooter reboot.Rebooter, checker checkers. continue } - err = uncordon(client, node, notifier) + err = k8soperations.Uncordon(client, node, notifier, postRebootNodeLabels, messageTemplateUncordon) if err != nil { // Might be a transient API issue or a real problem. Inform the admin slog.Info("unable to uncordon needs investigation", "node", nodeID, "error", err) @@ -579,7 +382,7 @@ func rebootAsRequired(nodeID string, rebooter reboot.Rebooter, checker checkers. if _, ok := node.Annotations[KuredRebootInProgressAnnotation]; ok { // Who reads this? I hope nobody bothers outside real debug cases slog.Debug(fmt.Sprintf("Deleting node %s annotation %s", nodeID, KuredRebootInProgressAnnotation), "node", nodeID) - err := deleteNodeAnnotation(client, nodeID, KuredRebootInProgressAnnotation) + err := k8soperations.DeleteNodeAnnotation(client, nodeID, KuredRebootInProgressAnnotation) if err != nil { continue } @@ -627,7 +430,7 @@ func rebootAsRequired(nodeID string, rebooter reboot.Rebooter, checker checkers. annotations := map[string]string{KuredRebootInProgressAnnotation: timeNowString} // & annotate this node with a timestamp so that other node maintenance tools know how long it's been since this node has been marked for reboot annotations[KuredMostRecentRebootNeededAnnotation] = timeNowString - err := addNodeAnnotations(client, nodeID, annotations) + err := k8soperations.AddNodeAnnotations(client, nodeID, annotations) if err != nil { continue } @@ -664,7 +467,8 @@ func rebootAsRequired(nodeID string, rebooter reboot.Rebooter, checker checkers. // } //} - err = drain(client, node, notifier) + err = k8soperations.Drain(client, node, preRebootNodeLabels, drainTimeout, drainGracePeriod, skipWaitForDeleteTimeoutSeconds, drainPodSelector, drainDelay, messageTemplateDrain, notifier) + if err != nil { if !forceReboot { slog.Debug(fmt.Sprintf("Unable to cordon or drain %s: %v, will force-reboot by releasing lock and uncordon until next success", node.GetName(), err), "node", nodeID, "error", err) @@ -677,7 +481,7 @@ func rebootAsRequired(nodeID string, rebooter reboot.Rebooter, checker checkers. // If shown, it is helping understand the uncordonning. If the admin seems the node as cordonned // with this, it needs to take action (for example if the node was previously cordonned!) slog.Info("Performing a best-effort uncordon after failed cordon and drain", "node", nodeID) - err := uncordon(client, node, notifier) + err := k8soperations.Uncordon(client, node, notifier, postRebootNodeLabels, messageTemplateUncordon) if err != nil { slog.Info("Uncordon failed", "error", err) } diff --git a/internal/k8soperations/annotations.go b/internal/k8soperations/annotations.go new file mode 100644 index 000000000..492a901ff --- /dev/null +++ b/internal/k8soperations/annotations.go @@ -0,0 +1,86 @@ +package k8soperations + +import ( + "context" + "encoding/json" + "fmt" + "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" + "k8s.io/client-go/kubernetes" + "log/slog" + "strings" +) + +const ( + KuredNodeWasUnschedulableBeforeDrainAnnotation string = "kured.dev/node-unschedulable-before-drain" +) + +func AddNodeAnnotations(client *kubernetes.Clientset, nodeID string, annotations map[string]string) error { + node, err := client.CoreV1().Nodes().Get(context.TODO(), nodeID, metav1.GetOptions{}) + if err != nil { + return fmt.Errorf("error retrieving node object via k8s API: %v", err) + } + for k, v := range annotations { + node.Annotations[k] = v + slog.Debug(fmt.Sprintf("adding node annotation: %s=%s", k, v), "node", node.GetName()) + } + + bytes, err := json.Marshal(node) + if err != nil { + return fmt.Errorf("error marshalling node object into JSON: %v", err) + } + + _, err = client.CoreV1().Nodes().Patch(context.TODO(), node.GetName(), types.StrategicMergePatchType, bytes, metav1.PatchOptions{}) + if err != nil { + var annotationsErr string + for k, v := range annotations { + annotationsErr += fmt.Sprintf("%s=%s ", k, v) + } + return fmt.Errorf("error adding node annotations %s via k8s API: %v", annotationsErr, err) + } + return nil +} + +func DeleteNodeAnnotation(client *kubernetes.Clientset, nodeID, key string) error { + // JSON Patch takes as path input a JSON Pointer, defined in RFC6901 + // So we replace all instances of "/" with "~1" as per: + // https://tools.ietf.org/html/rfc6901#section-3 + patch := []byte(fmt.Sprintf("[{\"op\":\"remove\",\"path\":\"/metadata/annotations/%s\"}]", strings.ReplaceAll(key, "/", "~1"))) + _, err := client.CoreV1().Nodes().Patch(context.TODO(), nodeID, types.JSONPatchType, patch, metav1.PatchOptions{}) + if err != nil { + return fmt.Errorf("error deleting node annotation %s via k8s API: %v", key, err) + } + return nil +} + +func updateNodeLabels(client *kubernetes.Clientset, node *v1.Node, labels []string) error { + labelsMap := make(map[string]string) + for _, label := range labels { + k := strings.Split(label, "=")[0] + v := strings.Split(label, "=")[1] + labelsMap[k] = v + slog.Debug(fmt.Sprintf("Updating node %s label: %s=%s", node.GetName(), k, v), "node", node.GetName()) + } + + bytes, err := json.Marshal(map[string]interface{}{ + "metadata": map[string]interface{}{ + "labels": labelsMap, + }, + }) + if err != nil { + return fmt.Errorf("error marshalling node object into JSON: %v", err) + } + + _, err = client.CoreV1().Nodes().Patch(context.TODO(), node.GetName(), types.StrategicMergePatchType, bytes, metav1.PatchOptions{}) + if err != nil { + var labelsErr string + for _, label := range labels { + k := strings.Split(label, "=")[0] + v := strings.Split(label, "=")[1] + labelsErr += fmt.Sprintf("%s=%s ", k, v) + } + return fmt.Errorf("error updating node labels %s via k8s API: %v", labelsErr, err) + } + return nil +} diff --git a/internal/k8soperations/drain.go b/internal/k8soperations/drain.go new file mode 100644 index 000000000..506629f4a --- /dev/null +++ b/internal/k8soperations/drain.go @@ -0,0 +1,124 @@ +package k8soperations + +import ( + "context" + "fmt" + "github.com/kubereboot/kured/internal/notifications" + "k8s.io/api/core/v1" + "k8s.io/client-go/kubernetes" + kubeDrain "k8s.io/kubectl/pkg/drain" + "log/slog" + "strconv" + "time" +) + +// Drain drains the node in a kured fashion, respecting delays, notifications, and applying labels/annotations. +func Drain(client *kubernetes.Clientset, node *v1.Node, preRebootNodeLabels []string, drainTimeout time.Duration, drainGracePeriod int, skipWaitForDeleteTimeoutSeconds int, drainPodSelector string, drainDelay time.Duration, messageTemplateDrain string, notifier notifications.Notifier) error { + nodeName := node.GetName() + + if preRebootNodeLabels != nil { + err := updateNodeLabels(client, node, preRebootNodeLabels) + if err != nil { + return fmt.Errorf("stopping drain due to problem with node labels %v", err) + } + } + + if drainDelay > 0 { + slog.Debug("Delaying drain", "delay", drainDelay, "node", nodeName) + time.Sleep(drainDelay) + } + + slog.Info("Starting drain", "node", nodeName) + + notifier.Send(fmt.Sprintf(messageTemplateDrain, nodeName), "Starting drain") + + kubectlStdOutLogger := &slogWriter{message: "draining: results", stream: "stdout"} + kubectlStdErrLogger := &slogWriter{message: "draining: results", stream: "stderr"} + + drainer := &kubeDrain.Helper{ + Client: client, + Ctx: context.Background(), + GracePeriodSeconds: drainGracePeriod, + PodSelector: drainPodSelector, + SkipWaitForDeleteTimeoutSeconds: skipWaitForDeleteTimeoutSeconds, + Force: true, + DeleteEmptyDirData: true, + IgnoreAllDaemonSets: true, + ErrOut: kubectlStdErrLogger, + Out: kubectlStdOutLogger, + Timeout: drainTimeout, + } + + // Add previous state of the node Spec.Unschedulable into an annotation + // If an annotation was present, it means that either the cordon or drain failed, + // hence it does not need to reapply: It might override what the user has set + // (for example if the cordon succeeded but the drain failed) + if _, ok := node.Annotations[KuredNodeWasUnschedulableBeforeDrainAnnotation]; !ok { + // Store State of the node before cordon changes it + annotations := map[string]string{KuredNodeWasUnschedulableBeforeDrainAnnotation: strconv.FormatBool(node.Spec.Unschedulable)} + // & annotate this node with a timestamp so that other node maintenance tools know how long it's been since this node has been marked for reboot + err := AddNodeAnnotations(client, nodeName, annotations) + if err != nil { + return fmt.Errorf("error saving state of the node %s, %v", nodeName, err) + } + } + + if err := kubeDrain.RunCordonOrUncordon(drainer, node, true); err != nil { + return fmt.Errorf("error cordonning node %s, %v", nodeName, err) + } + + if err := kubeDrain.RunNodeDrain(drainer, nodeName); err != nil { + return fmt.Errorf("error draining node %s: %v", nodeName, err) + } + return nil +} + +// Uncordon changes the `spec.Unschedulable` field of a node, applying kured labels and annotations. +// Is a noop on missing annotation or on a node in maintenance before kured action +func Uncordon(client *kubernetes.Clientset, node *v1.Node, notifier notifications.Notifier, postRebootNodeLabels []string, messageTemplateUncordon string) error { + nodeName := node.GetName() + // Revert cordon spec change with the help of node annotation + annotationContent, ok := node.Annotations[KuredNodeWasUnschedulableBeforeDrainAnnotation] + if !ok { + // If no node annotations, uncordon will not act. + // Do not uncordon if you do not know previous state, it could bring nodes under maintenance online! + return nil + } + + wasUnschedulable, err := strconv.ParseBool(annotationContent) + if err != nil { + return fmt.Errorf("annotation was edited and cannot be converted back to bool %v, cannot uncordon (unrecoverable)", err) + } + + if wasUnschedulable { + // Just delete the annotation, keep Cordonned + err := DeleteNodeAnnotation(client, nodeName, KuredNodeWasUnschedulableBeforeDrainAnnotation) + if err != nil { + return fmt.Errorf("error removing the WasUnschedulable annotation, keeping the node stuck in cordonned state forever %v", err) + } + return nil + } + + kubectlStdOutLogger := &slogWriter{message: "uncordon: results", stream: "stdout"} + kubectlStdErrLogger := &slogWriter{message: "uncordon: results", stream: "stderr"} + + drainer := &kubeDrain.Helper{ + Client: client, + ErrOut: kubectlStdErrLogger, + Out: kubectlStdOutLogger, + Ctx: context.Background(), + } + if err := kubeDrain.RunCordonOrUncordon(drainer, node, false); err != nil { + return fmt.Errorf("error uncordonning node %s: %v", nodeName, err) + } else if postRebootNodeLabels != nil { + err := updateNodeLabels(client, node, postRebootNodeLabels) + return fmt.Errorf("error updating node (%s) labels, needs manual intervention %v", nodeName, err) + } + + err = DeleteNodeAnnotation(client, nodeName, KuredNodeWasUnschedulableBeforeDrainAnnotation) + if err != nil { + return fmt.Errorf("error removing the WasUnschedulable annotation, keeping the node stuck in current state forever %v", err) + } + notifier.Send(fmt.Sprintf(messageTemplateUncordon, nodeName), "Node uncordonned successfully") + return nil +} diff --git a/internal/k8soperations/logging.go b/internal/k8soperations/logging.go new file mode 100644 index 000000000..15cf4fb9f --- /dev/null +++ b/internal/k8soperations/logging.go @@ -0,0 +1,19 @@ +package k8soperations + +import "log/slog" + +type slogWriter struct { + stream string + message string +} + +func (sw slogWriter) Write(p []byte) (n int, err error) { + output := string(p) + switch sw.stream { + case "stdout": + slog.Info(sw.message, "stdout", output) + case "stderr": + slog.Info(sw.message, "stderr", output) + } + return len(p), nil +} diff --git a/internal/taints/taints.go b/internal/k8soperations/taints.go similarity index 96% rename from internal/taints/taints.go rename to internal/k8soperations/taints.go index 1d92cc56f..2bed538f7 100644 --- a/internal/taints/taints.go +++ b/internal/k8soperations/taints.go @@ -1,4 +1,4 @@ -package taints +package k8soperations import ( "context" @@ -21,8 +21,8 @@ type Taint struct { exists bool } -// New provides a new taint. -func New(client *kubernetes.Clientset, nodeID, taintName string, effect v1.TaintEffect) *Taint { +// NewTaint provides a new taint. +func NewTaint(client *kubernetes.Clientset, nodeID, taintName string, effect v1.TaintEffect) *Taint { exists, _, _ := taintExists(client, nodeID, taintName) return &Taint{