Skip to content

Commit

Permalink
move away from unstructured check fetching
Browse files Browse the repository at this point in the history
  • Loading branch information
Eric Greer committed Jan 3, 2024
1 parent 750648a commit 32ff0df
Show file tree
Hide file tree
Showing 2 changed files with 83 additions and 74 deletions.
153 changes: 81 additions & 72 deletions cmd/kuberhealthy/kuberhealthy.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import (
"encoding/json"
"errors"
"fmt"
"io/ioutil"
"io"
"net"
"net/http"
"reflect"
Expand All @@ -18,9 +18,6 @@ import (
log "github.com/sirupsen/logrus"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/kubernetes"

Expand Down Expand Up @@ -312,7 +309,7 @@ func (k *Kuberhealthy) reapKHStateResources(ctx context.Context, namespace strin
return fmt.Errorf("khState reaper: error listing khStates for reaping: %w", err)
}

khChecks, err := listUnstructuredKHChecks(ctx, k.TargetNamespace)
khChecks, err := k.listKHChecks(k.TargetNamespace)
if err != nil {
return fmt.Errorf("khState reaper: error listing unstructured khChecks: %w", err)
}
Expand All @@ -329,13 +326,12 @@ func (k *Kuberhealthy) reapKHStateResources(ctx context.Context, namespace strin
log.Debugln("khState reaper: analyzing khState", khState.GetName(), "in", khState.GetName())
var foundKHCheck bool
for _, kc := range khChecks.Items {
khCheck, err := convertUnstructuredKhCheck(kc)
if err != nil {
log.Errorln("Error converting unstructured object to khcheck:", err)
continue
}
log.Debugln("khState reaper:", khCheck.GetName(), "==", khState.GetName(), "&&", khCheck.GetNamespace(), "==", khState.GetNamespace())
if khCheck.GetName() == khState.GetName() && khCheck.GetNamespace() == khState.GetNamespace() {
log.Debugln("khState reaper:", kc.GetName(), "==", khState.GetName(), "&&", kc.GetNamespace(), "==", khState.GetNamespace())
if kc.GetName() == khState.GetName() && kc.GetNamespace() == khState.GetNamespace() {
log.Infoln("khState reaper:", khState.GetName(), "in", khState.GetNamespace(), "is still valid")
foundKHCheck = true
break
Expand Down Expand Up @@ -433,6 +429,26 @@ func (k *Kuberhealthy) monitorKHJobs(ctx context.Context) {
}
}

// listKHChecks lists all kuberhealthy checks in the specified namespace
func (k *Kuberhealthy) listKHChecks(namespace string) (khcheckv1.KuberhealthyCheckList, error) {
return khCheckClient.KuberhealthyChecks(namespace).List(metav1.ListOptions{})
}

// getKHCheck gets the specified khcheck in the specified namespace
func (k *Kuberhealthy) getKHCheck(namespace string, checkName string) (khcheckv1.KuberhealthyCheck, error) {
return khCheckClient.KuberhealthyChecks(namespace).Get(checkName, metav1.GetOptions{})
}

// listKHStates lists all kuberhealthy states in the specified namespace
func (k *Kuberhealthy) listKHStates(namespace string) (khstatev1.KuberhealthyStateList, error) {
return khStateClient.KuberhealthyStates(namespace).List(metav1.ListOptions{})
}

// getKHState gets the specified khstate in the specified namespace
func (k *Kuberhealthy) getKHState(namespace string, checkName string) (khstatev1.KuberhealthyState, error) {
return khStateClient.KuberhealthyStates(namespace).Get(checkName, metav1.GetOptions{})
}

// watchForKHCheckChanges watches for changes to khcheck objects and returns them through the specified channel
func (k *Kuberhealthy) watchForKHCheckChanges(ctx context.Context, c chan struct{}) {

Expand Down Expand Up @@ -528,7 +544,7 @@ func (k *Kuberhealthy) monitorExternalChecks(ctx context.Context, notify chan st
<-c
log.Debugln("Change notification received. Scanning for external check changes...")

khChecks, err := listUnstructuredKHChecks(ctx, k.TargetNamespace)
khChecks, err := k.listKHChecks(k.TargetNamespace)
if err != nil {
log.Errorln("error listing unstructured khChecks: %w", err)
continue
Expand All @@ -542,13 +558,8 @@ func (k *Kuberhealthy) monitorExternalChecks(ctx context.Context, notify chan st
var existsInItems bool // indicates the item exists in the item listing

for _, kc := range khChecks.Items {
khCheck, err := convertUnstructuredKhCheck(kc)
if err != nil {
log.Errorln("Error converting unstructured object to khcheck:", err)
continue
}

itemMapName := khCheck.Namespace + "/" + khCheck.Name
itemMapName := kc.Namespace + "/" + kc.Name
if itemMapName == mapName {
existsInItems = true
break
Expand All @@ -562,21 +573,20 @@ func (k *Kuberhealthy) monitorExternalChecks(ctx context.Context, notify chan st
}

for _, kc := range khChecks.Items {
i, err := convertUnstructuredKhCheck(kc)
if err != nil {
log.Errorln("Error converting unstructured object to khcheck:", err)
continue
}

mapName := i.Namespace + "/" + i.Name
mapName := kc.Namespace + "/" + kc.Name

log.Debugln("Scanning khcheck CRD", mapName, "for changes since last seen...")

if len(i.Namespace) < 1 {
if len(kc.Namespace) < 1 {
log.Warning("Got khcheck update from object with no namespace...")
continue
}
if len(i.Name) < 1 {
if len(kc.Name) < 1 {
log.Warning("Got khcheck update from object with no name...")
continue
}
Expand All @@ -586,42 +596,42 @@ func (k *Kuberhealthy) monitorExternalChecks(ctx context.Context, notify chan st
_, exists := knownSettings[mapName]
if !exists {
log.Debugln("First time seeing khcheck of name", mapName)
knownSettings[mapName] = i.Spec
knownSettings[mapName] = kc.Spec
foundChange = true
}

// check if run interval has changed
if knownSettings[mapName].RunInterval != i.Spec.RunInterval {
if knownSettings[mapName].RunInterval != kc.Spec.RunInterval {
log.Debugln("The khcheck run interval for", mapName, "has changed.")
foundChange = true
}

// check if run timeout has changed
if knownSettings[mapName].Timeout != i.Spec.Timeout {
if knownSettings[mapName].Timeout != kc.Spec.Timeout {
log.Debugln("The khcheck timeout for", mapName, "has changed.")
foundChange = true
}

// check if extraLabels has changed
if !foundChange && !reflect.DeepEqual(knownSettings[mapName].ExtraLabels, i.Spec.ExtraLabels) {
if !foundChange && !reflect.DeepEqual(knownSettings[mapName].ExtraLabels, kc.Spec.ExtraLabels) {
log.Debugln("The khcheck extra labels for", mapName, "has changed.")
foundChange = true
}

// check if extraAnnotations has changed
if !foundChange && !reflect.DeepEqual(knownSettings[mapName].ExtraAnnotations, i.Spec.ExtraAnnotations) {
if !foundChange && !reflect.DeepEqual(knownSettings[mapName].ExtraAnnotations, kc.Spec.ExtraAnnotations) {
log.Debugln("The khcheck extra annotations for", mapName, "has changed.")
foundChange = true
}

// check if CheckConfig has changed (PodSpec)
if !foundChange && !reflect.DeepEqual(knownSettings[mapName].PodSpec, i.Spec.PodSpec) {
if !foundChange && !reflect.DeepEqual(knownSettings[mapName].PodSpec, kc.Spec.PodSpec) {
log.Debugln("The khcheck for", mapName, "has changed.")
foundChange = true
}

// finally, update known settings before continuing to the next interval
knownSettings[mapName] = i.Spec
knownSettings[mapName] = kc.Spec
}

// if a change was detected, we signal the notify channel
Expand All @@ -638,7 +648,7 @@ func (k *Kuberhealthy) addExternalChecks(ctx context.Context) error {

log.Debugln("Fetching khcheck configurations...")

khChecks, err := listUnstructuredKHChecks(ctx, k.TargetNamespace)
khChecks, err := k.listKHChecks(k.TargetNamespace)
if err != nil {
return err
}
Expand All @@ -647,21 +657,20 @@ func (k *Kuberhealthy) addExternalChecks(ctx context.Context) error {

// iterate on each check CRD resource and add it as a check
for _, kc := range khChecks.Items {
r, err := convertUnstructuredKhCheck(kc)
if err != nil {
log.Errorln("Error converting unstructured object to khcheck:", err)
continue
}
log.Debugln("Loading check CRD:", r.Name)
log.Debugln("Loading check CRD:", kc.Name)

log.Debugf("External check custom resource loaded: %v", r)
log.Debugf("External check custom resource loaded: %v", kc)

// create a new kubernetes client for this external checker
log.Infoln("Enabling external check:", r.Name)
c := external.New(kubernetesClient, &r, khCheckClient, khStateClient, cfg.ExternalCheckReportingURL)
log.Infoln("Enabling external check:", kc.Name)
c := external.New(kubernetesClient, &kc, khCheckClient, khStateClient, cfg.ExternalCheckReportingURL)

// parse the run interval string from the custom resource and setup the run interval
c.RunInterval, err = time.ParseDuration(r.Spec.RunInterval)
c.RunInterval, err = time.ParseDuration(kc.Spec.RunInterval)
if err != nil {
log.Errorln("Error parsing duration for check", c.CheckName, "in namespace", c.Namespace, err)
log.Errorln("Defaulting check to a runtime of ten minutes.")
Expand All @@ -672,8 +681,8 @@ func (k *Kuberhealthy) addExternalChecks(ctx context.Context) error {

// parse the user specified timeout if present
c.RunTimeout = DefaultTimeout
if len(r.Spec.Timeout) > 0 {
c.RunTimeout, err = time.ParseDuration(r.Spec.Timeout)
if len(kc.Spec.Timeout) > 0 {
c.RunTimeout, err = time.ParseDuration(kc.Spec.Timeout)
if err != nil {
log.Errorln("Error parsing timeout for check", c.CheckName, "in namespace", c.Namespace, err)
log.Errorln("Defaulting check to a timeout of", DefaultTimeout)
Expand All @@ -685,11 +694,11 @@ func (k *Kuberhealthy) addExternalChecks(ctx context.Context) error {
// add on extra annotations and labels
if c.ExtraAnnotations != nil {
log.Debugln("External check setting extra annotations:", c.ExtraAnnotations)
c.ExtraAnnotations = r.Spec.ExtraAnnotations
c.ExtraAnnotations = kc.Spec.ExtraAnnotations
}
if c.ExtraLabels != nil {
log.Debugln("External check setting extra labels:", c.ExtraLabels)
c.ExtraLabels = r.Spec.ExtraLabels
c.ExtraLabels = kc.Spec.ExtraLabels
}
log.Debugln("External check labels and annotations:", c.ExtraLabels, c.ExtraAnnotations)

Expand Down Expand Up @@ -1411,7 +1420,7 @@ func (k *Kuberhealthy) externalCheckReportHandler(w http.ResponseWriter, r *http
requestID = requestID + " (" + podReport.Namespace + "/" + podReport.Name + ")"

// ensure the client is sending a valid payload in the request body
b, err := ioutil.ReadAll(r.Body)
b, err := io.ReadAll(r.Body)
if err != nil {
w.WriteHeader(http.StatusBadRequest)
k.externalCheckReportHandlerLog(requestID, "Failed to read request body:", err.Error(), r.RemoteAddr)
Expand Down Expand Up @@ -1513,7 +1522,7 @@ func (k *Kuberhealthy) healthCheckHandler(w http.ResponseWriter, r *http.Request
log.Infoln("Client connected to status page from", r.RemoteAddr, r.UserAgent())

// If a request body was supplied, throw an error to ensure that checks don't report into the wrong url
body, err := ioutil.ReadAll(r.Body)
body, err := io.ReadAll(r.Body)
if len(body) > 0 {
log.Warningln("Unexpected body from status page request. Verify check is reporting to the right status url", r.RemoteAddr, r.RequestURI)
w.WriteHeader(http.StatusBadRequest)
Expand Down Expand Up @@ -1701,45 +1710,45 @@ func (k *Kuberhealthy) configureInfluxForwarding() {
k.MetricForwarder = metricClient
}

func listUnstructuredKHChecks(ctx context.Context, namespace string) (*unstructured.UnstructuredList, error) {
// func listUnstructuredKHChecks(ctx context.Context, namespace string) (*unstructured.UnstructuredList, error) {

khCheckGroupVersionResource := schema.GroupVersionResource{
Version: checkCRDVersion,
Resource: checkCRDResource,
Group: checkCRDGroup,
}
// khCheckGroupVersionResource := schema.GroupVersionResource{
// Version: checkCRDVersion,
// Resource: checkCRDResource,
// Group: checkCRDGroup,
// }

unstructuredList, err := dynamicClient.Resource(khCheckGroupVersionResource).Namespace(namespace).List(ctx, metav1.ListOptions{})
if err != nil {
return unstructuredList, err
}
// unstructuredList, err := dynamicClient.Resource(khCheckGroupVersionResource).Namespace(namespace).List(ctx, metav1.ListOptions{})
// if err != nil {
// return unstructuredList, err
// }

return unstructuredList, err
}
// return unstructuredList, err
// }

func convertUnstructuredKhCheck(unstructured unstructured.Unstructured) (khcheckv1.KuberhealthyCheck, error) {
un := unstructured.UnstructuredContent()
var khCheck khcheckv1.KuberhealthyCheck
err := runtime.DefaultUnstructuredConverter.FromUnstructured(un, &khCheck)
if err != nil {
return khCheck, fmt.Errorf("error converting unstructured object to khcheck: %w", err)
}
// func convertUnstructuredKhCheck(unstructured unstructured.Unstructured) (khcheckv1.KuberhealthyCheck, error) {
// un := unstructured.UnstructuredContent()
// var khCheck khcheckv1.KuberhealthyCheck
// err := runtime.DefaultUnstructuredConverter.FromUnstructured(un, &khCheck)
// if err != nil {
// return khCheck, fmt.Errorf("error converting unstructured object to khcheck: %w", err)
// }

return khCheck, err
}
// return khCheck, err
// }

func watchUnstructuredKHChecks(ctx context.Context, namespace string) (watch.Interface, error) {
// func watchUnstructuredKHChecks(ctx context.Context, namespace string) (watch.Interface, error) {

khCheckGroupVersionResource := schema.GroupVersionResource{
Version: checkCRDVersion,
Resource: checkCRDResource,
Group: checkCRDGroup,
}
// khCheckGroupVersionResource := schema.GroupVersionResource{
// Version: checkCRDVersion,
// Resource: checkCRDResource,
// Group: checkCRDGroup,
// }

watcher, err := dynamicClient.Resource(khCheckGroupVersionResource).Namespace(namespace).Watch(ctx, metav1.ListOptions{})
if err != nil {
return watcher, err
}
// watcher, err := dynamicClient.Resource(khCheckGroupVersionResource).Namespace(namespace).Watch(ctx, metav1.ListOptions{})
// if err != nil {
// return watcher, err
// }

return watcher, err
}
// return watcher, err
// }
4 changes: 2 additions & 2 deletions cmd/kuberhealthy/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ func getEnvVar(v string) (string, error) {
return envVar, err
}

// getAllLogLevel fetches a string list of possible log levels that can be set
func getAllLogLevel() string {
// getAllLogLevels fetches a string list of possible log levels that can be set
func getAllLogLevels() string {
var levelStrings []string
for _, level := range log.AllLevels {
levelStrings = append(levelStrings, level.String())
Expand Down

0 comments on commit 32ff0df

Please sign in to comment.