-
Notifications
You must be signed in to change notification settings - Fork 8
/
connection.go
89 lines (72 loc) · 2.43 KB
/
connection.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
package amqprpc
import (
"errors"
"fmt"
"maps"
amqp "github.com/rabbitmq/amqp091-go"
)
// ErrUnexpectedConnClosed is returned by ListenAndServe() if the server
// shuts down without calling Stop() and if AMQP does not give an error
// when said shutdown happens.
var ErrUnexpectedConnClosed = errors.New("unexpected connection close without specific error")
// OnStartedFunc can be registered at Server.OnStarted(f) and
// Client.OnStarted(f). This is used when you want to do more setup on the
// connections and/or channels from amqp, for example setting Qos,
// NotifyPublish etc.
type OnStartedFunc func(inputConn, outputConn *amqp.Connection, inputChannel, outputChannel *amqp.Channel)
func monitorAndWait(restartChan, stopChan chan struct{}, amqpErrs ...chan *amqp.Error) (bool, error) {
result := make(chan error, len(amqpErrs))
// Setup monitoring for connections and channels, can be several connections and several channels.
// The first one closed will yield the error.
for _, errCh := range amqpErrs {
go func(c chan *amqp.Error) {
err, ok := <-c
if !ok {
result <- ErrUnexpectedConnClosed
return
}
result <- err
}(errCh)
}
select {
case err := <-result:
return true, err
case <-restartChan:
return true, nil
case <-stopChan:
return false, nil
}
}
func createConnections(url, name string, config amqp.Config) (consumerConn, publisherConn *amqp.Connection, err error) {
if config.Properties == nil {
config.Properties = amqp.Table{}
}
consumerConnConfig := config
publisherConnConfig := config
if _, ok := config.Properties["connection_name"]; !ok {
consumerConnConfig.Properties = maps.Clone(config.Properties)
publisherConnConfig.Properties = maps.Clone(config.Properties)
consumerConnConfig.Properties["connection_name"] = fmt.Sprintf("%s-consumer", name)
publisherConnConfig.Properties["connection_name"] = fmt.Sprintf("%s-publisher", name)
}
consumerConn, err = amqp.DialConfig(url, consumerConnConfig)
if err != nil {
return nil, nil, err
}
publisherConn, err = amqp.DialConfig(url, publisherConnConfig)
if err != nil {
return nil, nil, err
}
return consumerConn, publisherConn, nil
}
func createChannels(inputConn, outputConn *amqp.Connection) (inputCh, outputCh *amqp.Channel, err error) {
inputCh, err = inputConn.Channel()
if err != nil {
return nil, nil, err
}
outputCh, err = outputConn.Channel()
if err != nil {
return nil, nil, err
}
return inputCh, outputCh, nil
}