Skip to content

Commit

Permalink
Ensure that pod observer never acts on stale watch events (#1328)
Browse files Browse the repository at this point in the history
  • Loading branch information
n-g authored Sep 17, 2024
1 parent d166a01 commit 155fe4b
Showing 1 changed file with 49 additions and 26 deletions.
75 changes: 49 additions & 26 deletions internal/runtime/kubernetes/kubeobserver/podobserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,25 @@ func (p *PodObserver) start(ctx context.Context) {

// Return true for a retry.
func (p *PodObserver) runWatcher(ctx context.Context, debug io.Writer) (bool, error) {
w, err := p.client.CoreV1().Pods(p.namespace).Watch(ctx, metav1.ListOptions{LabelSelector: kubeobj.SerializeSelector(p.labels)})
// We do a full list first to ensure we do not read stale events.
list, err := p.client.CoreV1().Pods(p.namespace).List(ctx, metav1.ListOptions{LabelSelector: kubeobj.SerializeSelector(p.labels)})
if err != nil {
return true, err
}

for _, pod := range list.Items {
ev := watch.Modified
if pod.DeletionTimestamp != nil {
ev = watch.Deleted
}

p.handle(ev, &pod, debug)
}

w, err := p.client.CoreV1().Pods(p.namespace).Watch(ctx, metav1.ListOptions{
LabelSelector: kubeobj.SerializeSelector(p.labels),
ResourceVersion: list.ResourceVersion,
})
if err != nil {
return true, err
}
Expand Down Expand Up @@ -125,33 +143,38 @@ func (p *PodObserver) runWatcher(ctx context.Context, debug io.Writer) (bool, er

fmt.Fprintf(debug, "event type %s: name %s phase %s\n", ev.Type, pod.Name, pod.Status.Phase)

p.mu.Lock()
existingIndex := slices.IndexFunc(p.runningPods, func(existing v1.Pod) bool {
return existing.UID == pod.UID
})

var modified bool
if ev.Type == watch.Deleted || pod.Status.Phase != v1.PodRunning {
if existingIndex >= 0 {
p.runningPods = slices.Delete(p.runningPods, existingIndex, existingIndex+1)
modified = true
fmt.Fprintf(debug, "remove pod-uid %s\n", pod.UID)
}
} else if (ev.Type == watch.Added || ev.Type == watch.Modified) && pod.Status.Phase == v1.PodRunning {
if existingIndex < 0 {
p.runningPods = append(p.runningPods, *pod)
modified = true
fmt.Fprintf(debug, "add pod-uid %s\n", pod.UID)
}
}
p.handle(ev.Type, pod, debug)
}
}
}

if modified {
p.revision++
p.cond.Broadcast()
p.broadcast()
}
p.mu.Unlock()
func (p *PodObserver) handle(ev watch.EventType, pod *v1.Pod, debug io.Writer) {
p.mu.Lock()
defer p.mu.Unlock()

existingIndex := slices.IndexFunc(p.runningPods, func(existing v1.Pod) bool {
return existing.UID == pod.UID
})

var modified bool
if ev == watch.Deleted || pod.Status.Phase != v1.PodRunning {
if existingIndex >= 0 {
p.runningPods = slices.Delete(p.runningPods, existingIndex, existingIndex+1)
modified = true
fmt.Fprintf(debug, "remove pod-uid %s (%s)\n", pod.UID, pod.Name)
}
} else if (ev == watch.Added || ev == watch.Modified) && pod.Status.Phase == v1.PodRunning {
if existingIndex < 0 {
p.runningPods = append(p.runningPods, *pod)
modified = true
fmt.Fprintf(debug, "add pod-uid %s (%s)\n", pod.UID, pod.Name)
}
}

if modified {
p.revision++
p.cond.Broadcast()
p.broadcast()
}
}

Expand Down

0 comments on commit 155fe4b

Please sign in to comment.