Skip to content

Commit

Permalink
Add timeout after first relay success
Browse files Browse the repository at this point in the history
  • Loading branch information
mohamed-essam committed Nov 26, 2024
1 parent db61cf2 commit 18216ba
Show file tree
Hide file tree
Showing 3 changed files with 46 additions and 16 deletions.
14 changes: 8 additions & 6 deletions relay/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,11 +123,12 @@ func (cc *connContainer) close() {
// the client can be reused by calling Connect again. When the client is closed, all connections are closed too.
// While the Connect is in progress, the OpenConn function will block until the connection is established with relay server.
type Client struct {
log *log.Entry
parentCtx context.Context
connectionURL string
authTokenStore *auth.TokenStore
hashedID []byte
log *log.Entry
parentCtx context.Context
connectionURL string
authTokenStore *auth.TokenStore
hashedID []byte
InitialConnectionTime time.Duration

bufPool *sync.Pool

Expand Down Expand Up @@ -264,11 +265,12 @@ func (c *Client) Close() error {
}

func (c *Client) connect() error {
conn, err := ws.Dial(c.connectionURL)
conn, latency, err := ws.Dial(c.connectionURL)
if err != nil {
return err
}
c.relayConn = conn
c.InitialConnectionTime = latency

err = c.handShake()
if err != nil {
Expand Down
19 changes: 13 additions & 6 deletions relay/client/dialer/ws/ws.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,10 @@ import (
"fmt"
"net"
"net/http"
"net/http/httptrace"
"net/url"
"strings"
"time"

log "github.com/sirupsen/logrus"
"nhooyr.io/websocket"
Expand All @@ -15,10 +17,10 @@ import (
nbnet "github.com/netbirdio/netbird/util/net"
)

func Dial(address string) (net.Conn, error) {
func Dial(address string) (net.Conn, time.Duration, error) {
wsURL, err := prepareURL(address)
if err != nil {
return nil, err
return nil, 0, err
}

opts := &websocket.DialOptions{
Expand All @@ -27,21 +29,26 @@ func Dial(address string) (net.Conn, error) {

parsedURL, err := url.Parse(wsURL)
if err != nil {
return nil, err
return nil, 0, err
}
parsedURL.Path = ws.URLPath

wsConn, resp, err := websocket.Dial(context.Background(), parsedURL.String(), opts)
var connStart, firstByte time.Time
ctx := httptrace.WithClientTrace(context.Background(), &httptrace.ClientTrace{
ConnectStart: func(network, addr string) { connStart = time.Now() },
GotFirstResponseByte: func() { firstByte = time.Now() },
})
wsConn, resp, err := websocket.Dial(ctx, parsedURL.String(), opts)
if err != nil {
log.Errorf("failed to dial to Relay server '%s': %s", wsURL, err)
return nil, err
return nil, 0, err
}
if resp.Body != nil {
_ = resp.Body.Close()
}

conn := NewConn(wsConn, address)
return conn, nil
return conn, firstByte.Sub(connStart), nil
}

func prepareURL(address string) (string, error) {
Expand Down
29 changes: 25 additions & 4 deletions relay/client/picker.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@ const (
)

var (
connectionTimeout = 30 * time.Second
connectionTimeout = 30 * time.Second
connectionSortingtimeout = 500 * time.Millisecond
)

type connResult struct {
Expand Down Expand Up @@ -72,22 +73,33 @@ func (sp *ServerPicker) PickServer(parentCtx context.Context) (*Client, error) {
func (sp *ServerPicker) startConnection(ctx context.Context, resultChan chan connResult, url string) {
log.Infof("try to connecting to relay server: %s", url)
relayClient := NewClient(ctx, url, sp.TokenStore, sp.PeerID)
start := time.Now()
err := relayClient.Connect()
resultChan <- connResult{
RelayClient: relayClient,
Url: url,
Err: err,
Latency: time.Since(start),
Latency: relayClient.InitialConnectionTime,
}
}

func (sp *ServerPicker) processConnResults(resultChan chan connResult, successChan chan connResult) {
var hasSuccess bool
var bestLatencyResult connResult
bestLatencyResult.Latency = time.Hour
processingCtx := context.Background()
var processingCtxCancel context.CancelFunc
for numOfResults := 0; numOfResults < cap(resultChan); numOfResults++ {
cr := <-resultChan
var cr connResult
select {
case <-processingCtx.Done():
log.Tracef("terminating Relay server sorting early")
successChan <- bestLatencyResult
close(successChan)
successChan = nil // Prevent any more sending to successChan
// Continue receiving connections to terminate any more
cr = <-resultChan
case cr = <-resultChan:
}
if cr.Err != nil {
log.Tracef("failed to connect to Relay server: %s: %v", cr.Url, cr.Err)
continue
Expand All @@ -108,10 +120,19 @@ func (sp *ServerPicker) processConnResults(resultChan chan connResult, successCh
}
}

// First successful connection, start a timer to return the result
if !hasSuccess {
processingCtx, processingCtxCancel = context.WithTimeout(processingCtx, connectionSortingtimeout)
}
hasSuccess = true
bestLatencyResult = cr
}

processingCtxCancel()
if successChan == nil {
return
}

if bestLatencyResult.RelayClient != nil {
successChan <- bestLatencyResult
}
Expand Down

0 comments on commit 18216ba

Please sign in to comment.