Skip to content

Commit

Permalink
Make metrics more fine-grained (#2778)
Browse files Browse the repository at this point in the history
  • Loading branch information
StephenButtolph authored Feb 28, 2024
1 parent ed41bef commit 1cbea42
Showing 1 changed file with 102 additions and 43 deletions.
145 changes: 102 additions & 43 deletions network/p2p/gossip/gossip.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,11 @@ import (
)

const (
typeLabel = "type"
pushType = "push"
pullType = "pull"
typeLabel = "type"
pushType = "push"
pullType = "pull"
unsentType = "unsent"
sentType = "sent"

defaultGossipableCount = 64
)
Expand All @@ -45,6 +47,12 @@ var (
pullLabels = prometheus.Labels{
typeLabel: pullType,
}
unsentLabels = prometheus.Labels{
typeLabel: unsentType,
}
sentLabels = prometheus.Labels{
typeLabel: sentType,
}

ErrInvalidDiscardedSize = errors.New("discarded size cannot be negative")
ErrInvalidTargetGossipSize = errors.New("target gossip size cannot be negative")
Expand All @@ -70,11 +78,12 @@ type ValidatorGossiper struct {
// Metrics that are tracked across a gossip protocol. A given protocol should
// only use a single instance of Metrics.
type Metrics struct {
sentCount *prometheus.CounterVec
sentBytes *prometheus.CounterVec
receivedCount *prometheus.CounterVec
receivedBytes *prometheus.CounterVec
tracking prometheus.Gauge
sentCount *prometheus.CounterVec
sentBytes *prometheus.CounterVec
receivedCount *prometheus.CounterVec
receivedBytes *prometheus.CounterVec
tracking *prometheus.GaugeVec
trackingLifetimeAverage prometheus.Gauge
}

// NewMetrics returns a common set of metrics
Expand Down Expand Up @@ -103,10 +112,15 @@ func NewMetrics(
Name: "gossip_received_bytes",
Help: "amount of gossip received (bytes)",
}, metricLabels),
tracking: prometheus.NewGauge(prometheus.GaugeOpts{
tracking: prometheus.NewGaugeVec(prometheus.GaugeOpts{
Namespace: namespace,
Name: "gossip_tracking",
Help: "number of gossipables being tracked",
}, metricLabels),
trackingLifetimeAverage: prometheus.NewGauge(prometheus.GaugeOpts{
Namespace: namespace,
Name: "gossip_tracking_lifetime_average",
Help: "average duration a gossipable has been tracked (ns)",
}),
}
err := utils.Err(
Expand All @@ -115,6 +129,7 @@ func NewMetrics(
metrics.Register(m.receivedCount),
metrics.Register(m.receivedBytes),
metrics.Register(m.tracking),
metrics.Register(m.trackingLifetimeAverage),
)
return m, err
}
Expand Down Expand Up @@ -265,10 +280,10 @@ func NewPushGossiper[T Gossipable](
targetGossipSize: targetGossipSize,
maxRegossipFrequency: maxRegossipFrequency,

tracking: make(map[ids.ID]time.Time),
pending: buffer.NewUnboundedDeque[T](0),
issued: buffer.NewUnboundedDeque[T](0),
discarded: &cache.LRU[ids.ID, struct{}]{Size: discardedSize},
tracking: make(map[ids.ID]*tracking),
toGossip: buffer.NewUnboundedDeque[T](0),
toRegossip: buffer.NewUnboundedDeque[T](0),
discarded: &cache.LRU[ids.ID, struct{}]{Size: discardedSize},
}, nil
}

Expand All @@ -282,17 +297,31 @@ type PushGossiper[T Gossipable] struct {
targetGossipSize int
maxRegossipFrequency time.Duration

lock sync.Mutex
tracking map[ids.ID]time.Time
pending buffer.Deque[T]
issued buffer.Deque[T]
discarded *cache.LRU[ids.ID, struct{}] // discarded attempts to avoid overgossiping transactions that are frequently dropped
lock sync.Mutex
tracking map[ids.ID]*tracking
addedTimeSum float64 // unix nanoseconds
toGossip buffer.Deque[T]
toRegossip buffer.Deque[T]
discarded *cache.LRU[ids.ID, struct{}] // discarded attempts to avoid overgossiping transactions that are frequently dropped
}

type tracking struct {
addedTime float64 // unix nanoseconds
lastGossiped time.Time
}

// Gossip flushes any queued gossipables.
func (p *PushGossiper[T]) Gossip(ctx context.Context) error {
var (
now = time.Now()
nowUnixNano = float64(now.UnixNano())
)

p.lock.Lock()
defer p.lock.Unlock()
defer func() {
p.updateMetrics(nowUnixNano)
p.lock.Unlock()
}()

if len(p.tracking) == 0 {
return nil
Expand All @@ -301,75 +330,78 @@ func (p *PushGossiper[T]) Gossip(ctx context.Context) error {
var (
sentBytes = 0
gossip = make([][]byte, 0, defaultGossipableCount)
now = time.Now()
)

// Iterate over all pending gossipables (never been sent before).
// Iterate over all unsent gossipables.
for sentBytes < p.targetGossipSize {
gossipable, ok := p.pending.PopLeft()
gossipable, ok := p.toGossip.PopLeft()
if !ok {
break
}

// Ensure item is still in the set before we gossip.
gossipID := gossipable.GossipID()
tracking := p.tracking[gossipID]
if !p.set.Has(gossipID) {
delete(p.tracking, gossipID)
p.addedTimeSum -= tracking.addedTime
continue
}

bytes, err := p.marshaller.MarshalGossip(gossipable)
if err != nil {
delete(p.tracking, gossipID)
p.addedTimeSum -= tracking.addedTime
return err
}

gossip = append(gossip, bytes)
sentBytes += len(bytes)
p.issued.PushRight(gossipable)
p.tracking[gossipID] = now
p.toRegossip.PushRight(gossipable)
tracking.lastGossiped = now
}

maxLastGossipTimeToRegossip := now.Add(-p.maxRegossipFrequency)

// Iterate over all issued gossipables (have been sent before) to fill any
// remaining space in gossip batch.
// Iterate over all previously sent gossipables to fill any remaining space
// in the gossip batch.
for sentBytes < p.targetGossipSize {
gossipable, ok := p.issued.PopLeft()
gossipable, ok := p.toRegossip.PopLeft()
if !ok {
break
}

// Ensure item is still in the set before we gossip.
gossipID := gossipable.GossipID()
tracking := p.tracking[gossipID]
if !p.set.Has(gossipID) {
delete(p.tracking, gossipID)
p.discarded.Put(gossipID, struct{}{}) // only add to discarded if issued once
p.addedTimeSum -= tracking.addedTime
p.discarded.Put(gossipID, struct{}{}) // only add to discarded if previously sent
continue
}

// Ensure we don't attempt to send a gossipable too frequently.
lastGossipTime := p.tracking[gossipID]
if maxLastGossipTimeToRegossip.Before(lastGossipTime) {
if maxLastGossipTimeToRegossip.Before(tracking.lastGossiped) {
// Put the gossipable on the front of the queue to keep items sorted
// by last issuance time.
p.issued.PushLeft(gossipable)
p.toRegossip.PushLeft(gossipable)
break
}

bytes, err := p.marshaller.MarshalGossip(gossipable)
if err != nil {
// Should never happen because we've already issued this once.
// Should never happen because we've already sent this once.
delete(p.tracking, gossipID)
p.addedTimeSum -= tracking.addedTime
return err
}

gossip = append(gossip, bytes)
sentBytes += len(bytes)
p.issued.PushRight(gossipable)
p.tracking[gossipID] = now
p.toRegossip.PushRight(gossipable)
tracking.lastGossiped = now
}
p.metrics.tracking.Set(float64(len(p.tracking)))

// If there is nothing to gossip, we can exit early.
if len(gossip) == 0 {
Expand Down Expand Up @@ -400,26 +432,53 @@ func (p *PushGossiper[T]) Gossip(ctx context.Context) error {
// Add enqueues new gossipables to be pushed. If a gossiable is already tracked,
// it is not added again.
func (p *PushGossiper[T]) Add(gossipables ...T) {
var (
now = time.Now()
nowUnixNano = float64(now.UnixNano())
)

p.lock.Lock()
defer p.lock.Unlock()
defer func() {
p.updateMetrics(nowUnixNano)
p.lock.Unlock()
}()

// Add new gossipables to the pending queue.
now := time.Now()
// Add new gossipables to be sent.
for _, gossipable := range gossipables {
gossipID := gossipable.GossipID()
if _, ok := p.tracking[gossipID]; ok {
continue
}

tracking := &tracking{
addedTime: nowUnixNano,
}
if _, ok := p.discarded.Get(gossipID); ok {
// Pretend that recently discarded transactions were just gossiped.
p.tracking[gossipID] = now
p.issued.PushRight(gossipable)
tracking.lastGossiped = now
p.toRegossip.PushRight(gossipable)
} else {
p.tracking[gossipID] = time.Time{}
p.pending.PushRight(gossipable)
p.toGossip.PushRight(gossipable)
}
p.tracking[gossipID] = tracking
p.addedTimeSum += nowUnixNano
}
}

func (p *PushGossiper[_]) updateMetrics(nowUnixNano float64) {
var (
numUnsent = float64(p.toGossip.Len())
numSent = float64(p.toRegossip.Len())
numTracking = numUnsent + numSent
averageLifetime float64
)
if numTracking != 0 {
averageLifetime = nowUnixNano - p.addedTimeSum/numTracking
}
p.metrics.tracking.Set(float64(len(p.tracking)))

p.metrics.tracking.With(unsentLabels).Set(numUnsent)
p.metrics.tracking.With(sentLabels).Set(numSent)
p.metrics.trackingLifetimeAverage.Set(averageLifetime)
}

// Every calls [Gossip] every [frequency] amount of time.
Expand Down

0 comments on commit 1cbea42

Please sign in to comment.