Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add option to disable synchronization on creation for auto refresh cache #5940

Merged
merged 5 commits into from
Jan 23, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 19 additions & 4 deletions flytestdlib/cache/in_memory_auto_refresh.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ func getEvictionFunction(counter prometheus.Counter) func(key interface{}, value
type Options struct {
clock clock.WithTicker
createBatchesCb CreateBatchesFunc
syncOnCreate bool
}

// WithClock configures the clock to use for time related operations. Mainly used for unit testing.
Expand All @@ -67,10 +68,20 @@ func WithCreateBatchesFunc(createBatchesCb CreateBatchesFunc) Option {
}
}

// WithSyncOnCreate configures whether the cache will attempt to sync items upon creation or wait until the next
// sync interval. Disabling this can be useful when the cache is under high load and synchronization both frequently
// and in large batches. Defaults to true.
func WithSyncOnCreate(syncOnCreate bool) Option {
return func(mo *Options) {
mo.syncOnCreate = syncOnCreate
}
}

func defaultOptions() *Options {
opts := &Options{}
WithClock(clock.RealClock{})(opts)
WithCreateBatchesFunc(SingleItemBatches)(opts)
WithSyncOnCreate(true)(opts)
return opts
}

Expand Down Expand Up @@ -102,6 +113,7 @@ type InMemoryAutoRefresh struct {
syncCount atomic.Int32 // internal sync counter for unit testing
enqueueCount atomic.Int32 // internal enqueue counter for unit testing
enqueueLoopRunning atomic.Bool // internal bool to ensure goroutines are running
syncOnCreate bool
}

// NewInMemoryAutoRefresh creates a new InMemoryAutoRefresh
Expand Down Expand Up @@ -145,6 +157,7 @@ func NewInMemoryAutoRefresh(
syncCount: atomic.NewInt32(0),
enqueueCount: atomic.NewInt32(0),
enqueueLoopRunning: atomic.NewBool(false),
syncOnCreate: opts.syncOnCreate,
}

return cache, nil
Expand Down Expand Up @@ -228,10 +241,12 @@ func (w *InMemoryAutoRefresh) GetOrCreate(id ItemID, item Item) (Item, error) {

// It fixes cold start issue in the AutoRefreshCache by adding the item to the workqueue when it is created.
// This way, the item will be processed without waiting for the next sync cycle (30s by default).
batch := make([]ItemWrapper, 0, 1)
batch = append(batch, itemWrapper{id: id, item: item})
w.workqueue.AddRateLimited(&batch)
w.processing.Store(id, w.clock.Now())
if w.syncOnCreate {
batch := make([]ItemWrapper, 0, 1)
batch = append(batch, itemWrapper{id: id, item: item})
w.workqueue.AddRateLimited(&batch)
Comment on lines +244 to +247
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider nil check for workqueue

Consider adding a check for w.workqueue being nil before using it in GetOrCreate. The workqueue field could potentially be nil if initialization failed.

Code suggestion
Check the AI-generated fix before applying
Suggested change
if w.syncOnCreate {
batch := make([]ItemWrapper, 0, 1)
batch = append(batch, itemWrapper{id: id, item: item})
w.workqueue.AddRateLimited(&batch)
if w.syncOnCreate {
if w.workqueue == nil {
return item, fmt.Errorf("workqueue not initialized")
}
batch := make([]ItemWrapper, 0, 1)
batch = append(batch, itemWrapper{id: id, item: item})
w.workqueue.AddRateLimited(&batch)

Code Review Run #8a272a


Is this a valid issue, or was it incorrectly flagged by the Agent?

  • it was incorrectly flagged

w.processing.Store(id, w.clock.Now())
}
return item, nil
}

Expand Down
34 changes: 34 additions & 0 deletions flytestdlib/cache/in_memory_auto_refresh_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,40 @@ func TestCacheFour(t *testing.T) {
assert.NoError(t, err)
}

// Cache should be processing
assert.NotEmpty(t, cache.processing)
Comment on lines +98 to +99
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider adding initial state validation

Consider adding assertions to verify the initial state of cache items before triggering the sync. This would help validate that items are properly initialized with val: 0 before being synced to val: 10.

Code suggestion
Check the AI-generated fix before applying
 @@ -97,6 +97,12 @@
 		}
 		// Cache should be processing
 		assert.NotEmpty(t, cache.processing)
 +		// Verify initial values
 +		for i := 1; i <= 10; i++ {
 +			item, err := cache.Get(fmt.Sprintf("%d", i))
 +			assert.NoError(t, err)
 +			assert.Equal(t, 0, item.(fakeCacheItem).val)
 +		}

Code Review Run #8a272a


Is this a valid issue, or was it incorrectly flagged by the Agent?

  • it was incorrectly flagged


assert.EventuallyWithT(t, func(c *assert.CollectT) {
// trigger periodic sync
fakeClock.Step(testResyncPeriod)

for i := 1; i <= 10; i++ {
item, err := cache.Get(fmt.Sprintf("%d", i))
assert.NoError(c, err)
assert.Equal(c, 10, item.(fakeCacheItem).val)
}
}, 3*time.Second, time.Millisecond)
cancel()
})

t.Run("disable sync on create", func(t *testing.T) {
cache, err := NewInMemoryAutoRefresh("fake1", syncFakeItem, rateLimiter, testResyncPeriod, 10, 10, promutils.NewTestScope(), WithClock(fakeClock), WithSyncOnCreate(false))
assert.NoError(t, err)

ctx, cancel := context.WithCancel(context.Background())
assert.NoError(t, cache.Start(ctx))

// Create ten items in the cache
for i := 1; i <= 10; i++ {
_, err := cache.GetOrCreate(fmt.Sprintf("%d", i), fakeCacheItem{
val: 0,
})
assert.NoError(t, err)
}

// Validate that none are processing since they aren't synced on creation
assert.Empty(t, cache.processing)

assert.EventuallyWithT(t, func(c *assert.CollectT) {
// trigger periodic sync
fakeClock.Step(testResyncPeriod)
Expand Down
Loading