Skip to content

Commit

Permalink
stop chan in port forwarder
Browse files Browse the repository at this point in the history
  • Loading branch information
matmerr committed Oct 14, 2024
1 parent bb55ee7 commit f65ae06
Show file tree
Hide file tree
Showing 5 changed files with 21 additions and 35 deletions.
13 changes: 1 addition & 12 deletions test/e2e/framework/kubernetes/create-kapinger-deployment.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ func (c *CreateKapingerDeployment) GetKapingerDeployment() *appsv1.Deployment {
Containers: []v1.Container{
{
Name: "kapinger",
Image: "acnpublic.azurecr.io/kapinger:20241011.3",
Image: "acnpublic.azurecr.io/kapinger:20241011.4",
Resources: v1.ResourceRequirements{
Requests: v1.ResourceList{
"memory": resource.MustParse("20Mi"),
Expand Down Expand Up @@ -210,17 +210,6 @@ func (c *CreateKapingerDeployment) GetKapingerDeployment() *appsv1.Deployment {
Value: c.BurstVolume,
},
},
LivenessProbe: &v1.Probe{
ProbeHandler: v1.ProbeHandler{
HTTPGet: &v1.HTTPGetAction{
Path: "/",
Port: intstr.FromInt(KapingerHTTPPort),
},
},
TimeoutSeconds: 10, //nolint
PeriodSeconds: 10, //nolint
InitialDelaySeconds: 3, //nolint
},
},
},
},
Expand Down
4 changes: 4 additions & 0 deletions test/e2e/framework/kubernetes/port-forward.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,13 +110,17 @@ func (p *PortForward) Run() error {

log.Printf("port forward validation HTTP request to \"%s\" succeeded, response: %s\n", p.pf.Address(), resp.Status)

log.Printf("starting keepalive for port forward...\n")
go p.pf.KeepAlive(pctx)
return nil
}

if err = defaultRetrier.Do(portForwardCtx, portForwardFn); err != nil {
return fmt.Errorf("could not start port forward within %ds: %w", defaultTimeoutSeconds, err)
}
log.Printf("successfully port forwarded to \"%s\"\n", p.pf.Address())
log.Printf("starting port forward keepalive...\n")
go p.pf.KeepAlive(pctx)
return nil
}

Expand Down
3 changes: 3 additions & 0 deletions test/e2e/framework/kubernetes/portforward.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,9 @@ func (p *PortForwarder) KeepAlive(ctx context.Context) {
case <-ctx.Done():
p.logger.Logf("port forwarder: keep alive cancelled: %v", ctx.Err())
return
case <-p.stopChan:
p.logger.Logf("port forwarder: keep alive stopped via stop channel")
return
case pfErr := <-p.errChan:
// as of client-go v0.26.1, if the connection is successful at first but then fails,
// an error is logged but only a nil error is sent to this channel. this will be fixed
Expand Down
34 changes: 12 additions & 22 deletions test/e2e/framework/kubernetes/pprof.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,11 @@ import (
"net/url"
"os"
"strconv"
"sync"
"time"
)

const (
defaultTimeout = 30 * time.Second
defaultTimeout = 3200 * time.Second
defaultRetinaPort = 10093
defaultSpanTime = 10 * time.Second
)
Expand Down Expand Up @@ -96,22 +95,15 @@ func (p *PullPProf) Run() error {
}
}

var wg sync.WaitGroup

for name, path := range durationProfiles {
wg.Add(1)
go func(name, path string) {
file := folder + name + ".out"
err = p.scraper.GetProfileWithDuration(name, path, file, defaultSpanTime)
if err != nil {
// don't return here because some data is better than no data,
// and other profiles might be functional
log.Printf("error getting %s profile: %v\n", name, err)
}
wg.Done()
}(name, path)
file := folder + name + ".out"
err = p.scraper.GetProfileWithDuration(name, path, file, defaultSpanTime)
if err != nil {
// don't return here because some data is better than no data,
// and other profiles might be functional
log.Printf("error getting %s profile: %v\n", name, err)
}
}
wg.Wait()

log.Printf("-- finished scraping profiles, saved to to %s --\n", folder)
log.Printf("waiting %s seconds for next scrape\n", p.ScrapeIntervalSeconds)
Expand Down Expand Up @@ -166,12 +158,12 @@ func (p *PprofScraper) GetProfileWithDuration(name, path, outfile string, durati
log.Printf("getting %s profile for %d seconds...\n", name, seconds)
profileURL := p.formatURLWithSeconds(seconds)
profileURL.Path += path
return p.scrape(profileURL.String(), defaultTimeout+duration, outfile)
return p.scrape(profileURL.String(), outfile)
}

func (p *PprofScraper) GetProfile(name, path, outfile string) error {
log.Printf("getting %s profile...\n", name)
return p.scrape(p.baseURL.String()+path, defaultTimeout, outfile)
return p.scrape(p.baseURL.String()+path, outfile)
}

func (p *PprofScraper) formatURLWithSeconds(seconds int) url.URL {
Expand All @@ -183,10 +175,8 @@ func (p *PprofScraper) formatURLWithSeconds(seconds int) url.URL {
return queryURL
}

func (p *PprofScraper) scrape(scrapingURL string, timeout time.Duration, outfile string) error {
client := http.Client{
Timeout: timeout,
}
func (p *PprofScraper) scrape(scrapingURL, outfile string) error {
client := http.Client{}

req, err := http.NewRequestWithContext(context.Background(), http.MethodGet, scrapingURL, http.NoBody)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion test/e2e/scenarios/longrunning/scenario.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ func PullPProf(kubeConfigFilePath string) *types.Scenario {
KapingerReplicas: "500",
KubeConfigFilePath: kubeConfigFilePath,
BurstIntervalMs: "10000", // 10 seconds
BurstVolume: "10", // 500 requests every 10 seconds
BurstVolume: "200", // 500 requests every 10 seconds
},
},
{
Expand Down

0 comments on commit f65ae06

Please sign in to comment.