Skip to content

Commit

Permalink
Add basic SASL and TLS support for Kafka cloud events
Browse files Browse the repository at this point in the history
  • Loading branch information
Sovietaced committed Oct 5, 2024
1 parent 9abfbda commit 6c3cfe6
Show file tree
Hide file tree
Showing 10 changed files with 78 additions and 13 deletions.
1 change: 1 addition & 0 deletions charts/flyte-core/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ helm install gateway bitnami/contour -n flyte
| cloud_events.enable | bool | `false` | |
| cloud_events.eventsPublisher.eventTypes[0] | string | `"all"` | |
| cloud_events.eventsPublisher.topicName | string | `"arn:aws:sns:us-east-2:123456:123-my-topic"` | |
| cloud_events.secretName | string | `""` | The name of the secret to use to alternatively load in cloud events configuration via a secret. Useful when the configuration contains secrets. |
| cloud_events.type | string | `"aws"` | |
| cluster_resource_manager | object | `{"config":{"cluster_resources":{"customData":[{"production":[{"projectQuotaCpu":{"value":"5"}},{"projectQuotaMemory":{"value":"4000Mi"}}]},{"staging":[{"projectQuotaCpu":{"value":"2"}},{"projectQuotaMemory":{"value":"3000Mi"}}]},{"development":[{"projectQuotaCpu":{"value":"4"}},{"projectQuotaMemory":{"value":"3000Mi"}}]}],"refreshInterval":"5m","standaloneDeployment":false,"templatePath":"/etc/flyte/clusterresource/templates"}},"enabled":true,"nodeSelector":{},"podAnnotations":{},"podEnv":{},"podLabels":{},"prometheus":{"enabled":false,"path":"/metrics","port":10254},"resources":{},"service_account_name":"flyteadmin","standaloneDeployment":false,"templates":[{"key":"aa_namespace","value":"apiVersion: v1\nkind: Namespace\nmetadata:\n name: {{ namespace }}\nspec:\n finalizers:\n - kubernetes\n"},{"key":"ab_project_resource_quota","value":"apiVersion: v1\nkind: ResourceQuota\nmetadata:\n name: project-quota\n namespace: {{ namespace }}\nspec:\n hard:\n limits.cpu: {{ projectQuotaCpu }}\n limits.memory: {{ projectQuotaMemory }}\n"}]}` | Configuration for the Cluster resource manager component. This is an optional component, that enables automatic cluster configuration. This is useful to set default quotas, manage namespaces etc that map to a project/domain |
| cluster_resource_manager.config | object | `{"cluster_resources":{"customData":[{"production":[{"projectQuotaCpu":{"value":"5"}},{"projectQuotaMemory":{"value":"4000Mi"}}]},{"staging":[{"projectQuotaCpu":{"value":"2"}},{"projectQuotaMemory":{"value":"3000Mi"}}]},{"development":[{"projectQuotaCpu":{"value":"4"}},{"projectQuotaMemory":{"value":"3000Mi"}}]}],"refreshInterval":"5m","standaloneDeployment":false,"templatePath":"/etc/flyte/clusterresource/templates"}}` | Configmap for ClusterResource parameters |
Expand Down
2 changes: 1 addition & 1 deletion charts/flyte-core/templates/admin/configmap.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ data:
externalEvents: {{ tpl (toYaml .) $ | nindent 6 }}
{{- end }}
{{- end }}
{{- if .Values.cloud_events.enable }}
{{- if and .Values.cloud_events.enable (not .Values.cloud_events.secretName) }}
{{- with .Values.cloud_events }}
cloud_events.yaml: |
cloudEvents: {{ tpl (toYaml .) $ | nindent 6 }}
Expand Down
4 changes: 4 additions & 0 deletions charts/flyte-core/templates/admin/deployment.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,10 @@ spec:
name: flyte-admin-base-config
- configMap:
name: flyte-admin-clusters-config
{{- if .Values.cloud_events.secretName }}
- secret:
name: {{ .Values.cloud_events.secretName }}
{{- end }}
name: clusters-config-volume
{{- if .Values.cluster_resource_manager.enabled }}
- configMap:
Expand Down
3 changes: 3 additions & 0 deletions charts/flyte-core/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -943,6 +943,9 @@ external_events:
# Cloud events are used to send events (unprocessed, as Admin see them) in cloud event format to
# an SNS topic (or gcp equivalent)
cloud_events:
# -- The name of the secret to use to alternatively load in cloud events configuration via a secret. Useful when the
# configuration contains secrets.
secretName: ""
enable: false
type: aws
aws:
Expand Down
4 changes: 2 additions & 2 deletions docker/sandbox-bundled/manifests/complete-agent.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -816,7 +816,7 @@ type: Opaque
---
apiVersion: v1
data:
haSharedSecret: cmRzbzQ4N3RQaWhuMk00OA==
haSharedSecret: ak5wVTFQVjRHMm5ZanVNUQ==
proxyPassword: ""
proxyUsername: ""
kind: Secret
Expand Down Expand Up @@ -1413,7 +1413,7 @@ spec:
metadata:
annotations:
checksum/config: 8f50e768255a87f078ba8b9879a0c174c3e045ffb46ac8723d2eedbe293c8d81
checksum/secret: 51528951e92c2bf712bbde990941593aae1fcf72144a1fe944c312ddad86e161
checksum/secret: db4b259a37cc362add2a4fd4c52954eabc67e69e2c399a292605415c70da4a2b
labels:
app: docker-registry
release: flyte-sandbox
Expand Down
4 changes: 2 additions & 2 deletions docker/sandbox-bundled/manifests/complete.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -798,7 +798,7 @@ type: Opaque
---
apiVersion: v1
data:
haSharedSecret: T1I2Q2tTcmREVG15MldGUQ==
haSharedSecret: RGZkSmNtV3k4dDZYd0pHVw==
proxyPassword: ""
proxyUsername: ""
kind: Secret
Expand Down Expand Up @@ -1362,7 +1362,7 @@ spec:
metadata:
annotations:
checksum/config: 8f50e768255a87f078ba8b9879a0c174c3e045ffb46ac8723d2eedbe293c8d81
checksum/secret: d723e395edc0fd2f221b9088efffe0d1f4dfabdef9892065fdabe12233362cf5
checksum/secret: d0a1f670be47a94b928141eae8a50733a775d074aee3a78db555b0728c90718e
labels:
app: docker-registry
release: flyte-sandbox
Expand Down
4 changes: 2 additions & 2 deletions docker/sandbox-bundled/manifests/dev.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -499,7 +499,7 @@ metadata:
---
apiVersion: v1
data:
haSharedSecret: ZnltNHNiZ01NRFNkb1RlMA==
haSharedSecret: amliZ0l4QXczU0ZjUUloWQ==
proxyPassword: ""
proxyUsername: ""
kind: Secret
Expand Down Expand Up @@ -934,7 +934,7 @@ spec:
metadata:
annotations:
checksum/config: 8f50e768255a87f078ba8b9879a0c174c3e045ffb46ac8723d2eedbe293c8d81
checksum/secret: eeab364c20a0e8ad5a1526ccd7ddbd1d5a442087e7267c4d761279102b81be21
checksum/secret: 4c93d218f3a1654f7eb3a238a4b1f57fbfda80cd8e5c4aaf5a9286a93f4a94f2
labels:
app: docker-registry
release: flyte-sandbox
Expand Down
2 changes: 2 additions & 0 deletions flyteadmin/.golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -39,3 +39,5 @@ issues:
exclude-rules:
- path: pkg/workflowengine/impl/prepare_execution.go
text: "copies lock"
- path: pkg/runtime/interfaces/application_configuration.go
text: "G402: TLS InsecureSkipVerify may be true."
7 changes: 1 addition & 6 deletions flyteadmin/pkg/async/cloudevent/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,12 +73,7 @@ func NewCloudEventsPublisher(ctx context.Context, db repositoryInterfaces.Reposi

case cloudEventImplementations.Kafka:
saramaConfig := sarama.NewConfig()
var err error
saramaConfig.Version, err = sarama.ParseKafkaVersion(cloudEventsConfig.KafkaConfig.Version)
if err != nil {
logger.Fatalf(ctx, "failed to parse kafka version, %v", err)
panic(err)
}
cloudEventsConfig.KafkaConfig.UpdateSaramaConfig(ctx, saramaConfig)
kafkaSender, err := kafka_sarama.NewSender(cloudEventsConfig.KafkaConfig.Brokers, saramaConfig, cloudEventsConfig.EventsPublisherConfig.TopicName)
if err != nil {
panic(err)
Expand Down
60 changes: 60 additions & 0 deletions flyteadmin/pkg/runtime/interfaces/application_configuration.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,18 @@
package interfaces

import (
"context"
"crypto/tls"

"github.com/Shopify/sarama"
"github.com/golang/protobuf/ptypes/wrappers"
"golang.org/x/time/rate"

"github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/admin"
"github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/core"
"github.com/flyteorg/flyte/flytestdlib/config"
"github.com/flyteorg/flyte/flytestdlib/database"
"github.com/flyteorg/flyte/flytestdlib/logger"
)

// DbConfig is used to for initiating the database connection with the store that holds registered
Expand Down Expand Up @@ -231,11 +236,66 @@ type GCPConfig struct {
ProjectID string `json:"projectId"`
}

type SASLConfig struct {
Enabled bool `json:"enabled"`
User string `json:"user"`
Password string `json:"password"`
Handshake bool `json:"handshake"`
Mechanism sarama.SASLMechanism `json:"mechanism"`
}

type TLSConfig struct {
Enabled bool `json:"enabled"`
InsecureSkipVerify bool `json:"insecureSkipVerify"`
CertPath string `json:"certPath"`
KeyPath string `json:"keyPath"`
}

type KafkaConfig struct {
// The version of Kafka, e.g. 2.1.0, 0.8.2.0
Version string `json:"version"`
// kafka broker addresses
Brokers []string `json:"brokers"`
// sasl config
SASLConfig SASLConfig `json:"sasl_config"`
// tls config
TLSConfig TLSConfig `json:"tls_config"`
}

func (k KafkaConfig) UpdateSaramaConfig(ctx context.Context, s *sarama.Config) {
var err error
s.Version, err = sarama.ParseKafkaVersion(k.Version)
if err != nil {
logger.Fatalf(ctx, "failed to parse kafka version, %v", err)
panic(err)

Check warning on line 270 in flyteadmin/pkg/runtime/interfaces/application_configuration.go

View check run for this annotation

Codecov / codecov/patch

flyteadmin/pkg/runtime/interfaces/application_configuration.go#L269-L270

Added lines #L269 - L270 were not covered by tests
}

if k.SASLConfig.Enabled {
s.Net.SASL.Enable = true
s.Net.SASL.User = k.SASLConfig.User
s.Net.SASL.Password = k.SASLConfig.Password
s.Net.SASL.Handshake = k.SASLConfig.Handshake

Check warning on line 277 in flyteadmin/pkg/runtime/interfaces/application_configuration.go

View check run for this annotation

Codecov / codecov/patch

flyteadmin/pkg/runtime/interfaces/application_configuration.go#L274-L277

Added lines #L274 - L277 were not covered by tests

if k.SASLConfig.Mechanism == "" {
k.SASLConfig.Mechanism = sarama.SASLTypePlaintext

Check warning on line 280 in flyteadmin/pkg/runtime/interfaces/application_configuration.go

View check run for this annotation

Codecov / codecov/patch

flyteadmin/pkg/runtime/interfaces/application_configuration.go#L279-L280

Added lines #L279 - L280 were not covered by tests
}
s.Net.SASL.Mechanism = k.SASLConfig.Mechanism

Check warning on line 282 in flyteadmin/pkg/runtime/interfaces/application_configuration.go

View check run for this annotation

Codecov / codecov/patch

flyteadmin/pkg/runtime/interfaces/application_configuration.go#L282

Added line #L282 was not covered by tests
}

if k.TLSConfig.Enabled {
s.Net.TLS.Enable = true
s.Net.TLS.Config = &tls.Config{
InsecureSkipVerify: k.TLSConfig.InsecureSkipVerify,

Check warning on line 288 in flyteadmin/pkg/runtime/interfaces/application_configuration.go

View check run for this annotation

Codecov / codecov/patch

flyteadmin/pkg/runtime/interfaces/application_configuration.go#L286-L288

Added lines #L286 - L288 were not covered by tests
}
if k.TLSConfig.KeyPath != "" && k.TLSConfig.CertPath != "" {
cert, err := tls.LoadX509KeyPair(k.TLSConfig.CertPath, k.TLSConfig.KeyPath)
if err != nil {
logger.Fatalf(ctx, "failed to load kafka client keypair: %v", err)
panic(err)

Check warning on line 294 in flyteadmin/pkg/runtime/interfaces/application_configuration.go

View check run for this annotation

Codecov / codecov/patch

flyteadmin/pkg/runtime/interfaces/application_configuration.go#L290-L294

Added lines #L290 - L294 were not covered by tests
}
s.Net.TLS.Config.Certificates = []tls.Certificate{cert}

Check warning on line 296 in flyteadmin/pkg/runtime/interfaces/application_configuration.go

View check run for this annotation

Codecov / codecov/patch

flyteadmin/pkg/runtime/interfaces/application_configuration.go#L296

Added line #L296 was not covered by tests
}
}
}

// This section holds configuration for the event scheduler used to schedule workflow executions.
Expand Down

0 comments on commit 6c3cfe6

Please sign in to comment.