Skip to content

Commit

Permalink
get and publish metrics
Browse files Browse the repository at this point in the history
Signed-off-by: Alex Castilio dos Santos <[email protected]>
  • Loading branch information
alexcastilio committed Nov 15, 2024
1 parent 8a62ddb commit 7135f75
Show file tree
Hide file tree
Showing 6 changed files with 230 additions and 30 deletions.
5 changes: 2 additions & 3 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,7 @@ require (
sigs.k8s.io/cloud-provider-azure/pkg/azclient/configloader v0.0.20
)

retract (
v0.10.0 // published accidentally
)
retract v0.10.0 // published accidentally

require (
cel.dev/expr v0.15.0 // indirect
Expand Down Expand Up @@ -333,6 +331,7 @@ require (
k8s.io/apiextensions-apiserver v0.30.3
k8s.io/cli-runtime v0.30.3
k8s.io/kubectl v0.30.3
k8s.io/metrics v0.30.3
sigs.k8s.io/controller-runtime v0.18.5
)

Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1200,6 +1200,8 @@ k8s.io/kube-openapi v0.0.0-20240228011516-70dd3763d340 h1:BZqlfIlq5YbRMFko6/PM7F
k8s.io/kube-openapi v0.0.0-20240228011516-70dd3763d340/go.mod h1:yD4MZYeKMBwQKVht279WycxKyM84kkAx2DPrTXaeb98=
k8s.io/kubectl v0.30.3 h1:YIBBvMdTW0xcDpmrOBzcpUVsn+zOgjMYIu7kAq+yqiI=
k8s.io/kubectl v0.30.3/go.mod h1:IcR0I9RN2+zzTRUa1BzZCm4oM0NLOawE6RzlDvd1Fpo=
k8s.io/metrics v0.30.3 h1:gKCpte5zykrOmQhZ8qmsxyJslMdiLN+sqbBfIWNpbGM=
k8s.io/metrics v0.30.3/go.mod h1:W06L2nXRhOwPkFYDJYWdEIS3u6JcJy3ebIPYbndRs6A=
k8s.io/utils v0.0.0-20240102154912-e7106e64919e h1:eQ/4ljkx21sObifjzXwlPKpdGLrCfRziVtos3ofG/sQ=
k8s.io/utils v0.0.0-20240102154912-e7106e64919e/go.mod h1:OLgZIPagt7ERELqWJFomSt595RzquPNLL48iOWgYOg0=
oras.land/oras-go v1.2.5 h1:XpYuAwAb0DfQsunIyMfeET92emK8km3W4yEzZvUbsTo=
Expand Down
181 changes: 181 additions & 0 deletions test/e2e/framework/scaletest/get-publish-restarts.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,181 @@
package scaletest

import (
"context"
"fmt"
"log"
"sync"
"time"

"github.com/microsoft/retina/pkg/telemetry"
"github.com/microsoft/retina/test/e2e/common"
"github.com/pkg/errors"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/clientcmd"
metrics "k8s.io/metrics/pkg/client/clientset/versioned"
)

type GetAndPublishMetrics struct {
KubeConfigFilePath string
AppInsightsKey string
AdditionalTelemetryProperty map[string]string
Labels map[string]string
stop chan struct{}
wg sync.WaitGroup
telemetryClient *telemetry.TelemetryClient
}

func (g *GetAndPublishMetrics) Run() error {
telemetry.InitAppInsights(g.AppInsightsKey, g.AdditionalTelemetryProperty["retinaVersion"])

telemetryClient, err := telemetry.NewAppInsightsTelemetryClient("retina-scale-test", g.AdditionalTelemetryProperty)
if err != nil {
return errors.Wrap(err, "error creating telemetry client")
}

g.telemetryClient = telemetryClient

g.stop = make(chan struct{})
g.wg.Add(1)

go func() {

t := time.NewTicker(5 * time.Minute)

for {
select {

case <-t.C:
err = g.getAndPublishMetrics()
if err != nil {
log.Fatalf("error getting and publishing number of restarts: %v", err)
return
}

case <-g.stop:
g.wg.Done()
return

}
}

}()

return nil
}

func (g *GetAndPublishMetrics) Stop() error {
telemetry.ShutdownAppInsights()
close(g.stop)
g.wg.Wait()
return nil
}

func (g *GetAndPublishMetrics) Prevalidate() error {
if g.AppInsightsKey == "" {
return fmt.Errorf("AppInsightsKey is required")
}
if _, ok := g.AdditionalTelemetryProperty["retinaVersion"]; !ok {
return fmt.Errorf("retinaVersion is required in AdditionalTelemetryProperty")
}
return nil
}

func (g *GetAndPublishMetrics) getAndPublishMetrics() error {
// Get the number of restarts
config, err := clientcmd.BuildConfigFromFlags("", g.KubeConfigFilePath)
if err != nil {
return fmt.Errorf("error building kubeconfig: %w", err)
}

clientset, err := kubernetes.NewForConfig(config)
if err != nil {
return fmt.Errorf("error creating Kubernetes client: %w", err)
}

mc, err := metrics.NewForConfig(config)
if err != nil {
return fmt.Errorf("error creating metrics client: %w", err)
}

ctx, cancel := context.WithTimeout(context.Background(), defaultTimeoutSeconds*time.Second)
defer cancel()

metrics, err := g.getMetrics(ctx, clientset, mc)
if err != nil {
return fmt.Errorf("error getting metrics: %w", err)
}

// Publish the number of restarts
for _, metric := range metrics {
g.telemetryClient.TrackEvent("scale-test", metric)
}

return nil
}

type metric map[string]string

func (g *GetAndPublishMetrics) getMetrics(ctx context.Context, k8sClient *kubernetes.Clientset, metricsClient *metrics.Clientset) ([]metric, error) {

labelSelector := labels.Set(g.Labels).String()

pods, err := k8sClient.CoreV1().Pods(common.KubeSystemNamespace).List(ctx, metav1.ListOptions{LabelSelector: labelSelector})
if err != nil {
return nil, errors.Wrap(err, "error getting nodes")
}

nodesMetricsInt := metricsClient.MetricsV1beta1().NodeMetricses()
podMetricsInt := metricsClient.MetricsV1beta1().PodMetricses(common.KubeSystemNamespace)

var allPodsHealth []metric

timestamp := time.Now().UTC().Format(time.RFC3339)

for _, pod := range pods.Items {
var podHealth metric = make(map[string]string)

podMetrics, err := podMetricsInt.Get(ctx, pod.Name, metav1.GetOptions{})
if err != nil {
return nil, errors.Wrap(err, "error getting pod metrics")
}

podMem := resource.MustParse("0")
podCpu := resource.MustParse("0")
for _, cm := range podMetrics.Containers {
podMem.Add(cm.Usage["memory"])
podCpu.Add(cm.Usage["cpu"])
}

nodeMetrics, err := nodesMetricsInt.Get(ctx, pod.Spec.NodeName, metav1.GetOptions{})
if err != nil {
return nil, errors.Wrap(err, "error getting node metrics")
}

nodeMem := nodeMetrics.Usage["memory"]
nodeCpu := nodeMetrics.Usage["cpu"]

restarts := 0

for _, containerStatus := range pod.Status.ContainerStatuses {
restarts = restarts + int(containerStatus.RestartCount)
}

podHealth["timestamp"] = timestamp
podHealth["pod"] = pod.Name
podHealth["podCpuInMilliCore"] = fmt.Sprintf("%d", podCpu.MilliValue())
podHealth["podMemoryInMB"] = fmt.Sprintf("%d", podMem.Value()/(1048576))
podHealth["podRestarts"] = fmt.Sprintf("%d", restarts)
podHealth["node"] = pod.Spec.NodeName
podHealth["nodeCpuInMilliCore"] = fmt.Sprintf("%d", nodeCpu.MilliValue())
podHealth["nodeMemoryInMB"] = fmt.Sprintf("%d", nodeMem.Value()/(1048576))

allPodsHealth = append(allPodsHealth, podHealth)

}

return allPodsHealth, nil
}
2 changes: 2 additions & 0 deletions test/e2e/framework/scaletest/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,4 +35,6 @@ type Options struct {
DeleteNetworkPoliciesTimes int
numKwokPods int
numRealPods int
LabelsToGetMetrics map[string]string
AdditionalTelemetryProperty map[string]string
}
21 changes: 13 additions & 8 deletions test/e2e/jobs/scale.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package retina

import (
"os"
"time"

"github.com/microsoft/retina/test/e2e/framework/kubernetes"
Expand Down Expand Up @@ -37,6 +38,8 @@ func DefaultScaleTestOptions() scaletest.Options {
DeleteNetworkPolicies: false,
DeleteNetworkPoliciesInterval: 60 * time.Second,
DeleteNetworkPoliciesTimes: 1,
LabelsToGetMetrics: map[string]string{},
AdditionalTelemetryProperty: map[string]string{},
}
}

Expand All @@ -60,6 +63,12 @@ func ScaleTest(opt *scaletest.Options) *types.Job {

job.AddStep(&kubernetes.CreateNamespace{}, nil)

job.AddStep(&scaletest.GetAndPublishMetrics{
Labels: opt.LabelsToGetMetrics,
AppInsightsKey: os.Getenv("APP_INSIGHTS_KEY"),
AdditionalTelemetryProperty: opt.AdditionalTelemetryProperty,
}, &types.StepOptions{RunInBackgroundWithID: "get-metrics"})

job.AddStep(&scaletest.CreateResources{
NumKwokDeployments: opt.NumKwokDeployments,
NumKwokReplicas: opt.NumKwokReplicas,
Expand Down Expand Up @@ -95,15 +104,11 @@ func ScaleTest(opt *scaletest.Options) *types.Job {
NumSharedLabelsPerPod: opt.NumSharedLabelsPerPod,
}, nil)

// job.AddStep(&kubernetes.DeleteNamespace{}, nil)

// TODO: Add steps to get the state of the cluster

// job.AddStep(&kubernetes.GetDeployment{})

// job.AddStep(&kubernetes.GetDaemonSet{})
job.AddStep(&types.Stop{
BackgroundID: "get-metrics",
}, nil)

// job.AddStep(&kubernetes.DescribePods{})
job.AddStep(&kubernetes.DeleteNamespace{}, nil)

return job
}
49 changes: 30 additions & 19 deletions test/e2e/scale_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"time"

"github.com/microsoft/retina/test/e2e/common"
"github.com/microsoft/retina/test/e2e/framework/generic"
"github.com/microsoft/retina/test/e2e/framework/helpers"
"github.com/microsoft/retina/test/e2e/framework/types"
jobs "github.com/microsoft/retina/test/e2e/jobs"
Expand Down Expand Up @@ -69,25 +70,7 @@ func TestE2ERetina_Scale(t *testing.T) {
chartPath := filepath.Join(rootDir, "deploy", "legacy", "manifests", "controller", "helm", "retina")
kubeConfigFilePath := filepath.Join(rootDir, "test", "e2e", "test.pem")

// CreateTestInfra
createTestInfra := types.NewRunner(t, jobs.CreateTestInfra(subID, rg, clusterName, location, kubeConfigFilePath, *createInfra))
createTestInfra.Run(ctx)

t.Cleanup(func() {
if *deleteInfra {
_ = jobs.DeleteTestInfra(subID, rg, clusterName, location).Run()
}
})

// Install Retina
installRetina := types.NewRunner(t, jobs.InstallRetina(kubeConfigFilePath, chartPath))
installRetina.Run(ctx)

t.Cleanup(func() {
_ = jobs.UninstallRetina(kubeConfigFilePath, chartPath).Run()
})

// Scale test
// Scale test parameters
opt := jobs.DefaultScaleTestOptions()
opt.KubeconfigPath = kubeConfigFilePath

Expand All @@ -114,6 +97,34 @@ func TestE2ERetina_Scale(t *testing.T) {
require.NoError(t, err)
}

RetinaVersion := os.Getenv(generic.DefaultTagEnv)
require.NotEmpty(t, RetinaVersion)
opt.AdditionalTelemetryProperty["retinaVersion"] = RetinaVersion
opt.AdditionalTelemetryProperty["clusterName"] = clusterName

// AppInsightsKey is required for telemetry
require.NotEmpty(t, os.Getenv("APP_INSIGHTS_KEY"))

opt.LabelsToGetMetrics = map[string]string{"k8s-app": "retina"}

// CreateTestInfra
createTestInfra := types.NewRunner(t, jobs.CreateTestInfra(subID, rg, clusterName, location, kubeConfigFilePath, *createInfra))
createTestInfra.Run(ctx)

t.Cleanup(func() {
if *deleteInfra {
_ = jobs.DeleteTestInfra(subID, rg, clusterName, location).Run()
}
})

// Install Retina
installRetina := types.NewRunner(t, jobs.InstallRetina(kubeConfigFilePath, chartPath))
installRetina.Run(ctx)

t.Cleanup(func() {
_ = jobs.UninstallRetina(kubeConfigFilePath, chartPath).Run()
})

scale := types.NewRunner(t, jobs.ScaleTest(&opt))
scale.Run(ctx)
}

0 comments on commit 7135f75

Please sign in to comment.