Skip to content

Commit

Permalink
Add basic SASL and TLS support for Kafka cloud events (#5814)
Browse files Browse the repository at this point in the history
* Add basic SASL and TLS support for Kafka cloud events

Signed-off-by: Jason Parraga <[email protected]>

* Address comments

Signed-off-by: Jason Parraga <[email protected]>

---------

Signed-off-by: Jason Parraga <[email protected]>
  • Loading branch information
Sovietaced authored Oct 31, 2024
1 parent dcdd472 commit 84bcc26
Show file tree
Hide file tree
Showing 8 changed files with 154 additions and 18 deletions.
17 changes: 16 additions & 1 deletion charts/flyte-core/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -55,10 +55,25 @@ helm install gateway bitnami/contour -n flyte

| Key | Type | Default | Description |
|-----|------|---------|-------------|
| cloud_events.aws.region | string | `"us-east-2"` | |
| cloud_events.aws | object | `{"region":"us-east-2"}` | Configuration for sending cloud events to AWS SNS |
| cloud_events.enable | bool | `false` | |
| cloud_events.eventsPublisher.eventTypes[0] | string | `"all"` | |
| cloud_events.eventsPublisher.topicName | string | `"arn:aws:sns:us-east-2:123456:123-my-topic"` | |
| cloud_events.gcp | object | `{"region":"us-east1"}` | Configuration for sending cloud events to GCP Pub Sub |
| cloud_events.kafka | object | `{"brokers":["mybroker:443"],"saslConfig":{"enabled":false,"handshake":true,"mechanism":"PLAIN","password":"","passwordPath":"","user":"kafka"},"tlsConfig":{"certPath":"/etc/ssl/certs/kafka-client.crt","enabled":false,"keyPath":"/etc/ssl/certs/kafka-client.key"},"version":"3.7.0"}` | Configuration for sending cloud events to Kafka |
| cloud_events.kafka.brokers | list | `["mybroker:443"]` | The kafka brokers to talk to |
| cloud_events.kafka.saslConfig | object | `{"enabled":false,"handshake":true,"mechanism":"PLAIN","password":"","passwordPath":"","user":"kafka"}` | SASL based authentication |
| cloud_events.kafka.saslConfig.enabled | bool | `false` | Whether to use SASL authentication |
| cloud_events.kafka.saslConfig.handshake | bool | `true` | Whether the send the SASL handsahke first |
| cloud_events.kafka.saslConfig.mechanism | string | `"PLAIN"` | Which SASL mechanism to use. Defaults to PLAIN |
| cloud_events.kafka.saslConfig.password | string | `""` | The password for the kafka user |
| cloud_events.kafka.saslConfig.passwordPath | string | `""` | Optional mount path of file containing the kafka password. |
| cloud_events.kafka.saslConfig.user | string | `"kafka"` | The kafka user |
| cloud_events.kafka.tlsConfig | object | `{"certPath":"/etc/ssl/certs/kafka-client.crt","enabled":false,"keyPath":"/etc/ssl/certs/kafka-client.key"}` | Certificate based authentication |
| cloud_events.kafka.tlsConfig.certPath | string | `"/etc/ssl/certs/kafka-client.crt"` | Path to the client certificate |
| cloud_events.kafka.tlsConfig.enabled | bool | `false` | Whether to use certificate based authentication or TLS |
| cloud_events.kafka.tlsConfig.keyPath | string | `"/etc/ssl/certs/kafka-client.key"` | Path to the client private key |
| cloud_events.kafka.version | string | `"3.7.0"` | The version of Kafka |
| cloud_events.type | string | `"aws"` | |
| cluster_resource_manager | object | `{"config":{"cluster_resources":{"customData":[{"production":[{"projectQuotaCpu":{"value":"5"}},{"projectQuotaMemory":{"value":"4000Mi"}}]},{"staging":[{"projectQuotaCpu":{"value":"2"}},{"projectQuotaMemory":{"value":"3000Mi"}}]},{"development":[{"projectQuotaCpu":{"value":"4"}},{"projectQuotaMemory":{"value":"3000Mi"}}]}],"refreshInterval":"5m","standaloneDeployment":false,"templatePath":"/etc/flyte/clusterresource/templates"}},"enabled":true,"nodeSelector":{},"podAnnotations":{},"podEnv":{},"podLabels":{},"prometheus":{"enabled":false,"path":"/metrics","port":10254},"resources":{},"service_account_name":"flyteadmin","standaloneDeployment":false,"templates":[{"key":"aa_namespace","value":"apiVersion: v1\nkind: Namespace\nmetadata:\n name: {{ namespace }}\nspec:\n finalizers:\n - kubernetes\n"},{"key":"ab_project_resource_quota","value":"apiVersion: v1\nkind: ResourceQuota\nmetadata:\n name: project-quota\n namespace: {{ namespace }}\nspec:\n hard:\n limits.cpu: {{ projectQuotaCpu }}\n limits.memory: {{ projectQuotaMemory }}\n"}]}` | Configuration for the Cluster resource manager component. This is an optional component, that enables automatic cluster configuration. This is useful to set default quotas, manage namespaces etc that map to a project/domain |
| cluster_resource_manager.config | object | `{"cluster_resources":{"customData":[{"production":[{"projectQuotaCpu":{"value":"5"}},{"projectQuotaMemory":{"value":"4000Mi"}}]},{"staging":[{"projectQuotaCpu":{"value":"2"}},{"projectQuotaMemory":{"value":"3000Mi"}}]},{"development":[{"projectQuotaCpu":{"value":"4"}},{"projectQuotaMemory":{"value":"3000Mi"}}]}],"refreshInterval":"5m","standaloneDeployment":false,"templatePath":"/etc/flyte/clusterresource/templates"}}` | Configmap for ClusterResource parameters |
Expand Down
39 changes: 36 additions & 3 deletions charts/flyte-core/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -946,15 +946,48 @@ external_events:
# an SNS topic (or gcp equivalent)
cloud_events:
enable: false
type: aws
aws:
region: us-east-2
eventsPublisher:
# Make sure this is not a fifo queue. Admin does not yet support
# writing to fifo sns topics.
topicName: "arn:aws:sns:us-east-2:123456:123-my-topic"
eventTypes:
- all # Or workflow, node, task. Or "*"
type: aws
# -- Configuration for sending cloud events to AWS SNS
aws:
region: us-east-2
# -- Configuration for sending cloud events to GCP Pub Sub
gcp:
region: us-east1
# -- Configuration for sending cloud events to Kafka
kafka:
# -- The version of Kafka
version: "3.7.0"
# -- The kafka brokers to talk to
brokers:
- mybroker:443
# -- SASL based authentication
saslConfig:
# -- Whether to use SASL authentication
enabled: false
# -- The kafka user
user: kafka
# -- The password for the kafka user
password: ""
# -- Optional mount path of file containing the kafka password.
passwordPath: ""
# -- Whether the send the SASL handsahke first
handshake: true
# -- Which SASL mechanism to use. Defaults to PLAIN
mechanism: PLAIN
# -- Certificate based authentication
tlsConfig:
# -- Whether to use certificate based authentication or TLS
enabled: false
# -- Path to the client certificate
certPath: /etc/ssl/certs/kafka-client.crt
# -- Path to the client private key
keyPath: /etc/ssl/certs/kafka-client.key

# -- Configuration for the Cluster resource manager component. This is an optional component, that enables automatic
# cluster configuration. This is useful to set default quotas, manage namespaces etc that map to a project/domain
Expand Down
9 changes: 6 additions & 3 deletions docker/sandbox-bundled/manifests/complete-agent.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -444,6 +444,9 @@ data:
disabled: false
seedProjects:
- flytesnacks
seedProjectsWithDetails:
- description: Default project setup.
name: flytesnacks
dataCatalog:
disabled: false
propeller:
Expand Down Expand Up @@ -816,7 +819,7 @@ type: Opaque
---
apiVersion: v1
data:
haSharedSecret: M0MwZjRDRlRMRVg5eFlNWA==
haSharedSecret: SlI1TDFkTXBMaThuc0hlSQ==
proxyPassword: ""
proxyUsername: ""
kind: Secret
Expand Down Expand Up @@ -1247,7 +1250,7 @@ spec:
metadata:
annotations:
checksum/cluster-resource-templates: 6fd9b172465e3089fcc59f738b92b8dc4d8939360c19de8ee65f68b0e7422035
checksum/configuration: dc6e26fec37cad413a92bf06f2840ea1e497284312275ff06e22b152dee1566b
checksum/configuration: a823eaadac5f3a4358c8acf628ebeb3719f88312af520d2c253de2579dff262d
checksum/configuration-secret: 09216ffaa3d29e14f88b1f30af580d02a2a5e014de4d750b7f275cc07ed4e914
labels:
app.kubernetes.io/component: flyte-binary
Expand Down Expand Up @@ -1413,7 +1416,7 @@ spec:
metadata:
annotations:
checksum/config: 8f50e768255a87f078ba8b9879a0c174c3e045ffb46ac8723d2eedbe293c8d81
checksum/secret: 49b88f7ed6b4bec4cdb0305c1d990514d9b75690607d7ae75d5862da9a3b2a29
checksum/secret: ffc8aa05a602edd8f9b1d7ef35aa1cc5e383bceb9b91307eef99e86f53e13d4e
labels:
app: docker-registry
release: flyte-sandbox
Expand Down
9 changes: 6 additions & 3 deletions docker/sandbox-bundled/manifests/complete.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -433,6 +433,9 @@ data:
disabled: false
seedProjects:
- flytesnacks
seedProjectsWithDetails:
- description: Default project setup.
name: flytesnacks
dataCatalog:
disabled: false
propeller:
Expand Down Expand Up @@ -798,7 +801,7 @@ type: Opaque
---
apiVersion: v1
data:
haSharedSecret: ekx6Z2kxS3FBYjV5dExlMw==
haSharedSecret: YjdMdE9yejJzZ2xXSDFBRQ==
proxyPassword: ""
proxyUsername: ""
kind: Secret
Expand Down Expand Up @@ -1196,7 +1199,7 @@ spec:
metadata:
annotations:
checksum/cluster-resource-templates: 6fd9b172465e3089fcc59f738b92b8dc4d8939360c19de8ee65f68b0e7422035
checksum/configuration: a6f3ea502338c626b7824453ce7dc8b6fcd441d68865c075e2e74d797bc607fa
checksum/configuration: c2649df6bcb523f120c73b0fdeec5d9516f555eab12e4eae78b04dea2cf2abae
checksum/configuration-secret: 09216ffaa3d29e14f88b1f30af580d02a2a5e014de4d750b7f275cc07ed4e914
labels:
app.kubernetes.io/component: flyte-binary
Expand Down Expand Up @@ -1362,7 +1365,7 @@ spec:
metadata:
annotations:
checksum/config: 8f50e768255a87f078ba8b9879a0c174c3e045ffb46ac8723d2eedbe293c8d81
checksum/secret: 9b64bfe991cd6ce4394fa9c2651b0bbfe4834024ece293b3ac9688111d6fe5d3
checksum/secret: 956ac1b58c049a630c94605eedaba7ba9de3fc01233701ef403ab4bf24fe2a7a
labels:
app: docker-registry
release: flyte-sandbox
Expand Down
4 changes: 2 additions & 2 deletions docker/sandbox-bundled/manifests/dev.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -499,7 +499,7 @@ metadata:
---
apiVersion: v1
data:
haSharedSecret: MW90empzaUNBd2FlV09QSw==
haSharedSecret: YUpzb25xNTM1eml3Rmpueg==
proxyPassword: ""
proxyUsername: ""
kind: Secret
Expand Down Expand Up @@ -934,7 +934,7 @@ spec:
metadata:
annotations:
checksum/config: 8f50e768255a87f078ba8b9879a0c174c3e045ffb46ac8723d2eedbe293c8d81
checksum/secret: ba78cd87d2f6685980b95bd20913088b3a07fa48e9a414693277e3df134710ad
checksum/secret: 2720f13bd64051a7acb512e59e426b9f6c5f6c3c7d1d9a3a423e2df4cf9bab46
labels:
app: docker-registry
release: flyte-sandbox
Expand Down
2 changes: 2 additions & 0 deletions flyteadmin/.golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -39,3 +39,5 @@ issues:
exclude-rules:
- path: pkg/workflowengine/impl/prepare_execution.go
text: "copies lock"
- path: pkg/runtime/interfaces/application_configuration.go
text: "G402: TLS InsecureSkipVerify may be true."
7 changes: 1 addition & 6 deletions flyteadmin/pkg/async/cloudevent/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,12 +73,7 @@ func NewCloudEventsPublisher(ctx context.Context, db repositoryInterfaces.Reposi

case cloudEventImplementations.Kafka:
saramaConfig := sarama.NewConfig()
var err error
saramaConfig.Version, err = sarama.ParseKafkaVersion(cloudEventsConfig.KafkaConfig.Version)
if err != nil {
logger.Fatalf(ctx, "failed to parse kafka version, %v", err)
panic(err)
}
cloudEventsConfig.KafkaConfig.UpdateSaramaConfig(ctx, saramaConfig)
kafkaSender, err := kafka_sarama.NewSender(cloudEventsConfig.KafkaConfig.Brokers, saramaConfig, cloudEventsConfig.EventsPublisherConfig.TopicName)
if err != nil {
panic(err)
Expand Down
85 changes: 85 additions & 0 deletions flyteadmin/pkg/runtime/interfaces/application_configuration.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,13 @@
package interfaces

import (
"context"
"crypto/tls"
"fmt"
"os"
"strings"

"github.com/Shopify/sarama"
"github.com/golang/protobuf/ptypes/wrappers"
"golang.org/x/time/rate"

Expand Down Expand Up @@ -231,11 +238,89 @@ type GCPConfig struct {
ProjectID string `json:"projectId"`
}

// This section holds SASL config for Kafka
type SASLConfig struct {
// Whether to use SASL
Enabled bool `json:"enabled"`
// The username
User string `json:"user"`
// The password
Password string `json:"password"`
PasswordPath string `json:"passwordPath"`
Handshake bool `json:"handshake"`
// Which SASL Mechanism to use. Defaults to PLAIN
Mechanism sarama.SASLMechanism `json:"mechanism"`
}

// This section holds TLS config for Kafka clients
type TLSConfig struct {
// Whether to use TLS
Enabled bool `json:"enabled"`
// Whether to skip certificate verification
InsecureSkipVerify bool `json:"insecureSkipVerify"`
// The location of the client certificate
CertPath string `json:"certPath"`
// The location of the client private key
KeyPath string `json:"keyPath"`
}

// This section holds configs for Kafka clients
type KafkaConfig struct {
// The version of Kafka, e.g. 2.1.0, 0.8.2.0
Version string `json:"version"`
// kafka broker addresses
Brokers []string `json:"brokers"`
// sasl config
SASLConfig SASLConfig `json:"saslConfig"`
// tls config
TLSConfig TLSConfig `json:"tlsConfig"`
}

func (k KafkaConfig) UpdateSaramaConfig(ctx context.Context, s *sarama.Config) {
var err error
s.Version, err = sarama.ParseKafkaVersion(k.Version)
if err != nil {
panic(err)
}

if k.SASLConfig.Enabled {
s.Net.SASL.Enable = true
s.Net.SASL.User = k.SASLConfig.User

if len(k.SASLConfig.PasswordPath) > 0 {
if _, err := os.Stat(k.SASLConfig.PasswordPath); os.IsNotExist(err) {
panic(fmt.Sprintf("missing kafka password at the specified path [%s]", k.SASLConfig.PasswordPath))
}
passwordVal, err := os.ReadFile(k.SASLConfig.PasswordPath)
if err != nil {
panic(fmt.Sprintf("failed to kafka password from path [%s] with err: %v", k.SASLConfig.PasswordPath, err))
}

s.Net.SASL.Password = strings.TrimSpace(string(passwordVal))
} else {
s.Net.SASL.Password = k.SASLConfig.Password
}
s.Net.SASL.Handshake = k.SASLConfig.Handshake

if k.SASLConfig.Mechanism == "" {
k.SASLConfig.Mechanism = sarama.SASLTypePlaintext
}
s.Net.SASL.Mechanism = k.SASLConfig.Mechanism
}

if k.TLSConfig.Enabled {
s.Net.TLS.Enable = true
s.Net.TLS.Config = &tls.Config{
InsecureSkipVerify: k.TLSConfig.InsecureSkipVerify,
}
if k.TLSConfig.KeyPath != "" && k.TLSConfig.CertPath != "" {
cert, err := tls.LoadX509KeyPair(k.TLSConfig.CertPath, k.TLSConfig.KeyPath)
if err != nil {
panic(err)
}
s.Net.TLS.Config.Certificates = []tls.Certificate{cert}
}
}
}

// This section holds configuration for the event scheduler used to schedule workflow executions.
Expand Down

0 comments on commit 84bcc26

Please sign in to comment.