Skip to content

Commit

Permalink
Add option to disable synchronization on creation for auto refresh ca…
Browse files Browse the repository at this point in the history
…che (#5940)

* Minor auto_refresh cleanup

Signed-off-by: Jason Parraga <[email protected]>

* spelling

Signed-off-by: Jason Parraga <[email protected]>

* lint fix

Signed-off-by: Jason Parraga <[email protected]>

* Add option to refresh cache to disable sync on create

Signed-off-by: Jason Parraga <[email protected]>

---------

Signed-off-by: Jason Parraga <[email protected]>
Signed-off-by: Paul Dittamo <[email protected]>
Co-authored-by: Paul Dittamo <[email protected]>
  • Loading branch information
Sovietaced and pvditt authored Jan 23, 2025
1 parent 34d124d commit 8125ae1
Show file tree
Hide file tree
Showing 2 changed files with 53 additions and 4 deletions.
23 changes: 19 additions & 4 deletions flytestdlib/cache/in_memory_auto_refresh.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ func getEvictionFunction(counter prometheus.Counter) func(key interface{}, value
type Options struct {
clock clock.WithTicker
createBatchesCb CreateBatchesFunc
syncOnCreate bool
}

// WithClock configures the clock to use for time related operations. Mainly used for unit testing.
Expand All @@ -67,10 +68,20 @@ func WithCreateBatchesFunc(createBatchesCb CreateBatchesFunc) Option {
}
}

// WithSyncOnCreate configures whether the cache will attempt to sync items upon creation or wait until the next
// sync interval. Disabling this can be useful when the cache is under high load and synchronization both frequently
// and in large batches. Defaults to true.
func WithSyncOnCreate(syncOnCreate bool) Option {
return func(mo *Options) {
mo.syncOnCreate = syncOnCreate
}
}

func defaultOptions() *Options {
opts := &Options{}
WithClock(clock.RealClock{})(opts)
WithCreateBatchesFunc(SingleItemBatches)(opts)
WithSyncOnCreate(true)(opts)
return opts
}

Expand Down Expand Up @@ -102,6 +113,7 @@ type InMemoryAutoRefresh struct {
syncCount atomic.Int32 // internal sync counter for unit testing
enqueueCount atomic.Int32 // internal enqueue counter for unit testing
enqueueLoopRunning atomic.Bool // internal bool to ensure goroutines are running
syncOnCreate bool
}

// NewInMemoryAutoRefresh creates a new InMemoryAutoRefresh
Expand Down Expand Up @@ -145,6 +157,7 @@ func NewInMemoryAutoRefresh(
syncCount: atomic.NewInt32(0),
enqueueCount: atomic.NewInt32(0),
enqueueLoopRunning: atomic.NewBool(false),
syncOnCreate: opts.syncOnCreate,
}

return cache, nil
Expand Down Expand Up @@ -228,10 +241,12 @@ func (w *InMemoryAutoRefresh) GetOrCreate(id ItemID, item Item) (Item, error) {

// It fixes cold start issue in the AutoRefreshCache by adding the item to the workqueue when it is created.
// This way, the item will be processed without waiting for the next sync cycle (30s by default).
batch := make([]ItemWrapper, 0, 1)
batch = append(batch, itemWrapper{id: id, item: item})
w.workqueue.AddRateLimited(&batch)
w.processing.Store(id, w.clock.Now())
if w.syncOnCreate {
batch := make([]ItemWrapper, 0, 1)
batch = append(batch, itemWrapper{id: id, item: item})
w.workqueue.AddRateLimited(&batch)
w.processing.Store(id, w.clock.Now())
}
return item, nil
}

Expand Down
34 changes: 34 additions & 0 deletions flytestdlib/cache/in_memory_auto_refresh_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,40 @@ func TestCacheFour(t *testing.T) {
assert.NoError(t, err)
}

// Cache should be processing
assert.NotEmpty(t, cache.processing)

assert.EventuallyWithT(t, func(c *assert.CollectT) {
// trigger periodic sync
fakeClock.Step(testResyncPeriod)

for i := 1; i <= 10; i++ {
item, err := cache.Get(fmt.Sprintf("%d", i))
assert.NoError(c, err)
assert.Equal(c, 10, item.(fakeCacheItem).val)
}
}, 3*time.Second, time.Millisecond)
cancel()
})

t.Run("disable sync on create", func(t *testing.T) {
cache, err := NewInMemoryAutoRefresh("fake1", syncFakeItem, rateLimiter, testResyncPeriod, 10, 10, promutils.NewTestScope(), WithClock(fakeClock), WithSyncOnCreate(false))
assert.NoError(t, err)

ctx, cancel := context.WithCancel(context.Background())
assert.NoError(t, cache.Start(ctx))

// Create ten items in the cache
for i := 1; i <= 10; i++ {
_, err := cache.GetOrCreate(fmt.Sprintf("%d", i), fakeCacheItem{
val: 0,
})
assert.NoError(t, err)
}

// Validate that none are processing since they aren't synced on creation
assert.Empty(t, cache.processing)

assert.EventuallyWithT(t, func(c *assert.CollectT) {
// trigger periodic sync
fakeClock.Step(testResyncPeriod)
Expand Down

0 comments on commit 8125ae1

Please sign in to comment.