Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

stats: delete unused Counters and Timers when flushing #160

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 47 additions & 1 deletion stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,14 @@ import (
tagspkg "github.com/lyft/gostats/internal/tags"
)

// Counter and Timer metrics that were zero after unusedMetricPruneCount are
// deleted from the statStore.
//
// TODO: considering a time interval for this instead of basing it off flush
// count. This guards against aggressively short/long flush intervals, which
// is configurable and thus out of our control.
const unusedMetricPruneCount = 4

// A Store holds statistics.
// There are two options when creating a new store:
//
Expand Down Expand Up @@ -238,6 +246,8 @@ func NewDefaultStore() Store {
type counter struct {
currentValue uint64
lastSentValue uint64
// number of times this counter had a value of zero during flush
zeroCount uint32
}

func (c *counter) Add(delta uint64) {
Expand Down Expand Up @@ -302,6 +312,12 @@ type timer struct {
base time.Duration
name string
sink Sink
// active is a boolean used to check if the timer was used between flushes
active uint32
// zeroCount is the number of times the timer was not used between
// flushes - if exceeds unusedMetricPruneCount the timer is deleted
// from the Store.
zeroCount uint32
}

func (t *timer) time(dur time.Duration) {
Expand All @@ -313,6 +329,7 @@ func (t *timer) AddDuration(dur time.Duration) {
}

func (t *timer) AddValue(value float64) {
atomic.StoreUint32(&t.active, 1)
t.sink.FlushTimer(t.name, value)
}

Expand Down Expand Up @@ -369,10 +386,35 @@ func (s *statStore) Flush() {
}
s.mu.RUnlock()

// This is kinda slow and does not write to the sink so run it in a
// separate goroutine.
wg := new(sync.WaitGroup)
wg.Add(1)
go func(timers *sync.Map) {
defer wg.Done()
timers.Range(func(key, v interface{}) bool {
timer := v.(*timer)
switch {
case atomic.SwapUint32(&timer.active, 0) != 0:
atomic.StoreUint32(&timer.zeroCount, 0)
case atomic.AddUint32(&timer.zeroCount, 1) >= unusedMetricPruneCount:
timers.Delete(key)
}
return true
})
}(&s.timers)

s.counters.Range(func(key, v interface{}) bool {
c := v.(*counter)
value := c.latch()
switch {
// do not flush counters that are set to zero
if value := v.(*counter).latch(); value != 0 {
case value != 0:
s.sink.FlushCounter(key.(string), value)
atomic.StoreUint32(&c.zeroCount, 0)
// delete unused counters
case atomic.AddUint32(&c.zeroCount, 1) >= unusedMetricPruneCount:
s.counters.Delete(key)
}
return true
})
Expand All @@ -386,6 +428,10 @@ func (s *statStore) Flush() {
if ok {
flushableSink.Flush()
}

// Wait for the goroutine pruning timers to finish. This prevents an
// explosion of goroutines if someone calls Flush in a hot loop.
wg.Wait()
}

func (s *statStore) AddStatGenerator(statGenerator StatGenerator) {
Expand Down
84 changes: 84 additions & 0 deletions stats_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -379,6 +379,45 @@ func TestPerInstanceStats(t *testing.T) {
})
}

func TestStatsStorePrune(t *testing.T) {
s := NewStore(nullSink{}, false).(*statStore)
tick := time.NewTicker(time.Hour) // don't flush automatically
defer tick.Stop()
go s.Start(tick)

const N = 1024
for i := 0; i < N; i++ {
id := strconv.Itoa(i)
s.NewCounter("counter_" + id)
s.NewTimer("timer_" + id)
}

mlen := func(m *sync.Map) int {
n := 0
m.Range(func(_, _ any) bool {
n++
return true
})
return n
}

for i := 0; i < unusedMetricPruneCount; i++ {
if n := mlen(&s.counters); n != N {
t.Errorf("len(s.counters) == %d; want: %d", n, N)
}
if n := mlen(&s.timers); n != N {
t.Errorf("len(s.timers) == %d; want: %d", n, N)
}
s.Flush()
}
if n := mlen(&s.counters); n != 0 {
t.Errorf("len(s.counters) == %d; want: %d", n, 0)
}
if n := mlen(&s.timers); n != 0 {
t.Errorf("len(s.timers) == %d; want: %d", n, 0)
}
}

func BenchmarkStore_MutexContention(b *testing.B) {
s := NewStore(nullSink{}, false)
t := time.NewTicker(500 * time.Microsecond) // we want flush to contend with accessing metrics
Expand Down Expand Up @@ -493,3 +532,48 @@ func BenchmarkStoreNewPerInstanceCounter(b *testing.B) {
}
})
}

func BenchmarkStoreNewCounterParallel(b *testing.B) {
s := NewStore(nullSink{}, false)
t := time.NewTicker(time.Hour) // don't flush
defer t.Stop()
go s.Start(t)
names := new([2048]string)
for i := 0; i < len(names); i++ {
names[i] = "counter_" + strconv.Itoa(i)
}
b.ResetTimer()
b.RunParallel(func(pb *testing.PB) {
for i := 0; pb.Next(); i++ {
s.NewCounter(names[i%len(names)])
}
})
}

func BenchmarkStoreFlush(b *testing.B) {
s := NewStore(nullSink{}, false)

var counters [2048]*counter
var timers [2048]*timer
for i := 0; i < len(counters); i++ {
id := strconv.Itoa(i)
counters[i] = s.NewCounter("counter_" + id).(*counter)
counters[i].Set(1)
timers[i] = s.NewTimer("timer_" + id).(*timer)
}
b.ResetTimer()

for i, n := 0, 0; i < b.N; i++ {
s.Flush()
// This takes ~2.5 microseconds so no point stopping the timer
if n++; n == unusedMetricPruneCount-1 {
n = 0
for i := 0; i < len(counters); i++ {
counters[i].currentValue = uint64(i)
counters[i].zeroCount = 0
timers[i].active = 1
timers[i].zeroCount = 0
}
}
}
}