forked from compose/transporter
-
Notifications
You must be signed in to change notification settings - Fork 1
/
rabbitmq.go
73 lines (61 loc) · 1.64 KB
/
rabbitmq.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
package rabbitmq
import (
"sync"
"github.com/compose/transporter/adaptor"
"github.com/compose/transporter/client"
)
const (
sampleConfig = `{
"uri": "${RABBITMQ_URI}",
"routing_key": "",
"key_in_field": false
// "delivery_mode": 1, // non-persistent (1) or persistent (2)
// "api_port": 15672,
// "ssl": false,
// "cacerts": ["/path/to/cert.pem"]
}`
description = "an adaptor that handles publish/subscribe messaging with RabbitMQ"
)
var (
_ adaptor.Adaptor = &rabbitMQ{}
)
// RabbitMQ defines all configurable elements for connecting to and sending/receiving JSON.
type rabbitMQ struct {
adaptor.BaseConfig
RoutingKey string `json:"routing_key"`
KeyInField bool `json:"key_in_field"`
DeliveryMode uint8 `json:"delivery_mode"`
APIPort int `json:"api_port"`
SSL bool `json:"ssl"`
CACerts []string `json:"cacerts"`
}
func init() {
adaptor.Add(
"rabbitmq",
func() adaptor.Adaptor {
return &rabbitMQ{
BaseConfig: adaptor.BaseConfig{URI: DefaultURI},
RoutingKey: DefaultRoutingKey,
DeliveryMode: DefaultDeliveryMode,
APIPort: DefaultAPIPort,
}
},
)
}
func (r *rabbitMQ) Client() (client.Client, error) {
return NewClient(WithURI(r.URI),
WithSSL(r.SSL),
WithCACerts(r.CACerts))
}
func (r *rabbitMQ) Reader() (client.Reader, error) {
return &Reader{r.URI, r.APIPort}, nil
}
func (r *rabbitMQ) Writer(done chan struct{}, wg *sync.WaitGroup) (client.Writer, error) {
return &Writer{r.DeliveryMode, r.RoutingKey, r.KeyInField}, nil
}
func (r *rabbitMQ) Description() string {
return description
}
func (r *rabbitMQ) SampleConfig() string {
return sampleConfig
}