Skip to content

Commit

Permalink
Merge pull request #6 from creativecreature/generics
Browse files Browse the repository at this point in the history
feat: Generics
  • Loading branch information
viccon authored May 7, 2024
2 parents f62d973 + de67e2a commit bd8a0dc
Show file tree
Hide file tree
Showing 29 changed files with 549 additions and 329 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/main.yml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ jobs:
with:
go-version: 1.22.0
- name: Test
run: go test -race -v ./... -coverprofile ./coverage.txt
run: go test -timeout 30s -race -v ./... -coverprofile ./coverage.txt
- name: Upload coverage reports to Codecov
uses: codecov/[email protected]
with:
Expand Down
207 changes: 143 additions & 64 deletions README.md

Large diffs are not rendered by default.

18 changes: 9 additions & 9 deletions benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,17 +14,17 @@ type benchmarkMetric[T any] struct {
evictions int
}

func (b *benchmarkMetric[T]) recordGet(client *sturdyc.Client, key string) {
func (b *benchmarkMetric[T]) recordGet(c *sturdyc.Client[T], key string) {
b.getOps++
_, ok := sturdyc.Get[T](client, key)
_, ok := c.Get(key)
if ok {
b.hits++
}
}

func (b *benchmarkMetric[T]) recordSet(client *sturdyc.Client, key string, value T) {
func (b *benchmarkMetric[T]) recordSet(c *sturdyc.Client[T], key string, value T) {
b.setOps++
evict := sturdyc.Set(client, key, value)
evict := c.Set(key, value)
if evict {
b.evictions++
}
Expand Down Expand Up @@ -56,15 +56,15 @@ func BenchmarkGetConcurrent(b *testing.B) {
numShards := 100
ttl := time.Hour
evictionPercentage := 5
client := sturdyc.New(capacity, numShards, ttl, evictionPercentage)
sturdyc.Set(client, cacheKey, "value")
c := sturdyc.New[string](capacity, numShards, ttl, evictionPercentage)
c.Set(cacheKey, "value")

metrics := make(benchmarkMetrics[string], 0)
b.ResetTimer()
b.RunParallel(func(pb *testing.PB) {
var metric benchmarkMetric[string]
for pb.Next() {
metric.recordGet(client, cacheKey)
metric.recordGet(c, cacheKey)
}
metrics = append(metrics, metric)
})
Expand All @@ -77,7 +77,7 @@ func BenchmarkSetConcurrent(b *testing.B) {
numShards := 10_000
ttl := time.Hour
evictionPercentage := 5
client := sturdyc.New(capacity, numShards, ttl, evictionPercentage)
c := sturdyc.New[string](capacity, numShards, ttl, evictionPercentage)

metrics := make(benchmarkMetrics[string], 0)
b.ResetTimer()
Expand All @@ -86,7 +86,7 @@ func BenchmarkSetConcurrent(b *testing.B) {
for pb.Next() {
// NOTE: The benchmark includes the time for generating random keys.
key := randKey(16)
metric.recordSet(client, key, "value")
metric.recordSet(c, key, "value")
}
metrics = append(metrics, metric)
})
Expand Down
12 changes: 6 additions & 6 deletions buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,20 +3,20 @@ package sturdyc
import "time"

// deleteBuffer should be called WITH a lock when a buffer has been processed.
func deleteBuffer(c *Client, batchIdentifier string) {
func deleteBuffer[T any](c *Client[T], batchIdentifier string) {
delete(c.bufferPermutationChan, batchIdentifier)
delete(c.bufferPermutationIDs, batchIdentifier)
}

// bufferBatchRefresh will buffer the batch of IDs until the batch size is reached or the buffer duration is exceeded.
func bufferBatchRefresh[T any](c *Client, ids []string, keyFn KeyFn, fetchFn BatchFetchFn[T]) {
func bufferBatchRefresh[T any](c *Client[T], ids []string, keyFn KeyFn, fetchFn BatchFetchFn[T]) {
if len(ids) == 0 {
return
}

// If we got a perfect batch size, we can refresh the records immediately.
if len(ids) == c.batchSize {
refreshBatch(c, ids, keyFn, fetchFn)
c.refreshBatch(ids, keyFn, fetchFn)
return
}

Expand All @@ -30,7 +30,7 @@ func bufferBatchRefresh[T any](c *Client, ids []string, keyFn KeyFn, fetchFn Bat

// These IDs are the size we want, so we'll refresh them immediately.
safeGo(func() {
refreshBatch(c, idsToRefresh, keyFn, fetchFn)
c.refreshBatch(idsToRefresh, keyFn, fetchFn)
})

// We'll continue to process the remaining IDs recursively.
Expand Down Expand Up @@ -88,7 +88,7 @@ func bufferBatchRefresh[T any](c *Client, ids []string, keyFn KeyFn, fetchFn Bat
c.batchMutex.Unlock()

safeGo(func() {
refreshBatch(c, permIDs, keyFn, fetchFn)
c.refreshBatch(permIDs, keyFn, fetchFn)
})
return

Expand Down Expand Up @@ -122,7 +122,7 @@ func bufferBatchRefresh[T any](c *Client, ids []string, keyFn KeyFn, fetchFn Bat

// Refresh the first batch of IDs immediately.
safeGo(func() {
refreshBatch(c, idsToRefresh, keyFn, fetchFn)
c.refreshBatch(idsToRefresh, keyFn, fetchFn)
})

// If we exceeded the batch size, we'll continue to process the remaining IDs recursively.
Expand Down
10 changes: 5 additions & 5 deletions buffer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ func TestBatchIsRefreshedWhenTheTimeoutExpires(t *testing.T) {
// - The queued refresh will be executed under two conditions:
// 1. The number of scheduled refreshes exceeds the specified 'batchSize'.
// 2. The 'batchBufferTimeout' threshold is exceeded.
client := sturdyc.New(capacity, numShards, ttl, evictionPercentage,
client := sturdyc.New[string](capacity, numShards, ttl, evictionPercentage,
sturdyc.WithStampedeProtection(minRefreshDelay, maxRefreshDelay, refreshRetryInterval, true),
sturdyc.WithRefreshBuffering(batchSize, batchBufferTimeout),
sturdyc.WithClock(clock),
Expand Down Expand Up @@ -96,7 +96,7 @@ func TestBatchIsRefreshedWhenTheBufferSizeIsReached(t *testing.T) {
// - The queued refresh will be executed under two conditions:
// 1. The number of scheduled refreshes exceeds the specified 'batchSize'.
// 2. The 'batchBufferTimeout' threshold is exceeded.
client := sturdyc.New(capacity, numShards, ttl, evictionPercentage,
client := sturdyc.New[string](capacity, numShards, ttl, evictionPercentage,
sturdyc.WithStampedeProtection(minRefreshDelay, maxRefreshDelay, refreshRetryInterval, true),
sturdyc.WithRefreshBuffering(batchSize, batchBufferTimeout),
sturdyc.WithClock(clock),
Expand Down Expand Up @@ -188,7 +188,7 @@ func TestBatchIsNotRefreshedByDuplicates(t *testing.T) {
// - The queued refresh will be executed under two conditions:
// 1. The number of scheduled refreshes exceeds the specified 'batchSize'.
// 2. The 'batchBufferTimeout' threshold is exceeded.
client := sturdyc.New(capacity, numShards, ttl, evictionPercentage,
client := sturdyc.New[string](capacity, numShards, ttl, evictionPercentage,
sturdyc.WithStampedeProtection(minRefreshDelay, maxRefreshDelay, refreshRetryInterval, true),
sturdyc.WithRefreshBuffering(batchSize, batchBufferTimeout),
sturdyc.WithClock(clock),
Expand Down Expand Up @@ -256,7 +256,7 @@ func TestBatchesAreGroupedByPermutations(t *testing.T) {
// - The queued refresh will be executed under two conditions:
// 1. The number of scheduled refreshes exceeds the specified 'batchSize'.
// 2. The 'batchBufferTimeout' threshold is exceeded.
c := sturdyc.New(capacity, numShards, ttl, evictionPercentage,
c := sturdyc.New[any](capacity, numShards, ttl, evictionPercentage,
sturdyc.WithStampedeProtection(minRefreshDelay, maxRefreshDelay, refreshRetryInterval, true),
sturdyc.WithRefreshBuffering(batchSize, batchBufferTimeout),
sturdyc.WithClock(clock),
Expand Down Expand Up @@ -346,7 +346,7 @@ func TestLargeBatchesAreChunkedCorrectly(t *testing.T) {
// - The queued refresh will be executed under two conditions:
// 1. The number of scheduled refreshes exceeds the specified 'batchSize'.
// 2. The 'batchBufferTimeout' threshold is exceeded.
client := sturdyc.New(capacity, numShards, ttl, evictionPercentage,
client := sturdyc.New[string](capacity, numShards, ttl, evictionPercentage,
sturdyc.WithStampedeProtection(minRefreshDelay, maxRefreshDelay, refreshRetryInterval, true),
sturdyc.WithRefreshBuffering(batchSize, batchBufferTimeout),
sturdyc.WithClock(clock),
Expand Down
80 changes: 32 additions & 48 deletions cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,10 @@ type BatchResponse[T any] map[string]T
// operation returns. It is used to create unique cache keys.
type KeyFn func(id string) string

// Client holds the cache configuration.
type Client struct {
ttl time.Duration
shards []*shard
nextShard int
evictionInterval time.Duration
// Config represents the configuration that can be applied to the cache.
type Config struct {
clock Clock
evictionInterval time.Duration
metricsRecorder MetricsRecorder

refreshesEnabled bool
Expand All @@ -58,6 +55,14 @@ type Client struct {

useRelativeTimeKeyFormat bool
keyTruncation time.Duration
getSize func() int
}

// Client represents a cache client that can be used to store and retrieve values.
type Client[T any] struct {
*Config
shards []*shard[T]
nextShard int
}

// New creates a new Client instance with the specified configuration.
Expand All @@ -67,50 +72,42 @@ type Client struct {
// `ttl` Sets the time to live for each entry in the cache. Has to be greater than 0.
// `evictionPercentage` Percentage of items to evict when the cache exceeds its capacity.
// `opts` allows for additional configurations to be applied to the cache client.
func New(capacity, numShards int, ttl time.Duration, evictionPercentage int, opts ...Option) *Client {
validateArgs(capacity, numShards, ttl, evictionPercentage)

// Create a new client, and apply the options.
//nolint: exhaustruct // The options are going to set the remaining fields.
client := &Client{
ttl: ttl,
func New[T any](capacity, numShards int, ttl time.Duration, evictionPercentage int, opts ...Option) *Client[T] {
// Create an emptu client and setup the default configuration.
client := &Client[T]{}
cfg := &Config{
clock: NewClock(),
evictionInterval: ttl / time.Duration(numShards),
passthroughPercentage: 100,
evictionInterval: ttl / time.Duration(numShards),
getSize: client.Size,
}

// Apply the options to the configuration.
client.Config = cfg
for _, opt := range opts {
opt(client)
opt(cfg)
}

// We create the shards after we've applied the options to ensure that the correct values are used.
validateConfig(capacity, numShards, ttl, evictionPercentage, cfg)

// Next, we'll create the shards. It is important that
// we do this after we've applited the options.
shardSize := capacity / numShards
shards := make([]*shard, numShards)
shards := make([]*shard[T], numShards)
for i := 0; i < numShards; i++ {
shard := newShard(
shardSize,
ttl,
evictionPercentage,
client.clock,
client.metricsRecorder,
client.refreshesEnabled,
client.minRefreshTime,
client.maxRefreshTime,
client.retryBaseDelay,
)
shards[i] = shard
shards[i] = newShard[T](shardSize, ttl, evictionPercentage, cfg)
}
client.shards = shards
client.nextShard = 0

// Run evictions in a separate goroutine.
// Run evictions on the shards in a separate goroutine.
client.startEvictions()

return client
}

// Size returns the number of entries in the cache.
func (c *Client) Size() int {
func (c *Client[T]) Size() int {
var sum int
for _, shard := range c.shards {
sum += shard.size()
Expand All @@ -119,13 +116,13 @@ func (c *Client) Size() int {
}

// Delete removes a single entry from the cache.
func (c *Client) Delete(key string) {
func (c *Client[T]) Delete(key string) {
shard := c.getShard(key)
shard.delete(key)
}

// startEvictions is going to be running in a separate goroutine that we're going to prevent from ever exiting.
func (c *Client) startEvictions() {
func (c *Client[T]) startEvictions() {
go func() {
ticker, stop := c.clock.NewTicker(c.evictionInterval)
defer stop()
Expand All @@ -140,7 +137,7 @@ func (c *Client) startEvictions() {
}

// getShard returns the shard that should be used for the specified key.
func (c *Client) getShard(key string) *shard {
func (c *Client[T]) getShard(key string) *shard[T] {
hash := xxhash.Sum64String(key)
shardIndex := hash % uint64(len(c.shards))
if c.metricsRecorder != nil {
Expand All @@ -150,7 +147,7 @@ func (c *Client) getShard(key string) *shard {
}

// reportCacheHits is used to report cache hits and misses to the metrics recorder.
func (c *Client) reportCacheHits(cacheHit bool) {
func (c *Client[T]) reportCacheHits(cacheHit bool) {
if c.metricsRecorder == nil {
return
}
Expand All @@ -160,16 +157,3 @@ func (c *Client) reportCacheHits(cacheHit bool) {
}
c.metricsRecorder.CacheHit()
}

// set writes a single value to the cache. Returns true if it triggered an eviction.
func (c *Client) set(key string, value any, isMissingRecord bool) bool {
shard := c.getShard(key)
return shard.set(key, value, isMissingRecord)
}

// get retrieves a single value from the cache.
func (c *Client) get(key string) (any, bool) {
shard := c.getShard(key)
value, ok, _, _ := shard.get(key)
return value, ok
}
18 changes: 9 additions & 9 deletions cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,10 @@ func TestShardDistribution(t *testing.T) {
t.Run(tc.name, func(t *testing.T) {
t.Parallel()
recorder := newTestMetricsRecorder(tc.numShards)
client := sturdyc.New(tc.capacity, tc.numShards, time.Hour, 5, sturdyc.WithMetrics(recorder))
c := sturdyc.New[string](tc.capacity, tc.numShards, time.Hour, 5, sturdyc.WithMetrics(recorder))
for i := 0; i < tc.capacity; i++ {
key := randKey(tc.keyLength)
sturdyc.Set(client, key, "value")
c.Set(key, "value")
}
recorder.validateShardDistribution(t, tc.tolerancePercentage)
})
Expand All @@ -58,7 +58,7 @@ func TestTimeBasedEviction(t *testing.T) {
evictionInterval := time.Second
clock := sturdyc.NewTestClock(time.Now())
metricRecorder := newTestMetricsRecorder(numShards)
client := sturdyc.New(
c := sturdyc.New[string](
capacity,
numShards,
ttl,
Expand All @@ -69,7 +69,7 @@ func TestTimeBasedEviction(t *testing.T) {
)

for i := 0; i < capacity; i++ {
sturdyc.Set(client, randKey(12), "value")
c.Set(randKey(12), "value")
}

// Expire all entries.
Expand Down Expand Up @@ -137,7 +137,7 @@ func TestForcedEvictions(t *testing.T) {
t.Parallel()

recorder := newTestMetricsRecorder(tc.numShards)
client := sturdyc.New(tc.capacity,
c := sturdyc.New[string](tc.capacity,
tc.numShards,
time.Hour,
tc.evictionPercentage,
Expand All @@ -147,13 +147,13 @@ func TestForcedEvictions(t *testing.T) {
// Start by filling the sturdyc.
for i := 0; i < tc.capacity; i++ {
key := randKey(12)
sturdyc.Set(client, key, "value")
c.Set(key, "value")
}

// Next, we'll write to the cache to force evictions.
for i := 0; i < tc.writes; i++ {
key := randKey(12)
sturdyc.Set(client, key, "value")
c.Set(key, "value")
}

if recorder.forcedEvictions < tc.minEvictions || recorder.forcedEvictions > tc.maxEvictions {
Expand All @@ -175,7 +175,7 @@ func TestDisablingForcedEvictionMakesSetANoop(t *testing.T) {
// Setting the eviction percentage to 0 should disable forced evictions.
evictionpercentage := 0
metricRecorder := newTestMetricsRecorder(numShards)
c := sturdyc.New(
c := sturdyc.New[string](
capacity,
numShards,
ttl,
Expand All @@ -184,7 +184,7 @@ func TestDisablingForcedEvictionMakesSetANoop(t *testing.T) {
)

for i := 0; i < capacity*10; i++ {
sturdyc.Set(c, randKey(12), "value")
c.Set(randKey(12), "value")
}

metricRecorder.Lock()
Expand Down
Loading

0 comments on commit bd8a0dc

Please sign in to comment.