-
Notifications
You must be signed in to change notification settings - Fork 112
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge pull request #122 from zalando-incubator/http-collector
Add basic HTTP collector
- Loading branch information
Showing
16 changed files
with
928 additions
and
357 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,103 @@ | ||
package collector | ||
|
||
import ( | ||
"fmt" | ||
"net/url" | ||
"time" | ||
|
||
"github.com/zalando-incubator/kube-metrics-adapter/pkg/collector/httpmetrics" | ||
|
||
"github.com/oliveagle/jsonpath" | ||
"k8s.io/api/autoscaling/v2beta2" | ||
"k8s.io/apimachinery/pkg/api/resource" | ||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" | ||
"k8s.io/metrics/pkg/apis/external_metrics" | ||
) | ||
|
||
const ( | ||
HTTPMetricName = "http" | ||
HTTPEndpointAnnotationKey = "endpoint" | ||
HTTPJsonPathAnnotationKey = "json-key" | ||
identifierLabel = "identifier" | ||
) | ||
|
||
type HTTPCollectorPlugin struct{} | ||
|
||
func NewHTTPCollectorPlugin() (*HTTPCollectorPlugin, error) { | ||
return &HTTPCollectorPlugin{}, nil | ||
} | ||
|
||
func (p *HTTPCollectorPlugin) NewCollector(_ *v2beta2.HorizontalPodAutoscaler, config *MetricConfig, interval time.Duration) (Collector, error) { | ||
collector := &HTTPCollector{} | ||
var ( | ||
value string | ||
ok bool | ||
) | ||
if value, ok = config.Config[HTTPJsonPathAnnotationKey]; !ok { | ||
return nil, fmt.Errorf("config value %s not found", HTTPJsonPathAnnotationKey) | ||
} | ||
jsonPath, err := jsonpath.Compile(value) | ||
if err != nil { | ||
return nil, fmt.Errorf("failed to parse json path: %v", err) | ||
} | ||
collector.jsonPath = jsonPath | ||
if value, ok = config.Config[HTTPEndpointAnnotationKey]; !ok { | ||
return nil, fmt.Errorf("config value %s not found", HTTPEndpointAnnotationKey) | ||
} | ||
collector.endpoint, err = url.Parse(value) | ||
if err != nil { | ||
return nil, err | ||
} | ||
collector.interval = interval | ||
collector.metricType = config.Type | ||
if config.Metric.Selector == nil || config.Metric.Selector.MatchLabels == nil { | ||
return nil, fmt.Errorf("no label selector specified for metric: %s", config.Metric.Name) | ||
} | ||
if _, ok := config.Metric.Selector.MatchLabels[identifierLabel]; !ok { | ||
return nil, fmt.Errorf("%s is not specified as a label for metric %s", identifierLabel, config.Metric.Name) | ||
} | ||
collector.metric = config.Metric | ||
var aggFunc httpmetrics.AggregatorFunc | ||
|
||
if val, ok := config.Config["aggregator"]; ok { | ||
aggFunc, err = httpmetrics.ParseAggregator(val) | ||
if err != nil { | ||
return nil, err | ||
} | ||
} | ||
collector.metricsGetter = httpmetrics.NewJSONPathMetricsGetter(httpmetrics.DefaultMetricsHTTPClient(), aggFunc, jsonPath) | ||
return collector, nil | ||
} | ||
|
||
type HTTPCollector struct { | ||
endpoint *url.URL | ||
jsonPath *jsonpath.Compiled | ||
interval time.Duration | ||
metricType v2beta2.MetricSourceType | ||
metricsGetter *httpmetrics.JSONPathMetricsGetter | ||
metric v2beta2.MetricIdentifier | ||
} | ||
|
||
func (c *HTTPCollector) GetMetrics() ([]CollectedMetric, error) { | ||
metric, err := c.metricsGetter.GetMetric(*c.endpoint) | ||
if err != nil { | ||
return nil, err | ||
} | ||
|
||
value := CollectedMetric{ | ||
Type: c.metricType, | ||
External: external_metrics.ExternalMetricValue{ | ||
MetricName: c.metric.Name, | ||
MetricLabels: c.metric.Selector.MatchLabels, | ||
Timestamp: metav1.Time{ | ||
Time: time.Now(), | ||
}, | ||
Value: *resource.NewMilliQuantity(int64(metric*1000), resource.DecimalSI), | ||
}, | ||
} | ||
return []CollectedMetric{value}, nil | ||
} | ||
|
||
func (c *HTTPCollector) Interval() time.Duration { | ||
return c.interval | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,94 @@ | ||
package collector | ||
|
||
import ( | ||
"encoding/json" | ||
"net/http" | ||
"net/http/httptest" | ||
"testing" | ||
|
||
"github.com/stretchr/testify/require" | ||
"k8s.io/api/autoscaling/v2beta2" | ||
v1 "k8s.io/apimachinery/pkg/apis/meta/v1" | ||
) | ||
|
||
type testExternalMetricsHandler struct { | ||
values []int64 | ||
test *testing.T | ||
} | ||
|
||
func (t testExternalMetricsHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { | ||
response, err := json.Marshal(testMetricResponse{t.values}) | ||
require.NoError(t.test, err) | ||
w.Header().Set("Content-Type", "application/json") | ||
_, err = w.Write(response) | ||
require.NoError(t.test, err) | ||
} | ||
|
||
func makeHTTPTestServer(t *testing.T, values []int64) string { | ||
server := httptest.NewServer(&testExternalMetricsHandler{values: values, test: t}) | ||
return server.URL | ||
} | ||
|
||
func TestHTTPCollector(t *testing.T) { | ||
for _, tc := range []struct { | ||
name string | ||
values []int64 | ||
output int | ||
aggregator string | ||
}{ | ||
{ | ||
name: "basic", | ||
values: []int64{3}, | ||
output: 3, | ||
aggregator: "sum", | ||
}, | ||
{ | ||
name: "sum", | ||
values: []int64{3, 5, 6}, | ||
aggregator: "sum", | ||
output: 14, | ||
}, | ||
{ | ||
name: "average", | ||
values: []int64{3, 5, 6}, | ||
aggregator: "sum", | ||
output: 14, | ||
}, | ||
} { | ||
t.Run(tc.name, func(t *testing.T) { | ||
testServer := makeHTTPTestServer(t, tc.values) | ||
plugin, err := NewHTTPCollectorPlugin() | ||
require.NoError(t, err) | ||
testConfig := makeTestHTTPCollectorConfig(testServer, tc.aggregator) | ||
collector, err := plugin.NewCollector(nil, testConfig, testInterval) | ||
require.NoError(t, err) | ||
metrics, err := collector.GetMetrics() | ||
require.NoError(t, err) | ||
require.NotNil(t, metrics) | ||
require.Len(t, metrics, 1) | ||
require.EqualValues(t, metrics[0].External.Value.Value(), tc.output) | ||
}) | ||
} | ||
} | ||
|
||
func makeTestHTTPCollectorConfig(endpoint, aggregator string) *MetricConfig { | ||
config := &MetricConfig{ | ||
MetricTypeName: MetricTypeName{ | ||
Type: v2beta2.ExternalMetricSourceType, | ||
Metric: v2beta2.MetricIdentifier{ | ||
Name: "test-metric", | ||
Selector: &v1.LabelSelector{ | ||
MatchLabels: map[string]string{identifierLabel: "test-metric"}, | ||
}, | ||
}, | ||
}, | ||
Config: map[string]string{ | ||
HTTPJsonPathAnnotationKey: "$.values", | ||
HTTPEndpointAnnotationKey: endpoint, | ||
}, | ||
} | ||
if aggregator != "" { | ||
config.Config["aggregator"] = aggregator | ||
} | ||
return config | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,65 @@ | ||
package httpmetrics | ||
|
||
import ( | ||
"fmt" | ||
"math" | ||
) | ||
|
||
type AggregatorFunc func(...float64) float64 | ||
|
||
// Average implements the average mathematical function over a slice of float64 | ||
func Average(values ...float64) float64 { | ||
sum := Sum(values...) | ||
return sum / float64(len(values)) | ||
} | ||
|
||
// Minimum implements the absolute minimum mathematical function over a slice of float64 | ||
func Minimum(values ...float64) float64 { | ||
// initialized with positive infinity, all finite numbers are smaller than it | ||
curMin := math.Inf(1) | ||
for _, v := range values { | ||
if v < curMin { | ||
curMin = v | ||
} | ||
} | ||
return curMin | ||
} | ||
|
||
// Maximum implements the absolute maximum mathematical function over a slice of float64 | ||
func Maximum(values ...float64) float64 { | ||
// initialized with negative infinity, all finite numbers are bigger than it | ||
curMax := math.Inf(-1) | ||
for _, v := range values { | ||
if v > curMax { | ||
curMax = v | ||
} | ||
} | ||
return curMax | ||
} | ||
|
||
// Sum implements the summation mathematical function over a slice of float64 | ||
func Sum(values ...float64) float64 { | ||
res := 0.0 | ||
|
||
for _, v := range values { | ||
res += v | ||
} | ||
|
||
return res | ||
} | ||
|
||
// reduce will reduce a slice of numbers given a aggregator function's name. If it's empty or not recognized, an error is returned. | ||
func ParseAggregator(aggregator string) (AggregatorFunc, error) { | ||
switch aggregator { | ||
case "avg": | ||
return Average, nil | ||
case "min": | ||
return Minimum, nil | ||
case "max": | ||
return Maximum, nil | ||
case "sum": | ||
return Sum, nil | ||
default: | ||
return nil, fmt.Errorf("aggregator function: %s is unknown", aggregator) | ||
} | ||
} |
Oops, something went wrong.