Skip to content

Commit

Permalink
Add TreeCache state for monitoring
Browse files Browse the repository at this point in the history
  • Loading branch information
wayt committed Nov 1, 2023
1 parent b98bde4 commit 1bae4f1
Showing 1 changed file with 43 additions and 0 deletions.
43 changes: 43 additions & 0 deletions tree_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ func NewTreeCache(conn *Conn, path string, options ...TreeCacheOption) *TreeCach
rootNode: newTreeCacheNode("", &Stat{}, nil),
reservoirLimit: defaultReservoirLimit,
batchSize: defaultBatchSize,
state: TreeCacheStateStopped,
}
for _, option := range options {
option(tc)
Expand Down Expand Up @@ -120,6 +121,9 @@ type TreeCacheListener interface {

// OnNodeDataChanged is called when a node's data is changed after last full sync.
OnNodeDataChanged(path string, data []byte, stat *Stat)

// OnStateChanged is called when the tree cache's state changes.
OnStateChanged(state TreeCacheSate)
}

// TreeCacheListenerFuncs is a convenience type that implements TreeCacheListener with function callbacks.
Expand All @@ -133,6 +137,7 @@ type TreeCacheListenerFuncs struct {
OnNodeDeletingFunc func(path string, data []byte, stat *Stat)
OnNodeDeletedFunc func(path string)
OnNodeDataChangedFunc func(path string, data []byte, stat *Stat)
OnStateChangedFunc func(state TreeCacheSate)
}

func (l *TreeCacheListenerFuncs) OnSyncStarted() {
Expand Down Expand Up @@ -183,6 +188,22 @@ func (l *TreeCacheListenerFuncs) OnNodeDataChanged(path string, data []byte, sta
}
}

func (l *TreeCacheListenerFuncs) OnStateChanged(state TreeCacheSate) {
if l.OnStateChangedFunc != nil {
l.OnStateChangedFunc(state)
}
}

type TreeCacheSate int

const (
TreeCacheStateStopped TreeCacheSate = iota // Tree cache is stopped.
TreeCacheStateWaitingForSession // Waiting for session to be established.
TreeCacheStateWaitingForRootPath // Waiting for root path to exist.
TreeCacheStateSyncing // Syncing tree.
TreeCacheStateRunning // Running normally.
)

type TreeCache struct {
conn *Conn
logger Logger
Expand All @@ -198,6 +219,7 @@ type TreeCache struct {
initialSyncResult chan error // Closed when initial sync completes or fails; holds error if failed.
syncMutex sync.Mutex // Protects sync state.
listener TreeCacheListener
state TreeCacheSate // Current state of the tree cache.
}

func (tc *TreeCache) Sync(ctx context.Context) (err error) {
Expand Down Expand Up @@ -240,15 +262,18 @@ func (tc *TreeCache) Sync(ctx context.Context) (err error) {

// Loop until the context is canceled or the connection is closed.
for {
tc.setState(TreeCacheStateStopped)
if ctx.Err() != nil {
return ctx.Err() // Context was canceled - this is fatal.
}

// Wait for connection to establish a session.
tc.setState(TreeCacheStateWaitingForSession)
if err := tc.waitForSession(ctx); err != nil {
return err // Suggests the connection was closed - this is fatal.
}

tc.setState(TreeCacheStateWaitingForRootPath)
// Wait for path to exist.
if found, _, existsCh, err := tc.conn.ExistsWCtx(ctx, tc.rootPath); !found || err != nil {
if err != nil {
Expand Down Expand Up @@ -278,6 +303,7 @@ func (tc *TreeCache) Sync(ctx context.Context) (err error) {
}

func (tc *TreeCache) doSync(ctx context.Context) error {
tc.setState(TreeCacheStateSyncing)
stalled := &atomic.Value{}
stalled.Store(false)
// Start a recursive watch, so we do not miss any changes.
Expand Down Expand Up @@ -352,6 +378,8 @@ func (tc *TreeCache) doSync(ctx context.Context) error {
}
tc.syncMutex.Unlock()

tc.setState(TreeCacheStateRunning)

syncElapsedTime := time.Since(syncStartTime)
tc.logger.Printf("synced tree cache in %s", syncElapsedTime)
if tc.listener != nil {
Expand Down Expand Up @@ -500,6 +528,21 @@ func (tc *TreeCache) delete(path string) {
tc.rootNode.deletePath(path)
}

func (tc *TreeCache) setState(state TreeCacheSate) {
if tc.state == state {
return
}

tc.state = state
if tc.listener != nil {
tc.listener.OnStateChanged(state)
}
}

func (tc *TreeCache) GetState() TreeCacheSate {
return tc.state
}

// WaitForInitialSync will wait for the cache to start and complete an initial sync of the tree.
// This method will return when any of the following conditions are met (whichever occurs first):
// 1. The initial sync completes,
Expand Down

0 comments on commit 1bae4f1

Please sign in to comment.