Skip to content

Commit

Permalink
FFM-9485 Improve SSE Reconnection Logic (#133)
Browse files Browse the repository at this point in the history
  • Loading branch information
erdirowlands authored Oct 24, 2023
1 parent d3d1765 commit a551d0c
Show file tree
Hide file tree
Showing 7 changed files with 178 additions and 98 deletions.
183 changes: 112 additions & 71 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,11 @@ import (
"errors"
"fmt"
"github.com/harness/ff-golang-server-sdk/sdk_codes"
"golang.org/x/sync/errgroup"
"log"
"math/rand"
"net/http"
"strconv"
"strings"
"sync"
"sync/atomic"
Expand Down Expand Up @@ -53,6 +55,7 @@ type CfClient struct {
token string
streamConnected bool
streamConnectedLock sync.RWMutex
streamDisconnected chan struct{}
authenticated chan struct{}
postEvalChan chan evaluation.PostEvalData
initializedBool bool
Expand All @@ -78,16 +81,17 @@ func NewCfClient(sdkKey string, options ...ConfigOption) (*CfClient, error) {
analyticsService := analyticsservice.NewAnalyticsService(time.Minute, config.Logger)

client := &CfClient{
sdkKey: sdkKey,
config: config,
authenticated: make(chan struct{}),
analyticsService: analyticsService,
clusterIdentifier: "1",
postEvalChan: make(chan evaluation.PostEvalData),
stop: make(chan struct{}),
stopped: newAtomicBool(false),
initialized: make(chan struct{}),
initializedErr: make(chan error),
sdkKey: sdkKey,
config: config,
authenticated: make(chan struct{}),
analyticsService: analyticsService,
clusterIdentifier: "1",
postEvalChan: make(chan evaluation.PostEvalData),
stop: make(chan struct{}),
stopped: newAtomicBool(false),
initialized: make(chan struct{}),
initializedErr: make(chan error),
streamDisconnected: make(chan struct{}),
}

if sdkKey == "" {
Expand Down Expand Up @@ -154,6 +158,9 @@ func (c *CfClient) start() {
}()
go c.setAnalyticsServiceClient(ctx)
go c.pullCronJob(ctx)
if c.config.enableStream {
go c.stream(ctx)
}
}

// PostEvaluateProcessor push the data to the analytics service
Expand Down Expand Up @@ -186,47 +193,52 @@ func (c *CfClient) IsInitialized() (bool, error) {
return false, InitializeTimeoutError{}
}

func (c *CfClient) retrieve(ctx context.Context) bool {
ok := true
var wg sync.WaitGroup
wg.Add(2)
go func() {
defer wg.Done()
rCtx, cancel := context.WithTimeout(ctx, time.Minute)
defer cancel()
func (c *CfClient) retrieve(ctx context.Context) {
var g errgroup.Group

rCtx, cancel := context.WithTimeout(ctx, time.Minute)
defer cancel()

// First goroutine for retrieving flags.
g.Go(func() error {
err := c.retrieveFlags(rCtx)
if err != nil {
ok = false
c.config.Logger.Errorf("error while retrieving flags: %v", err.Error())
c.config.Logger.Errorf("error while retrieving flags: %v", err)
return err
}
}()
return nil
})

go func() {
defer wg.Done()
rCtx, cancel := context.WithTimeout(ctx, time.Minute)
defer cancel()
// Second goroutine for retrieving segments.
g.Go(func() error {
err := c.retrieveSegments(rCtx)
if err != nil {
ok = false
c.config.Logger.Errorf("error while retrieving segments: %v", err.Error())
c.config.Logger.Errorf("error while retrieving segments: %v", err)
return err
}
}()
wg.Wait()
if ok {
c.config.Logger.Info("Data poll finished successfully")
return nil
})

err := g.Wait()

if err != nil {
// We just log the error and continue. In the case of initialization, this means we mark the client as initialized
// if we can't poll for initial state, and default evaluations are likely to be returned.
c.config.Logger.Errorf("Data poll finished with errors: %s", err)
} else {
c.config.Logger.Error("Data poll finished with errors")
c.config.Logger.Info("Data poll finished successfully")
}

if ok {
// This flag is used by `IsInitialized` so set to true.
c.initializedBoolLock.Lock()
c.initializedBool = true
c.initializedBoolLock.Unlock()
c.initializedBoolLock.Lock()
defer c.initializedBoolLock.Unlock()

// This function is used to mark the client as "initialized" once flags and segments have been loaded,
// but it's also used for the polling thread, so we check if the client is already initialized before
// marking it as such.
if !c.initializedBool {
c.initializedBool = true
close(c.initialized)
}
return ok
}

func (c *CfClient) streamConnect(ctx context.Context) {
Expand All @@ -247,28 +259,12 @@ func (c *CfClient) streamConnect(ctx context.Context) {
// Use the SDKs http client
sseClient.Connection = c.config.httpClient

streamErr := func() {
c.config.Logger.Warnf("%s Stream disconnected. Swapping to polling mode", sdk_codes.StreamDisconnected)
c.mux.RLock()
defer c.mux.RUnlock()
c.streamConnected = false

// If an eventStreamListener has been passed to the Proxy lets notify it of the disconnected
// to let it know something is up with the stream it has been listening to
if c.config.eventStreamListener != nil {
c.config.eventStreamListener.Pub(context.Background(), stream.Event{
APIKey: c.sdkKey,
Environment: c.environmentID,
Err: stream.ErrStreamDisconnect,
})
}
}
conn := stream.NewSSEClient(c.sdkKey, c.token, sseClient, c.repository, c.api, c.config.Logger, streamErr,
c.config.eventStreamListener, c.config.proxyMode)
conn := stream.NewSSEClient(c.sdkKey, c.token, sseClient, c.repository, c.api, c.config.Logger,
c.config.eventStreamListener, c.config.proxyMode, c.streamDisconnected)

// Connect kicks off a goroutine that attempts to establish a stream connection
// while this is happening we set streamConnected to true - if any errors happen
// in this process streamConnected will be set back to false by the streamErr function
// in this process streamConnected will be set back to false by the streamDisconnected function
conn.Connect(ctx, c.environmentID, c.sdkKey)
c.streamConnected = true
}
Expand Down Expand Up @@ -303,7 +299,14 @@ func (c *CfClient) initAuthentication(ctx context.Context) error {
jitter := time.Duration(rand.Float64() * float64(currentDelay))
delayWithJitter := currentDelay + jitter

c.config.Logger.Errorf("%s Authentication failed with error: '%s'. Retrying in %v.", sdk_codes.AuthAttempt, err, delayWithJitter)
maxAttemptLog := ""
if c.config.maxAuthRetries == -1 {
maxAttemptLog = "∞"
} else {
maxAttemptLog = strconv.Itoa(c.config.maxAuthRetries)
}

c.config.Logger.Errorf("%s Authentication attempt %d of %s failed with error: '%s'. Retrying in %v.", sdk_codes.AuthAttempt, attempts, maxAttemptLog, err, delayWithJitter)
c.config.sleeper.Sleep(delayWithJitter)

currentDelay *= time.Duration(factor)
Expand All @@ -321,7 +324,7 @@ func (c *CfClient) authenticate(ctx context.Context) error {
defer c.mux.RUnlock()

// dont check err just retry
httpClient, err := rest.NewClientWithResponses(c.config.url, rest.WithHTTPClient(c.config.httpClient))
httpClient, err := rest.NewClientWithResponses(c.config.url, rest.WithHTTPClient(c.config.authHttpClient))
if err != nil {
return err
}
Expand Down Expand Up @@ -420,25 +423,64 @@ func (c *CfClient) makeTicker(interval uint) *time.Ticker {
return time.NewTicker(time.Second * time.Duration(interval))
}

func (c *CfClient) stream(ctx context.Context) {
// wait until initialized with initial state
<-c.initialized
c.config.Logger.Infof("%s Polling Stopped", sdk_codes.PollStop)
c.config.Logger.Info("Attempting to start stream")
c.streamConnect(ctx)

const maxBackoffDuration = 2 * time.Minute
backoffDuration := 2 * time.Second
for {
select {
case <-ctx.Done():
c.config.Logger.Infof("%s Stream stopped", sdk_codes.StreamStop)
return
case <-c.streamDisconnected:
c.config.Logger.Warnf("%s Stream disconnected. Swapping to polling mode", sdk_codes.StreamDisconnected)
c.mux.RLock()
c.streamConnected = false
c.mux.RUnlock()

// If an eventStreamListener has been passed to the Proxy lets notify it of the disconnected
// to let it know something is up with the stream it has been listening to
if c.config.eventStreamListener != nil {
c.config.eventStreamListener.Pub(context.Background(), stream.Event{
APIKey: c.sdkKey,
Environment: c.environmentID,
Err: stream.ErrStreamDisconnect,
})
}

time.Sleep(backoffDuration)

c.config.Logger.Info("Attempting to restart stream")
c.streamConnect(ctx)

if backoffDuration > maxBackoffDuration {
backoffDuration = maxBackoffDuration
return
}

backoffDuration *= 2

}
}
}

func (c *CfClient) pullCronJob(ctx context.Context) {
poll := func() {
c.mux.RLock()
c.config.Logger.Infof("%s Polling started, interval: %v", sdk_codes.PollStart, c.config.pullInterval)
defer c.mux.RUnlock()
if !c.streamConnected {
ok := c.retrieve(ctx)
// we should only try and start the stream after the poll succeeded to make sure we get the latest changes
if ok && c.config.enableStream {
c.config.Logger.Infof("%s Polling Stopped", sdk_codes.PollStop)
// here stream is enabled but not connected, so we attempt to reconnect
c.config.Logger.Info("Attempting to start stream")
c.streamConnect(ctx)
}
c.retrieve(ctx)
}
c.mux.RUnlock()
}
// wait until authenticated
<-c.authenticated

c.config.Logger.Infof("%s Polling started, interval: %v seconds", sdk_codes.PollStart, c.config.pullInterval)
// pull initial data
poll()

Expand All @@ -449,7 +491,6 @@ func (c *CfClient) pullCronJob(ctx context.Context) {
case <-ctx.Done():
pullingTicker.Stop()
c.config.Logger.Infof("%s Polling stopped", sdk_codes.PollStop)
c.config.Logger.Infof("%s Stream stopped", sdk_codes.StreamStop)
return
case <-pullingTicker.C:
poll()
Expand All @@ -471,7 +512,7 @@ func (c *CfClient) retrieveFlags(ctx context.Context) error {
}

if flags.JSON200 == nil {
return nil
return fmt.Errorf("%w: `%v`", FetchFlagsError, flags.HTTPResponse.Status)
}

c.repository.SetFlags(true, c.environmentID, *flags.JSON200...)
Expand Down
2 changes: 1 addition & 1 deletion client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -747,7 +747,7 @@ var FeatureConfigsResponse = func(req *http.Request) (*http.Response, error) {
}

func TestCfClient_Close(t *testing.T) {
registerResponders(AuthResponse(200, ValidAuthToken), TargetSegmentsResponse, TargetSegmentsResponse)
registerResponders(AuthResponse(200, ValidAuthToken), TargetSegmentsResponse, FeatureConfigsResponse)
client, err := newClient(&http.Client{}, ValidSDKKey, WithWaitForInitialized(true))
if err != nil {
t.Error(err)
Expand Down
45 changes: 36 additions & 9 deletions client/config.go
Original file line number Diff line number Diff line change
@@ -1,16 +1,17 @@
package client

import (
"fmt"
"github.com/harness/ff-golang-server-sdk/cache"
"github.com/harness/ff-golang-server-sdk/evaluation"
"github.com/harness/ff-golang-server-sdk/logger"
"github.com/harness/ff-golang-server-sdk/storage"
"github.com/harness/ff-golang-server-sdk/stream"
"github.com/harness/ff-golang-server-sdk/types"
"github.com/hashicorp/go-retryablehttp"
"net/http"
"os"

"github.com/harness/ff-golang-server-sdk/cache"
"github.com/harness/ff-golang-server-sdk/logger"
"github.com/harness/ff-golang-server-sdk/storage"
"github.com/hashicorp/go-retryablehttp"
"time"
)

type config struct {
Expand All @@ -21,6 +22,7 @@ type config struct {
Store storage.Storage
Logger logger.Logger
httpClient *http.Client
authHttpClient *http.Client
enableStream bool
enableStore bool
target evaluation.Target
Expand All @@ -39,9 +41,33 @@ func newDefaultConfig(log logger.Logger) *config {
defaultStore = storage.NewFileStore("defaultProject", storage.GetHarnessDir(log), log)
}

retryClient := retryablehttp.NewClient()
retryClient.RetryMax = 10
retryClient.Logger = logger.NewRetryableLogger(log)
const requestTimeout = time.Second * 30

// Authentication uses a default http client + timeout as we have our own custom retry logic for authentication.
authHttpClient := &http.Client{}
authHttpClient.Timeout = requestTimeout

// Remaining requests use a go-retryablehttp client to handle retries.
requestHttpClient := retryablehttp.NewClient()
requestHttpClient.Logger = logger.NewRetryableLogger(log)
requestHttpClient.RetryMax = 10

// Assign a custom ErrorHandler. By default, the go-retryablehttp library doesn't return the final
// network error from the server but instead reports that it has exhausted all retry attempts.
requestHttpClient.ErrorHandler = func(resp *http.Response, err error, numTries int) (*http.Response, error) {
message := ""
if resp != nil {
message = fmt.Sprintf("Error after '%d' connection attempts: '%s'", numTries, resp.Status)
}

// In practice, the error is usually nil and the response is used, but include this for any
// edge cases.
if err != nil {
message = fmt.Sprintf("Error after %d connection attempts: %v\n", numTries, err)
}

return resp, fmt.Errorf(message)
}

return &config{
url: "https://config.ff.harness.io/api/1.0",
Expand All @@ -50,7 +76,8 @@ func newDefaultConfig(log logger.Logger) *config {
Cache: defaultCache,
Store: defaultStore,
Logger: log,
httpClient: retryClient.StandardClient(),
authHttpClient: authHttpClient,
httpClient: requestHttpClient.StandardClient(),
enableStream: true,
enableStore: true,
enableAnalytics: true,
Expand Down
1 change: 1 addition & 0 deletions client/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
var (
EmptySDKKeyError = errors.New("default variation was returned")
DefaultVariationReturnedError = errors.New("default variation was returned")
FetchFlagsError = errors.New("fetching flags failed")
)

type NonRetryableAuthError struct {
Expand Down
3 changes: 2 additions & 1 deletion client/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,9 +83,10 @@ func WithStoreEnabled(val bool) ConfigOption {
}
}

// WithHTTPClient set http client for use in interactions with ff server
// WithHTTPClient set auth and http client for use in interactions with ff server
func WithHTTPClient(client *http.Client) ConfigOption {
return func(config *config) {
config.authHttpClient = client
config.httpClient = client
}
}
Expand Down
Loading

0 comments on commit a551d0c

Please sign in to comment.