Skip to content

Commit

Permalink
Get rid on phout aggragator multi pool logic. Fix #63.
Browse files Browse the repository at this point in the history
  • Loading branch information
skipor committed Aug 23, 2017
1 parent a9dbe50 commit 52baaeb
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 53 deletions.
72 changes: 20 additions & 52 deletions core/aggregate/netsample/phout.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,29 +6,42 @@ import (
"io"
"os"
"strconv"
"sync"
"time"

"github.com/pkg/errors"
"github.com/spf13/afero"
)

func GetPhout(fs afero.Fs, conf PhoutConfig) (Aggregator, error) {
return defaultPhoutAggregator.get(fs, conf)
}

type PhoutConfig struct {
Destination string
}

func NewPhout(fs afero.Fs, conf PhoutConfig) (a Aggregator, err error) {
filename := conf.Destination
var file afero.File = os.Stdout
if filename != "" {
file, err = fs.OpenFile(filename, os.O_APPEND|os.O_WRONLY|os.O_CREATE|os.O_SYNC, 0666)
}
if err != nil {
err = errors.Wrap(err, "phout output file open failed")
return
}
a = &phoutAggregator{
sink: make(chan *Sample, 32*1024),
writer: bufio.NewWriterSize(file, 512*1024),
buf: make([]byte, 0, 1024),
file: file,
}
return
}

type phoutAggregator struct {
sink chan *Sample
writer *bufio.Writer
buf []byte
file io.Closer
}

var _ Aggregator = (*phoutAggregator)(nil)

func (a *phoutAggregator) Report(s *Sample) { a.sink <- s }

func (a *phoutAggregator) Run(ctx context.Context) error {
Expand Down Expand Up @@ -78,51 +91,6 @@ func (a *phoutAggregator) handle(s *Sample) error {
return err
}

var defaultPhoutAggregator = newPhoutResultListeners()

type phoutAggregators struct {
sync.Mutex
aggregators map[string]Aggregator
}

func newPhout(fs afero.Fs, conf PhoutConfig) (a *phoutAggregator, err error) {
filename := conf.Destination
var file afero.File = os.Stdout
if filename != "" {
file, err = fs.OpenFile(filename, os.O_APPEND|os.O_WRONLY|os.O_CREATE|os.O_SYNC, 0666)
}
if err != nil {
return
}
a = &phoutAggregator{
sink: make(chan *Sample, 32*1024),
writer: bufio.NewWriterSize(file, 512*1024),
buf: make([]byte, 0, 1024),
file: file,
}
return
}

func newPhoutResultListeners() *phoutAggregators {
return &phoutAggregators{aggregators: make(map[string]Aggregator)}
}

func (l *phoutAggregators) get(fs afero.Fs, conf PhoutConfig) (Aggregator, error) {
dest := conf.Destination
l.Lock()
defer l.Unlock()
rl, ok := l.aggregators[dest]
if !ok {
rl, err := newPhout(fs, conf)
if err != nil {
return nil, err
}
l.aggregators[dest] = rl
return rl, nil
}
return rl, nil
}

const phoutDelimiter = '\t'

func appendPhout(s *Sample, dst []byte) []byte {
Expand Down
2 changes: 1 addition & 1 deletion core/import/import.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import (

func Import(fs afero.Fs) {
register.Aggregator("phout", func(conf netsample.PhoutConfig) (core.Aggregator, error) {
a, err := netsample.GetPhout(fs, conf)
a, err := netsample.NewPhout(fs, conf)
return netsample.WrapAggregator(a), err
})

Expand Down

0 comments on commit 52baaeb

Please sign in to comment.