diff --git a/stats.go b/stats.go index 241f8dc..4143dc6 100644 --- a/stats.go +++ b/stats.go @@ -1,6 +1,7 @@ package stats import ( + "context" "strconv" "sync" "sync/atomic" @@ -40,18 +41,16 @@ type Store interface { // and flush all the Counters and Gauges registered with it. Flush() - // Start a timer for periodic stat flushes. - // - // If Start is called multiple times, the previous ticker is - // stopped and a new replacement ticker is started. It is - // equivalent to calling Stop() and then Start() with the new - // ticker. + // Start a timer for periodic stat flushes. This is a blocking + // call and should be called in a goroutine. Start(*time.Ticker) - // Stop stops the running periodic flush timer. + // StartContext starts a timer for periodic stat flushes. This is + // a blocking call and should be called in a goroutine. // - // If no periodic flush is currently active, this is a no-op. - Stop() + // If the passed-in context is cancelled, then this call + // exits. Flush will be called on exit. + StartContext(context.Context, *time.Ticker) // Add a StatGenerator to the Store that programatically generates stats. AddStatGenerator(StatGenerator) @@ -343,68 +342,24 @@ type statStore struct { mu sync.RWMutex statGenerators []StatGenerator - stop chan bool - wg *sync.WaitGroup sink Sink } -func (s *statStore) Start(ticker *time.Ticker) { - s.mu.Lock() - defer s.mu.Unlock() - - // if there is already a stop channel allocated, that means there - // is a ticker running - we will replace the ticker now. - if s.stop != nil { - s.stopLocked() - } - - stopChan := make(chan bool, 1) - wg := &sync.WaitGroup{} - wg.Add(1) - - s.stop = stopChan - s.wg = wg - - go func() { - defer wg.Done() - for { - select { - case <-ticker.C: - s.Flush() - case <-stopChan: - return - } +func (s *statStore) StartContext(ctx context.Context, ticker *time.Ticker) { + for { + select { + case <-ctx.Done(): + s.Flush() + return + case <-ticker.C: + s.Flush() } - }() -} - -// stopLocked is the core of the stop implementation, but without a -// lock. -func (s *statStore) stopLocked() { - // if the stop channel is nil, there is no ticker running, so this - // is a no-op. - if s.stop == nil { - return } - - // close to make the flush goroutine stop - close(s.stop) - - // wait for the flush goroutine to fully stop - s.wg.Wait() - - // nil out the stop channel - s.stop = nil - - // nil out the wait group for tidyness - s.wg = nil } -func (s *statStore) Stop() { - s.mu.Lock() - defer s.mu.Unlock() - s.stopLocked() +func (s *statStore) Start(ticker *time.Ticker) { + s.StartContext(context.Background(), ticker) } func (s *statStore) Flush() { diff --git a/stats_test.go b/stats_test.go index f691113..58b24fd 100644 --- a/stats_test.go +++ b/stats_test.go @@ -1,6 +1,7 @@ package stats import ( + "context" crand "crypto/rand" "encoding/hex" "fmt" @@ -43,70 +44,29 @@ func TestStats(t *testing.T) { wg.Wait() } -// TestStatsStartMultipleTimes ensures starting a periodic flush twice -// works as expected. -func TestStatsStartMultipleTimes(t *testing.T) { +// TestStatsStartContext ensures that a cancelled context cancels a +// flushing goroutine. +func TestStatsStartContext(t *testing.T) { sink := &testStatSink{} store := NewStore(sink, true) - // we check to make sure the two stop channels are different, - // that's the best we can do. - store.Start(time.NewTicker(1 * time.Minute)) + ctx, cancel := context.WithCancel(context.Background()) + tick := time.NewTicker(1 * time.Minute) - // grab the stop channel - realStore := store.(*statStore) - stopChan1 := realStore.stop + wg := &sync.WaitGroup{} - store.Start(time.NewTicker(10 * time.Hour)) - - // grab the new stop channel - stopChan2 := realStore.stop - - if stopChan1 == stopChan2 { - t.Error("two stop channels are the same") - } -} - -// TestStatsStartStop ensures starting a periodic flush can be -// stopped. -func TestStatsStartStop(t *testing.T) { - sink := &testStatSink{} - store := NewStore(sink, true) - - store.Start(time.NewTicker(1 * time.Minute)) - store.Stop() - - realStore := store.(*statStore) - - // we check to make sure the two stop channel is nil'ed out, that - // is the best we can do to avoid flakey time based tests. - if realStore.stop != nil { - t.Errorf("expected stop channel to be nil") - } -} - -// TestStatsStartStopMultipleTimes ensures starting a periodic flush -// can be stopped, even if called multiple times. -func TestStatsStartStopMultipleTimes(t *testing.T) { - sink := &testStatSink{} - store := NewStore(sink, true) - - // ensure we can call it if no ticker was ever started - store.Stop() - - // start one, and stop many times. - store.Start(time.NewTicker(1 * time.Minute)) - store.Stop() - store.Stop() - store.Stop() + wg.Add(1) + go func() { + defer wg.Done() + store.StartContext(ctx, tick) + }() - realStore := store.(*statStore) + // now we cancel, and its ok to do this at any point - the + // goroutine above could have started or not started, either case + // is ok. + cancel() - // we check to make sure the two stop channel is nil'ed out, that - // is the best we can do to avoid flakey time based tests. - if realStore.stop != nil { - t.Errorf("expected stop channel to be nil") - } + wg.Wait() } // Ensure timers and timespans are working