forked from compose/transporter
-
Notifications
You must be signed in to change notification settings - Fork 1
/
client.go
384 lines (328 loc) · 11 KB
/
client.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
package mongodb
import (
"context"
"crypto/tls"
"crypto/x509"
"errors"
"fmt"
"io/ioutil"
"strings"
"time"
"github.com/compose/transporter/client"
"github.com/compose/transporter/log"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
"go.mongodb.org/mongo-driver/mongo/readpref"
"go.mongodb.org/mongo-driver/mongo/writeconcern"
"gopkg.in/mgo.v2"
"gopkg.in/mgo.v2/bson"
)
const (
DefaultURI = "mongodb://127.0.0.1:27017/test"
DefaultSessionTimeout = 10 * time.Second
DefaultMaxWriteBatchSize = 1000
)
var (
// DefaultSafety is the default saftey mode used for the underlying session.
// These default settings are only good for local use as it makes not guarantees for writes.
DefaultSafety = mgo.Safe{}
_ client.Client = &Client{}
_ client.Closer = &Client{}
)
// OplogAccessError wraps the underlying error when access to the oplog fails.
type OplogAccessError struct {
reason string
}
func (e OplogAccessError) Error() string {
return fmt.Sprintf("oplog access failed, %s", e.reason)
}
// InvalidReadPreferenceError represents the error when an incorrect mongo read preference has been set.
type InvalidReadPreferenceError struct {
ReadPreference string
}
func (e InvalidReadPreferenceError) Error() string {
return fmt.Sprintf("Invalid Read Preference, %s", e.ReadPreference)
}
func extractDBNameFromURI(uri string) (string, error) {
// Splitting the URI to extract the database name part
parts := strings.Split(uri, "/")
if len(parts) < 3 {
return "", fmt.Errorf("invalid MongoDB URI: %s", uri)
}
dbname := parts[len(parts)-1]
// Further split if query parameters exist
if strings.Contains(dbname, "?") {
dbname = strings.Split(dbname, "?")[0]
}
return dbname, nil
}
// ClientOptionFunc is a function that configures a Client.
// It is used in NewClient.
type ClientOptionFunc func(*Client) error
// Client represents a client to the underlying MongoDB source
type Client struct {
uri string
tlsConfig *tls.Config
sessionTimeout time.Duration
tail bool
readPreference *readpref.ReadPref
maxWriteBatchSize int
client *mongo.Client // This replaces mgoSession
dbname string
clientOptions *options.ClientOptions
}
// NewClient creates a new client to work with MongoDB.
//
// The caller can configure the new client by passing configuration options
// to the func.
//
// Example:
//
// client, err := NewClient(
// WithURI("mongodb://localhost:27017"),
// WithTimeout("30s"))
//
// If no URI is configured, it uses defaultURI by default.
//
// An error is also returned when some configuration option is invalid
func NewClient(options ...ClientOptionFunc) (*Client, error) {
// Set up the client
c := &Client{
uri: DefaultURI,
tlsConfig: nil,
tail: false,
readPreference: readpref.Primary(),
maxWriteBatchSize: DefaultMaxWriteBatchSize,
dbname: "",
}
// Run the options on it
for _, option := range options {
if err := option(c); err != nil {
return nil, err
}
}
return c, nil
}
// WithURI defines the full connection string of the MongoDB database.
func WithURI(uri string) ClientOptionFunc {
return func(c *Client) error {
// In this refactored version, you don't need to explicitly parse the URI for validation.
// The MongoDB Go Driver will validate the URI when you attempt to connect.
// However, you can still perform some basic checks if needed (e.g., non-empty string).
if uri == "" {
return errors.New("URI cannot be empty")
}
if dbname, err := extractDBNameFromURI(uri); err == nil {
c.dbname = dbname
} else {
c.dbname = ""
}
// Assuming c.clientOptions is an instance of *options.ClientOptions
if c.clientOptions == nil {
c.clientOptions = options.Client() // Initialize if not already done
}
c.uri = uri
// Set the URI directly on the client options.
// The MongoDB Go driver takes care of parsing and using this URI when connecting.
c.clientOptions.ApplyURI(uri)
return nil
}
}
// WithSSL configures the database connection to connect via TLS. Setting ssl to true
// without proper TLS configuration will enable SSL with certificate verification skipped,
// which should be used with caution.
func WithSSL(ssl bool, insecureSkipVerify bool) ClientOptionFunc {
return func(c *Client) error {
if ssl {
tlsConfig := &tls.Config{}
// Control whether to perform SSL cert verification.
tlsConfig.InsecureSkipVerify = insecureSkipVerify
if !insecureSkipVerify {
// Assuming you want to use the system's root CA set when not skipping verification.
// This is important for validating server certificates.
tlsConfig.RootCAs = x509.NewCertPool()
}
// Make sure to initialize clientOptions if it hasn't been already
if c.clientOptions == nil {
c.clientOptions = options.Client()
}
// Apply the TLS configuration for the MongoDB client
c.clientOptions.SetTLSConfig(tlsConfig)
}
return nil
}
}
func WithCACerts(certs []string) ClientOptionFunc {
return func(c *Client) error {
if len(certs) > 0 {
roots := x509.NewCertPool()
for _, certPath := range certs {
certBytes, err := ioutil.ReadFile(certPath)
if err != nil {
return errors.New("could not read cert file: " + certPath)
}
if ok := roots.AppendCertsFromPEM(certBytes); !ok {
return errors.New("failed to append cert from PEM")
}
}
if c.clientOptions == nil {
c.clientOptions = options.Client()
}
// Ensure any preset TLS config is respected or create a new one
tlsConfig := c.clientOptions.TLSConfig
if tlsConfig == nil {
tlsConfig = &tls.Config{}
}
tlsConfig.RootCAs = roots
tlsConfig.InsecureSkipVerify = false // Set based on your security policies
// Apply the TLS config to the client options
c.clientOptions.SetTLSConfig(tlsConfig)
}
return nil
}
}
// WithWriteConcern configures the write concern option for the MongoDB client.
func WithWriteConcern(wc int) ClientOptionFunc {
return func(c *Client) error {
// If wc is less than 1, it implies a default or unspecified write concern.
// MongoDB's default write concern is applied in such cases.
// The default behavior is server-defined, typically equivalent to acknowledging
// the write operation on the primary only (w: 1).
writeConcern := writeconcern.New(writeconcern.W(wc))
if wc > 0 {
if c.clientOptions == nil {
c.clientOptions = options.Client() // Initialize if not already done
}
c.clientOptions.SetWriteConcern(writeConcern)
}
return nil
}
}
// WithTail set the flag to tell the Client whether or not access to the oplog will be
// needed (Default: false).
func WithTail(tail bool) ClientOptionFunc {
return func(c *Client) error {
c.tail = tail
return nil
}
}
// WithReadPreference sets the MongoDB read preference based on the provided string.
// WithReadPreference sets the MongoDB read preference based on the provided string.
func WithReadPreference(readPreference string) ClientOptionFunc {
return func(c *Client) error {
var rp *readpref.ReadPref
switch strings.ToLower(readPreference) {
case "primary":
rp = readpref.Primary()
case "primarypreferred":
rp = readpref.PrimaryPreferred()
case "secondary":
rp = readpref.Secondary()
case "secondarypreferred":
rp = readpref.SecondaryPreferred()
case "nearest":
rp = readpref.Nearest()
default:
//return InvalidReadPreferenceError{ReadPreference: readPreference}
rp = readpref.Primary()
}
// Assuming c.clientOptions is an instance of *options.ClientOptions that you're building up
if c.clientOptions == nil {
c.clientOptions = options.Client()
}
c.clientOptions.SetReadPreference(rp)
return nil
}
}
func (c *Client) Connect() (client.Session, error) {
// If the client has already been initialized, perform a quick ping to verify the connection is still good.
if c.client != nil {
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) // Short timeout for ping
defer cancel()
if err := c.client.Ping(ctx, nil); err != nil {
// If the ping fails, you may choose to log this error, handle it, or attempt to reconnect.
// For this example, let's log and attempt reconnection.
log.Infoln("Ping to MongoDB failed, attempting to reconnect: %s", err)
c.client = nil // Reset client before reconnection
// Note: Depending on your application's requirements, you might handle this differently.
} else {
// Connection is still active
return &Session{c.client, c.dbname}, nil
}
}
// Create a context with timeout for initializing connection
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
// Attempt to initialize the connection if it has not been done or needs to be redone.
if c.client == nil {
var err error
err = c.initConnection(ctx)
if err != nil {
return nil, err
}
}
// Perform a ping to ensure the newly established connection is active.
if err := c.client.Ping(ctx, nil); err != nil {
return nil, err
}
return &Session{c.client, c.dbname}, nil
}
func (c *Client) Close() {
if c.client != nil {
// Create a context specifically for the disconnection process
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel() // Ensure that resources are released when the operation completes
// Disconnect the client and clean up connection resources
err := c.client.Disconnect(ctx)
if err != nil {
log.Errorln("Error on closing connection")
}
}
}
func (c *Client) initConnection(ctx context.Context) error {
// Configuration for MongoDB client
clientOptions := options.Client().ApplyURI(c.uri)
log.With("database", c.dbname).Infof("connecting to URI %s", c.uri)
// log client options
if c.tlsConfig != nil {
clientOptions = clientOptions.SetTLSConfig(c.tlsConfig)
}
// Connecting to MongoDB
mongoClient, err := mongo.Connect(ctx, clientOptions)
if err != nil {
return fmt.Errorf("failed to connect to MongoDB: %v", err)
}
// Perform a ping to validate connectivity
if err := mongoClient.Ping(ctx, nil); err != nil {
return fmt.Errorf("failed to ping MongoDB: %v", err)
}
// Check for oplog access if 'tail' is enabled
if c.tail {
if err := c.checkOplogAccess(ctx, mongoClient); err != nil {
return err // Directly returning the error from checkOplogAccess
}
}
c.client = mongoClient
return nil
}
// checkOplogAccess checks whether access to the oplog collection is available.
func (c *Client) checkOplogAccess(ctx context.Context, mongoClient *mongo.Client) error {
database := mongoClient.Database("local")
collections, err := database.ListCollectionNames(ctx, bson.M{})
if err != nil {
return fmt.Errorf("failed to list collections in 'local' database: %v", err)
}
oplogFound := false
for _, collName := range collections {
if collName == "oplog.rs" {
oplogFound = true
break
}
}
if !oplogFound {
return fmt.Errorf("database missing 'oplog.rs' collection")
}
// Further checks for Oplog read access could go here, using queries etc.,
// but MongoDB's roles and permissions typically manage this.
return nil
}