Skip to content

Commit

Permalink
Restructure mutex
Browse files Browse the repository at this point in the history
  • Loading branch information
krhitesh7 committed Apr 17, 2024
1 parent 981ae8c commit ca40e63
Showing 1 changed file with 7 additions and 5 deletions.
12 changes: 7 additions & 5 deletions pkg/cache/v3/simple.go
Original file line number Diff line number Diff line change
Expand Up @@ -239,16 +239,17 @@ func (cache *snapshotCache) ParseSystemVersionInfo(version string) int64 {
}

func (cache *snapshotCache) BatchUpsertResources(ctx context.Context, typ string, batchResourcesUpserted map[string]map[string]types.Resource) error {
cache.mu.Lock()
defer cache.mu.Unlock()

var wg sync.WaitGroup
for node, resourcesUpserted := range batchResourcesUpserted {
wg.Add(1)

go func(node string, resourcesUpserted map[string]types.Resource) {
defer wg.Done()

cache.mu.Lock()
if snapshot, ok := cache.snapshots[node]; ok {
defer cache.mu.Unlock()

// Add new/updated resources to the Resources map
index := GetResponseType(typ)
currentResources := snapshot.(*Snapshot).Resources[index]
Expand Down Expand Up @@ -278,6 +279,7 @@ func (cache *snapshotCache) BatchUpsertResources(ctx context.Context, typ string
}
}
} else {
cache.mu.Unlock()
resources := make(map[resource.Type][]types.Resource)
resources[typ] = make([]types.Resource, 0)
for _, r := range resourcesUpserted {
Expand All @@ -302,9 +304,8 @@ func (cache *snapshotCache) BatchUpsertResources(ctx context.Context, typ string

func (cache *snapshotCache) UpsertResources(ctx context.Context, node string, typ string, resourcesUpserted map[string]types.Resource) error {

Check failure on line 305 in pkg/cache/v3/simple.go

View workflow job for this annotation

GitHub Actions / lint

File is not `gofumpt`-ed with `-extra` (gofumpt)

Check failure on line 305 in pkg/cache/v3/simple.go

View workflow job for this annotation

GitHub Actions / lint

File is not `gofumpt`-ed with `-extra` (gofumpt)
cache.mu.Lock()
defer cache.mu.Unlock()

if snapshot, ok := cache.snapshots[node]; ok {
defer cache.mu.Unlock()
// Add new/updated resources to the Resources map
index := GetResponseType(typ)
currentResources := snapshot.(*Snapshot).Resources[index]
Expand All @@ -331,6 +332,7 @@ func (cache *snapshotCache) UpsertResources(ctx context.Context, node string, ty
return cache.respondDeltaWatches(ctx, info, snapshot)
}
} else {
cache.mu.Unlock()
resources := make(map[resource.Type][]types.Resource)
resources[typ] = make([]types.Resource, 0)
for _, r := range resourcesUpserted {
Expand Down

0 comments on commit ca40e63

Please sign in to comment.