Skip to content

Commit

Permalink
Merge pull request #111 from weaveworks/limit-route-calculation-with-…
Browse files Browse the repository at this point in the history
…ticker

Reduce the rate of route recalculation to reduce CPU load
  • Loading branch information
bboreham authored Oct 11, 2019
2 parents 5a78851 + d743780 commit 8889a80
Showing 1 changed file with 43 additions and 24 deletions.
67 changes: 43 additions & 24 deletions routes.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package mesh
import (
"math"
"sync"
"time"
)

type unicastRoutes map[PeerName]PeerName
Expand All @@ -11,23 +12,29 @@ type broadcastRoutes map[PeerName][]PeerName
// routes aggregates unicast and broadcast routes for our peer.
type routes struct {
sync.RWMutex
ourself *localPeer
peers *Peers
onChange []func()
unicast unicastRoutes
unicastAll unicastRoutes // [1]
broadcast broadcastRoutes
broadcastAll broadcastRoutes // [1]
recalc chan<- *struct{}
wait chan chan struct{}
action chan<- func()
ourself *localPeer
peers *Peers
onChange []func()
unicast unicastRoutes
unicastAll unicastRoutes // [1]
broadcast broadcastRoutes
broadcastAll broadcastRoutes // [1]
recalcTimer *time.Timer
pendingRecalc bool
wait chan chan struct{}
action chan<- func()
// [1] based on *all* connections, not just established &
// symmetric ones
}

const (
// We defer recalculation requests by up to 100ms, in order to
// coalesce multiple recalcs together.
recalcDeferTime = 100 * time.Millisecond
)

// newRoutes returns a usable Routes based on the LocalPeer and existing Peers.
func newRoutes(ourself *localPeer, peers *Peers) *routes {
recalculate := make(chan *struct{}, 1)
wait := make(chan chan struct{})
action := make(chan func())
r := &routes{
Expand All @@ -37,11 +44,12 @@ func newRoutes(ourself *localPeer, peers *Peers) *routes {
unicastAll: unicastRoutes{ourself.Name: UnknownPeerName},
broadcast: broadcastRoutes{ourself.Name: []PeerName{}},
broadcastAll: broadcastRoutes{ourself.Name: []PeerName{}},
recalc: recalculate,
recalcTimer: time.NewTimer(time.Hour),
wait: wait,
action: action,
}
go r.run(recalculate, wait, action)
r.recalcTimer.Stop()
go r.run(wait, action)
return r
}

Expand Down Expand Up @@ -156,13 +164,18 @@ func (r *routes) randomNeighbours(except PeerName) []PeerName {
// can effectively be made synchronous with a subsequent call to
// EnsureRecalculated.
func (r *routes) recalculate() {
// The use of a 1-capacity channel in combination with the
// non-blocking send is an optimisation that results in multiple
// requests being coalesced.
select {
case r.recalc <- nil:
default:
r.Lock()
if !r.pendingRecalc {
r.recalcTimer.Reset(recalcDeferTime)
r.pendingRecalc = true
}
r.Unlock()
}

func (r *routes) clearPendingRecalcFlag() {
r.Lock()
r.pendingRecalc = false
r.Unlock()
}

// EnsureRecalculated waits for any preceding Recalculate requests to finish.
Expand All @@ -178,16 +191,20 @@ func (r *routes) ensureRecalculated() {
<-done
}

func (r *routes) run(recalculate <-chan *struct{}, wait <-chan chan struct{}, action <-chan func()) {
func (r *routes) run(wait <-chan chan struct{}, action <-chan func()) {
for {
select {
case <-recalculate:
case <-r.recalcTimer.C:
r.clearPendingRecalcFlag()
r.calculate()
case done := <-wait:
select {
case <-recalculate:
r.Lock()
pending := r.pendingRecalc
r.Unlock()
if pending {
<-r.recalcTimer.C
r.clearPendingRecalcFlag()
r.calculate()
default:
}
close(done)
case f := <-action:
Expand All @@ -196,6 +213,8 @@ func (r *routes) run(recalculate <-chan *struct{}, wait <-chan chan struct{}, ac
}
}

// Calculate unicast and broadcast routes from r.ourself, and reset
// the broadcast route cache.
func (r *routes) calculate() {
r.peers.RLock()
r.ourself.RLock()
Expand Down

0 comments on commit 8889a80

Please sign in to comment.