-
Notifications
You must be signed in to change notification settings - Fork 12
/
integration.go
94 lines (75 loc) · 1.95 KB
/
integration.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
package main
import (
"crypto/tls"
"crypto/x509"
"io"
"io/ioutil"
"github.com/Shopify/sarama"
log "github.com/Sirupsen/logrus"
"github.com/segment-integrations/connect-kafka/internal/kafka"
"github.com/tj/docopt"
)
type KafkaIntegration struct {
topic string
producer sarama.SyncProducer
}
func (k *KafkaIntegration) newTLSFromConfig(m map[string]interface{}) *tls.Config {
trustedCertPath, _ := m["--trusted-cert"].(string)
clientCertPath, _ := m["--client-cert"].(string)
clientCertKeyPath, _ := m["--client-cert-key"].(string)
if trustedCertPath == "" && clientCertPath == "" && clientCertKeyPath == "" {
return nil
}
trustedCertBytes, err := ioutil.ReadFile(trustedCertPath)
if err != nil {
log.Fatal(err)
}
clientCertBytes, err := ioutil.ReadFile(clientCertPath)
if err != nil {
log.Fatal(err)
}
clientCertKeyBytes, err := ioutil.ReadFile(clientCertKeyPath)
if err != nil {
log.Fatal(err)
}
cert, err := tls.X509KeyPair(clientCertBytes, clientCertKeyBytes)
if err != nil {
log.Fatal(err)
}
certPool := x509.NewCertPool()
certPool.AppendCertsFromPEM(trustedCertBytes)
tlsConfig := &tls.Config{
Certificates: []tls.Certificate{cert},
InsecureSkipVerify: true,
RootCAs: certPool,
}
tlsConfig.BuildNameToCertificate()
return tlsConfig
}
func (k *KafkaIntegration) Init() error {
m, err := docopt.Parse(usage, nil, true, Version, false)
if err != nil {
return err
}
kafkaConfig := &kafka.Config{BrokerAddresses: m["--broker"].([]string)}
kafkaConfig.TLSConfig = k.newTLSFromConfig(m)
producer, err := kafka.NewProducer(kafkaConfig)
if err != nil {
return err
}
k.producer = producer
k.topic = m["--topic"].(string)
return nil
}
func (k *KafkaIntegration) Process(r io.ReadCloser) error {
defer r.Close()
b, err := ioutil.ReadAll(r)
if err != nil {
return err
}
_, _, err = k.producer.SendMessage(&sarama.ProducerMessage{
Topic: k.topic,
Value: sarama.ByteEncoder(b),
})
return err
}