Skip to content

Commit

Permalink
Patch the resource slice status instead of updating (#65)
Browse files Browse the repository at this point in the history
We can avoid the overhead of deep copying the entire resource slice by
using jsonpatch to update only the modified items. This approach also
naturally simplifies the signature of the write buffer since the same
func can be used for checking the current state and building the patch.

The perf overhead of full updates isn't a big deal now, but will be once
we start updating it 2-3x more often (status updates).

---------

Co-authored-by: Jordan Olshevski <[email protected]>
  • Loading branch information
jveski and Jordan Olshevski authored Mar 5, 2024
1 parent 2c467e8 commit 398a5e0
Show file tree
Hide file tree
Showing 4 changed files with 60 additions and 56 deletions.
17 changes: 9 additions & 8 deletions internal/controllers/reconciliation/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,14 +127,15 @@ func (c *Controller) Reconcile(ctx context.Context, req *reconstitution.Request)
}
}

c.resourceClient.PatchStatusAsync(ctx, &req.Manifest,
func(rs *apiv1.ResourceState) bool {
return !((resource.Deleted() && !rs.Deleted) || !rs.Reconciled)
},
func(rs *apiv1.ResourceState) {
rs.Deleted = resource.Deleted()
rs.Reconciled = true
})
c.resourceClient.PatchStatusAsync(ctx, &req.Manifest, func(rs *apiv1.ResourceState) *apiv1.ResourceState {
if rs.Deleted == resource.Deleted() && rs.Reconciled {
return nil
}
return &apiv1.ResourceState{
Deleted: resource.Deleted(),
Reconciled: true,
}
})

if modified {
return ctrl.Result{Requeue: true}, nil
Expand Down
5 changes: 2 additions & 3 deletions internal/reconstitution/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,10 @@ type Reconciler interface {
// Client provides read/write access to a collection of reconstituted resources.
type Client interface {
Get(ctx context.Context, comp *CompositionRef, res *ResourceRef) (*Resource, bool)
PatchStatusAsync(ctx context.Context, req *ManifestRef, checkFn CheckPatchFn, patchFn StatusPatchFn)
PatchStatusAsync(ctx context.Context, req *ManifestRef, patchFn StatusPatchFn)
}

type StatusPatchFn func(*apiv1.ResourceState)
type CheckPatchFn func(*apiv1.ResourceState) bool
type StatusPatchFn func(*apiv1.ResourceState) *apiv1.ResourceState

// ManifestRef references a particular resource manifest within a resource slice.
type ManifestRef struct {
Expand Down
67 changes: 37 additions & 30 deletions internal/reconstitution/writebuffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package reconstitution

import (
"context"
"encoding/json"
"fmt"
"sync"
"time"
Expand All @@ -19,7 +20,6 @@ import (
type asyncStatusUpdate struct {
SlicedResource *ManifestRef
PatchFn StatusPatchFn
CheckFn CheckPatchFn
}

// writeBuffer reduces load on etcd/apiserver by collecting resource slice status
Expand All @@ -46,7 +46,7 @@ func newWriteBuffer(cli client.Client, batchInterval time.Duration, burst int) *
}
}

func (w *writeBuffer) PatchStatusAsync(ctx context.Context, ref *ManifestRef, checkFn CheckPatchFn, patchFn StatusPatchFn) {
func (w *writeBuffer) PatchStatusAsync(ctx context.Context, ref *ManifestRef, patchFn StatusPatchFn) {
w.mut.Lock()
defer w.mut.Unlock()
logger := logr.FromContextOrDiscard(ctx)
Expand All @@ -63,7 +63,6 @@ func (w *writeBuffer) PatchStatusAsync(ctx context.Context, ref *ManifestRef, ch
w.state[key] = append(currentSlice, &asyncStatusUpdate{
SlicedResource: ref,
PatchFn: patchFn,
CheckFn: checkFn,
})
w.queue.Add(key)
}
Expand Down Expand Up @@ -129,44 +128,46 @@ func (w *writeBuffer) updateSlice(ctx context.Context, sliceNSN types.Namespaced
return false
}

// Allocate the status slice if needed
var status *apiv1.ResourceSliceStatus
var dirty bool
if len(slice.Status.Resources) != len(slice.Spec.Resources) {
dirty = true
status = slice.Status.DeepCopy()
status.Resources = make([]apiv1.ResourceState, len(slice.Spec.Resources))
// It's easier to pre-allocate the entire status slice before sending patches
// since the "replace" op requires an existing item.
if len(slice.Status.Resources) == 0 {
copy := slice.DeepCopy()
copy.Status.Resources = make([]apiv1.ResourceState, len(slice.Spec.Resources))
err = w.client.Status().Update(ctx, copy)
if err != nil {
logger.Error(err, "unable to update resource slice")
return false
}
slice = copy
}

// Use the newly allocated slice (if allocated), otherwise use the already allocated one
// Transform the set of patch funcs into a set of jsonpatch objects
unsafeSlice := slice.Status.Resources
if unsafeSlice == nil {
unsafeSlice = status.Resources
}

// The resource slice informer doesn't DeepCopy automatically for perf reasons.
// So we can apply the CheckFn to status pointers but not PatchFn.
// We'll deep copy the status here only when it needs to change.
var patches []*jsonPatch
for _, update := range updates {
unsafeStatusPtr := &unsafeSlice[update.SlicedResource.Index]
if update.CheckFn(unsafeStatusPtr) {
patch := update.PatchFn(unsafeStatusPtr)
if patch == nil {
continue
}

if status == nil {
status = slice.Status.DeepCopy()
}

dirty = true
update.PatchFn(&status.Resources[update.SlicedResource.Index])
patches = append(patches, &jsonPatch{
Op: "replace",
Path: fmt.Sprintf("/status/resources/%d", update.SlicedResource.Index),
Value: patch,
})
}
if !dirty {
return true
if len(patches) == 0 {
return true // nothing to do!
}

copy := &apiv1.ResourceSlice{Status: *status}
slice.ObjectMeta.DeepCopyInto(&copy.ObjectMeta)
err = w.client.Status().Update(ctx, copy)
// Encode/apply the patch(es)
patchJson, err := json.Marshal(&patches)
if err != nil {
logger.Error(err, "unable to encode patch")
return false
}
err = w.client.Status().Patch(ctx, slice, client.RawPatch(types.JSONPatchType, patchJson))
if err != nil {
logger.Error(err, "unable to update resource slice")
return false
Expand All @@ -176,3 +177,9 @@ func (w *writeBuffer) updateSlice(ctx context.Context, sliceNSN types.Namespaced
discoveryCacheChanges.Inc()
return true
}

type jsonPatch struct {
Op string `json:"op"`
Path string `json:"path"`
Value any `json:"value"`
}
27 changes: 12 additions & 15 deletions internal/reconstitution/writebuffer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ func TestWriteBufferBasics(t *testing.T) {
req := &ManifestRef{}
req.Slice.Name = "test-slice-1"
req.Index = 1
w.PatchStatusAsync(ctx, req, checkReconciled(), setReconciled())
w.PatchStatusAsync(ctx, req, setReconciled())

// Slice resource's status should reflect the patch
w.processQueueItem(ctx)
Expand Down Expand Up @@ -69,12 +69,12 @@ func TestWriteBufferBatching(t *testing.T) {
req := &ManifestRef{}
req.Slice.Name = "test-slice-1"
req.Index = 1
w.PatchStatusAsync(ctx, req, checkReconciled(), setReconciled())
w.PatchStatusAsync(ctx, req, setReconciled())

req = &ManifestRef{}
req.Slice.Name = "test-slice-1"
req.Index = 2
w.PatchStatusAsync(ctx, req, checkReconciled(), setReconciled())
w.PatchStatusAsync(ctx, req, setReconciled())

// Slice resource's status should be correct after a single update
w.processQueueItem(ctx)
Expand All @@ -101,7 +101,7 @@ func TestWriteBufferNoUpdates(t *testing.T) {
req := &ManifestRef{}
req.Slice.Name = "test-slice-1"
req.Index = 1
w.PatchStatusAsync(ctx, req, checkReconciled(), setReconciled())
w.PatchStatusAsync(ctx, req, setReconciled())

// Remove the update leaving the queue message in place
w.state = map[types.NamespacedName][]*asyncStatusUpdate{}
Expand All @@ -119,7 +119,7 @@ func TestWriteBufferMissingSlice(t *testing.T) {

req := &ManifestRef{}
req.Slice.Name = "test-slice-1" // this doesn't exist
w.PatchStatusAsync(ctx, req, checkReconciled(), setReconciled())
w.PatchStatusAsync(ctx, req, setReconciled())

// Slice 404 drops the event and does not retry.
// Prevents a deadlock of this queue item.
Expand Down Expand Up @@ -150,7 +150,7 @@ func TestWriteBufferNoChange(t *testing.T) {
req := &ManifestRef{}
req.Slice.Name = "test-slice-1"
req.Index = 1
w.PatchStatusAsync(ctx, req, checkReconciled(), setReconciled())
w.PatchStatusAsync(ctx, req, setReconciled())

w.processQueueItem(ctx)
}
Expand All @@ -174,7 +174,7 @@ func TestWriteBufferUpdateError(t *testing.T) {
req := &ManifestRef{}
req.Slice.Name = "test-slice-1"
req.Index = 1
w.PatchStatusAsync(ctx, req, checkReconciled(), setReconciled())
w.PatchStatusAsync(ctx, req, setReconciled())

// Both the queue item and state have persisted
w.processQueueItem(ctx)
Expand All @@ -184,13 +184,10 @@ func TestWriteBufferUpdateError(t *testing.T) {
}

func setReconciled() StatusPatchFn {
return func(rs *apiv1.ResourceState) {
rs.Reconciled = true
}
}

func checkReconciled() CheckPatchFn {
return func(rs *apiv1.ResourceState) bool {
return rs.Reconciled
return func(rs *apiv1.ResourceState) *apiv1.ResourceState {
if rs != nil && rs.Reconciled {
return nil
}
return &apiv1.ResourceState{Reconciled: true}
}
}

0 comments on commit 398a5e0

Please sign in to comment.