Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add global connection pool for mysql #6327

Open
wants to merge 12 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 7 additions & 4 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ To learn more about active deprecations, we recommend checking [GitHub Discussio

### New

- TODO ([#XXX](https://github.com/kedacore/keda/issues/XXX))
- **General**: Introduce new NSQ scaler ([#3281](https://github.com/kedacore/keda/issues/3281))

#### Experimental

Expand All @@ -72,23 +72,26 @@ Here is an overview of all new **experimental** features:

### Fixes

- TODO ([#XXX](https://github.com/kedacore/keda/issues/XXX))
- **General**: Centralize and improve automaxprocs configuration with proper structured logging ([#5970](https://github.com/kedacore/keda/issues/5970))
- **General**: Paused ScaledObject count is reported correctly after operator restart ([#6321](https://github.com/kedacore/keda/issues/6321))
- **General**: ScaledJobs ready status set to true when recoverred problem ([#6329](https://github.com/kedacore/keda/pull/6329))

### Deprecations

You can find all deprecations in [this overview](https://github.com/kedacore/keda/issues?q=is%3Aissue+is%3Aopen+sort%3Aupdated-desc+label%3Abreaking-change) and [join the discussion here](https://github.com/kedacore/keda/discussions/categories/deprecations).

New deprecation(s):

- TODO ([#XXX](https://github.com/kedacore/keda/issues/XXX))
- **NATS Streaming scaler**: Deprecate NATS Streaming Server (aka Stan) ([#6362](https://github.com/kedacore/keda/issues/6362))

### Breaking Changes

- TODO ([#XXX](https://github.com/kedacore/keda/issues/XXX))

### Other

- TODO ([#XXX](https://github.com/kedacore/keda/issues/XXX))
- **General**: Bump newrelic-client-go deps to 2.51.2 (latest) ([#6325](https://github.com/kedacore/keda/pull/6325))


## v2.16.0

Expand Down
7 changes: 6 additions & 1 deletion cmd/adapter/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import (
grpcprom "github.com/grpc-ecosystem/go-grpc-middleware/providers/prometheus"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/collectors"
_ "go.uber.org/automaxprocs"
appsv1 "k8s.io/api/apps/v1"
apimetrics "k8s.io/apiserver/pkg/endpoints/metrics"
"k8s.io/client-go/kubernetes/scheme"
Expand Down Expand Up @@ -257,6 +256,12 @@ func main() {
return
}

err = kedautil.ConfigureMaxProcs(logger)
if err != nil {
logger.Error(err, "failed to set max procs")
return
}

kedaProvider, err := cmd.makeProvider(ctx)
if err != nil {
logger.Error(err, "making provider")
Expand Down
8 changes: 7 additions & 1 deletion cmd/operator/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (
"time"

"github.com/spf13/pflag"
_ "go.uber.org/automaxprocs"
apimachineryruntime "k8s.io/apimachinery/pkg/runtime"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
kubeinformers "k8s.io/client-go/informers"
Expand Down Expand Up @@ -115,6 +114,13 @@ func main() {

ctrl.SetLogger(zap.New(zap.UseFlagOptions(&opts)))
ctx := ctrl.SetupSignalHandler()

err := kedautil.ConfigureMaxProcs(setupLog)
if err != nil {
setupLog.Error(err, "failed to set max procs")
os.Exit(1)
}

namespaces, err := kedautil.GetWatchNamespaces()
if err != nil {
setupLog.Error(err, "failed to get watch namespace")
Expand Down
7 changes: 6 additions & 1 deletion cmd/webhooks/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (
"os"

"github.com/spf13/pflag"
_ "go.uber.org/automaxprocs"
apimachineryruntime "k8s.io/apimachinery/pkg/runtime"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
clientgoscheme "k8s.io/client-go/kubernetes/scheme"
Expand Down Expand Up @@ -80,6 +79,12 @@ func main() {

ctrl.SetLogger(zap.New(zap.UseFlagOptions(&opts)))

err := kedautil.ConfigureMaxProcs(setupLog)
if err != nil {
setupLog.Error(err, "failed to set max procs")
os.Exit(1)
}

ctx := ctrl.SetupSignalHandler()

cfg := ctrl.GetConfigOrDie()
Expand Down
4 changes: 2 additions & 2 deletions controllers/keda/scaledobject_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,8 @@ func (r *ScaledObjectReconciler) Reconcile(ctx context.Context, req ctrl.Request
conditions.SetFallbackCondition(metav1.ConditionFalse, "NoFallbackFound", "No fallbacks are active on this scaled object")
}

metricscollector.RecordScaledObjectPaused(scaledObject.Namespace, scaledObject.Name, conditions.GetPausedCondition().Status == metav1.ConditionTrue)

if err := kedastatus.SetStatusConditions(ctx, r.Client, reqLogger, scaledObject, &conditions); err != nil {
r.EventEmitter.Emit(scaledObject, req.NamespacedName.Namespace, corev1.EventTypeWarning, eventingv1alpha1.ScaledObjectFailedType, eventreason.ScaledObjectUpdateFailed, err.Error())
return ctrl.Result{}, err
Expand Down Expand Up @@ -246,12 +248,10 @@ func (r *ScaledObjectReconciler) reconcileScaledObject(ctx context.Context, logg
return msg, err
}
conditions.SetPausedCondition(metav1.ConditionTrue, kedav1alpha1.ScaledObjectConditionPausedReason, msg)
metricscollector.RecordScaledObjectPaused(scaledObject.Namespace, scaledObject.Name, true)
return msg, nil
}
} else if conditions.GetPausedCondition().Status == metav1.ConditionTrue {
conditions.SetPausedCondition(metav1.ConditionFalse, "ScaledObjectUnpaused", "pause annotation removed for ScaledObject")
metricscollector.RecordScaledObjectPaused(scaledObject.Namespace, scaledObject.Name, false)
}

// Check scale target Name is specified
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ require (
github.com/microsoft/ApplicationInsights-Go v0.4.4
github.com/microsoft/azure-devops-go-api/azuredevops v1.0.0-b5
github.com/mitchellh/hashstructure v1.1.0
github.com/newrelic/newrelic-client-go v1.1.0
github.com/newrelic/newrelic-client-go/v2 v2.51.2
github.com/onsi/ginkgo/v2 v2.21.0
github.com/onsi/gomega v1.35.1
github.com/open-policy-agent/cert-controller v0.12.0
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -2173,6 +2173,8 @@ github.com/neelance/astrewrite v0.0.0-20160511093645-99348263ae86/go.mod h1:kHJE
github.com/neelance/sourcemap v0.0.0-20200213170602-2833bce08e4c/go.mod h1:Qr6/a/Q4r9LP1IltGz7tA7iOK1WonHEYhu1HRBA7ZiM=
github.com/newrelic/newrelic-client-go v1.1.0 h1:aflNjzQ21c+2GwBVh+UbAf9lznkRfCcVABoc5UM4IXw=
github.com/newrelic/newrelic-client-go v1.1.0/go.mod h1:RYMXt7hgYw7nzuXIGd2BH0F1AivgWw7WrBhNBQZEB4k=
github.com/newrelic/newrelic-client-go/v2 v2.51.2 h1:Xf+M0NuZuIuxqG48zYoqyIdQL514j2J1c+kNVYajcYI=
github.com/newrelic/newrelic-client-go/v2 v2.51.2/go.mod h1:+RRjI3nDGWT3kLm9Oi3QxpBm70uu8q1upEHBVWCZFpo=
github.com/nxadm/tail v1.4.8 h1:nPr65rt6Y5JFSKQO7qToXr7pePgD6Gwiw05lkbyAQTE=
github.com/nxadm/tail v1.4.8/go.mod h1:+ncqLTQzXmGhMZNUePPaPqPvBxHAIsmXswZKocGu+AU=
github.com/oapi-codegen/runtime v1.1.1 h1:EXLHh0DXIJnWhdRPN2w4MXAzFyE4CskzhNLUmtpMYro=
Expand Down
69 changes: 18 additions & 51 deletions pkg/scalers/aws_dynamodb_streams_scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package scalers
import (
"context"
"fmt"
"strconv"

"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/service/dynamodb"
Expand Down Expand Up @@ -31,11 +30,11 @@ type awsDynamoDBStreamsScaler struct {
}

type awsDynamoDBStreamsMetadata struct {
targetShardCount int64
activationTargetShardCount int64
tableName string
awsRegion string
awsEndpoint string
TargetShardCount int64 `keda:"name=shardCount, order=triggerMetadata, default=2"`
ActivationTargetShardCount int64 `keda:"name=activationShardCount, order=triggerMetadata, default=0"`
TableName string `keda:"name=tableName, order=triggerMetadata"`
AwsRegion string `keda:"name=awsRegion, order=triggerMetadata"`
AwsEndpoint string `keda:"name=awsEndpoint, order=triggerMetadata, optional"`
awsAuthorization awsutils.AuthorizationMetadata
triggerIndex int
}
Expand All @@ -49,7 +48,7 @@ func NewAwsDynamoDBStreamsScaler(ctx context.Context, config *scalersconfig.Scal

logger := InitializeLogger(config, "aws_dynamodb_streams_scaler")

meta, err := parseAwsDynamoDBStreamsMetadata(config, logger)
meta, err := parseAwsDynamoDBStreamsMetadata(config)
if err != nil {
return nil, fmt.Errorf("error parsing dynamodb stream metadata: %w", err)
}
Expand All @@ -58,7 +57,7 @@ func NewAwsDynamoDBStreamsScaler(ctx context.Context, config *scalersconfig.Scal
if err != nil {
return nil, fmt.Errorf("error when creating dynamodbstream client: %w", err)
}
streamArn, err := getDynamoDBStreamsArn(ctx, dbClient, &meta.tableName)
streamArn, err := getDynamoDBStreamsArn(ctx, dbClient, &meta.TableName)
if err != nil {
return nil, fmt.Errorf("error dynamodb stream arn: %w", err)
}
Expand All @@ -74,43 +73,11 @@ func NewAwsDynamoDBStreamsScaler(ctx context.Context, config *scalersconfig.Scal
}, nil
}

func parseAwsDynamoDBStreamsMetadata(config *scalersconfig.ScalerConfig, logger logr.Logger) (*awsDynamoDBStreamsMetadata, error) {
func parseAwsDynamoDBStreamsMetadata(config *scalersconfig.ScalerConfig) (*awsDynamoDBStreamsMetadata, error) {
meta := awsDynamoDBStreamsMetadata{}
meta.targetShardCount = defaultTargetDBStreamsShardCount

if val, ok := config.TriggerMetadata["awsRegion"]; ok && val != "" {
meta.awsRegion = val
} else {
return nil, fmt.Errorf("no awsRegion given")
}

if val, ok := config.TriggerMetadata["awsEndpoint"]; ok {
meta.awsEndpoint = val
}

if val, ok := config.TriggerMetadata["tableName"]; ok && val != "" {
meta.tableName = val
} else {
return nil, fmt.Errorf("no tableName given")
}

if val, ok := config.TriggerMetadata["shardCount"]; ok && val != "" {
shardCount, err := strconv.ParseInt(val, 10, 64)
if err != nil {
meta.targetShardCount = defaultTargetDBStreamsShardCount
logger.Error(err, "error parsing dyanmodb stream metadata shardCount, using default %n", defaultTargetDBStreamsShardCount)
} else {
meta.targetShardCount = shardCount
}
}
if val, ok := config.TriggerMetadata["activationShardCount"]; ok && val != "" {
shardCount, err := strconv.ParseInt(val, 10, 64)
if err != nil {
meta.activationTargetShardCount = defaultActivationTargetDBStreamsShardCount
logger.Error(err, "error parsing dyanmodb stream metadata activationTargetShardCount, using default %n", defaultActivationTargetDBStreamsShardCount)
} else {
meta.activationTargetShardCount = shardCount
}
if err := config.TypedConfig(&meta); err != nil {
return nil, fmt.Errorf("error parsing dynamodb stream metadata: %w", err)
}

auth, err := awsutils.GetAwsAuthorization(config.TriggerUniqueKey, config.PodIdentity, config.TriggerMetadata, config.AuthParams, config.ResolvedEnv)
Expand All @@ -125,18 +92,18 @@ func parseAwsDynamoDBStreamsMetadata(config *scalersconfig.ScalerConfig, logger
}

func createClientsForDynamoDBStreamsScaler(ctx context.Context, metadata *awsDynamoDBStreamsMetadata) (*dynamodb.Client, *dynamodbstreams.Client, error) {
cfg, err := awsutils.GetAwsConfig(ctx, metadata.awsRegion, metadata.awsAuthorization)
cfg, err := awsutils.GetAwsConfig(ctx, metadata.AwsRegion, metadata.awsAuthorization)
if err != nil {
return nil, nil, err
}
dbClient := dynamodb.NewFromConfig(*cfg, func(options *dynamodb.Options) {
if metadata.awsEndpoint != "" {
options.BaseEndpoint = aws.String(metadata.awsEndpoint)
if metadata.AwsEndpoint != "" {
options.BaseEndpoint = aws.String(metadata.AwsEndpoint)
}
})
dbStreamClient := dynamodbstreams.NewFromConfig(*cfg, func(options *dynamodbstreams.Options) {
if metadata.awsEndpoint != "" {
options.BaseEndpoint = aws.String(metadata.awsEndpoint)
if metadata.AwsEndpoint != "" {
options.BaseEndpoint = aws.String(metadata.AwsEndpoint)
}
})

Expand Down Expand Up @@ -176,9 +143,9 @@ func (s *awsDynamoDBStreamsScaler) Close(_ context.Context) error {
func (s *awsDynamoDBStreamsScaler) GetMetricSpecForScaling(_ context.Context) []v2.MetricSpec {
externalMetric := &v2.ExternalMetricSource{
Metric: v2.MetricIdentifier{
Name: GenerateMetricNameWithIndex(s.metadata.triggerIndex, kedautil.NormalizeString(fmt.Sprintf("aws-dynamodb-streams-%s", s.metadata.tableName))),
Name: GenerateMetricNameWithIndex(s.metadata.triggerIndex, kedautil.NormalizeString(fmt.Sprintf("aws-dynamodb-streams-%s", s.metadata.TableName))),
},
Target: GetMetricTarget(s.metricType, s.metadata.targetShardCount),
Target: GetMetricTarget(s.metricType, s.metadata.TargetShardCount),
}
metricSpec := v2.MetricSpec{External: externalMetric, Type: externalMetricType}
return []v2.MetricSpec{metricSpec}
Expand All @@ -195,7 +162,7 @@ func (s *awsDynamoDBStreamsScaler) GetMetricsAndActivity(ctx context.Context, me

metric := GenerateMetricInMili(metricName, float64(shardCount))

return []external_metrics.ExternalMetricValue{metric}, shardCount > s.metadata.activationTargetShardCount, nil
return []external_metrics.ExternalMetricValue{metric}, shardCount > s.metadata.ActivationTargetShardCount, nil
}

// GetDynamoDBStreamShardCount Get DynamoDB Stream Shard Count
Expand Down
Loading
Loading