Skip to content

Commit

Permalink
Functional options (#160)
Browse files Browse the repository at this point in the history
  • Loading branch information
mantzas authored Oct 4, 2023
1 parent 3174d3f commit c51d5f2
Show file tree
Hide file tree
Showing 7 changed files with 167 additions and 315 deletions.
2 changes: 1 addition & 1 deletion docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ networks:
services:
consul:
container_name: harvester_consul_dev
image: consul:1.8.0
image: consul:1.15
networks:
- consul-network
ports:
Expand Down
10 changes: 6 additions & 4 deletions examples/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,10 +67,12 @@ func main() {

redisClient := createRedisClient()

h, err := harvester.New(&cfg).
WithConsulSeed(consulAddress, consulDC, consulToken, 0).WithConsulMonitor(consulAddress, consulDC, consulToken, 0).
WithRedisSeed(redisClient).WithRedisMonitor(redisClient, 200*time.Millisecond).
WithNotification(chNotify).Create()
h, err := harvester.New(&cfg, chNotify,
harvester.WithConsulSeed(consulAddress, consulDC, consulToken, 0),
harvester.WithConsulMonitor(consulAddress, consulDC, consulToken, 0),
harvester.WithRedisSeed(redisClient),
harvester.WithRedisMonitor(redisClient, 200*time.Millisecond),
)
if err != nil {
log.Fatalf("failed to create harvester: %v", err)
}
Expand Down
260 changes: 17 additions & 243 deletions harvester.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,10 @@ package harvester

import (
"context"
"errors"
"log/slog"
"time"

"github.com/beatlabs/harvester/config"
"github.com/beatlabs/harvester/monitor"
"github.com/beatlabs/harvester/monitor/consul"
redismon "github.com/beatlabs/harvester/monitor/redis"
"github.com/beatlabs/harvester/seed"
seedconsul "github.com/beatlabs/harvester/seed/consul"
seedredis "github.com/beatlabs/harvester/seed/redis"
"github.com/go-redis/redis/v8"
)

// Seeder interface for seeding initial values of the configuration.
Expand Down Expand Up @@ -51,255 +43,37 @@ func (h *harvester) Harvest(ctx context.Context) error {
return h.monitor.Monitor(ctx)
}

type consulConfig struct {
addr, dataCenter, token, folderPrefix string
timeout time.Duration
}

// Builder of a harvester instance.
type Builder struct {
cfg interface{}
seedConsulCfg *consulConfig
monitorConsulCfg *consulConfig
err error
chNotify chan<- config.ChangeNotification
monitorRedisClient redis.UniversalClient
seedRedisClient redis.UniversalClient
monitorRedisPollInterval time.Duration
}

// New constructor.
func New(cfg interface{}) *Builder {
return &Builder{cfg: cfg}
}

// WithNotification constructor.
func (b *Builder) WithNotification(chNotify chan<- config.ChangeNotification) *Builder {
if b.err != nil {
return b
}

if chNotify == nil {
b.err = errors.New("notification channel is nil")
return b
}

b.chNotify = chNotify
return b
}

// WithConsulSeed enables support for seeding values with consul.
func (b *Builder) WithConsulSeed(addr, dataCenter, token string, timeout time.Duration) *Builder {
return b.WithConsulSeedWithPrefix(addr, dataCenter, token, "", timeout)
}

// WithConsulSeedWithPrefix enables support for seeding values with consul including a folder prefix.
func (b *Builder) WithConsulSeedWithPrefix(addr, dataCenter, token, folderPrefix string, timeout time.Duration) *Builder {
if b.err != nil {
return b
}

b.seedConsulCfg = &consulConfig{
addr: addr,
dataCenter: dataCenter,
token: token,
folderPrefix: folderPrefix,
timeout: timeout,
}
return b
}

// WithConsulMonitor enables support for monitoring key/prefixes on ConsulLogger. It automatically parses the config
// and monitors every field found tagged with ConsulLogger.
func (b *Builder) WithConsulMonitor(addr, dataCenter, token string, timeout time.Duration) *Builder {
return b.WithConsulFolderPrefixMonitor(addr, dataCenter, token, "", timeout)
}

// WithConsulFolderPrefixMonitor enables support for monitoring key/prefixes on ConsulLogger. It automatically parses the config
// and monitors every field found tagged with ConsulLogger.
func (b *Builder) WithConsulFolderPrefixMonitor(addr, dataCenter, token, folderPrefix string, timeout time.Duration) *Builder {
if b.err != nil {
return b
}

b.monitorConsulCfg = &consulConfig{
addr: addr,
dataCenter: dataCenter,
token: token,
folderPrefix: folderPrefix,
timeout: timeout,
}
return b
}

// WithRedisSeed enables support for seeding values with redis.
func (b *Builder) WithRedisSeed(client redis.UniversalClient) *Builder {
if b.err != nil {
return b
}

if client == nil {
b.err = errors.New("redis seed client is nil")
return b
}
b.seedRedisClient = client
return b
}

// WithRedisMonitor enables support for monitoring keys in Redis. It automatically parses the config
// and monitors every field found tagged with ConsulLogger.
func (b *Builder) WithRedisMonitor(client redis.UniversalClient, pollInterval time.Duration) *Builder {
if b.err != nil {
return b
}

if client == nil {
b.err = errors.New("redis monitor client is nil")
return b
}

if pollInterval <= 0 {
b.err = errors.New("redis monitor poll interval should be a positive number")
return b
}

b.monitorRedisClient = client
b.monitorRedisPollInterval = pollInterval
return b
}

// Create the harvester instance.
func (b *Builder) Create() (Harvester, error) {
if b.err != nil {
return nil, b.err
}

cfg, err := config.New(b.cfg, b.chNotify)
if err != nil {
return nil, err
}

sd, err := b.setupSeeding()
if err != nil {
return nil, err
}

mon, err := b.setupMonitoring(cfg)
if err != nil {
return nil, err
}

return &harvester{seeder: sd, monitor: mon, cfg: cfg}, nil
}

func (b *Builder) setupSeeding() (Seeder, error) {
pp := make([]seed.Param, 0)

consulSeedParam, err := b.setupConsulSeeding()
if err != nil {
return nil, err
}

if consulSeedParam != nil {
pp = append(pp, *consulSeedParam)
}

redisSeedParam, err := b.setupRedisSeeding()
if err != nil {
return nil, err
}

if redisSeedParam != nil {
pp = append(pp, *redisSeedParam)
}

return seed.New(pp...), nil
}

func (b *Builder) setupConsulSeeding() (*seed.Param, error) {
if b.seedConsulCfg == nil {
return nil, nil
}

getter, err := seedconsul.NewWithFolderPrefix(b.seedConsulCfg.addr, b.seedConsulCfg.dataCenter, b.seedConsulCfg.token, b.seedConsulCfg.folderPrefix,
b.seedConsulCfg.timeout)
// New constructor with functional options support.
// Notification channel is optional and can be nil.
func New(cfg interface{}, ch chan<- config.ChangeNotification, oo ...OptionFunc) (Harvester, error) {
hCfg, err := config.New(cfg, ch)
if err != nil {
return nil, err
}

return seed.NewParam(config.SourceConsul, getter)
}

func (b *Builder) setupRedisSeeding() (*seed.Param, error) {
if b.seedRedisClient == nil {
return nil, nil
opt := &options{
cfg: hCfg,
}

getter, err := seedredis.New(b.seedRedisClient)
if err != nil {
return nil, err
for _, option := range oo {
err = option(opt)
if err != nil {
return nil, err
}
}

return seed.NewParam(config.SourceRedis, getter)
}

func (b *Builder) setupMonitoring(cfg *config.Config) (Monitor, error) {
var watchers []monitor.Watcher
sd := seed.New(opt.seedParams...)

consulWatcher, err := b.setupConsulMonitoring(cfg)
if err != nil {
return nil, err
}
var mon *monitor.Monitor

if consulWatcher != nil {
watchers = append(watchers, consulWatcher)
if len(opt.monitorParams) == 0 {
return &harvester{cfg: hCfg, seeder: sd, monitor: nil}, nil
}

redisWatcher, err := b.setupRedisMonitoring(cfg)
mon, err = monitor.New(opt.cfg, opt.monitorParams...)
if err != nil {
return nil, err
}

if redisWatcher != nil {
watchers = append(watchers, redisWatcher)
}

if len(watchers) == 0 {
return nil, nil
}

return monitor.New(cfg, watchers...)
}

func (b *Builder) setupConsulMonitoring(cfg *config.Config) (*consul.Watcher, error) {
if b.monitorConsulCfg == nil {
return nil, nil
}
items := make([]consul.Item, 0)
for _, field := range cfg.Fields {
consulKey, ok := field.Sources()[config.SourceConsul]
if !ok {
continue
}
slog.Debug("monitoring consul", "key", consulKey)
items = append(items, consul.NewKeyItemWithPrefix(consulKey, b.monitorConsulCfg.folderPrefix))
}
return consul.New(b.monitorConsulCfg.addr, b.monitorConsulCfg.dataCenter, b.monitorConsulCfg.token,
b.monitorConsulCfg.timeout, items...)
}

func (b *Builder) setupRedisMonitoring(cfg *config.Config) (*redismon.Watcher, error) {
if b.monitorRedisClient == nil {
return nil, nil
}
items := make([]string, 0)
for _, field := range cfg.Fields {
redisKey, ok := field.Sources()[config.SourceRedis]
if !ok {
continue
}
slog.Debug("monitoring redis", "key", redisKey)
items = append(items, redisKey)
}
return redismon.New(b.monitorRedisClient, b.monitorRedisPollInterval, items)
return &harvester{cfg: hCfg, seeder: sd, monitor: mon}, nil
}
11 changes: 5 additions & 6 deletions harvester_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,12 +75,11 @@ func TestMain(m *testing.M) {

func Test_harvester_Harvest(t *testing.T) {
cfg := testConfigWithSecret{}
h, err := New(&cfg).
WithConsulSeed(addr, "", "", 0).
WithConsulMonitor(addr, "", "", 0).
WithRedisSeed(redisClient).
WithRedisMonitor(redisClient, 10*time.Millisecond).
Create()
h, err := New(&cfg, nil,
WithConsulSeed(addr, "", "", 0),
WithConsulMonitor(addr, "", "", 0),
WithRedisSeed(redisClient),
WithRedisMonitor(redisClient, 10*time.Millisecond))
require.NoError(t, err)

ctx, cnl := context.WithCancel(context.Background())
Expand Down
Loading

0 comments on commit c51d5f2

Please sign in to comment.