Skip to content

Commit

Permalink
Merge pull request #192 from zalando-incubator/extend-jsonpath-support
Browse files Browse the repository at this point in the history
Support bracket notation in jsonPath
  • Loading branch information
mikkeloscar authored Sep 30, 2020
2 parents 262a35c + cf36963 commit deec472
Show file tree
Hide file tree
Showing 8 changed files with 105 additions and 147 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ example the following JSON data would be expected:
```

The json-path query support depends on the
[github.com/oliveagle/jsonpath](https://github.com/oliveagle/jsonpath) library.
[github.com/spyzhov/ajson](https://github.com/spyzhov/ajson) library.
See the README for possible queries. It's expected that the metric you query
returns something that can be turned into a `float64`.

Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,12 @@ require (
github.com/kubernetes-incubator/custom-metrics-apiserver v0.0.0-20200618121405-54026617ec44
github.com/lib/pq v1.2.0 // indirect
github.com/mattn/go-colorable v0.1.4 // indirect
github.com/oliveagle/jsonpath v0.0.0-20180606110733-2e52cf6e6852
github.com/onsi/gomega v1.8.1 // indirect
github.com/prometheus/client_golang v1.7.1
github.com/prometheus/common v0.10.0
github.com/sirupsen/logrus v1.6.0
github.com/spf13/cobra v0.0.7
github.com/spyzhov/ajson v0.4.2
github.com/stretchr/testify v1.6.1
github.com/zalando-incubator/cluster-lifecycle-manager v0.0.0-20180921141935-824b77fb1f84
golang.org/x/oauth2 v0.0.0-20190604053449-0f29369cfe45
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -302,8 +302,6 @@ github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223/go.mod h1:qRW
github.com/mxk/go-flowrate v0.0.0-20140419014527-cca7078d478f/go.mod h1:ZdcZmHo+o7JKHSa8/e818NopupXU1YMK5fe1lsApnBw=
github.com/oklog/ulid v1.3.1/go.mod h1:CirwcVhetQ6Lv90oh/F+FBtV6XMibvdAFo93nm5qn4U=
github.com/olekukonko/tablewriter v0.0.0-20170122224234-a0225b3f23b5/go.mod h1:vsDQFd/mU46D+Z4whnwzcISnGGzXWMclvtLoiIKAKIo=
github.com/oliveagle/jsonpath v0.0.0-20180606110733-2e52cf6e6852 h1:Yl0tPBa8QPjGmesFh1D0rDy+q1Twx6FyU7VWHi8wZbI=
github.com/oliveagle/jsonpath v0.0.0-20180606110733-2e52cf6e6852/go.mod h1:eqOVx5Vwu4gd2mmMZvVZsgIqNSaW3xxRThUJ0k/TPk4=
github.com/onsi/ginkgo v0.0.0-20170829012221-11459a886d9c/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
github.com/onsi/ginkgo v1.11.0 h1:JAKSXpt1YjtLA7YpPiqO9ss6sNXEsPfSGdwN0UHqzrw=
Expand Down Expand Up @@ -381,6 +379,8 @@ github.com/spf13/pflag v1.0.3/go.mod h1:DYY7MBk1bdzusC3SYhjObp+wFpr4gzcvqqNjLnIn
github.com/spf13/pflag v1.0.5 h1:iy+VFUOCP1a+8yFto/drg2CJ5u0yRoB7fZw3DKv/JXA=
github.com/spf13/pflag v1.0.5/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg=
github.com/spf13/viper v1.4.0/go.mod h1:PTJ7Z/lr49W6bUbkmS1V3by4uWynFiR9p7+dSq/yZzE=
github.com/spyzhov/ajson v0.4.2 h1:JMByd/jZApPKDvNsmO90X2WWGbmT2ahDFp73QhZbg3s=
github.com/spyzhov/ajson v0.4.2/go.mod h1:63V+CGM6f1Bu/p4nLIN8885ojBdt88TbLoSFzyqMuVA=
github.com/src-d/gcfg v1.4.0/go.mod h1:p/UMsR43ujA89BJY9duynAwIpvqEujIH/jFlfL7jWoI=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
Expand Down
16 changes: 8 additions & 8 deletions pkg/collector/http_collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (

"github.com/zalando-incubator/kube-metrics-adapter/pkg/collector/httpmetrics"

"github.com/oliveagle/jsonpath"
"k8s.io/api/autoscaling/v2beta2"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand Down Expand Up @@ -36,14 +35,12 @@ func (p *HTTPCollectorPlugin) NewCollector(_ *v2beta2.HorizontalPodAutoscaler, c
if value, ok = config.Config[HTTPJsonPathAnnotationKey]; !ok {
return nil, fmt.Errorf("config value %s not found", HTTPJsonPathAnnotationKey)
}
jsonPath, err := jsonpath.Compile(value)
if err != nil {
return nil, fmt.Errorf("failed to parse json path: %v", err)
}
collector.jsonPath = jsonPath
jsonPath := value

if value, ok = config.Config[HTTPEndpointAnnotationKey]; !ok {
return nil, fmt.Errorf("config value %s not found", HTTPEndpointAnnotationKey)
}
var err error
collector.endpoint, err = url.Parse(value)
if err != nil {
return nil, err
Expand All @@ -65,13 +62,16 @@ func (p *HTTPCollectorPlugin) NewCollector(_ *v2beta2.HorizontalPodAutoscaler, c
return nil, err
}
}
collector.metricsGetter = httpmetrics.NewJSONPathMetricsGetter(httpmetrics.DefaultMetricsHTTPClient(), aggFunc, jsonPath)
jsonPathGetter, err := httpmetrics.NewJSONPathMetricsGetter(httpmetrics.DefaultMetricsHTTPClient(), aggFunc, jsonPath)
if err != nil {
return nil, err
}
collector.metricsGetter = jsonPathGetter
return collector, nil
}

type HTTPCollector struct {
endpoint *url.URL
jsonPath *jsonpath.Compiled
interval time.Duration
metricType v2beta2.MetricSourceType
metricsGetter *httpmetrics.JSONPathMetricsGetter
Expand Down
74 changes: 33 additions & 41 deletions pkg/collector/httpmetrics/json_path.go
Original file line number Diff line number Diff line change
@@ -1,29 +1,33 @@
package httpmetrics

import (
"encoding/json"
"fmt"
"io/ioutil"
"net"
"net/http"
"net/url"
"time"

"github.com/oliveagle/jsonpath"
"github.com/spyzhov/ajson"
)

// JSONPathMetricsGetter is a metrics getter which looks up pod metrics by
// querying the pods metrics endpoint and lookup the metric value as defined by
// the json path query.
type JSONPathMetricsGetter struct {
jsonPath *jsonpath.Compiled
jsonPath string
aggregator AggregatorFunc
client *http.Client
}

// NewJSONPathMetricsGetter initializes a new JSONPathMetricsGetter.
func NewJSONPathMetricsGetter(httpClient *http.Client, aggregatorFunc AggregatorFunc, compiledPath *jsonpath.Compiled) *JSONPathMetricsGetter {
return &JSONPathMetricsGetter{client: httpClient, aggregator: aggregatorFunc, jsonPath: compiledPath}
func NewJSONPathMetricsGetter(httpClient *http.Client, aggregatorFunc AggregatorFunc, jsonPath string) (*JSONPathMetricsGetter, error) {
// check that jsonPath parses
_, err := ajson.ParseJSONPath(jsonPath)
if err != nil {
return nil, err
}
return &JSONPathMetricsGetter{client: httpClient, aggregator: aggregatorFunc, jsonPath: jsonPath}, nil
}

var DefaultRequestTimeout = 15 * time.Second
Expand Down Expand Up @@ -58,57 +62,45 @@ func (g *JSONPathMetricsGetter) GetMetric(metricsURL url.URL) (float64, error) {
}

// parse data
var jsonData interface{}
err = json.Unmarshal(data, &jsonData)
root, err := ajson.Unmarshal(data)
if err != nil {
return 0, err
}

res, err := g.jsonPath.Lookup(jsonData)
nodes, err := root.JSONPath(g.jsonPath)
if err != nil {
return 0, err
}

switch res := res.(type) {
case int:
return float64(res), nil
case float32:
return float64(res), nil
case float64:
return res, nil
case []interface{}:
if len(nodes) > 1 {
return 0, fmt.Errorf("unexpected json: expected single numeric or array value")
}

node := nodes[0]
if node.IsArray() {
if g.aggregator == nil {
return 0, fmt.Errorf("no aggregator function has been specified")
}
s, err := castSlice(res)
if err != nil {
return 0, err
values := make([]float64, 0, len(nodes))
items, _ := node.GetArray()
for _, item := range items {
value, err := item.GetNumeric()
if err != nil {
return 0, fmt.Errorf("did not find numeric type: %w", err)
}
values = append(values, value)
}
return g.aggregator(s...), nil
default:
return 0, fmt.Errorf("unsupported type %T", res)
return g.aggregator(values...), nil
} else if node.IsNumeric() {
res, _ := node.GetNumeric()
return res, nil
}
}

// castSlice takes a slice of interface and returns a slice of float64 if all
// values in slice were castable, else returns an error
func castSlice(in []interface{}) ([]float64, error) {
var out []float64

for _, v := range in {
switch v := v.(type) {
case int:
out = append(out, float64(v))
case float32:
out = append(out, float64(v))
case float64:
out = append(out, v)
default:
return nil, fmt.Errorf("slice was returned by JSONPath, but value inside is unsupported: %T", v)
}
value, err := node.Value()
if err != nil {
return 0, fmt.Errorf("failed to check value of jsonPath result: %w", err)
}

return out, nil
return 0, fmt.Errorf("unsupported type %T", value)
}

func (g *JSONPathMetricsGetter) fetchMetrics(metricsURL url.URL) ([]byte, error) {
Expand Down
82 changes: 29 additions & 53 deletions pkg/collector/httpmetrics/json_path_test.go
Original file line number Diff line number Diff line change
@@ -1,89 +1,65 @@
package httpmetrics

import (
"encoding/json"
"fmt"
"net/http"
"net/http/httptest"
"net/url"
"testing"

"github.com/oliveagle/jsonpath"
"github.com/stretchr/testify/require"
)

func TestCastSlice(t *testing.T) {
res1, err1 := castSlice([]interface{}{1, 2, 3})
require.NoError(t, err1)
require.Equal(t, []float64{1.0, 2.0, 3.0}, res1)

res2, err2 := castSlice([]interface{}{float32(1.0), float32(2.0), float32(3.0)})
require.NoError(t, err2)
require.Equal(t, []float64{1.0, 2.0, 3.0}, res2)

res3, err3 := castSlice([]interface{}{float64(1.0), float64(2.0), float64(3.0)})
require.NoError(t, err3)
require.Equal(t, []float64{1.0, 2.0, 3.0}, res3)

res4, err4 := castSlice([]interface{}{1, 2, "some string"})
require.Errorf(t, err4, "slice was returned by JSONPath, but value inside is unsupported: %T", "string")
require.Equal(t, []float64(nil), res4)
}

type testValueResponse struct {
Value int64 `json:"value"`
}

type testValueArrayResponse struct {
Value []int64 `json:"value"`
}

func makeTestHTTPServer(t *testing.T, values ...int64) *httptest.Server {
func makeTestHTTPServer(t *testing.T, response []byte) *httptest.Server {
h := func(w http.ResponseWriter, r *http.Request) {
require.Equal(t, r.URL.Path, "/metrics")
w.Header().Set("Content-Type", "application/json")
var (
response []byte
err error
)
if len(values) == 1 {
response, err = json.Marshal(testValueResponse{Value: values[0]})
require.NoError(t, err)
} else {
response, err = json.Marshal(testValueArrayResponse{Value: values})
require.NoError(t, err)
}
_, err = w.Write(response)
_, err := w.Write(response)
require.NoError(t, err)
}
return httptest.NewServer(http.HandlerFunc(h))
}

func TestJSONPathMetricsGetter(t *testing.T) {
for _, tc := range []struct {
name string
input []int64
output float64
aggregator AggregatorFunc
name string
jsonResponse []byte
jsonPath string
result float64
aggregator AggregatorFunc
}{
{
name: "basic average",
input: []int64{3, 4, 5},
output: 4,
aggregator: Average,
name: "basic single value",
jsonResponse: []byte(`{"value":3}`),
jsonPath: "$.value",
result: 3,
aggregator: Average,
},
{
name: "basic average",
jsonResponse: []byte(`{"value":[3,4,5]}`),
jsonPath: "$.value",
result: 4,
aggregator: Average,
},
{
name: "dotted key",
jsonResponse: []byte(`{"metric.value":5}`),
jsonPath: "$['metric.value']",
result: 5,
aggregator: Average,
},
} {
t.Run(tc.name, func(t *testing.T) {
server := makeTestHTTPServer(t, tc.input...)
server := makeTestHTTPServer(t, tc.jsonResponse)
defer server.Close()
path, err := jsonpath.Compile("$.value")
getter, err := NewJSONPathMetricsGetter(DefaultMetricsHTTPClient(), tc.aggregator, tc.jsonPath)
require.NoError(t, err)
getter := NewJSONPathMetricsGetter(DefaultMetricsHTTPClient(), tc.aggregator, path)
url, err := url.Parse(fmt.Sprintf("%s/metrics", server.URL))
require.NoError(t, err)
metric, err := getter.GetMetric(*url)
require.NoError(t, err)
require.Equal(t, tc.output, metric)
require.Equal(t, tc.result, metric)
})
}
}
16 changes: 7 additions & 9 deletions pkg/collector/httpmetrics/pod_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"strconv"
"time"

"github.com/oliveagle/jsonpath"
v1 "k8s.io/api/core/v1"
)

Expand All @@ -33,18 +32,13 @@ func (g PodMetricsJSONPathGetter) GetMetric(pod *v1.Pod) (float64, error) {
func NewPodMetricsJSONPathGetter(config map[string]string) (*PodMetricsJSONPathGetter, error) {
getter := PodMetricsJSONPathGetter{}
var (
jsonPath *jsonpath.Compiled
jsonPath string
aggregator AggregatorFunc
err error
)

if v, ok := config["json-key"]; ok {
path, err := jsonpath.Compile(v)
if err != nil {
return nil, fmt.Errorf("failed to parse json path definition: %v", err)
}

jsonPath = path
jsonPath = v
}

if v, ok := config["scheme"]; ok {
Expand Down Expand Up @@ -99,7 +93,11 @@ func NewPodMetricsJSONPathGetter(config map[string]string) (*PodMetricsJSONPathG
connectTimeout = d
}

getter.metricGetter = NewJSONPathMetricsGetter(CustomMetricsHTTPClient(requestTimeout, connectTimeout), aggregator, jsonPath)
jsonPathGetter, err := NewJSONPathMetricsGetter(CustomMetricsHTTPClient(requestTimeout, connectTimeout), aggregator, jsonPath)
if err != nil {
return nil, err
}
getter.metricGetter = jsonPathGetter
return &getter, nil
}

Expand Down
Loading

0 comments on commit deec472

Please sign in to comment.