Skip to content

Commit

Permalink
add StartContext, remove Stop (#147)
Browse files Browse the repository at this point in the history
This change removes the Stop() method from the previous change, and
fixes the Start behavior to be backwards compatible.

The previous change changed the behavior of Start() to be non-blocking
from blocking. This is a change in API that we should revert. Stop is
also removed - even though this too is a breaking change this is not a
1.0 release and the release is short-lived as of right now.

This change introduces a new method, StartContext, which should achieve
the goals from the last change, but retain backwards compatibility.

StartContext is a blocking call like Start is, but allows for a
context to be passed in, thus allowing for cancellation.
  • Loading branch information
tomwans authored Dec 20, 2022
1 parent f033a31 commit 911c76a
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 120 deletions.
81 changes: 18 additions & 63 deletions stats.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package stats

import (
"context"
"strconv"
"sync"
"sync/atomic"
Expand Down Expand Up @@ -40,18 +41,16 @@ type Store interface {
// and flush all the Counters and Gauges registered with it.
Flush()

// Start a timer for periodic stat flushes.
//
// If Start is called multiple times, the previous ticker is
// stopped and a new replacement ticker is started. It is
// equivalent to calling Stop() and then Start() with the new
// ticker.
// Start a timer for periodic stat flushes. This is a blocking
// call and should be called in a goroutine.
Start(*time.Ticker)

// Stop stops the running periodic flush timer.
// StartContext starts a timer for periodic stat flushes. This is
// a blocking call and should be called in a goroutine.
//
// If no periodic flush is currently active, this is a no-op.
Stop()
// If the passed-in context is cancelled, then this call
// exits. Flush will be called on exit.
StartContext(context.Context, *time.Ticker)

// Add a StatGenerator to the Store that programatically generates stats.
AddStatGenerator(StatGenerator)
Expand Down Expand Up @@ -343,68 +342,24 @@ type statStore struct {

mu sync.RWMutex
statGenerators []StatGenerator
stop chan bool
wg *sync.WaitGroup

sink Sink
}

func (s *statStore) Start(ticker *time.Ticker) {
s.mu.Lock()
defer s.mu.Unlock()

// if there is already a stop channel allocated, that means there
// is a ticker running - we will replace the ticker now.
if s.stop != nil {
s.stopLocked()
}

stopChan := make(chan bool, 1)
wg := &sync.WaitGroup{}
wg.Add(1)

s.stop = stopChan
s.wg = wg

go func() {
defer wg.Done()
for {
select {
case <-ticker.C:
s.Flush()
case <-stopChan:
return
}
func (s *statStore) StartContext(ctx context.Context, ticker *time.Ticker) {
for {
select {
case <-ctx.Done():
s.Flush()
return
case <-ticker.C:
s.Flush()
}
}()
}

// stopLocked is the core of the stop implementation, but without a
// lock.
func (s *statStore) stopLocked() {
// if the stop channel is nil, there is no ticker running, so this
// is a no-op.
if s.stop == nil {
return
}

// close to make the flush goroutine stop
close(s.stop)

// wait for the flush goroutine to fully stop
s.wg.Wait()

// nil out the stop channel
s.stop = nil

// nil out the wait group for tidyness
s.wg = nil
}

func (s *statStore) Stop() {
s.mu.Lock()
defer s.mu.Unlock()
s.stopLocked()
func (s *statStore) Start(ticker *time.Ticker) {
s.StartContext(context.Background(), ticker)
}

func (s *statStore) Flush() {
Expand Down
74 changes: 17 additions & 57 deletions stats_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package stats

import (
"context"
crand "crypto/rand"
"encoding/hex"
"fmt"
Expand Down Expand Up @@ -43,70 +44,29 @@ func TestStats(t *testing.T) {
wg.Wait()
}

// TestStatsStartMultipleTimes ensures starting a periodic flush twice
// works as expected.
func TestStatsStartMultipleTimes(t *testing.T) {
// TestStatsStartContext ensures that a cancelled context cancels a
// flushing goroutine.
func TestStatsStartContext(t *testing.T) {
sink := &testStatSink{}
store := NewStore(sink, true)

// we check to make sure the two stop channels are different,
// that's the best we can do.
store.Start(time.NewTicker(1 * time.Minute))
ctx, cancel := context.WithCancel(context.Background())
tick := time.NewTicker(1 * time.Minute)

// grab the stop channel
realStore := store.(*statStore)
stopChan1 := realStore.stop
wg := &sync.WaitGroup{}

store.Start(time.NewTicker(10 * time.Hour))

// grab the new stop channel
stopChan2 := realStore.stop

if stopChan1 == stopChan2 {
t.Error("two stop channels are the same")
}
}

// TestStatsStartStop ensures starting a periodic flush can be
// stopped.
func TestStatsStartStop(t *testing.T) {
sink := &testStatSink{}
store := NewStore(sink, true)

store.Start(time.NewTicker(1 * time.Minute))
store.Stop()

realStore := store.(*statStore)

// we check to make sure the two stop channel is nil'ed out, that
// is the best we can do to avoid flakey time based tests.
if realStore.stop != nil {
t.Errorf("expected stop channel to be nil")
}
}

// TestStatsStartStopMultipleTimes ensures starting a periodic flush
// can be stopped, even if called multiple times.
func TestStatsStartStopMultipleTimes(t *testing.T) {
sink := &testStatSink{}
store := NewStore(sink, true)

// ensure we can call it if no ticker was ever started
store.Stop()

// start one, and stop many times.
store.Start(time.NewTicker(1 * time.Minute))
store.Stop()
store.Stop()
store.Stop()
wg.Add(1)
go func() {
defer wg.Done()
store.StartContext(ctx, tick)
}()

realStore := store.(*statStore)
// now we cancel, and its ok to do this at any point - the
// goroutine above could have started or not started, either case
// is ok.
cancel()

// we check to make sure the two stop channel is nil'ed out, that
// is the best we can do to avoid flakey time based tests.
if realStore.stop != nil {
t.Errorf("expected stop channel to be nil")
}
wg.Wait()
}

// Ensure timers and timespans are working
Expand Down

0 comments on commit 911c76a

Please sign in to comment.