Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add basic SASL and TLS support for Kafka cloud events #5814

Merged
merged 2 commits into from
Oct 31, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 16 additions & 1 deletion charts/flyte-core/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -55,10 +55,25 @@ helm install gateway bitnami/contour -n flyte

| Key | Type | Default | Description |
|-----|------|---------|-------------|
| cloud_events.aws.region | string | `"us-east-2"` | |
| cloud_events.aws | object | `{"region":"us-east-2"}` | Configuration for sending cloud events to AWS SNS |
| cloud_events.enable | bool | `false` | |
| cloud_events.eventsPublisher.eventTypes[0] | string | `"all"` | |
| cloud_events.eventsPublisher.topicName | string | `"arn:aws:sns:us-east-2:123456:123-my-topic"` | |
| cloud_events.gcp | object | `{"region":"us-east1"}` | Configuration for sending cloud events to GCP Pub Sub |
| cloud_events.kafka | object | `{"brokers":["mybroker:443"],"saslConfig":{"enabled":false,"handshake":true,"mechanism":"PLAIN","password":"","passwordPath":"","user":"kafka"},"tlsConfig":{"certPath":"/etc/ssl/certs/kafka-client.crt","enabled":false,"keyPath":"/etc/ssl/certs/kafka-client.key"},"version":"3.7.0"}` | Configuration for sending cloud events to Kafka |
| cloud_events.kafka.brokers | list | `["mybroker:443"]` | The kafka brokers to talk to |
| cloud_events.kafka.saslConfig | object | `{"enabled":false,"handshake":true,"mechanism":"PLAIN","password":"","passwordPath":"","user":"kafka"}` | SASL based authentication |
| cloud_events.kafka.saslConfig.enabled | bool | `false` | Whether to use SASL authentication |
| cloud_events.kafka.saslConfig.handshake | bool | `true` | Whether the send the SASL handsahke first |
| cloud_events.kafka.saslConfig.mechanism | string | `"PLAIN"` | Which SASL mechanism to use. Defaults to PLAIN |
| cloud_events.kafka.saslConfig.password | string | `""` | The password for the kafka user |
| cloud_events.kafka.saslConfig.passwordPath | string | `""` | Optional mount path of file containing the kafka password. |
| cloud_events.kafka.saslConfig.user | string | `"kafka"` | The kafka user |
| cloud_events.kafka.tlsConfig | object | `{"certPath":"/etc/ssl/certs/kafka-client.crt","enabled":false,"keyPath":"/etc/ssl/certs/kafka-client.key"}` | Certificate based authentication |
| cloud_events.kafka.tlsConfig.certPath | string | `"/etc/ssl/certs/kafka-client.crt"` | Path to the client certificate |
| cloud_events.kafka.tlsConfig.enabled | bool | `false` | Whether to use certificate based authentication or TLS |
| cloud_events.kafka.tlsConfig.keyPath | string | `"/etc/ssl/certs/kafka-client.key"` | Path to the client private key |
| cloud_events.kafka.version | string | `"3.7.0"` | The version of Kafka |
| cloud_events.type | string | `"aws"` | |
| cluster_resource_manager | object | `{"config":{"cluster_resources":{"customData":[{"production":[{"projectQuotaCpu":{"value":"5"}},{"projectQuotaMemory":{"value":"4000Mi"}}]},{"staging":[{"projectQuotaCpu":{"value":"2"}},{"projectQuotaMemory":{"value":"3000Mi"}}]},{"development":[{"projectQuotaCpu":{"value":"4"}},{"projectQuotaMemory":{"value":"3000Mi"}}]}],"refreshInterval":"5m","standaloneDeployment":false,"templatePath":"/etc/flyte/clusterresource/templates"}},"enabled":true,"nodeSelector":{},"podAnnotations":{},"podEnv":{},"podLabels":{},"prometheus":{"enabled":false,"path":"/metrics","port":10254},"resources":{},"service_account_name":"flyteadmin","standaloneDeployment":false,"templates":[{"key":"aa_namespace","value":"apiVersion: v1\nkind: Namespace\nmetadata:\n name: {{ namespace }}\nspec:\n finalizers:\n - kubernetes\n"},{"key":"ab_project_resource_quota","value":"apiVersion: v1\nkind: ResourceQuota\nmetadata:\n name: project-quota\n namespace: {{ namespace }}\nspec:\n hard:\n limits.cpu: {{ projectQuotaCpu }}\n limits.memory: {{ projectQuotaMemory }}\n"}]}` | Configuration for the Cluster resource manager component. This is an optional component, that enables automatic cluster configuration. This is useful to set default quotas, manage namespaces etc that map to a project/domain |
| cluster_resource_manager.config | object | `{"cluster_resources":{"customData":[{"production":[{"projectQuotaCpu":{"value":"5"}},{"projectQuotaMemory":{"value":"4000Mi"}}]},{"staging":[{"projectQuotaCpu":{"value":"2"}},{"projectQuotaMemory":{"value":"3000Mi"}}]},{"development":[{"projectQuotaCpu":{"value":"4"}},{"projectQuotaMemory":{"value":"3000Mi"}}]}],"refreshInterval":"5m","standaloneDeployment":false,"templatePath":"/etc/flyte/clusterresource/templates"}}` | Configmap for ClusterResource parameters |
Expand Down
39 changes: 36 additions & 3 deletions charts/flyte-core/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -946,15 +946,48 @@ external_events:
# an SNS topic (or gcp equivalent)
cloud_events:
enable: false
type: aws
aws:
region: us-east-2
eventsPublisher:
# Make sure this is not a fifo queue. Admin does not yet support
# writing to fifo sns topics.
topicName: "arn:aws:sns:us-east-2:123456:123-my-topic"
eventTypes:
- all # Or workflow, node, task. Or "*"
type: aws
# -- Configuration for sending cloud events to AWS SNS
aws:
region: us-east-2
# -- Configuration for sending cloud events to GCP Pub Sub
gcp:
region: us-east1
# -- Configuration for sending cloud events to Kafka
kafka:
# -- The version of Kafka
version: "3.7.0"
# -- The kafka brokers to talk to
brokers:
- mybroker:443
# -- SASL based authentication
saslConfig:
# -- Whether to use SASL authentication
enabled: false
# -- The kafka user
user: kafka
# -- The password for the kafka user
password: ""
# -- Optional mount path of file containing the kafka password.
passwordPath: ""
# -- Whether the send the SASL handsahke first
handshake: true
# -- Which SASL mechanism to use. Defaults to PLAIN
mechanism: PLAIN
# -- Certificate based authentication
tlsConfig:
# -- Whether to use certificate based authentication or TLS
enabled: false
# -- Path to the client certificate
certPath: /etc/ssl/certs/kafka-client.crt
# -- Path to the client private key
keyPath: /etc/ssl/certs/kafka-client.key

# -- Configuration for the Cluster resource manager component. This is an optional component, that enables automatic
# cluster configuration. This is useful to set default quotas, manage namespaces etc that map to a project/domain
Expand Down
9 changes: 6 additions & 3 deletions docker/sandbox-bundled/manifests/complete-agent.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -444,6 +444,9 @@ data:
disabled: false
seedProjects:
- flytesnacks
seedProjectsWithDetails:
- description: Default project setup.
name: flytesnacks
dataCatalog:
disabled: false
propeller:
Expand Down Expand Up @@ -816,7 +819,7 @@ type: Opaque
---
apiVersion: v1
data:
haSharedSecret: M0MwZjRDRlRMRVg5eFlNWA==
haSharedSecret: SlI1TDFkTXBMaThuc0hlSQ==
proxyPassword: ""
proxyUsername: ""
kind: Secret
Expand Down Expand Up @@ -1247,7 +1250,7 @@ spec:
metadata:
annotations:
checksum/cluster-resource-templates: 6fd9b172465e3089fcc59f738b92b8dc4d8939360c19de8ee65f68b0e7422035
checksum/configuration: dc6e26fec37cad413a92bf06f2840ea1e497284312275ff06e22b152dee1566b
checksum/configuration: a823eaadac5f3a4358c8acf628ebeb3719f88312af520d2c253de2579dff262d
checksum/configuration-secret: 09216ffaa3d29e14f88b1f30af580d02a2a5e014de4d750b7f275cc07ed4e914
labels:
app.kubernetes.io/component: flyte-binary
Expand Down Expand Up @@ -1413,7 +1416,7 @@ spec:
metadata:
annotations:
checksum/config: 8f50e768255a87f078ba8b9879a0c174c3e045ffb46ac8723d2eedbe293c8d81
checksum/secret: 49b88f7ed6b4bec4cdb0305c1d990514d9b75690607d7ae75d5862da9a3b2a29
checksum/secret: ffc8aa05a602edd8f9b1d7ef35aa1cc5e383bceb9b91307eef99e86f53e13d4e
labels:
app: docker-registry
release: flyte-sandbox
Expand Down
9 changes: 6 additions & 3 deletions docker/sandbox-bundled/manifests/complete.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -433,6 +433,9 @@ data:
disabled: false
seedProjects:
- flytesnacks
seedProjectsWithDetails:
- description: Default project setup.
name: flytesnacks
dataCatalog:
disabled: false
propeller:
Expand Down Expand Up @@ -798,7 +801,7 @@ type: Opaque
---
apiVersion: v1
data:
haSharedSecret: ekx6Z2kxS3FBYjV5dExlMw==
haSharedSecret: YjdMdE9yejJzZ2xXSDFBRQ==
proxyPassword: ""
proxyUsername: ""
kind: Secret
Expand Down Expand Up @@ -1196,7 +1199,7 @@ spec:
metadata:
annotations:
checksum/cluster-resource-templates: 6fd9b172465e3089fcc59f738b92b8dc4d8939360c19de8ee65f68b0e7422035
checksum/configuration: a6f3ea502338c626b7824453ce7dc8b6fcd441d68865c075e2e74d797bc607fa
checksum/configuration: c2649df6bcb523f120c73b0fdeec5d9516f555eab12e4eae78b04dea2cf2abae
checksum/configuration-secret: 09216ffaa3d29e14f88b1f30af580d02a2a5e014de4d750b7f275cc07ed4e914
labels:
app.kubernetes.io/component: flyte-binary
Expand Down Expand Up @@ -1362,7 +1365,7 @@ spec:
metadata:
annotations:
checksum/config: 8f50e768255a87f078ba8b9879a0c174c3e045ffb46ac8723d2eedbe293c8d81
checksum/secret: 9b64bfe991cd6ce4394fa9c2651b0bbfe4834024ece293b3ac9688111d6fe5d3
checksum/secret: 956ac1b58c049a630c94605eedaba7ba9de3fc01233701ef403ab4bf24fe2a7a
labels:
app: docker-registry
release: flyte-sandbox
Expand Down
4 changes: 2 additions & 2 deletions docker/sandbox-bundled/manifests/dev.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -499,7 +499,7 @@ metadata:
---
apiVersion: v1
data:
haSharedSecret: MW90empzaUNBd2FlV09QSw==
haSharedSecret: YUpzb25xNTM1eml3Rmpueg==
proxyPassword: ""
proxyUsername: ""
kind: Secret
Expand Down Expand Up @@ -934,7 +934,7 @@ spec:
metadata:
annotations:
checksum/config: 8f50e768255a87f078ba8b9879a0c174c3e045ffb46ac8723d2eedbe293c8d81
checksum/secret: ba78cd87d2f6685980b95bd20913088b3a07fa48e9a414693277e3df134710ad
checksum/secret: 2720f13bd64051a7acb512e59e426b9f6c5f6c3c7d1d9a3a423e2df4cf9bab46
labels:
app: docker-registry
release: flyte-sandbox
Expand Down
2 changes: 2 additions & 0 deletions flyteadmin/.golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -39,3 +39,5 @@ issues:
exclude-rules:
- path: pkg/workflowengine/impl/prepare_execution.go
text: "copies lock"
- path: pkg/runtime/interfaces/application_configuration.go
text: "G402: TLS InsecureSkipVerify may be true."
7 changes: 1 addition & 6 deletions flyteadmin/pkg/async/cloudevent/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,12 +73,7 @@ func NewCloudEventsPublisher(ctx context.Context, db repositoryInterfaces.Reposi

case cloudEventImplementations.Kafka:
saramaConfig := sarama.NewConfig()
var err error
saramaConfig.Version, err = sarama.ParseKafkaVersion(cloudEventsConfig.KafkaConfig.Version)
if err != nil {
logger.Fatalf(ctx, "failed to parse kafka version, %v", err)
panic(err)
}
cloudEventsConfig.KafkaConfig.UpdateSaramaConfig(ctx, saramaConfig)
kafkaSender, err := kafka_sarama.NewSender(cloudEventsConfig.KafkaConfig.Brokers, saramaConfig, cloudEventsConfig.EventsPublisherConfig.TopicName)
if err != nil {
panic(err)
Expand Down
85 changes: 85 additions & 0 deletions flyteadmin/pkg/runtime/interfaces/application_configuration.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,13 @@
package interfaces

import (
"context"
"crypto/tls"
"fmt"
"os"
"strings"

"github.com/Shopify/sarama"
"github.com/golang/protobuf/ptypes/wrappers"
"golang.org/x/time/rate"

Expand Down Expand Up @@ -231,11 +238,89 @@
ProjectID string `json:"projectId"`
}

// This section holds SASL config for Kafka
type SASLConfig struct {
// Whether to use SASL
Enabled bool `json:"enabled"`
// The username
User string `json:"user"`
// The password
Password string `json:"password"`
PasswordPath string `json:"passwordPath"`
Handshake bool `json:"handshake"`
// Which SASL Mechanism to use. Defaults to PLAIN
Mechanism sarama.SASLMechanism `json:"mechanism"`
}

// This section holds TLS config for Kafka clients
type TLSConfig struct {
// Whether to use TLS
Enabled bool `json:"enabled"`
// Whether to skip certificate verification
InsecureSkipVerify bool `json:"insecureSkipVerify"`
// The location of the client certificate
CertPath string `json:"certPath"`
// The location of the client private key
KeyPath string `json:"keyPath"`
}

// This section holds configs for Kafka clients
type KafkaConfig struct {
// The version of Kafka, e.g. 2.1.0, 0.8.2.0
Version string `json:"version"`
// kafka broker addresses
Brokers []string `json:"brokers"`
// sasl config
SASLConfig SASLConfig `json:"saslConfig"`
// tls config
TLSConfig TLSConfig `json:"tlsConfig"`
}

func (k KafkaConfig) UpdateSaramaConfig(ctx context.Context, s *sarama.Config) {
var err error
s.Version, err = sarama.ParseKafkaVersion(k.Version)
if err != nil {
panic(err)

Check warning on line 283 in flyteadmin/pkg/runtime/interfaces/application_configuration.go

View check run for this annotation

Codecov / codecov/patch

flyteadmin/pkg/runtime/interfaces/application_configuration.go#L283

Added line #L283 was not covered by tests
}

if k.SASLConfig.Enabled {
s.Net.SASL.Enable = true
s.Net.SASL.User = k.SASLConfig.User

if len(k.SASLConfig.PasswordPath) > 0 {
if _, err := os.Stat(k.SASLConfig.PasswordPath); os.IsNotExist(err) {
panic(fmt.Sprintf("missing kafka password at the specified path [%s]", k.SASLConfig.PasswordPath))

Check warning on line 292 in flyteadmin/pkg/runtime/interfaces/application_configuration.go

View check run for this annotation

Codecov / codecov/patch

flyteadmin/pkg/runtime/interfaces/application_configuration.go#L287-L292

Added lines #L287 - L292 were not covered by tests
}
passwordVal, err := os.ReadFile(k.SASLConfig.PasswordPath)
if err != nil {
panic(fmt.Sprintf("failed to kafka password from path [%s] with err: %v", k.SASLConfig.PasswordPath, err))

Check warning on line 296 in flyteadmin/pkg/runtime/interfaces/application_configuration.go

View check run for this annotation

Codecov / codecov/patch

flyteadmin/pkg/runtime/interfaces/application_configuration.go#L294-L296

Added lines #L294 - L296 were not covered by tests
}

s.Net.SASL.Password = strings.TrimSpace(string(passwordVal))
} else {
s.Net.SASL.Password = k.SASLConfig.Password
}
s.Net.SASL.Handshake = k.SASLConfig.Handshake

if k.SASLConfig.Mechanism == "" {
k.SASLConfig.Mechanism = sarama.SASLTypePlaintext
}
s.Net.SASL.Mechanism = k.SASLConfig.Mechanism

Check warning on line 308 in flyteadmin/pkg/runtime/interfaces/application_configuration.go

View check run for this annotation

Codecov / codecov/patch

flyteadmin/pkg/runtime/interfaces/application_configuration.go#L299-L308

Added lines #L299 - L308 were not covered by tests
}

if k.TLSConfig.Enabled {
s.Net.TLS.Enable = true
s.Net.TLS.Config = &tls.Config{
InsecureSkipVerify: k.TLSConfig.InsecureSkipVerify,
}
if k.TLSConfig.KeyPath != "" && k.TLSConfig.CertPath != "" {
cert, err := tls.LoadX509KeyPair(k.TLSConfig.CertPath, k.TLSConfig.KeyPath)
if err != nil {
panic(err)

Check warning on line 319 in flyteadmin/pkg/runtime/interfaces/application_configuration.go

View check run for this annotation

Codecov / codecov/patch

flyteadmin/pkg/runtime/interfaces/application_configuration.go#L312-L319

Added lines #L312 - L319 were not covered by tests
}
s.Net.TLS.Config.Certificates = []tls.Certificate{cert}

Check warning on line 321 in flyteadmin/pkg/runtime/interfaces/application_configuration.go

View check run for this annotation

Codecov / codecov/patch

flyteadmin/pkg/runtime/interfaces/application_configuration.go#L321

Added line #L321 was not covered by tests
}
}
}

// This section holds configuration for the event scheduler used to schedule workflow executions.
Expand Down
Loading