Skip to content

Commit

Permalink
Merge pull request #123694 from mengqiy/automated-cherry-pick-of-#123…
Browse files Browse the repository at this point in the history
…532-upstream-release-1.28

Automated cherry pick of #123532: Prevent watch cache starvation, by moving its watch to
  • Loading branch information
k8s-ci-robot committed Mar 8, 2024
2 parents a0f9a88 + 4fbf9a2 commit 643ce7f
Show file tree
Hide file tree
Showing 9 changed files with 181 additions and 16 deletions.
2 changes: 2 additions & 0 deletions pkg/features/kube_features.go
Original file line number Diff line number Diff line change
Expand Up @@ -1205,6 +1205,8 @@ var defaultKubernetesFeatureGates = map[featuregate.Feature]featuregate.FeatureS

genericfeatures.OpenAPIV3: {Default: true, PreRelease: featuregate.GA, LockToDefault: true}, // remove in 1.29

genericfeatures.SeparateCacheWatchRPC: {Default: true, PreRelease: featuregate.Beta},

genericfeatures.ServerSideApply: {Default: true, PreRelease: featuregate.GA, LockToDefault: true}, // remove in 1.29

genericfeatures.ServerSideFieldValidation: {Default: true, PreRelease: featuregate.GA, LockToDefault: true}, // remove in 1.29
Expand Down
9 changes: 9 additions & 0 deletions staging/src/k8s.io/apiserver/pkg/features/kube_features.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,13 @@ const (
// Deprecates and removes SelfLink from ObjectMeta and ListMeta.
RemoveSelfLink featuregate.Feature = "RemoveSelfLink"

// owner: @serathius
// beta: v1.30
//
// Allow watch cache to create a watch on a dedicated RPC.
// This prevents watch cache from being starved by other watches.
SeparateCacheWatchRPC featuregate.Feature = "SeparateCacheWatchRPC"

// owner: @apelisse, @lavalamp
// alpha: v1.14
// beta: v1.16
Expand Down Expand Up @@ -286,6 +293,8 @@ var defaultKubernetesFeatureGates = map[featuregate.Feature]featuregate.FeatureS

RemoveSelfLink: {Default: true, PreRelease: featuregate.GA, LockToDefault: true},

SeparateCacheWatchRPC: {Default: true, PreRelease: featuregate.Beta},

ServerSideApply: {Default: true, PreRelease: featuregate.GA, LockToDefault: true}, // remove in 1.29

ServerSideFieldValidation: {Default: true, PreRelease: featuregate.GA, LockToDefault: true}, // remove in 1.29
Expand Down
13 changes: 11 additions & 2 deletions staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"time"

"go.opentelemetry.io/otel/attribute"
"google.golang.org/grpc/metadata"

"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
Expand Down Expand Up @@ -401,10 +402,18 @@ func NewCacherFromConfig(config Config) (*Cacher, error) {
// so that future reuse does not get a spurious timeout.
<-cacher.timer.C
}
progressRequester := newConditionalProgressRequester(config.Storage.RequestWatchProgress, config.Clock)
var contextMetadata metadata.MD
if utilfeature.DefaultFeatureGate.Enabled(features.SeparateCacheWatchRPC) {
// Add grpc context metadata to watch and progress notify requests done by cacher to:
// * Prevent starvation of watch opened by cacher, by moving it to separate Watch RPC than watch request that bypass cacher.
// * Ensure that progress notification requests are executed on the same Watch RPC as their watch, which is required for it to work.
contextMetadata = metadata.New(map[string]string{"source": "cache"})
}

progressRequester := newConditionalProgressRequester(config.Storage.RequestWatchProgress, config.Clock, contextMetadata)
watchCache := newWatchCache(
config.KeyFunc, cacher.processEvent, config.GetAttrsFunc, config.Versioner, config.Indexers, config.Clock, config.GroupResource, progressRequester)
listerWatcher := NewListerWatcher(config.Storage, config.ResourcePrefix, config.NewListFunc)
listerWatcher := NewListerWatcher(config.Storage, config.ResourcePrefix, config.NewListFunc, contextMetadata)
reflectorName := "storage/cacher.go:" + config.ResourcePrefix

reflector := cache.NewNamedReflector(reflectorName, listerWatcher, obj, watchCache, 0)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"time"

"github.com/stretchr/testify/require"
"google.golang.org/grpc/metadata"

"k8s.io/apimachinery/pkg/api/apitesting"
apiequality "k8s.io/apimachinery/pkg/api/equality"
Expand Down Expand Up @@ -2089,3 +2090,128 @@ func TestDoNotPopExpiredWatchersWhenNoEventsSeen(t *testing.T) {
}},
}, true)
}

func TestWatchStreamSeparation(t *testing.T) {
tcs := []struct {
name string
separateCacheWatchRPC bool
useWatchCacheContextMetadata bool
expectBookmarkOnWatchCache bool
expectBookmarkOnEtcd bool
}{
{
name: "common RPC > both get bookmarks",
separateCacheWatchRPC: false,
expectBookmarkOnEtcd: true,
expectBookmarkOnWatchCache: true,
},
{
name: "common RPC & watch cache context > both get bookmarks",
separateCacheWatchRPC: false,
useWatchCacheContextMetadata: true,
expectBookmarkOnEtcd: true,
expectBookmarkOnWatchCache: true,
},
{
name: "separate RPC > only etcd gets bookmarks",
separateCacheWatchRPC: true,
expectBookmarkOnEtcd: true,
expectBookmarkOnWatchCache: false,
},
{
name: "separate RPC & watch cache context > only watch cache gets bookmarks",
separateCacheWatchRPC: true,
useWatchCacheContextMetadata: true,
expectBookmarkOnEtcd: false,
expectBookmarkOnWatchCache: true,
},
}
for _, tc := range tcs {
t.Run(tc.name, func(t *testing.T) {
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.SeparateCacheWatchRPC, tc.separateCacheWatchRPC)()
_, cacher, _, terminate := testSetupWithEtcdServer(t)
t.Cleanup(terminate)
if err := cacher.ready.wait(context.TODO()); err != nil {
t.Fatalf("unexpected error waiting for the cache to be ready")
}

getCacherRV := func() uint64 {
cacher.watchCache.RLock()
defer cacher.watchCache.RUnlock()
return cacher.watchCache.resourceVersion
}
waitContext, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
waitForEtcdBookmark := watchAndWaitForBookmark(t, waitContext, cacher.storage)

var out example.Pod
err := cacher.Create(context.Background(), "foo", &example.Pod{}, &out, 0)
if err != nil {
t.Fatal(err)
}
versioner := storage.APIObjectVersioner{}
var lastResourceVersion uint64
lastResourceVersion, err = versioner.ObjectResourceVersion(&out)
if err != nil {
t.Fatal(err)
}

var contextMetadata metadata.MD
if tc.useWatchCacheContextMetadata {
contextMetadata = cacher.watchCache.waitingUntilFresh.contextMetadata
}
// Wait before sending watch progress request to avoid https://github.com/etcd-io/etcd/issues/17507
// TODO(https://github.com/etcd-io/etcd/issues/17507): Remove sleep when etcd is upgraded to version with fix.
time.Sleep(time.Second)
err = cacher.storage.RequestWatchProgress(metadata.NewOutgoingContext(context.Background(), contextMetadata))
if err != nil {
t.Fatal(err)
}
// Give time for bookmark to arrive
time.Sleep(time.Second)

etcdWatchResourceVersion := waitForEtcdBookmark()
gotEtcdWatchBookmark := etcdWatchResourceVersion == lastResourceVersion
if gotEtcdWatchBookmark != tc.expectBookmarkOnEtcd {
t.Errorf("Unexpected etcd bookmark check result, rv: %d, got: %v, want: %v", etcdWatchResourceVersion, etcdWatchResourceVersion, tc.expectBookmarkOnEtcd)
}

watchCacheResourceVersion := getCacherRV()
cacherGotBookmark := watchCacheResourceVersion == lastResourceVersion
if cacherGotBookmark != tc.expectBookmarkOnWatchCache {
t.Errorf("Unexpected watch cache bookmark check result, rv: %d, got: %v, want: %v", watchCacheResourceVersion, cacherGotBookmark, tc.expectBookmarkOnWatchCache)
}
})
}
}

func watchAndWaitForBookmark(t *testing.T, ctx context.Context, etcdStorage storage.Interface) func() (resourceVersion uint64) {
opts := storage.ListOptions{ResourceVersion: "", Predicate: storage.Everything, Recursive: true}
opts.Predicate.AllowWatchBookmarks = true
w, err := etcdStorage.Watch(ctx, "/pods/", opts)
if err != nil {
t.Fatal(err)
}

versioner := storage.APIObjectVersioner{}
var rv uint64
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
for event := range w.ResultChan() {
if event.Type == watch.Bookmark {
rv, err = versioner.ObjectResourceVersion(event.Object)
break
}
}
}()
return func() (resourceVersion uint64) {
defer w.Stop()
wg.Wait()
if err != nil {
t.Fatal(err)
}
return rv
}
}
30 changes: 21 additions & 9 deletions staging/src/k8s.io/apiserver/pkg/storage/cacher/lister_watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ package cacher
import (
"context"

"google.golang.org/grpc/metadata"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/labels"
Expand All @@ -30,17 +32,19 @@ import (

// listerWatcher opaques storage.Interface to expose cache.ListerWatcher.
type listerWatcher struct {
storage storage.Interface
resourcePrefix string
newListFunc func() runtime.Object
storage storage.Interface
resourcePrefix string
newListFunc func() runtime.Object
contextMetadata metadata.MD
}

// NewListerWatcher returns a storage.Interface backed ListerWatcher.
func NewListerWatcher(storage storage.Interface, resourcePrefix string, newListFunc func() runtime.Object) cache.ListerWatcher {
func NewListerWatcher(storage storage.Interface, resourcePrefix string, newListFunc func() runtime.Object, contextMetadata metadata.MD) cache.ListerWatcher {
return &listerWatcher{
storage: storage,
resourcePrefix: resourcePrefix,
newListFunc: newListFunc,
storage: storage,
resourcePrefix: resourcePrefix,
newListFunc: newListFunc,
contextMetadata: contextMetadata,
}
}

Expand All @@ -59,7 +63,11 @@ func (lw *listerWatcher) List(options metav1.ListOptions) (runtime.Object, error
Predicate: pred,
Recursive: true,
}
if err := lw.storage.GetList(context.TODO(), lw.resourcePrefix, storageOpts, list); err != nil {
ctx := context.Background()
if lw.contextMetadata != nil {
ctx = metadata.NewOutgoingContext(ctx, lw.contextMetadata)
}
if err := lw.storage.GetList(ctx, lw.resourcePrefix, storageOpts, list); err != nil {
return nil, err
}
return list, nil
Expand All @@ -73,5 +81,9 @@ func (lw *listerWatcher) Watch(options metav1.ListOptions) (watch.Interface, err
Recursive: true,
ProgressNotify: true,
}
return lw.storage.Watch(context.TODO(), lw.resourcePrefix, opts)
ctx := context.Background()
if lw.contextMetadata != nil {
ctx = metadata.NewOutgoingContext(ctx, lw.contextMetadata)
}
return lw.storage.Watch(ctx, lw.resourcePrefix, opts)
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ func TestCacherListerWatcher(t *testing.T) {
}
}

lw := NewListerWatcher(store, prefix, fn)
lw := NewListerWatcher(store, prefix, fn, nil)

obj, err := lw.List(metav1.ListOptions{})
if err != nil {
Expand Down Expand Up @@ -80,7 +80,7 @@ func TestCacherListerWatcherPagination(t *testing.T) {
}
}

lw := NewListerWatcher(store, prefix, fn)
lw := NewListerWatcher(store, prefix, fn, nil)

obj1, err := lw.List(metav1.ListOptions{Limit: 2})
if err != nil {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ func newTestWatchCache(capacity int, indexers *cache.Indexers) *testWatchCache {
wc := &testWatchCache{}
wc.bookmarkRevision = make(chan int64, 1)
wc.stopCh = make(chan struct{})
pr := newConditionalProgressRequester(wc.RequestWatchProgress, &immediateTickerFactory{})
pr := newConditionalProgressRequester(wc.RequestWatchProgress, &immediateTickerFactory{}, nil)
go pr.Run(wc.stopCh)
wc.watchCache = newWatchCache(keyFunc, mockHandler, getAttrsFunc, versioner, indexers, testingclock.NewFakeClock(time.Now()), schema.GroupResource{Resource: "pods"}, pr)
// To preserve behavior of tests that assume a given capacity,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ import (
"sync"
"time"

"google.golang.org/grpc/metadata"

utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"

Expand All @@ -34,10 +36,11 @@ const (
progressRequestPeriod = 100 * time.Millisecond
)

func newConditionalProgressRequester(requestWatchProgress WatchProgressRequester, clock TickerFactory) *conditionalProgressRequester {
func newConditionalProgressRequester(requestWatchProgress WatchProgressRequester, clock TickerFactory, contextMetadata metadata.MD) *conditionalProgressRequester {
pr := &conditionalProgressRequester{
clock: clock,
requestWatchProgress: requestWatchProgress,
contextMetadata: contextMetadata,
}
pr.cond = sync.NewCond(pr.mux.RLocker())
return pr
Expand All @@ -54,6 +57,7 @@ type TickerFactory interface {
type conditionalProgressRequester struct {
clock TickerFactory
requestWatchProgress WatchProgressRequester
contextMetadata metadata.MD

mux sync.RWMutex
cond *sync.Cond
Expand All @@ -63,6 +67,9 @@ type conditionalProgressRequester struct {

func (pr *conditionalProgressRequester) Run(stopCh <-chan struct{}) {
ctx := wait.ContextForChannel(stopCh)
if pr.contextMetadata != nil {
ctx = metadata.NewOutgoingContext(ctx, pr.contextMetadata)
}
go func() {
defer utilruntime.HandleCrash()
<-stopCh
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ func TestConditionalProgressRequester(t *testing.T) {

func newTestConditionalProgressRequester(clock clock.WithTicker) *testConditionalProgressRequester {
pr := &testConditionalProgressRequester{}
pr.conditionalProgressRequester = newConditionalProgressRequester(pr.RequestWatchProgress, clock)
pr.conditionalProgressRequester = newConditionalProgressRequester(pr.RequestWatchProgress, clock, nil)
return pr
}

Expand Down

0 comments on commit 643ce7f

Please sign in to comment.