From 84bcc26b118d8109ae020df5990cf283a09ddd8f Mon Sep 17 00:00:00 2001 From: Jason Parraga Date: Thu, 31 Oct 2024 09:27:42 -0700 Subject: [PATCH] Add basic SASL and TLS support for Kafka cloud events (#5814) * Add basic SASL and TLS support for Kafka cloud events Signed-off-by: Jason Parraga * Address comments Signed-off-by: Jason Parraga --------- Signed-off-by: Jason Parraga --- charts/flyte-core/README.md | 17 +++- charts/flyte-core/values.yaml | 39 ++++++++- .../manifests/complete-agent.yaml | 9 +- .../sandbox-bundled/manifests/complete.yaml | 9 +- docker/sandbox-bundled/manifests/dev.yaml | 4 +- flyteadmin/.golangci.yml | 2 + flyteadmin/pkg/async/cloudevent/factory.go | 7 +- .../interfaces/application_configuration.go | 85 +++++++++++++++++++ 8 files changed, 154 insertions(+), 18 deletions(-) diff --git a/charts/flyte-core/README.md b/charts/flyte-core/README.md index ba0d2a792d..6aed892810 100644 --- a/charts/flyte-core/README.md +++ b/charts/flyte-core/README.md @@ -55,10 +55,25 @@ helm install gateway bitnami/contour -n flyte | Key | Type | Default | Description | |-----|------|---------|-------------| -| cloud_events.aws.region | string | `"us-east-2"` | | +| cloud_events.aws | object | `{"region":"us-east-2"}` | Configuration for sending cloud events to AWS SNS | | cloud_events.enable | bool | `false` | | | cloud_events.eventsPublisher.eventTypes[0] | string | `"all"` | | | cloud_events.eventsPublisher.topicName | string | `"arn:aws:sns:us-east-2:123456:123-my-topic"` | | +| cloud_events.gcp | object | `{"region":"us-east1"}` | Configuration for sending cloud events to GCP Pub Sub | +| cloud_events.kafka | object | `{"brokers":["mybroker:443"],"saslConfig":{"enabled":false,"handshake":true,"mechanism":"PLAIN","password":"","passwordPath":"","user":"kafka"},"tlsConfig":{"certPath":"/etc/ssl/certs/kafka-client.crt","enabled":false,"keyPath":"/etc/ssl/certs/kafka-client.key"},"version":"3.7.0"}` | Configuration for sending cloud events to Kafka | +| cloud_events.kafka.brokers | list | `["mybroker:443"]` | The kafka brokers to talk to | +| cloud_events.kafka.saslConfig | object | `{"enabled":false,"handshake":true,"mechanism":"PLAIN","password":"","passwordPath":"","user":"kafka"}` | SASL based authentication | +| cloud_events.kafka.saslConfig.enabled | bool | `false` | Whether to use SASL authentication | +| cloud_events.kafka.saslConfig.handshake | bool | `true` | Whether the send the SASL handsahke first | +| cloud_events.kafka.saslConfig.mechanism | string | `"PLAIN"` | Which SASL mechanism to use. Defaults to PLAIN | +| cloud_events.kafka.saslConfig.password | string | `""` | The password for the kafka user | +| cloud_events.kafka.saslConfig.passwordPath | string | `""` | Optional mount path of file containing the kafka password. | +| cloud_events.kafka.saslConfig.user | string | `"kafka"` | The kafka user | +| cloud_events.kafka.tlsConfig | object | `{"certPath":"/etc/ssl/certs/kafka-client.crt","enabled":false,"keyPath":"/etc/ssl/certs/kafka-client.key"}` | Certificate based authentication | +| cloud_events.kafka.tlsConfig.certPath | string | `"/etc/ssl/certs/kafka-client.crt"` | Path to the client certificate | +| cloud_events.kafka.tlsConfig.enabled | bool | `false` | Whether to use certificate based authentication or TLS | +| cloud_events.kafka.tlsConfig.keyPath | string | `"/etc/ssl/certs/kafka-client.key"` | Path to the client private key | +| cloud_events.kafka.version | string | `"3.7.0"` | The version of Kafka | | cloud_events.type | string | `"aws"` | | | cluster_resource_manager | object | `{"config":{"cluster_resources":{"customData":[{"production":[{"projectQuotaCpu":{"value":"5"}},{"projectQuotaMemory":{"value":"4000Mi"}}]},{"staging":[{"projectQuotaCpu":{"value":"2"}},{"projectQuotaMemory":{"value":"3000Mi"}}]},{"development":[{"projectQuotaCpu":{"value":"4"}},{"projectQuotaMemory":{"value":"3000Mi"}}]}],"refreshInterval":"5m","standaloneDeployment":false,"templatePath":"/etc/flyte/clusterresource/templates"}},"enabled":true,"nodeSelector":{},"podAnnotations":{},"podEnv":{},"podLabels":{},"prometheus":{"enabled":false,"path":"/metrics","port":10254},"resources":{},"service_account_name":"flyteadmin","standaloneDeployment":false,"templates":[{"key":"aa_namespace","value":"apiVersion: v1\nkind: Namespace\nmetadata:\n name: {{ namespace }}\nspec:\n finalizers:\n - kubernetes\n"},{"key":"ab_project_resource_quota","value":"apiVersion: v1\nkind: ResourceQuota\nmetadata:\n name: project-quota\n namespace: {{ namespace }}\nspec:\n hard:\n limits.cpu: {{ projectQuotaCpu }}\n limits.memory: {{ projectQuotaMemory }}\n"}]}` | Configuration for the Cluster resource manager component. This is an optional component, that enables automatic cluster configuration. This is useful to set default quotas, manage namespaces etc that map to a project/domain | | cluster_resource_manager.config | object | `{"cluster_resources":{"customData":[{"production":[{"projectQuotaCpu":{"value":"5"}},{"projectQuotaMemory":{"value":"4000Mi"}}]},{"staging":[{"projectQuotaCpu":{"value":"2"}},{"projectQuotaMemory":{"value":"3000Mi"}}]},{"development":[{"projectQuotaCpu":{"value":"4"}},{"projectQuotaMemory":{"value":"3000Mi"}}]}],"refreshInterval":"5m","standaloneDeployment":false,"templatePath":"/etc/flyte/clusterresource/templates"}}` | Configmap for ClusterResource parameters | diff --git a/charts/flyte-core/values.yaml b/charts/flyte-core/values.yaml index 2eb9ff876a..9faaed731a 100755 --- a/charts/flyte-core/values.yaml +++ b/charts/flyte-core/values.yaml @@ -946,15 +946,48 @@ external_events: # an SNS topic (or gcp equivalent) cloud_events: enable: false - type: aws - aws: - region: us-east-2 eventsPublisher: # Make sure this is not a fifo queue. Admin does not yet support # writing to fifo sns topics. topicName: "arn:aws:sns:us-east-2:123456:123-my-topic" eventTypes: - all # Or workflow, node, task. Or "*" + type: aws + # -- Configuration for sending cloud events to AWS SNS + aws: + region: us-east-2 + # -- Configuration for sending cloud events to GCP Pub Sub + gcp: + region: us-east1 + # -- Configuration for sending cloud events to Kafka + kafka: + # -- The version of Kafka + version: "3.7.0" + # -- The kafka brokers to talk to + brokers: + - mybroker:443 + # -- SASL based authentication + saslConfig: + # -- Whether to use SASL authentication + enabled: false + # -- The kafka user + user: kafka + # -- The password for the kafka user + password: "" + # -- Optional mount path of file containing the kafka password. + passwordPath: "" + # -- Whether the send the SASL handsahke first + handshake: true + # -- Which SASL mechanism to use. Defaults to PLAIN + mechanism: PLAIN + # -- Certificate based authentication + tlsConfig: + # -- Whether to use certificate based authentication or TLS + enabled: false + # -- Path to the client certificate + certPath: /etc/ssl/certs/kafka-client.crt + # -- Path to the client private key + keyPath: /etc/ssl/certs/kafka-client.key # -- Configuration for the Cluster resource manager component. This is an optional component, that enables automatic # cluster configuration. This is useful to set default quotas, manage namespaces etc that map to a project/domain diff --git a/docker/sandbox-bundled/manifests/complete-agent.yaml b/docker/sandbox-bundled/manifests/complete-agent.yaml index a460033647..028f719e71 100644 --- a/docker/sandbox-bundled/manifests/complete-agent.yaml +++ b/docker/sandbox-bundled/manifests/complete-agent.yaml @@ -444,6 +444,9 @@ data: disabled: false seedProjects: - flytesnacks + seedProjectsWithDetails: + - description: Default project setup. + name: flytesnacks dataCatalog: disabled: false propeller: @@ -816,7 +819,7 @@ type: Opaque --- apiVersion: v1 data: - haSharedSecret: M0MwZjRDRlRMRVg5eFlNWA== + haSharedSecret: SlI1TDFkTXBMaThuc0hlSQ== proxyPassword: "" proxyUsername: "" kind: Secret @@ -1247,7 +1250,7 @@ spec: metadata: annotations: checksum/cluster-resource-templates: 6fd9b172465e3089fcc59f738b92b8dc4d8939360c19de8ee65f68b0e7422035 - checksum/configuration: dc6e26fec37cad413a92bf06f2840ea1e497284312275ff06e22b152dee1566b + checksum/configuration: a823eaadac5f3a4358c8acf628ebeb3719f88312af520d2c253de2579dff262d checksum/configuration-secret: 09216ffaa3d29e14f88b1f30af580d02a2a5e014de4d750b7f275cc07ed4e914 labels: app.kubernetes.io/component: flyte-binary @@ -1413,7 +1416,7 @@ spec: metadata: annotations: checksum/config: 8f50e768255a87f078ba8b9879a0c174c3e045ffb46ac8723d2eedbe293c8d81 - checksum/secret: 49b88f7ed6b4bec4cdb0305c1d990514d9b75690607d7ae75d5862da9a3b2a29 + checksum/secret: ffc8aa05a602edd8f9b1d7ef35aa1cc5e383bceb9b91307eef99e86f53e13d4e labels: app: docker-registry release: flyte-sandbox diff --git a/docker/sandbox-bundled/manifests/complete.yaml b/docker/sandbox-bundled/manifests/complete.yaml index 88cd06ac2c..c8b8e1c93a 100644 --- a/docker/sandbox-bundled/manifests/complete.yaml +++ b/docker/sandbox-bundled/manifests/complete.yaml @@ -433,6 +433,9 @@ data: disabled: false seedProjects: - flytesnacks + seedProjectsWithDetails: + - description: Default project setup. + name: flytesnacks dataCatalog: disabled: false propeller: @@ -798,7 +801,7 @@ type: Opaque --- apiVersion: v1 data: - haSharedSecret: ekx6Z2kxS3FBYjV5dExlMw== + haSharedSecret: YjdMdE9yejJzZ2xXSDFBRQ== proxyPassword: "" proxyUsername: "" kind: Secret @@ -1196,7 +1199,7 @@ spec: metadata: annotations: checksum/cluster-resource-templates: 6fd9b172465e3089fcc59f738b92b8dc4d8939360c19de8ee65f68b0e7422035 - checksum/configuration: a6f3ea502338c626b7824453ce7dc8b6fcd441d68865c075e2e74d797bc607fa + checksum/configuration: c2649df6bcb523f120c73b0fdeec5d9516f555eab12e4eae78b04dea2cf2abae checksum/configuration-secret: 09216ffaa3d29e14f88b1f30af580d02a2a5e014de4d750b7f275cc07ed4e914 labels: app.kubernetes.io/component: flyte-binary @@ -1362,7 +1365,7 @@ spec: metadata: annotations: checksum/config: 8f50e768255a87f078ba8b9879a0c174c3e045ffb46ac8723d2eedbe293c8d81 - checksum/secret: 9b64bfe991cd6ce4394fa9c2651b0bbfe4834024ece293b3ac9688111d6fe5d3 + checksum/secret: 956ac1b58c049a630c94605eedaba7ba9de3fc01233701ef403ab4bf24fe2a7a labels: app: docker-registry release: flyte-sandbox diff --git a/docker/sandbox-bundled/manifests/dev.yaml b/docker/sandbox-bundled/manifests/dev.yaml index e524e13ae1..1038da1f64 100644 --- a/docker/sandbox-bundled/manifests/dev.yaml +++ b/docker/sandbox-bundled/manifests/dev.yaml @@ -499,7 +499,7 @@ metadata: --- apiVersion: v1 data: - haSharedSecret: MW90empzaUNBd2FlV09QSw== + haSharedSecret: YUpzb25xNTM1eml3Rmpueg== proxyPassword: "" proxyUsername: "" kind: Secret @@ -934,7 +934,7 @@ spec: metadata: annotations: checksum/config: 8f50e768255a87f078ba8b9879a0c174c3e045ffb46ac8723d2eedbe293c8d81 - checksum/secret: ba78cd87d2f6685980b95bd20913088b3a07fa48e9a414693277e3df134710ad + checksum/secret: 2720f13bd64051a7acb512e59e426b9f6c5f6c3c7d1d9a3a423e2df4cf9bab46 labels: app: docker-registry release: flyte-sandbox diff --git a/flyteadmin/.golangci.yml b/flyteadmin/.golangci.yml index 4dbb031812..cd180b89d1 100644 --- a/flyteadmin/.golangci.yml +++ b/flyteadmin/.golangci.yml @@ -39,3 +39,5 @@ issues: exclude-rules: - path: pkg/workflowengine/impl/prepare_execution.go text: "copies lock" + - path: pkg/runtime/interfaces/application_configuration.go + text: "G402: TLS InsecureSkipVerify may be true." diff --git a/flyteadmin/pkg/async/cloudevent/factory.go b/flyteadmin/pkg/async/cloudevent/factory.go index 65cd48de93..51c38ffea4 100644 --- a/flyteadmin/pkg/async/cloudevent/factory.go +++ b/flyteadmin/pkg/async/cloudevent/factory.go @@ -73,12 +73,7 @@ func NewCloudEventsPublisher(ctx context.Context, db repositoryInterfaces.Reposi case cloudEventImplementations.Kafka: saramaConfig := sarama.NewConfig() - var err error - saramaConfig.Version, err = sarama.ParseKafkaVersion(cloudEventsConfig.KafkaConfig.Version) - if err != nil { - logger.Fatalf(ctx, "failed to parse kafka version, %v", err) - panic(err) - } + cloudEventsConfig.KafkaConfig.UpdateSaramaConfig(ctx, saramaConfig) kafkaSender, err := kafka_sarama.NewSender(cloudEventsConfig.KafkaConfig.Brokers, saramaConfig, cloudEventsConfig.EventsPublisherConfig.TopicName) if err != nil { panic(err) diff --git a/flyteadmin/pkg/runtime/interfaces/application_configuration.go b/flyteadmin/pkg/runtime/interfaces/application_configuration.go index 15ed271412..55791a1538 100644 --- a/flyteadmin/pkg/runtime/interfaces/application_configuration.go +++ b/flyteadmin/pkg/runtime/interfaces/application_configuration.go @@ -1,6 +1,13 @@ package interfaces import ( + "context" + "crypto/tls" + "fmt" + "os" + "strings" + + "github.com/Shopify/sarama" "github.com/golang/protobuf/ptypes/wrappers" "golang.org/x/time/rate" @@ -231,11 +238,89 @@ type GCPConfig struct { ProjectID string `json:"projectId"` } +// This section holds SASL config for Kafka +type SASLConfig struct { + // Whether to use SASL + Enabled bool `json:"enabled"` + // The username + User string `json:"user"` + // The password + Password string `json:"password"` + PasswordPath string `json:"passwordPath"` + Handshake bool `json:"handshake"` + // Which SASL Mechanism to use. Defaults to PLAIN + Mechanism sarama.SASLMechanism `json:"mechanism"` +} + +// This section holds TLS config for Kafka clients +type TLSConfig struct { + // Whether to use TLS + Enabled bool `json:"enabled"` + // Whether to skip certificate verification + InsecureSkipVerify bool `json:"insecureSkipVerify"` + // The location of the client certificate + CertPath string `json:"certPath"` + // The location of the client private key + KeyPath string `json:"keyPath"` +} + +// This section holds configs for Kafka clients type KafkaConfig struct { // The version of Kafka, e.g. 2.1.0, 0.8.2.0 Version string `json:"version"` // kafka broker addresses Brokers []string `json:"brokers"` + // sasl config + SASLConfig SASLConfig `json:"saslConfig"` + // tls config + TLSConfig TLSConfig `json:"tlsConfig"` +} + +func (k KafkaConfig) UpdateSaramaConfig(ctx context.Context, s *sarama.Config) { + var err error + s.Version, err = sarama.ParseKafkaVersion(k.Version) + if err != nil { + panic(err) + } + + if k.SASLConfig.Enabled { + s.Net.SASL.Enable = true + s.Net.SASL.User = k.SASLConfig.User + + if len(k.SASLConfig.PasswordPath) > 0 { + if _, err := os.Stat(k.SASLConfig.PasswordPath); os.IsNotExist(err) { + panic(fmt.Sprintf("missing kafka password at the specified path [%s]", k.SASLConfig.PasswordPath)) + } + passwordVal, err := os.ReadFile(k.SASLConfig.PasswordPath) + if err != nil { + panic(fmt.Sprintf("failed to kafka password from path [%s] with err: %v", k.SASLConfig.PasswordPath, err)) + } + + s.Net.SASL.Password = strings.TrimSpace(string(passwordVal)) + } else { + s.Net.SASL.Password = k.SASLConfig.Password + } + s.Net.SASL.Handshake = k.SASLConfig.Handshake + + if k.SASLConfig.Mechanism == "" { + k.SASLConfig.Mechanism = sarama.SASLTypePlaintext + } + s.Net.SASL.Mechanism = k.SASLConfig.Mechanism + } + + if k.TLSConfig.Enabled { + s.Net.TLS.Enable = true + s.Net.TLS.Config = &tls.Config{ + InsecureSkipVerify: k.TLSConfig.InsecureSkipVerify, + } + if k.TLSConfig.KeyPath != "" && k.TLSConfig.CertPath != "" { + cert, err := tls.LoadX509KeyPair(k.TLSConfig.CertPath, k.TLSConfig.KeyPath) + if err != nil { + panic(err) + } + s.Net.TLS.Config.Certificates = []tls.Certificate{cert} + } + } } // This section holds configuration for the event scheduler used to schedule workflow executions.