Skip to content

Commit

Permalink
[feature] add outlier ejection (#579)
Browse files Browse the repository at this point in the history
* Add basic outlier ejection

* Add half-open nodes to token result

* Removed map of nodeID to IP address

* Add outlier ejection examples

* Fix go-micro outlier adapter

* Fix kitex outlier adapter

* Fix kratos outlier adapter

* Move outlier ejection examples

* Fix bugs of outlier recycler

* Fix OnCompleted of outlier MetricStatSlot

* Add unit tests for outlier ejection

* Add unit tests for rule_manager.go

* Fix bugs of outlier unit tests

* Fix data race of outlier unit tests

* Update go version

* Fix bugs of micro adapter
  • Loading branch information
wooyang2018 authored Oct 28, 2024
1 parent f13247d commit b8d94d1
Show file tree
Hide file tree
Showing 48 changed files with 8,134 additions and 454 deletions.
16 changes: 15 additions & 1 deletion api/tracer.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,10 @@
package api

import (
"github.com/pkg/errors"

"github.com/alibaba/sentinel-golang/core/base"
"github.com/alibaba/sentinel-golang/logging"
"github.com/pkg/errors"
)

// TraceError records the provided error to the given SentinelEntry.
Expand All @@ -34,3 +35,16 @@ func TraceError(entry *base.SentinelEntry, err error) {

entry.SetError(err)
}

func TraceCallee(entry *base.SentinelEntry, address string) {
defer func() {
if e := recover(); e != nil {
logging.Error(errors.Errorf("%+v", e), "Failed to api.TraceCallee()")
return
}
}()
if entry == nil || address == "" {
return
}
entry.SetPair("address", address)
}
16 changes: 16 additions & 0 deletions core/base/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,22 @@ func (ctx *EntryContext) Rt() uint64 {
return ctx.rt
}

func (ctx *EntryContext) FilterNodes() []string {
return ctx.RuleCheckResult.FilterNodes()
}

func (ctx *EntryContext) HalfOpenNodes() []string {
return ctx.RuleCheckResult.HalfOpenNodes()
}

func (ctx *EntryContext) SetPair(key, val interface{}) {
ctx.Data[key] = val
}

func (ctx *EntryContext) GetPair(key interface{}) interface{} {
return ctx.Data[key]
}

func NewEmptyEntryContext() *EntryContext {
return &EntryContext{}
}
Expand Down
9 changes: 8 additions & 1 deletion core/base/entry.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,9 @@ package base
import (
"sync"

"github.com/alibaba/sentinel-golang/logging"
"github.com/pkg/errors"

"github.com/alibaba/sentinel-golang/logging"
)

type ExitHandler func(entry *SentinelEntry, ctx *EntryContext) error
Expand Down Expand Up @@ -55,6 +56,12 @@ func (e *SentinelEntry) SetError(err error) {
}
}

func (e *SentinelEntry) SetPair(key, val interface{}) {
if e.ctx != nil {
e.ctx.SetPair(key, val)
}
}

func (e *SentinelEntry) Context() *EntryContext {
return e.ctx
}
Expand Down
22 changes: 20 additions & 2 deletions core/base/result.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,8 +84,10 @@ func (s TokenResultStatus) String() string {
type TokenResult struct {
status TokenResultStatus

blockErr *BlockError
nanosToWait time.Duration
blockErr *BlockError
nanosToWait time.Duration
filterNodes []string
halfOpenNodes []string
}

func (r *TokenResult) DeepCopyFrom(newResult *TokenResult) {
Expand Down Expand Up @@ -154,6 +156,22 @@ func (r *TokenResult) NanosToWait() time.Duration {
return r.nanosToWait
}

func (r *TokenResult) FilterNodes() []string {
return r.filterNodes
}

func (r *TokenResult) HalfOpenNodes() []string {
return r.halfOpenNodes
}

func (r *TokenResult) SetFilterNodes(nodes []string) {
r.filterNodes = nodes
}

func (r *TokenResult) SetHalfOpenNodes(nodes []string) {
r.halfOpenNodes = nodes
}

func (r *TokenResult) String() string {
var blockMsg string
if r.blockErr == nil {
Expand Down
21 changes: 11 additions & 10 deletions core/circuitbreaker/rule_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,10 @@ import (
"reflect"
"sync"

"github.com/pkg/errors"

"github.com/alibaba/sentinel-golang/logging"
"github.com/alibaba/sentinel-golang/util"
"github.com/pkg/errors"
)

type CircuitBreakerGenFunc func(r *Rule, reuseStat interface{}) (CircuitBreaker, error)
Expand Down Expand Up @@ -262,7 +263,7 @@ func onRuleUpdate(rawResRulesMap map[string][]*Rule) (err error) {

newBreakers := make(map[string][]CircuitBreaker, len(validResRulesMap))
for res, resRules := range validResRulesMap {
newCbsOfRes := buildResourceCircuitBreaker(res, resRules, breakersClone[res])
newCbsOfRes := BuildResourceCircuitBreaker(res, resRules, breakersClone[res])
if len(newCbsOfRes) > 0 {
newBreakers[res] = newCbsOfRes
}
Expand All @@ -275,7 +276,7 @@ func onRuleUpdate(rawResRulesMap map[string][]*Rule) (err error) {
currentRules = rawResRulesMap

logging.Debug("[CircuitBreaker onRuleUpdate] Time statistics(ns) for updating circuit breaker rule", "timeCost", util.CurrentTimeNano()-start)
logRuleUpdate(validResRulesMap)
LogRuleUpdate(validResRulesMap)
return nil
}

Expand Down Expand Up @@ -305,7 +306,7 @@ func onResourceRuleUpdate(res string, rawResRules []*Rule) (err error) {
oldResCbs = append(oldResCbs, breakers[res]...)
updateMux.RUnlock()

newCbsOfRes := buildResourceCircuitBreaker(res, rawResRules, oldResCbs)
newCbsOfRes := BuildResourceCircuitBreaker(res, rawResRules, oldResCbs)

updateMux.Lock()
if len(newCbsOfRes) == 0 {
Expand Down Expand Up @@ -341,7 +342,7 @@ func rulesFrom(rm map[string][]*Rule) []*Rule {
return rules
}

func logRuleUpdate(m map[string][]*Rule) {
func LogRuleUpdate(m map[string][]*Rule) {
rs := rulesFrom(m)
if len(rs) == 0 {
logging.Info("[CircuitBreakerRuleManager] Circuit breaking rules were cleared")
Expand Down Expand Up @@ -399,12 +400,12 @@ func ClearRulesOfResource(res string) error {
return err
}

// buildResourceCircuitBreaker builds CircuitBreaker slice from rules. the resource of rules must be equals to res
func buildResourceCircuitBreaker(res string, rulesOfRes []*Rule, oldResCbs []CircuitBreaker) []CircuitBreaker {
// BuildResourceCircuitBreaker builds CircuitBreaker slice from rules. the resource of rules must be equals to res
func BuildResourceCircuitBreaker(res string, rulesOfRes []*Rule, oldResCbs []CircuitBreaker) []CircuitBreaker {
newCbsOfRes := make([]CircuitBreaker, 0, len(rulesOfRes))
for _, r := range rulesOfRes {
if res != r.Resource {
logging.Error(errors.Errorf("unmatched resource name expect: %s, actual: %s", res, r.Resource), "Unmatched resource name in circuitBreaker.buildResourceCircuitBreaker()", "rule", r)
logging.Error(errors.Errorf("unmatched resource name expect: %s, actual: %s", res, r.Resource), "Unmatched resource name in circuitBreaker.BuildResourceCircuitBreaker()", "rule", r)
continue
}
equalIdx, reuseStatIdx := calculateReuseIndexFor(r, oldResCbs)
Expand All @@ -421,7 +422,7 @@ func buildResourceCircuitBreaker(res string, rulesOfRes []*Rule, oldResCbs []Cir

generator := cbGenFuncMap[r.Strategy]
if generator == nil {
logging.Warn("[CircuitBreaker buildResourceCircuitBreaker] Ignoring the rule due to unsupported circuit breaking strategy", "rule", r)
logging.Warn("[CircuitBreaker BuildResourceCircuitBreaker] Ignoring the rule due to unsupported circuit breaking strategy", "rule", r)
continue
}

Expand All @@ -433,7 +434,7 @@ func buildResourceCircuitBreaker(res string, rulesOfRes []*Rule, oldResCbs []Cir
cb, e = generator(r, nil)
}
if cb == nil || e != nil {
logging.Warn("[CircuitBreaker buildResourceCircuitBreaker] Ignoring the rule due to bad generated circuit breaker", "rule", r, "err", e.Error())
logging.Warn("[CircuitBreaker BuildResourceCircuitBreaker] Ignoring the rule due to bad generated circuit breaker", "rule", r, "err", e.Error())
continue
}

Expand Down
114 changes: 114 additions & 0 deletions core/outlier/recycler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
// Copyright 1999-2020 Alibaba Group Holding Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package outlier

import (
"errors"
"fmt"
"sync"
"time"

"github.com/alibaba/sentinel-golang/logging"
)

const capacity = 200

var (
// resource name ---> node recycler
recyclers = make(map[string]*Recycler)
recyclerMutex = new(sync.Mutex)
recyclerCh = make(chan task, capacity)
)

type task struct {
nodes []string
resource string
}

func init() {
go func() {
defer func() {
if err := recover(); err != nil {
logging.Error(fmt.Errorf("%+v", err), "Unexpected panic when consuming recyclerCh")
}
}()
for task := range recyclerCh {
recycler := getRecyclerOfResource(task.resource)
recycler.scheduleNodes(task.nodes)
}
}()
}

// Recycler recycles node instance that have been invalidated for a long time
type Recycler struct {
resource string
interval time.Duration
status map[string]bool
mtx sync.Mutex
}

func getRecyclerOfResource(resource string) *Recycler {
recyclerMutex.Lock()
defer recyclerMutex.Unlock()
if _, ok := recyclers[resource]; !ok {
recycler := &Recycler{
resource: resource,
status: make(map[string]bool),
}
rule := getOutlierRuleOfResource(resource)
if rule == nil {
logging.Error(errors.New("nil outlier rule"), "Nil outlier rule in getRecyclerOfResource()")
} else {
if rule.RecycleIntervalS == 0 {
recycler.interval = 10 * time.Minute
} else {
recycler.interval = time.Duration(rule.RecycleIntervalS * 1e9)
}
}
recyclers[resource] = recycler
}
return recyclers[resource]
}

func (r *Recycler) scheduleNodes(nodes []string) {
r.mtx.Lock()
defer r.mtx.Unlock()
for _, node := range nodes {
if _, ok := r.status[node]; !ok {
r.status[node] = false
nodeCopy := node // Copy values to correctly capture the closure for node.
time.AfterFunc(r.interval, func() {
r.recycle(nodeCopy)
})
}
}
}

func (r *Recycler) recover(node string) {
r.mtx.Lock()
defer r.mtx.Unlock()
if _, ok := r.status[node]; ok {
r.status[node] = true
}
}

func (r *Recycler) recycle(node string) {
r.mtx.Lock()
defer r.mtx.Unlock()
if v, ok := r.status[node]; ok && !v {
deleteNodeBreakerOfResource(r.resource, node)
}
delete(r.status, node)
}
Loading

0 comments on commit b8d94d1

Please sign in to comment.