Skip to content

Commit

Permalink
Merge pull request #636 from zalando-incubator/nakadi-collector
Browse files Browse the repository at this point in the history
Add support for scaling based on Nakadi
  • Loading branch information
mikkeloscar authored Dec 7, 2023
2 parents 45a09c1 + 43dd2e9 commit 49f9f5f
Show file tree
Hide file tree
Showing 5 changed files with 506 additions and 1 deletion.
89 changes: 89 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -702,6 +702,95 @@ you need to define a `key` or other `tag` with a "star" query syntax like
metric label definitions. If both annotations and corresponding label is
defined, then the annotation takes precedence.


## Nakadi collector

The Nakadi collector allows scaling based on [Nakadi](https://nakadi.io/)
Subscription API stats metrics `consumer_lag_seconds` or `unconsumed_events`.

### Supported metrics

| Metric Type | Description | Type | K8s Versions |
|------------------------|-----------------------------------------------------------------------------|----------|--------------|
| `unconsumed-events` | Scale based on number of unconsumed events for a Nakadi subscription | External | `>=1.24` |
| `consumer-lag-seconds` | Scale based on number of max consumer lag seconds for a Nakadi subscription | External | `>=1.24` |
| ------------ | ------- | -- | -- |
| `unconsumed-events` | Scale based on number of unconsumed events for a Nakadi Subscription | External | `>=1.24` |
| `consumer-lag-seconds` | Scale based on number of max consumer lag seconds for a Nakadi Subscription | External | `>=1.24` |

```yaml
apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
name: myapp-hpa
annotations:
# metric-config.<metricType>.<metricName>.<collectorType>/<configKey>
metric-config.external.my-nakadi-consumer.nakadi/interval: "60s" # optional
spec:
scaleTargetRef:
apiVersion: apps/v1
kind: Deployment
name: custom-metrics-consumer
minReplicas: 0
maxReplicas: 8 # should match number of partitions for the event type
metrics:
- type: External
external:
metric:
name: my-nakadi-consumer
selector:
matchLabels:
type: nakadi
subscription-id: "708095f6-cece-4d02-840e-ee488d710b29"
metric-type: "consumer-lag-seconds|unconsumed-events"
target:
# value is compatible with the consumer-lag-seconds metric type.
# It describes the amount of consumer lag in seconds before scaling
# additionally up.
# if an event-type has multiple partitions the value of
# consumer-lag-seconds is the max of all the partitions.
value: "600" # 10m
# averageValue is compatible with unconsumed-events metric type.
# This means for every 30 unconsumed events a pod is added.
# unconsumed-events is the sum of of unconsumed_events over all
# partitions.
averageValue: "30"
type: AverageValue
```

The `subscription-id` is the Subscription ID of the relevant consumer. The
`metric-type` indicates whether to scale on `consumer-lag-seconds` or
`unconsumed-events` as outlined below.

`unconsumed-events` - is the total number of unconsumed events over all
partitions. When using this `metric-type` you should also use the target
`averageValue` which indicates the number of events which can be handled per
pod. To best estimate the number of events per pods, you need to understand the
average time for processing an event as well as the rate of events.

*Example*: You have an event type producing 100 events per second between 00:00
and 08:00. Between 08:01 to 23:59 it produces 400 events per second.
Let's assume that on average a single pod can consume 100 events per second,
then we can define 100 as `averageValue` and the HPA would scale to 1 between
00:00 and 08:00, and scale to 4 between 08:01 and 23:59. If there for some
reason is a short spike of 800 events per second, then it would scale to 8 pods
to process those events until the rate goes down again.

`consumer-lag-seconds` - describes the age of the oldest unconsumed event for
a subscription. If the event type has multiple partitions the lag is defined as
the max age over all partitions. When using this `metric-type` you should use
the target `value` to indicate the max lag (in seconds) before the HPA should
scale.

*Example*: You have a subscription with a defined SLO of "99.99 of events are
consumed within 30 min.". In this case you can define a target `value` of e.g.
20 min. (1200s) (to include a safety buffer) such that the HPA only scales up
from 1 to 2 if the target of 20 min. is breached and it needs to work faster
with more consumers.
For this case you should also account for the average time for processing an
event when defining the target.


## HTTP Collector

The http collector allows collecting metrics from an external endpoint specified in the HPA.
Expand Down
120 changes: 120 additions & 0 deletions pkg/collector/nakadi_collector.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
package collector

import (
"context"
"fmt"
"time"

"github.com/zalando-incubator/kube-metrics-adapter/pkg/nakadi"
autoscalingv2 "k8s.io/api/autoscaling/v2"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/metrics/pkg/apis/external_metrics"
)

const (
// NakadiMetricType defines the metric type for metrics based on Nakadi
// subscriptions.
NakadiMetricType = "nakadi"
nakadiSubscriptionIDKey = "subscription-id"
nakadiMetricTypeKey = "metric-type"
nakadiMetricTypeConsumerLagSeconds = "consumer-lag-seconds"
nakadiMetricTypeUnconsumedEvents = "unconsumed-events"
)

// NakadiCollectorPlugin defines a plugin for creating collectors that can get
// unconsumed events from Nakadi.
type NakadiCollectorPlugin struct {
nakadi nakadi.Nakadi
}

// NewNakadiCollectorPlugin initializes a new NakadiCollectorPlugin.
func NewNakadiCollectorPlugin(nakadi nakadi.Nakadi) (*NakadiCollectorPlugin, error) {
return &NakadiCollectorPlugin{
nakadi: nakadi,
}, nil
}

// NewCollector initializes a new Nakadi collector from the specified HPA.
func (c *NakadiCollectorPlugin) NewCollector(hpa *autoscalingv2.HorizontalPodAutoscaler, config *MetricConfig, interval time.Duration) (Collector, error) {
return NewNakadiCollector(c.nakadi, hpa, config, interval)
}

// NakadiCollector defines a collector that is able to collect metrics from
// Nakadi.
type NakadiCollector struct {
nakadi nakadi.Nakadi
interval time.Duration
subscriptionID string
nakadiMetricType string
metric autoscalingv2.MetricIdentifier
metricType autoscalingv2.MetricSourceType
namespace string
}

// NewNakadiCollector initializes a new NakadiCollector.
func NewNakadiCollector(nakadi nakadi.Nakadi, hpa *autoscalingv2.HorizontalPodAutoscaler, config *MetricConfig, interval time.Duration) (*NakadiCollector, error) {
if config.Metric.Selector == nil {
return nil, fmt.Errorf("selector for nakadi is not specified")
}

subscriptionID, ok := config.Config[nakadiSubscriptionIDKey]
if !ok {
return nil, fmt.Errorf("subscription-id not specified on metric")
}

metricType, ok := config.Config[nakadiMetricTypeKey]
if !ok {
return nil, fmt.Errorf("metric-type not specified on metric")
}

if metricType != nakadiMetricTypeConsumerLagSeconds && metricType != nakadiMetricTypeUnconsumedEvents {
return nil, fmt.Errorf("metric-type must be either '%s' or '%s', was '%s'", nakadiMetricTypeConsumerLagSeconds, nakadiMetricTypeUnconsumedEvents, metricType)
}

return &NakadiCollector{
nakadi: nakadi,
interval: interval,
subscriptionID: subscriptionID,
nakadiMetricType: metricType,
metric: config.Metric,
metricType: config.Type,
namespace: hpa.Namespace,
}, nil
}

// GetMetrics returns a list of collected metrics for the Nakadi subscription ID.
func (c *NakadiCollector) GetMetrics() ([]CollectedMetric, error) {
var value int64
var err error
switch c.nakadiMetricType {
case nakadiMetricTypeConsumerLagSeconds:
value, err = c.nakadi.ConsumerLagSeconds(context.TODO(), c.subscriptionID)
if err != nil {
return nil, err
}
case nakadiMetricTypeUnconsumedEvents:
value, err = c.nakadi.UnconsumedEvents(context.TODO(), c.subscriptionID)
if err != nil {
return nil, err
}
}

metricValue := CollectedMetric{
Namespace: c.namespace,
Type: c.metricType,
External: external_metrics.ExternalMetricValue{
MetricName: c.metric.Name,
MetricLabels: c.metric.Selector.MatchLabels,
Timestamp: metav1.Now(),
Value: *resource.NewQuantity(value, resource.DecimalSI),
},
}

return []CollectedMetric{metricValue}, nil
}

// Interval returns the interval at which the collector should run.
func (c *NakadiCollector) Interval() time.Duration {
return c.interval
}
124 changes: 124 additions & 0 deletions pkg/nakadi/nakadi.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
package nakadi

import (
"context"
"encoding/json"
"errors"
"fmt"
"io"
"net/http"
"net/url"
)

// Nakadi defines an interface for talking to the Nakadi API.
type Nakadi interface {
ConsumerLagSeconds(ctx context.Context, subscriptionID string) (int64, error)
UnconsumedEvents(ctx context.Context, subscriptionID string) (int64, error)
}

// Client defines client for interfacing with the Nakadi API.
type Client struct {
nakadiEndpoint string
http *http.Client
}

// NewNakadiClient initializes a new Nakadi Client.
func NewNakadiClient(nakadiEndpoint string, client *http.Client) *Client {
return &Client{
nakadiEndpoint: nakadiEndpoint,
http: client,
}
}

func (c *Client) ConsumerLagSeconds(ctx context.Context, subscriptionID string) (int64, error) {
stats, err := c.stats(ctx, subscriptionID)
if err != nil {
return 0, err
}

var maxConsumerLagSeconds int64
for _, eventType := range stats {
for _, partition := range eventType.Partitions {
maxConsumerLagSeconds = max(maxConsumerLagSeconds, partition.ConsumerLagSeconds)
}
}

return maxConsumerLagSeconds, nil
}

func (c *Client) UnconsumedEvents(ctx context.Context, subscriptionID string) (int64, error) {
stats, err := c.stats(ctx, subscriptionID)
if err != nil {
return 0, err
}

var unconsumedEvents int64
for _, eventType := range stats {
for _, partition := range eventType.Partitions {
unconsumedEvents += partition.UnconsumedEvents
}
}

return unconsumedEvents, nil
}

type statsResp struct {
Items []statsEventType `json:"items"`
}

type statsEventType struct {
EventType string `json:"event_type"`
Partitions []statsPartition `json:"partitions"`
}

type statsPartition struct {
Partiton string `json:"partition"`
State string `json:"state"`
UnconsumedEvents int64 `json:"unconsumed_events"`
ConsumerLagSeconds int64 `json:"consumer_lag_seconds"`
StreamID string `json:"stream_id"`
AssignmentType string `json:"assignment_type"`
}

// stats returns the Nakadi stats for a given subscription ID.
//
// https://nakadi.io/manual.html#/subscriptions/subscription_id/stats_get
func (c *Client) stats(ctx context.Context, subscriptionID string) ([]statsEventType, error) {
endpoint, err := url.Parse(c.nakadiEndpoint)
if err != nil {
return nil, err
}

endpoint.Path = fmt.Sprintf("/subscriptions/%s/stats", subscriptionID)

q := endpoint.Query()
q.Set("show_time_lag", "true")
endpoint.RawQuery = q.Encode()

resp, err := c.http.Get(endpoint.String())
if err != nil {
return nil, err
}
defer resp.Body.Close()

d, err := io.ReadAll(resp.Body)
if err != nil {
return nil, err
}

if resp.StatusCode != http.StatusOK {
return nil, fmt.Errorf("[nakadi stats] unexpected response code: %d (%s)", resp.StatusCode, string(d))
}

var result statsResp
err = json.Unmarshal(d, &result)
if err != nil {
return nil, err
}

if len(result.Items) == 0 {
return nil, errors.New("expected at least 1 event-type, 0 returned")
}

return result.Items, nil
}
Loading

0 comments on commit 49f9f5f

Please sign in to comment.