From a67db029b44e08e3ce978b4e57ae7b86f369f1a1 Mon Sep 17 00:00:00 2001 From: Victor Conner Date: Mon, 30 Dec 2024 10:29:09 +0100 Subject: [PATCH] WIP --- buffer_test.go | 18 ++-- cache.go | 23 +++--- distribution_test.go | 3 +- examples/batch/main.go | 4 +- examples/buffering/main.go | 4 +- .../distributed-early-refreshes/client.go | 9 +- examples/distribution/client.go | 9 +- examples/generics/main.go | 4 +- examples/missing/main.go | 4 +- examples/permutations/main.go | 4 +- examples/refreshes/main.go | 4 +- fetch.go | 82 +++++++++++++++---- fetch_test.go | 34 +++++--- metrics.go | 16 ++-- options.go | 11 ++- options_test.go | 22 ++++- passthrough_test.go | 2 +- refresh.go | 2 +- shard.go | 43 ++++++---- sturdyc_test.go | 29 ++++--- 20 files changed, 224 insertions(+), 103 deletions(-) diff --git a/buffer_test.go b/buffer_test.go index 3aa6c4e..071ca7b 100644 --- a/buffer_test.go +++ b/buffer_test.go @@ -20,6 +20,7 @@ func TestBatchIsRefreshedWhenTheTimeoutExpires(t *testing.T) { evictionPercentage := 10 minRefreshDelay := time.Minute * 5 maxRefreshDelay := time.Minute * 10 + synchronousRefreshDelay := time.Minute * 30 refreshRetryInterval := time.Millisecond * 10 batchSize := 10 batchBufferTimeout := time.Minute @@ -34,7 +35,7 @@ func TestBatchIsRefreshedWhenTheTimeoutExpires(t *testing.T) { // 2. The 'batchBufferTimeout' threshold is exceeded. client := sturdyc.New[string](capacity, numShards, ttl, evictionPercentage, sturdyc.WithNoContinuousEvictions(), - sturdyc.WithEarlyRefreshes(minRefreshDelay, maxRefreshDelay, refreshRetryInterval), + sturdyc.WithEarlyRefreshes(minRefreshDelay, maxRefreshDelay, synchronousRefreshDelay, refreshRetryInterval), sturdyc.WithMissingRecordStorage(), sturdyc.WithRefreshCoalescing(batchSize, batchBufferTimeout), sturdyc.WithClock(clock), @@ -86,6 +87,7 @@ func TestBatchIsRefreshedWhenTheBufferSizeIsReached(t *testing.T) { ttl := time.Hour minRefreshDelay := time.Minute * 5 maxRefreshDelay := time.Minute * 10 + synchronousRefreshDelay := time.Minute * 30 refreshRetryInterval := time.Millisecond * 10 batchSize := 10 batchBufferTimeout := time.Minute @@ -100,7 +102,7 @@ func TestBatchIsRefreshedWhenTheBufferSizeIsReached(t *testing.T) { // 2. The 'batchBufferTimeout' threshold is exceeded. client := sturdyc.New[string](capacity, numShards, ttl, evictionPercentage, sturdyc.WithNoContinuousEvictions(), - sturdyc.WithEarlyRefreshes(minRefreshDelay, maxRefreshDelay, refreshRetryInterval), + sturdyc.WithEarlyRefreshes(minRefreshDelay, maxRefreshDelay, synchronousRefreshDelay, refreshRetryInterval), sturdyc.WithMissingRecordStorage(), sturdyc.WithRefreshCoalescing(batchSize, batchBufferTimeout), sturdyc.WithClock(clock), @@ -180,6 +182,7 @@ func TestBatchIsNotRefreshedByDuplicates(t *testing.T) { evictionPercentage := 10 minRefreshDelay := time.Minute * 5 maxRefreshDelay := time.Minute * 10 + synchronousRefreshDelay := time.Minute * 30 refreshRetryInterval := time.Millisecond * 10 batchSize := 10 batchBufferTimeout := time.Minute @@ -194,7 +197,7 @@ func TestBatchIsNotRefreshedByDuplicates(t *testing.T) { // 2. The 'batchBufferTimeout' threshold is exceeded. client := sturdyc.New[string](capacity, numShards, ttl, evictionPercentage, sturdyc.WithNoContinuousEvictions(), - sturdyc.WithEarlyRefreshes(minRefreshDelay, maxRefreshDelay, refreshRetryInterval), + sturdyc.WithEarlyRefreshes(minRefreshDelay, maxRefreshDelay, synchronousRefreshDelay, refreshRetryInterval), sturdyc.WithMissingRecordStorage(), sturdyc.WithRefreshCoalescing(batchSize, batchBufferTimeout), sturdyc.WithClock(clock), @@ -250,6 +253,7 @@ func TestBatchesAreGroupedByPermutations(t *testing.T) { evictionPercentage := 15 minRefreshDelay := time.Minute * 5 maxRefreshDelay := time.Minute * 10 + synchronousRefreshDelay := time.Minute * 30 refreshRetryInterval := time.Millisecond * 10 batchSize := 5 batchBufferTimeout := time.Minute @@ -264,7 +268,7 @@ func TestBatchesAreGroupedByPermutations(t *testing.T) { // 2. The 'batchBufferTimeout' threshold is exceeded. c := sturdyc.New[any](capacity, numShards, ttl, evictionPercentage, sturdyc.WithNoContinuousEvictions(), - sturdyc.WithEarlyRefreshes(minRefreshDelay, maxRefreshDelay, refreshRetryInterval), + sturdyc.WithEarlyRefreshes(minRefreshDelay, maxRefreshDelay, synchronousRefreshDelay, refreshRetryInterval), sturdyc.WithMissingRecordStorage(), sturdyc.WithRefreshCoalescing(batchSize, batchBufferTimeout), sturdyc.WithClock(clock), @@ -339,6 +343,7 @@ func TestLargeBatchesAreChunkedCorrectly(t *testing.T) { evictionPercentage := 23 minRefreshDelay := time.Minute * 5 maxRefreshDelay := time.Minute * 10 + synchronousRefreshDelay := time.Minute * 30 refreshRetryInterval := time.Millisecond * 10 batchSize := 5 batchBufferTimeout := time.Minute @@ -353,7 +358,7 @@ func TestLargeBatchesAreChunkedCorrectly(t *testing.T) { // 2. The 'batchBufferTimeout' threshold is exceeded. client := sturdyc.New[string](capacity, numShards, ttl, evictionPercentage, sturdyc.WithNoContinuousEvictions(), - sturdyc.WithEarlyRefreshes(minRefreshDelay, maxRefreshDelay, refreshRetryInterval), + sturdyc.WithEarlyRefreshes(minRefreshDelay, maxRefreshDelay, synchronousRefreshDelay, refreshRetryInterval), sturdyc.WithMissingRecordStorage(), sturdyc.WithRefreshCoalescing(batchSize, batchBufferTimeout), sturdyc.WithClock(clock), @@ -401,6 +406,7 @@ func TestValuesAreUpdatedCorrectly(t *testing.T) { evictionPercentage := 10 minRefreshDelay := time.Minute * 5 maxRefreshDelay := time.Minute * 10 + synchronousRefreshDelay := time.Minute * 50 refreshRetryInterval := time.Millisecond * 10 batchSize := 10 batchBufferTimeout := time.Minute @@ -415,7 +421,7 @@ func TestValuesAreUpdatedCorrectly(t *testing.T) { // 2. The 'batchBufferTimeout' threshold is exceeded. client := sturdyc.New[any](capacity, numShards, ttl, evictionPercentage, sturdyc.WithNoContinuousEvictions(), - sturdyc.WithEarlyRefreshes(minRefreshDelay, maxRefreshDelay, refreshRetryInterval), + sturdyc.WithEarlyRefreshes(minRefreshDelay, maxRefreshDelay, synchronousRefreshDelay, refreshRetryInterval), sturdyc.WithMissingRecordStorage(), sturdyc.WithRefreshCoalescing(batchSize, batchBufferTimeout), sturdyc.WithClock(clock), diff --git a/cache.go b/cache.go index 239b71c..e63a5e5 100644 --- a/cache.go +++ b/cache.go @@ -29,11 +29,12 @@ type Config struct { metricsRecorder DistributedMetricsRecorder log Logger - refreshInBackground bool - minRefreshTime time.Duration - maxRefreshTime time.Duration - retryBaseDelay time.Duration - storeMissingRecords bool + earlyRefreshes bool + minRefreshTime time.Duration + maxRefreshTime time.Duration + synchronousRefreshTime time.Duration + retryBaseDelay time.Duration + storeMissingRecords bool bufferRefreshes bool batchMutex sync.Mutex @@ -127,11 +128,11 @@ func (c *Client[T]) getShard(key string) *shard[T] { // getWithState retrieves a single value from the cache and returns additional // information about the state of the record. The state includes whether the record // exists, if it has been marked as missing, and if it is due for a refresh. -func (c *Client[T]) getWithState(key string) (value T, exists, markedAsMissing, refresh bool) { +func (c *Client[T]) getWithState(key string) (value T, exists, markedAsMissing, backgroundRefresh, synchronousRefresh bool) { shard := c.getShard(key) - val, exists, markedAsMissing, refresh := shard.get(key) - c.reportCacheHits(exists, markedAsMissing, refresh) - return val, exists, markedAsMissing, refresh + val, exists, markedAsMissing, backgroundRefresh, synchronousRefresh := shard.get(key) + c.reportCacheHits(exists, markedAsMissing, backgroundRefresh, synchronousRefresh) + return val, exists, markedAsMissing, backgroundRefresh, synchronousRefresh } // Get retrieves a single value from the cache. @@ -145,8 +146,8 @@ func (c *Client[T]) getWithState(key string) (value T, exists, markedAsMissing, // The value corresponding to the key and a boolean indicating if the value was found. func (c *Client[T]) Get(key string) (T, bool) { shard := c.getShard(key) - val, ok, markedAsMissing, refresh := shard.get(key) - c.reportCacheHits(ok, markedAsMissing, refresh) + val, ok, markedAsMissing, backgroundRefresh, synchronousRefresh := shard.get(key) + c.reportCacheHits(ok, markedAsMissing, backgroundRefresh, synchronousRefresh) return val, ok && !markedAsMissing } diff --git a/distribution_test.go b/distribution_test.go index a9fd468..49fe989 100644 --- a/distribution_test.go +++ b/distribution_test.go @@ -779,6 +779,7 @@ func TestPartialResponseForRefreshesDoesNotResultInMissingRecords(t *testing.T) ttl := time.Hour minRefreshDelay := time.Minute * 5 maxRefreshDelay := time.Minute * 10 + synchronousRefreshDelay := time.Minute * 30 refreshRetryInterval := time.Millisecond * 10 batchSize := 10 batchBufferTimeout := time.Minute @@ -788,7 +789,7 @@ func TestPartialResponseForRefreshesDoesNotResultInMissingRecords(t *testing.T) c := sturdyc.New[string](capacity, numShards, ttl, evictionPercentage, sturdyc.WithNoContinuousEvictions(), - sturdyc.WithEarlyRefreshes(minRefreshDelay, maxRefreshDelay, refreshRetryInterval), + sturdyc.WithEarlyRefreshes(minRefreshDelay, maxRefreshDelay, synchronousRefreshDelay, refreshRetryInterval), sturdyc.WithMissingRecordStorage(), sturdyc.WithRefreshCoalescing(batchSize, batchBufferTimeout), sturdyc.WithDistributedStorageEarlyRefreshes(distributedStorage, refreshAfter), diff --git a/examples/batch/main.go b/examples/batch/main.go index 7f68cfe..79ca1c6 100644 --- a/examples/batch/main.go +++ b/examples/batch/main.go @@ -54,12 +54,14 @@ func main() { // used to spread out the refreshes for entries evenly over time. minRefreshDelay := time.Second maxRefreshDelay := time.Second * 2 + // Set a synchronous refresh delay for when we want a refresh to happen synchronously. + synchronousRefreshDelay := time.Second * 30 // The base for exponential backoff when retrying a refresh. retryBaseDelay := time.Millisecond * 10 // Create a cache client with the specified configuration. cacheClient := sturdyc.New[string](capacity, numShards, ttl, evictionPercentage, - sturdyc.WithEarlyRefreshes(minRefreshDelay, maxRefreshDelay, retryBaseDelay), + sturdyc.WithEarlyRefreshes(minRefreshDelay, maxRefreshDelay, synchronousRefreshDelay, retryBaseDelay), ) // Create a new API instance with the cache client. diff --git a/examples/buffering/main.go b/examples/buffering/main.go index 98ad1ee..25c2d67 100644 --- a/examples/buffering/main.go +++ b/examples/buffering/main.go @@ -56,6 +56,8 @@ func main() { // used to spread out the refreshes for entries evenly over time. minRefreshDelay := time.Second maxRefreshDelay := time.Second * 2 + // Set a synchronous refresh delay for when we want a refresh to happen synchronously. + synchronousRefreshDelay := time.Second * 30 // The base for exponential backoff when retrying a refresh. retryBaseDelay := time.Millisecond * 10 // Whether to store misses in the sturdyc. This can be useful to @@ -68,7 +70,7 @@ func main() { // Create a new cache client with the specified configuration. cacheClient := sturdyc.New[string](capacity, numShards, ttl, evictionPercentage, - sturdyc.WithEarlyRefreshes(minRefreshDelay, maxRefreshDelay, retryBaseDelay), + sturdyc.WithEarlyRefreshes(minRefreshDelay, maxRefreshDelay, synchronousRefreshDelay, retryBaseDelay), sturdyc.WithRefreshCoalescing(batchSize, batchBufferTimeout), ) diff --git a/examples/distributed-early-refreshes/client.go b/examples/distributed-early-refreshes/client.go index dbda4c3..dadbef8 100644 --- a/examples/distributed-early-refreshes/client.go +++ b/examples/distributed-early-refreshes/client.go @@ -19,9 +19,10 @@ const ( // Configuration for the early in-memory refreshes. const ( - minRefreshTime = 2 * time.Second - maxRefreshTime = 4 * time.Second - retryBaseDelay = 5 * time.Second + minRefreshTime = 2 * time.Second + maxRefreshTime = 4 * time.Second + synchronousRefreshTime = 30 * time.Second + retryBaseDelay = 5 * time.Second ) // Configuration for the refresh coalescing. @@ -36,7 +37,7 @@ const refreshAfter = time.Second func newAPIClient(distributedStorage sturdyc.DistributedStorageWithDeletions) *apiClient { return &apiClient{ cache: sturdyc.New[any](capacity, numberOfShards, ttl, percentageOfRecordsToEvictWhenFull, - sturdyc.WithEarlyRefreshes(minRefreshTime, maxRefreshTime, retryBaseDelay), + sturdyc.WithEarlyRefreshes(minRefreshTime, maxRefreshTime, synchronousRefreshTime, retryBaseDelay), sturdyc.WithRefreshCoalescing(idealBufferSize, bufferTimeout), sturdyc.WithDistributedStorageEarlyRefreshes(distributedStorage, refreshAfter), // NOTE: Uncommenting this line will make the cache mark the records as diff --git a/examples/distribution/client.go b/examples/distribution/client.go index 0753941..0c348eb 100644 --- a/examples/distribution/client.go +++ b/examples/distribution/client.go @@ -22,9 +22,10 @@ const ( // Configuration for the early in-memory refreshes. const ( - minRefreshTime = 100 * time.Millisecond - maxRefreshTime = 500 * time.Millisecond - retryBaseDelay = time.Second + minRefreshTime = 100 * time.Millisecond + maxRefreshTime = 500 * time.Millisecond + synchronousRefreshTime = 30 * time.Second + retryBaseDelay = time.Second ) // Configuration for the refresh coalescing. @@ -37,7 +38,7 @@ func newAPIClient(distributedStorage sturdyc.DistributedStorage) *apiClient { return &apiClient{ cache: sturdyc.New[any](capacity, numberOfShards, ttl, percentageOfRecordsToEvictWhenFull, sturdyc.WithMissingRecordStorage(), - sturdyc.WithEarlyRefreshes(minRefreshTime, maxRefreshTime, retryBaseDelay), + sturdyc.WithEarlyRefreshes(minRefreshTime, maxRefreshTime, synchronousRefreshTime, retryBaseDelay), sturdyc.WithRefreshCoalescing(idealBufferSize, bufferTimeout), sturdyc.WithDistributedStorage(distributedStorage), ), diff --git a/examples/generics/main.go b/examples/generics/main.go index 6d98565..a683d77 100644 --- a/examples/generics/main.go +++ b/examples/generics/main.go @@ -54,12 +54,14 @@ func main() { // used to spread out the refreshes for entries evenly over time. minRefreshDelay := time.Second maxRefreshDelay := time.Second * 2 + // Set a synchronous refresh delay for when we want a refresh to happen synchronously. + synchronousRefreshDelay := time.Second * 30 // The base for exponential backoff when retrying a refresh. retryBaseDelay := time.Millisecond * 10 // Create a new cache client with the specified configuration. cacheClient := sturdyc.New[any](capacity, numShards, ttl, evictionPercentage, - sturdyc.WithEarlyRefreshes(minRefreshDelay, maxRefreshDelay, retryBaseDelay), + sturdyc.WithEarlyRefreshes(minRefreshDelay, maxRefreshDelay, synchronousRefreshDelay, retryBaseDelay), sturdyc.WithRefreshCoalescing(10, time.Second*15), ) diff --git a/examples/missing/main.go b/examples/missing/main.go index 6347ed8..dcdb042 100644 --- a/examples/missing/main.go +++ b/examples/missing/main.go @@ -51,12 +51,14 @@ func main() { // used to spread out the refreshes for entries evenly over time. minRefreshDelay := time.Millisecond * 10 maxRefreshDelay := time.Millisecond * 30 + // Set a synchronous refresh delay for when we want a refresh to happen synchronously. + synchronousRefreshDelay := time.Second * 30 // The base for exponential backoff when retrying a refresh. retryBaseDelay := time.Millisecond * 10 // Create a cache client with the specified configuration. cacheClient := sturdyc.New[string](capacity, numShards, ttl, evictionPercentage, - sturdyc.WithEarlyRefreshes(minRefreshDelay, maxRefreshDelay, retryBaseDelay), + sturdyc.WithEarlyRefreshes(minRefreshDelay, maxRefreshDelay, synchronousRefreshDelay, retryBaseDelay), sturdyc.WithMissingRecordStorage(), ) diff --git a/examples/permutations/main.go b/examples/permutations/main.go index 42c0153..dae93ea 100644 --- a/examples/permutations/main.go +++ b/examples/permutations/main.go @@ -56,12 +56,14 @@ func main() { // used to spread out the refreshes for entries evenly over time. minRefreshDelay := time.Second maxRefreshDelay := time.Second * 2 + // Set a synchronous refresh delay for when we want a refresh to happen synchronously. + synchronousRefreshDelay := time.Second * 30 // The base for exponential backoff when retrying a refresh. retryBaseDelay := time.Millisecond * 10 // Create a new cache client with the specified configuration. cacheClient := sturdyc.New[string](capacity, numShards, ttl, evictionPercentage, - sturdyc.WithEarlyRefreshes(minRefreshDelay, maxRefreshDelay, retryBaseDelay), + sturdyc.WithEarlyRefreshes(minRefreshDelay, maxRefreshDelay, synchronousRefreshDelay, retryBaseDelay), ) // We will fetch these IDs using various option sets, meaning that the ID alone diff --git a/examples/refreshes/main.go b/examples/refreshes/main.go index ed47b8d..19f7722 100644 --- a/examples/refreshes/main.go +++ b/examples/refreshes/main.go @@ -49,6 +49,8 @@ func main() { // We don't want our outgoing requests graph to look like a comb. minRefreshDelay := time.Millisecond * 10 maxRefreshDelay := time.Millisecond * 30 + // Set a synchronous refresh delay for when we want a refresh to happen synchronously. + synchronousRefreshDelay := time.Second * 30 // The base used for exponential backoff when retrying a refresh. Most of the // time, we perform refreshes well in advance of the records expiry time. // Hence, we can use this to make it easier for a system that is having @@ -60,7 +62,7 @@ func main() { // Create a cache client with the specified configuration. cacheClient := sturdyc.New[string](capacity, numShards, ttl, evictionPercentage, - sturdyc.WithEarlyRefreshes(minRefreshDelay, maxRefreshDelay, retryBaseDelay), + sturdyc.WithEarlyRefreshes(minRefreshDelay, maxRefreshDelay, synchronousRefreshDelay, retryBaseDelay), ) // Create a new API instance with the cache client. diff --git a/fetch.go b/fetch.go index 2ce31ca..f9b9764 100644 --- a/fetch.go +++ b/fetch.go @@ -6,24 +6,30 @@ import ( "maps" ) -func (c *Client[T]) groupIDs(ids []string, keyFn KeyFn) (hits map[string]T, misses, refreshes []string) { +func (c *Client[T]) groupIDs(ids []string, keyFn KeyFn) (hits map[string]T, misses, backgroundRefreshes, synchronousRefreshes []string) { hits = make(map[string]T) misses = make([]string, 0) - refreshes = make([]string, 0) + backgroundRefreshes = make([]string, 0) + synchronousRefreshes = make([]string, 0) for _, id := range ids { key := keyFn(id) - value, exists, markedAsMissing, shouldRefresh := c.getWithState(key) + value, exists, markedAsMissing, backgroundRefresh, synchronousRefresh := c.getWithState(key) + + if synchronousRefresh { + synchronousRefreshes = append(synchronousRefreshes, id) + } // Check if the record should be refreshed in the background. - if shouldRefresh { - refreshes = append(refreshes, id) + if backgroundRefresh && !synchronousRefresh { + backgroundRefreshes = append(backgroundRefreshes, id) } if markedAsMissing { continue } + // If the record should be synchronously refreshed, it's going to be added to both the hits and misses maps. if !exists { misses = append(misses, id) continue @@ -31,16 +37,36 @@ func (c *Client[T]) groupIDs(ids []string, keyFn KeyFn) (hits map[string]T, miss hits[id] = value } - return hits, misses, refreshes + return hits, misses, backgroundRefreshes, synchronousRefreshes } func getFetch[V, T any](ctx context.Context, c *Client[T], key string, fetchFn FetchFn[V]) (T, error) { + value, ok, markedAsMissing, backgroundRefresh, synchronousRefresh := c.getWithState(key) wrappedFetch := wrap[T](distributedFetch(c, key, fetchFn)) - // Begin by checking if we have the item in our cache. - value, ok, markedAsMissing, shouldRefresh := c.getWithState(key) + if synchronousRefresh { + res, err := callAndCache(ctx, c, key, wrappedFetch) + // Check if the record has been deleted at the source. If it has, we'll + // delete it from the cache too. NOTE: The callAndCache function converts + // ErrNotFound to ErrMissingRecord. + if ok && !markedAsMissing && errors.Is(err, ErrMissingRecord) { + c.Delete(key) + } + + if errors.Is(err, ErrMissingRecord) { + return res, err + } + + // If the call to synchrounously refresh the record failed, + // we'll return the latest value if we have it in the cache. + if err != nil && ok { + return value, nil + } - if shouldRefresh { + return res, err + } + + if backgroundRefresh { c.safeGo(func() { c.refresh(key, wrappedFetch) }) @@ -99,26 +125,50 @@ func GetOrFetch[V, T any](ctx context.Context, c *Client[T], key string, fetchFn func getFetchBatch[V, T any](ctx context.Context, c *Client[T], ids []string, keyFn KeyFn, fetchFn BatchFetchFn[V]) (map[string]T, error) { wrappedFetch := wrapBatch[T](distributedBatchFetch[V, T](c, keyFn, fetchFn)) - cachedRecords, cacheMisses, idsToRefresh := c.groupIDs(ids, keyFn) + cachedRecords, cacheMisses, idsToBackgroundRefresh, idsToSynchronouslyRefresh := c.groupIDs(ids, keyFn) - // If any records need to be refreshed, we'll do so in the background. - if len(idsToRefresh) > 0 { + // Schedule background refreshes. + if len(idsToBackgroundRefresh) > 0 { c.safeGo(func() { if c.bufferRefreshes { - bufferBatchRefresh(c, idsToRefresh, keyFn, wrappedFetch) + bufferBatchRefresh(c, idsToBackgroundRefresh, keyFn, wrappedFetch) return } - c.refreshBatch(idsToRefresh, keyFn, wrappedFetch) + c.refreshBatch(idsToBackgroundRefresh, keyFn, wrappedFetch) }) } // If we were able to retrieve all records from the cache, we can return them straight away. - if len(cacheMisses) == 0 { + if len(cacheMisses) == 0 && len(idsToSynchronouslyRefresh) == 0 { return cachedRecords, nil } - callBatchOpts := callBatchOpts[T, T]{ids: cacheMisses, keyFn: keyFn, fn: wrappedFetch} + // Create a list of the IDs that we're going to fetch from the underlying data source or distributed storage. + cacheMissesAndSyncRefreshes := make([]string, 0, len(cacheMisses)+len(idsToSynchronouslyRefresh)) + cacheMissesAndSyncRefreshes = append(cacheMissesAndSyncRefreshes, cacheMisses...) + cacheMissesAndSyncRefreshes = append(cacheMissesAndSyncRefreshes, idsToSynchronouslyRefresh...) + + callBatchOpts := callBatchOpts[T, T]{ids: cacheMissesAndSyncRefreshes, keyFn: keyFn, fn: wrappedFetch} response, err := callAndCacheBatch(ctx, c, callBatchOpts) + + // If we did a call to synchronously refresh some of the records, and it + // didn't fail, we'll have to check if any of the IDs have been deleted at + // the underlying data source. If they have, we'll have to delete them from + // the cache and remove them from the cachedRecords map so that we don't + // return them. + if err == nil && len(idsToSynchronouslyRefresh) > 0 { + for _, id := range idsToSynchronouslyRefresh { + // If we have it in the cache, but not in the response, it means + // that the ID no longer exists at the underlying data source. + _, okResponse := response[id] + _, okCache := cachedRecords[id] + if okCache && !okResponse { + delete(cachedRecords, id) + c.Delete(keyFn(id)) + } + } + } + if err != nil && !errors.Is(err, ErrOnlyCachedRecords) { if len(cachedRecords) > 0 { return cachedRecords, ErrOnlyCachedRecords diff --git a/fetch_test.go b/fetch_test.go index 32f4a32..9c60126 100644 --- a/fetch_test.go +++ b/fetch_test.go @@ -61,12 +61,13 @@ func TestGetOrFetchStampedeProtection(t *testing.T) { clock := sturdyc.NewTestClock(time.Now()) minRefreshDelay := time.Millisecond * 500 maxRefreshDelay := time.Millisecond * 500 + synchronousRefreshDelay := time.Second refreshRetryInterval := time.Millisecond * 10 // The cache is going to have a 2 second TTL, and the first refresh should happen within a second. c := sturdyc.New[string](capacity, numShards, ttl, evictionPercentage, sturdyc.WithNoContinuousEvictions(), - sturdyc.WithEarlyRefreshes(minRefreshDelay, maxRefreshDelay, refreshRetryInterval), + sturdyc.WithEarlyRefreshes(minRefreshDelay, maxRefreshDelay, synchronousRefreshDelay, refreshRetryInterval), sturdyc.WithMissingRecordStorage(), sturdyc.WithClock(clock), ) @@ -112,12 +113,13 @@ func TestGetOrFetchRefreshRetries(t *testing.T) { evictionPercentage := 10 minRefreshDelay := time.Second maxRefreshDelay := time.Second * 2 + synchronousRefreshDelay := time.Second * 10 retryInterval := time.Millisecond * 10 clock := sturdyc.NewTestClock(time.Now()) c := sturdyc.New[string](capacity, numShards, ttl, evictionPercentage, sturdyc.WithNoContinuousEvictions(), - sturdyc.WithEarlyRefreshes(minRefreshDelay, maxRefreshDelay, retryInterval), + sturdyc.WithEarlyRefreshes(minRefreshDelay, maxRefreshDelay, synchronousRefreshDelay, retryInterval), sturdyc.WithMissingRecordStorage(), sturdyc.WithClock(clock), ) @@ -167,12 +169,13 @@ func TestGetOrFetchMissingRecord(t *testing.T) { evictionPercentage := 20 minRefreshDelay := time.Second maxRefreshDelay := time.Second * 2 + synchronousRefreshDelay := time.Second * 10 retryInterval := time.Millisecond * 10 clock := sturdyc.NewTestClock(time.Now()) c := sturdyc.New[string](capacity, numShards, ttl, evictionPercentage, sturdyc.WithNoContinuousEvictions(), sturdyc.WithClock(clock), - sturdyc.WithEarlyRefreshes(minRefreshDelay, maxRefreshDelay, retryInterval), + sturdyc.WithEarlyRefreshes(minRefreshDelay, maxRefreshDelay, synchronousRefreshDelay, retryInterval), sturdyc.WithMissingRecordStorage(), ) @@ -266,13 +269,14 @@ func TestBatchGetOrFetchNilMapMissingRecords(t *testing.T) { numShards := 1 ttl := time.Minute evictionPercentage := 50 - minRefreshDelay := time.Minute - maxRefreshDelay := time.Minute * 2 + minRefreshDelay := time.Second + maxRefreshDelay := time.Second * 2 + synchronousRefreshDelay := time.Second * 10 retryInterval := time.Second clock := sturdyc.NewTestClock(time.Now()) c := sturdyc.New[string](capacity, numShards, ttl, evictionPercentage, sturdyc.WithNoContinuousEvictions(), - sturdyc.WithEarlyRefreshes(minRefreshDelay, maxRefreshDelay, retryInterval), + sturdyc.WithEarlyRefreshes(minRefreshDelay, maxRefreshDelay, synchronousRefreshDelay, retryInterval), sturdyc.WithMissingRecordStorage(), sturdyc.WithClock(clock), ) @@ -316,11 +320,12 @@ func TestGetOrFetchBatchRetries(t *testing.T) { evictionPercentage := 10 minRefreshDelay := time.Hour maxRefreshDelay := time.Hour * 2 + synchronousRefreshDelay := time.Hour * 4 retryInterval := time.Second clock := sturdyc.NewTestClock(time.Now()) c := sturdyc.New[string](capacity, numShards, ttl, evictionPercentage, sturdyc.WithNoContinuousEvictions(), - sturdyc.WithEarlyRefreshes(minRefreshDelay, maxRefreshDelay, retryInterval), + sturdyc.WithEarlyRefreshes(minRefreshDelay, maxRefreshDelay, synchronousRefreshDelay, retryInterval), sturdyc.WithMissingRecordStorage(), sturdyc.WithClock(clock), ) @@ -428,10 +433,11 @@ func TestGetOrFetchBatchStampedeProtection(t *testing.T) { clock := sturdyc.NewTestClock(time.Now()) minRefreshDelay := time.Millisecond * 500 maxRefreshDelay := time.Millisecond * 1000 + synchronousRefreshDelay := time.Millisecond * 1500 refreshRetryInterval := time.Millisecond * 10 c := sturdyc.New[string](capacity, shards, ttl, evictionPercentage, sturdyc.WithNoContinuousEvictions(), - sturdyc.WithEarlyRefreshes(minRefreshDelay, maxRefreshDelay, refreshRetryInterval), + sturdyc.WithEarlyRefreshes(minRefreshDelay, maxRefreshDelay, synchronousRefreshDelay, refreshRetryInterval), sturdyc.WithMissingRecordStorage(), sturdyc.WithClock(clock), sturdyc.WithMetrics(newTestMetricsRecorder(shards)), @@ -502,11 +508,12 @@ func TestGetOrFetchDeletesRecordsThatHaveBeenRemovedAtTheSource(t *testing.T) { clock := sturdyc.NewTestClock(time.Now()) minRefreshDelay := time.Millisecond * 500 maxRefreshDelay := time.Second + synchronousRefreshDelay := time.Minute refreshRetryInterval := time.Millisecond * 10 c := sturdyc.New[string](capacity, numShards, ttl, evictionPercentage, sturdyc.WithNoContinuousEvictions(), - sturdyc.WithEarlyRefreshes(minRefreshDelay, maxRefreshDelay, refreshRetryInterval), + sturdyc.WithEarlyRefreshes(minRefreshDelay, maxRefreshDelay, synchronousRefreshDelay, refreshRetryInterval), sturdyc.WithClock(clock), ) @@ -548,11 +555,12 @@ func TestGetOrFetchConvertsDeletedRecordsToMissingRecords(t *testing.T) { clock := sturdyc.NewTestClock(time.Now()) minRefreshDelay := time.Millisecond * 500 maxRefreshDelay := time.Second + synchronousRefreshDelay := time.Minute refreshRetryInterval := time.Millisecond * 10 c := sturdyc.New[string](capacity, numShards, ttl, evictionPercentage, sturdyc.WithNoContinuousEvictions(), - sturdyc.WithEarlyRefreshes(minRefreshDelay, maxRefreshDelay, refreshRetryInterval), + sturdyc.WithEarlyRefreshes(minRefreshDelay, maxRefreshDelay, synchronousRefreshDelay, refreshRetryInterval), sturdyc.WithMissingRecordStorage(), sturdyc.WithClock(clock), ) @@ -601,11 +609,12 @@ func TestGetOrFetchBatchDeletesRecordsThatHaveBeenRemovedAtTheSource(t *testing. clock := sturdyc.NewTestClock(time.Now()) minRefreshDelay := time.Millisecond * 500 maxRefreshDelay := time.Second + synchronousRefreshDelay := time.Minute refreshRetryInterval := time.Millisecond * 10 c := sturdyc.New[string](capacity, numShards, ttl, evictionPercentage, sturdyc.WithNoContinuousEvictions(), - sturdyc.WithEarlyRefreshes(minRefreshDelay, maxRefreshDelay, refreshRetryInterval), + sturdyc.WithEarlyRefreshes(minRefreshDelay, maxRefreshDelay, synchronousRefreshDelay, refreshRetryInterval), sturdyc.WithClock(clock), ) @@ -648,11 +657,12 @@ func TestGetFetchBatchConvertsDeletedRecordsToMissingRecords(t *testing.T) { clock := sturdyc.NewTestClock(time.Now()) minRefreshDelay := time.Millisecond * 500 maxRefreshDelay := time.Second + synchronousRefreshDelay := time.Minute refreshRetryInterval := time.Millisecond * 10 c := sturdyc.New[string](capacity, numShards, ttl, evictionPercentage, sturdyc.WithNoContinuousEvictions(), - sturdyc.WithEarlyRefreshes(minRefreshDelay, maxRefreshDelay, refreshRetryInterval), + sturdyc.WithEarlyRefreshes(minRefreshDelay, maxRefreshDelay, synchronousRefreshDelay, refreshRetryInterval), sturdyc.WithMissingRecordStorage(), sturdyc.WithClock(clock), ) diff --git a/metrics.go b/metrics.go index 55ea6c5..10dbb14 100644 --- a/metrics.go +++ b/metrics.go @@ -6,7 +6,9 @@ type MetricsRecorder interface { // CacheMiss is called for every key that results in a cache miss. CacheMiss() // Refresh is called when a get operation results in a refresh. - Refresh() + BackgroundRefresh() + // SynchronousRefresh is called when a get operation results in a synchronous refresh. + SynchronousRefresh() // MissingRecord is called every time the cache is asked to // look up a key which has been marked as missing. MissingRecord() @@ -31,7 +33,7 @@ type DistributedMetricsRecorder interface { DistributedCacheMiss() // DistributedRefresh is called when we retrieve a record from // the distributed storage that should be refreshed. - DistributedRefresh() + DistributedRefresh() // TODO: Should this be renamed to DistributedBackgroundRefresh? And should we add DistributedSynchronousRefresh? // DistributedMissingRecord is called when we retrieve a record from the // distributed storage that has been marked as a missing record. DistributedMissingRecord() @@ -71,7 +73,7 @@ func (s *shard[T]) reportEntriesEvicted(n int) { } // reportCacheHits is used to report cache hits and misses to the metrics recorder. -func (c *Client[T]) reportCacheHits(cacheHit, missingRecord, refresh bool) { +func (c *Client[T]) reportCacheHits(cacheHit, missingRecord, backgroundRefresh, synchronousRefresh bool) { if c.metricsRecorder == nil { return } @@ -80,8 +82,12 @@ func (c *Client[T]) reportCacheHits(cacheHit, missingRecord, refresh bool) { c.metricsRecorder.MissingRecord() } - if refresh { - c.metricsRecorder.Refresh() + if backgroundRefresh { + c.metricsRecorder.BackgroundRefresh() + } + + if synchronousRefresh { + c.metricsRecorder.SynchronousRefresh() } if !cacheHit { diff --git a/options.go b/options.go index 7322f02..9ddbb48 100644 --- a/options.go +++ b/options.go @@ -56,11 +56,12 @@ func WithMissingRecordStorage() Option { // gets scheduled when the key is requested again after a random time between // minRefreshTime and maxRefreshTime. This is an important distinction because // it means that the cache won't just naively refresh every key it's ever seen. -func WithEarlyRefreshes(minRefreshTime, maxRefreshTime, retryBaseDelay time.Duration) Option { +func WithEarlyRefreshes(minRefreshTime, maxRefreshTime, synchronousRefresthTime, retryBaseDelay time.Duration) Option { return func(c *Config) { - c.refreshInBackground = true + c.earlyRefreshes = true c.minRefreshTime = minRefreshTime c.maxRefreshTime = maxRefreshTime + c.synchronousRefreshTime = synchronousRefresthTime c.retryBaseDelay = retryBaseDelay } } @@ -161,7 +162,7 @@ func validateConfig(capacity, numShards int, ttl time.Duration, evictionPercenta panic("evictionPercentage must be between 0 and 100") } - if !cfg.refreshInBackground && cfg.bufferRefreshes { + if !cfg.earlyRefreshes && cfg.bufferRefreshes { panic("refresh buffering requires background refreshes to be enabled") } @@ -181,6 +182,10 @@ func validateConfig(capacity, numShards int, ttl time.Duration, evictionPercenta panic("minRefreshTime must be less than or equal to maxRefreshTime") } + if cfg.maxRefreshTime > cfg.synchronousRefreshTime { + panic("maxRefreshTime must be less than or equal to synchronousRefreshTime") + } + if cfg.retryBaseDelay < 0 { panic("retryBaseDelay must be greater than or equal to 0") } diff --git a/options_test.go b/options_test.go index 9a83aed..dd99bc8 100644 --- a/options_test.go +++ b/options_test.go @@ -91,7 +91,7 @@ func TestPanicsIfTheRefreshBufferSizeIsLessThanOne(t *testing.T) { } }() sturdyc.New[string](100, 10, time.Minute, 5, - sturdyc.WithEarlyRefreshes(time.Minute, time.Hour, time.Second), + sturdyc.WithEarlyRefreshes(time.Minute, time.Hour, time.Hour*2, time.Second), sturdyc.WithRefreshCoalescing(0, time.Minute), ) } @@ -106,7 +106,7 @@ func TestPanicsIfTheRefreshBufferTimeoutIsLessThanOne(t *testing.T) { } }() sturdyc.New[string](100, 10, time.Minute, 5, - sturdyc.WithEarlyRefreshes(time.Minute, time.Hour, time.Second), + sturdyc.WithEarlyRefreshes(time.Minute, time.Hour, time.Hour*2, time.Second), sturdyc.WithRefreshCoalescing(10, 0), ) } @@ -135,7 +135,21 @@ func TestPanicsIfTheMinRefreshTimeIsGreaterThanTheMaxRefreshTime(t *testing.T) { } }() sturdyc.New[string](100, 10, time.Minute, 5, - sturdyc.WithEarlyRefreshes(time.Hour, time.Minute, time.Second), + sturdyc.WithEarlyRefreshes(time.Hour, time.Minute, time.Hour*2, time.Second), + ) +} + +func TestPanicsIfTheBackgroundRefreshTimeIsGreaterThanTheSynchronousRefreshTime(t *testing.T) { + t.Parallel() + + defer func() { + err := recover() + if err == nil { + t.Error("expected a panic when trying to use a greater background refresh time than the synchronous refresh time") + } + }() + sturdyc.New[string](100, 10, time.Minute, 5, + sturdyc.WithEarlyRefreshes(time.Minute, time.Hour*2, time.Hour*1, time.Second), ) } @@ -149,6 +163,6 @@ func TestPanicsIfTheRetryBaseDelayIsLessThanZero(t *testing.T) { } }() sturdyc.New[string](100, 10, time.Minute, 5, - sturdyc.WithEarlyRefreshes(time.Minute, time.Hour, -1), + sturdyc.WithEarlyRefreshes(time.Minute, time.Hour, time.Hour*2, -1), ) } diff --git a/passthrough_test.go b/passthrough_test.go index 20269d7..ea397a1 100644 --- a/passthrough_test.go +++ b/passthrough_test.go @@ -6,8 +6,8 @@ import ( "testing" "time" - "github.com/viccon/sturdyc" "github.com/google/go-cmp/cmp" + "github.com/viccon/sturdyc" ) func TestPassthrough(t *testing.T) { diff --git a/refresh.go b/refresh.go index a4e3eba..7162de9 100644 --- a/refresh.go +++ b/refresh.go @@ -28,7 +28,7 @@ func (c *Client[T]) refreshBatch(ids []string, keyFn KeyFn, fetchFn BatchFetchFn // Check if any of the records have been deleted at the data source. for _, id := range ids { - _, okCache, _, _ := c.getWithState(keyFn(id)) + _, okCache, _, _, _ := c.getWithState(keyFn(id)) _, okResponse := response[id] if okResponse { diff --git a/shard.go b/shard.go index c39c6a5..e9f50c7 100644 --- a/shard.go +++ b/shard.go @@ -8,12 +8,13 @@ import ( // entry represents a single cache entry. type entry[T any] struct { - key string - value T - expiresAt time.Time - refreshAt time.Time - numOfRefreshRetries int - isMissingRecord bool + key string + value T + expiresAt time.Time + backgroundRefreshAt time.Time + synchronousRefreshAt time.Time + numOfRefreshRetries int + isMissingRecord bool } // shard is a thread-safe data structure that holds a subset of the cache entries. @@ -79,7 +80,7 @@ func (s *shard[T]) forceEvict() { s.reportEntriesEvicted(entriesEvicted) } -// get retrieves attempts to retrieve a value from the shard. +// get attempts to retrieve a value from the shard. // // Parameters: // @@ -91,20 +92,25 @@ func (s *shard[T]) forceEvict() { // exists: A boolean indicating if the value exists in the shard. // markedAsMissing: A boolean indicating if the key has been marked as a missing record. // refresh: A boolean indicating if the value should be refreshed in the background. -func (s *shard[T]) get(key string) (val T, exists, markedAsMissing, refresh bool) { +func (s *shard[T]) get(key string) (val T, exists, markedAsMissing, backgroundRefresh, synchronousRefresh bool) { s.RLock() item, ok := s.entries[key] if !ok { s.RUnlock() - return val, false, false, false + return val, false, false, false, false } if s.clock.Now().After(item.expiresAt) { s.RUnlock() - return val, false, false, false + return val, false, false, false, false } - shouldRefresh := s.refreshInBackground && s.clock.Now().After(item.refreshAt) + // Check if the record should be synchronously refreshed. + if s.earlyRefreshes && s.clock.Now().After(item.synchronousRefreshAt) { + return item.value, true, item.isMissingRecord, false, true + } + + shouldRefresh := s.earlyRefreshes && s.clock.Now().After(item.backgroundRefreshAt) if shouldRefresh { // Release the read lock, and switch to a write lock. s.RUnlock() @@ -113,22 +119,22 @@ func (s *shard[T]) get(key string) (val T, exists, markedAsMissing, refresh bool // However, during the time it takes to switch locks, another goroutine // might have acquired it and moved the refreshAt. Therefore, we'll have to // check if this operation should still be performed. - if !s.clock.Now().After(item.refreshAt) { + if !s.clock.Now().After(item.backgroundRefreshAt) { s.Unlock() - return item.value, true, item.isMissingRecord, false + return item.value, true, item.isMissingRecord, false, false } // Update the "refreshAt" so no other goroutines attempts to refresh the same entry. nextRefresh := s.retryBaseDelay * (1 << item.numOfRefreshRetries) - item.refreshAt = s.clock.Now().Add(nextRefresh) + item.backgroundRefreshAt = s.clock.Now().Add(nextRefresh) item.numOfRefreshRetries++ s.Unlock() - return item.value, true, item.isMissingRecord, shouldRefresh + return item.value, true, item.isMissingRecord, shouldRefresh, false } s.RUnlock() - return item.value, true, item.isMissingRecord, false + return item.value, true, item.isMissingRecord, false, false } // set writes a key-value pair to the shard and returns a @@ -158,14 +164,15 @@ func (s *shard[T]) set(key string, value T, isMissingRecord bool) bool { isMissingRecord: isMissingRecord, } - if s.refreshInBackground { + if s.earlyRefreshes { // If there is a difference between the min- and maxRefreshTime we'll use that to // set a random padding so that the refreshes get spread out evenly over time. var padding time.Duration if s.minRefreshTime != s.maxRefreshTime { padding = time.Duration(rand.Int64N(int64(s.maxRefreshTime - s.minRefreshTime))) } - newEntry.refreshAt = now.Add(s.minRefreshTime + padding) + newEntry.backgroundRefreshAt = now.Add(s.minRefreshTime + padding) + newEntry.synchronousRefreshAt = now.Add(s.synchronousRefreshTime) newEntry.numOfRefreshRetries = 0 } diff --git a/sturdyc_test.go b/sturdyc_test.go index 781f323..f5b4103 100644 --- a/sturdyc_test.go +++ b/sturdyc_test.go @@ -24,15 +24,16 @@ func randKey(n int) string { type TestMetricsRecorder struct { sync.Mutex - cacheHits int - cacheMisses int - refreshes int - missingRecords int - evictions int - forcedEvictions int - evictedEntries int - shards map[int]int - batchSizes []int + cacheHits int + cacheMisses int + backgroundRefreshes int + synchronousRefreshes int + missingRecords int + evictions int + forcedEvictions int + evictedEntries int + shards map[int]int + batchSizes []int } func newTestMetricsRecorder(numShards int) *TestMetricsRecorder { @@ -54,10 +55,16 @@ func (r *TestMetricsRecorder) CacheMiss() { r.cacheMisses++ } -func (r *TestMetricsRecorder) Refresh() { +func (r *TestMetricsRecorder) BackgroundRefresh() { r.Lock() defer r.Unlock() - r.refreshes++ + r.backgroundRefreshes++ +} + +func (r *TestMetricsRecorder) SynchronousRefresh() { + r.Lock() + defer r.Unlock() + r.synchronousRefreshes++ } func (r *TestMetricsRecorder) MissingRecord() {