Skip to content

Commit

Permalink
Add metrics for per-device collection duration, error and timeout ind…
Browse files Browse the repository at this point in the history
…icators (#22)
  • Loading branch information
treydock authored May 13, 2024
1 parent 8d9b96f commit 8b98f4b
Show file tree
Hide file tree
Showing 6 changed files with 158 additions and 42 deletions.
52 changes: 49 additions & 3 deletions collectors/hca.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ package collectors

import (
"context"
"fmt"
"math"
"strings"
"sync"
Expand All @@ -36,6 +37,9 @@ type HCACollector struct {
devices *[]InfinibandDevice
logger log.Logger
collector string
Duration *prometheus.Desc
Error *prometheus.Desc
Timeout *prometheus.Desc
PortXmitData *prometheus.Desc
PortRcvData *prometheus.Desc
PortXmitPkts *prometheus.Desc
Expand Down Expand Up @@ -70,6 +74,15 @@ type HCACollector struct {
Info *prometheus.Desc
}

type HCAMetrics struct {
duration float64
timeout float64
error float64
rcvErrDuration float64
rcvErrTimeout float64
rcvErrError float64
}

func NewHCACollector(devices *[]InfinibandDevice, runonce bool, logger log.Logger) *HCACollector {
labels := []string{"guid", "port"}
collector := "hca"
Expand All @@ -80,6 +93,12 @@ func NewHCACollector(devices *[]InfinibandDevice, runonce bool, logger log.Logge
devices: devices,
logger: log.With(logger, "collector", collector),
collector: collector,
Duration: prometheus.NewDesc(prometheus.BuildFQName(namespace, "hca", "collect_duration_seconds"),
"Duration of collection", []string{"guid", "collector"}, nil),
Error: prometheus.NewDesc(prometheus.BuildFQName(namespace, "hca", "collect_error"),
"Indicates if collect error", []string{"guid", "collector"}, nil),
Timeout: prometheus.NewDesc(prometheus.BuildFQName(namespace, "hca", "collect_timeout"),
"Indicates if collect timeout", []string{"guid", "collector"}, nil),
PortXmitData: prometheus.NewDesc(prometheus.BuildFQName(namespace, "hca", "port_transmit_data_bytes_total"),
"Infiniband HCA port PortXmitData", labels, nil),
PortRcvData: prometheus.NewDesc(prometheus.BuildFQName(namespace, "hca", "port_receive_data_bytes_total"),
Expand Down Expand Up @@ -148,6 +167,9 @@ func NewHCACollector(devices *[]InfinibandDevice, runonce bool, logger log.Logge
}

func (h *HCACollector) Describe(ch chan<- *prometheus.Desc) {
ch <- h.Duration
ch <- h.Error
ch <- h.Timeout
ch <- h.PortXmitData
ch <- h.PortRcvData
ch <- h.PortXmitPkts
Expand Down Expand Up @@ -184,7 +206,7 @@ func (h *HCACollector) Describe(ch chan<- *prometheus.Desc) {

func (h *HCACollector) Collect(ch chan<- prometheus.Metric) {
collectTime := time.Now()
counters, errors, timeouts := h.collect()
counters, metrics, errors, timeouts := h.collect()
for _, c := range counters {
if !math.IsNaN(c.PortXmitData) {
ch <- prometheus.MustNewConstMetric(h.PortXmitData, prometheus.CounterValue, c.PortXmitData, c.device.GUID, c.PortSelect)
Expand Down Expand Up @@ -273,14 +295,26 @@ func (h *HCACollector) Collect(ch chan<- prometheus.Metric) {
}
if *hcaCollectBase {
for _, device := range *h.devices {
metric := metrics[device.GUID]
ch <- prometheus.MustNewConstMetric(h.Rate, prometheus.GaugeValue, device.Rate, device.GUID)
ch <- prometheus.MustNewConstMetric(h.RawRate, prometheus.GaugeValue, device.RawRate, device.GUID)
ch <- prometheus.MustNewConstMetric(h.Info, prometheus.GaugeValue, 1, device.GUID, device.Name, device.LID)
ch <- prometheus.MustNewConstMetric(h.Duration, prometheus.GaugeValue, metric.duration, device.GUID, h.collector)
ch <- prometheus.MustNewConstMetric(h.Timeout, prometheus.GaugeValue, metric.timeout, device.GUID, h.collector)
ch <- prometheus.MustNewConstMetric(h.Error, prometheus.GaugeValue, metric.error, device.GUID, h.collector)
for port, uplink := range device.Uplinks {
ch <- prometheus.MustNewConstMetric(h.Uplink, prometheus.GaugeValue, 1, device.GUID, port, device.Name, uplink.Name, uplink.GUID, uplink.Type, uplink.PortNumber, uplink.LID)
}
}
}
if *hcaCollectRcvErr {
for _, device := range *h.devices {
metric := metrics[device.GUID]
ch <- prometheus.MustNewConstMetric(h.Duration, prometheus.GaugeValue, metric.rcvErrDuration, device.GUID, fmt.Sprintf("%s-rcv-err", h.collector))
ch <- prometheus.MustNewConstMetric(h.Timeout, prometheus.GaugeValue, metric.rcvErrTimeout, device.GUID, fmt.Sprintf("%s-rcv-err", h.collector))
ch <- prometheus.MustNewConstMetric(h.Error, prometheus.GaugeValue, metric.rcvErrError, device.GUID, fmt.Sprintf("%s-rcv-err", h.collector))
}
}
ch <- prometheus.MustNewConstMetric(collectErrors, prometheus.GaugeValue, errors, h.collector)
ch <- prometheus.MustNewConstMetric(collecTimeouts, prometheus.GaugeValue, timeouts, h.collector)
ch <- prometheus.MustNewConstMetric(collectDuration, prometheus.GaugeValue, time.Since(collectTime).Seconds(), h.collector)
Expand All @@ -289,8 +323,9 @@ func (h *HCACollector) Collect(ch chan<- prometheus.Metric) {
}
}

func (h *HCACollector) collect() ([]PerfQueryCounters, float64, float64) {
func (h *HCACollector) collect() ([]PerfQueryCounters, map[string]HCAMetrics, float64, float64) {
var counters []PerfQueryCounters
metrics := make(map[string]HCAMetrics)
var countersLock sync.Mutex
var errors, timeouts float64
limit := make(chan int, *maxConcurrent)
Expand All @@ -307,11 +342,15 @@ func (h *HCACollector) collect() ([]PerfQueryCounters, float64, float64) {
defer cancelExtended()
ports := getDevicePorts(device.Uplinks)
perfqueryPorts := strings.Join(ports, ",")
start := time.Now()
extendedOut, err := PerfqueryExec(device.GUID, perfqueryPorts, []string{"-l", "-x"}, ctxExtended)
metric := HCAMetrics{duration: time.Since(start).Seconds()}
if err == context.DeadlineExceeded {
metric.timeout = 1
level.Error(h.logger).Log("msg", "Timeout collecting extended perfquery counters", "guid", device.GUID)
timeouts++
} else if err != nil {
metric.error = 1
level.Error(h.logger).Log("msg", "Error collecting extended perfquery counters", "guid", device.GUID)
errors++
}
Expand All @@ -330,12 +369,16 @@ func (h *HCACollector) collect() ([]PerfQueryCounters, float64, float64) {
for _, deviceCounter := range deviceCounters {
ctxRcvErr, cancelRcvErr := context.WithTimeout(context.Background(), *perfqueryTimeout)
defer cancelRcvErr()
rcvErrStart := time.Now()
rcvErrOut, err := PerfqueryExec(device.GUID, deviceCounter.PortSelect, []string{"-E"}, ctxRcvErr)
metric.rcvErrDuration = time.Since(rcvErrStart).Seconds()
if err == context.DeadlineExceeded {
metric.rcvErrTimeout = 1
level.Error(h.logger).Log("msg", "Timeout collecting rcvErr perfquery counters", "guid", device.GUID)
timeouts++
continue
} else if err != nil {
metric.rcvErrError = 1
level.Error(h.logger).Log("msg", "Error collecting rcvErr perfquery counters", "guid", device.GUID)
errors++
continue
Expand All @@ -347,9 +390,12 @@ func (h *HCACollector) collect() ([]PerfQueryCounters, float64, float64) {
countersLock.Unlock()
}
}
countersLock.Lock()
metrics[device.GUID] = metric
countersLock.Unlock()
}(device)
}
wg.Wait()
close(limit)
return counters, errors, timeouts
return counters, metrics, errors, timeouts
}
20 changes: 10 additions & 10 deletions collectors/hca_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,8 +158,8 @@ func TestHCACollector(t *testing.T) {
gatherers := setupGatherer(collector)
if val, err := testutil.GatherAndCount(gatherers); err != nil {
t.Errorf("Unexpected error: %v", err)
} else if val != 55 {
t.Errorf("Unexpected collection count %d, expected 55", val)
} else if val != 61 {
t.Errorf("Unexpected collection count %d, expected 61", val)
}
if err := testutil.GatherAndCompare(gatherers, strings.NewReader(expected),
"infiniband_hca_port_excessive_buffer_overrun_errors_total", "infiniband_hca_port_link_downed_total",
Expand Down Expand Up @@ -325,8 +325,8 @@ func TestHCACollectorFull(t *testing.T) {
gatherers := setupGatherer(collector)
if val, err := testutil.GatherAndCount(gatherers); err != nil {
t.Errorf("Unexpected error: %v", err)
} else if val != 67 {
t.Errorf("Unexpected collection count %d, expected 67", val)
} else if val != 79 {
t.Errorf("Unexpected collection count %d, expected 79", val)
}
if err := testutil.GatherAndCompare(gatherers, strings.NewReader(expected),
"infiniband_hca_port_excessive_buffer_overrun_errors_total", "infiniband_hca_port_link_downed_total",
Expand Down Expand Up @@ -366,8 +366,8 @@ func TestHCACollectorError(t *testing.T) {
gatherers := setupGatherer(collector)
if val, err := testutil.GatherAndCount(gatherers); err != nil {
t.Errorf("Unexpected error: %v", err)
} else if val != 11 {
t.Errorf("Unexpected collection count %d, expected 11", val)
} else if val != 17 {
t.Errorf("Unexpected collection count %d, expected 17", val)
}
if err := testutil.GatherAndCompare(gatherers, strings.NewReader(expected),
"infiniband_hca_port_excessive_buffer_overrun_errors_total", "infiniband_hca_port_link_downed_total",
Expand All @@ -394,8 +394,8 @@ func TestHCACollectorErrorRunonce(t *testing.T) {
gatherers := setupGatherer(collector)
if val, err := testutil.GatherAndCount(gatherers); err != nil {
t.Errorf("Unexpected error: %v", err)
} else if val != 12 {
t.Errorf("Unexpected collection count %d, expected 12", val)
} else if val != 18 {
t.Errorf("Unexpected collection count %d, expected 18", val)
}
if err := testutil.GatherAndCompare(gatherers, strings.NewReader(expected),
"infiniband_hca_port_excessive_buffer_overrun_errors_total", "infiniband_hca_port_link_downed_total",
Expand All @@ -422,8 +422,8 @@ func TestHCACollectorTimeout(t *testing.T) {
gatherers := setupGatherer(collector)
if val, err := testutil.GatherAndCount(gatherers); err != nil {
t.Errorf("Unexpected error: %v", err)
} else if val != 11 {
t.Errorf("Unexpected collection count %d, expected 11", val)
} else if val != 17 {
t.Errorf("Unexpected collection count %d, expected 17", val)
}
if err := testutil.GatherAndCompare(gatherers, strings.NewReader(expected),
"infiniband_hca_port_excessive_buffer_overrun_errors_total", "infiniband_hca_port_link_downed_total",
Expand Down
35 changes: 28 additions & 7 deletions collectors/ibswinfo.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,9 @@ type IbswinfoCollector struct {
devices *[]InfinibandDevice
logger log.Logger
collector string
Duration *prometheus.Desc
Error *prometheus.Desc
Timeout *prometheus.Desc
HardwareInfo *prometheus.Desc
Uptime *prometheus.Desc
PowerSupplyStatus *prometheus.Desc
Expand All @@ -64,6 +67,9 @@ type Ibswinfo struct {
Temp float64
FanStatus string
Fans []SwitchFan
duration float64
error float64
timeout float64
}

type SwitchPowerSupply struct {
Expand All @@ -88,6 +94,12 @@ func NewIbswinfoCollector(devices *[]InfinibandDevice, runonce bool, logger log.
devices: devices,
logger: log.With(logger, "collector", collector),
collector: collector,
Duration: prometheus.NewDesc(prometheus.BuildFQName(namespace, "switch", "collect_duration_seconds"),
"Duration of collection", []string{"guid", "collector"}, nil),
Error: prometheus.NewDesc(prometheus.BuildFQName(namespace, "switch", "collect_error"),
"Indicates if collect error", []string{"guid", "collector"}, nil),
Timeout: prometheus.NewDesc(prometheus.BuildFQName(namespace, "switch", "collect_timeout"),
"Indicates if collect timeout", []string{"guid", "collector"}, nil),
HardwareInfo: prometheus.NewDesc(prometheus.BuildFQName(namespace, "switch", "hardware_info"),
"Infiniband switch hardware info", []string{"guid", "firmware_version", "psid", "part_number", "serial_number", "switch"}, nil),
Uptime: prometheus.NewDesc(prometheus.BuildFQName(namespace, "switch", "uptime_seconds"),
Expand All @@ -110,6 +122,9 @@ func NewIbswinfoCollector(devices *[]InfinibandDevice, runonce bool, logger log.
}

func (s *IbswinfoCollector) Describe(ch chan<- *prometheus.Desc) {
ch <- s.Duration
ch <- s.Error
ch <- s.Timeout
ch <- s.HardwareInfo
ch <- s.Uptime
ch <- s.PowerSupplyStatus
Expand All @@ -128,6 +143,9 @@ func (s *IbswinfoCollector) Collect(ch chan<- prometheus.Metric) {
ch <- prometheus.MustNewConstMetric(s.HardwareInfo, prometheus.GaugeValue, 1, swinfo.device.GUID,
swinfo.FirmwareVersion, swinfo.PSID, swinfo.PartNumber, swinfo.SerialNumber, swinfo.device.Name)
ch <- prometheus.MustNewConstMetric(s.Uptime, prometheus.GaugeValue, swinfo.Uptime, swinfo.device.GUID)
ch <- prometheus.MustNewConstMetric(s.Duration, prometheus.GaugeValue, swinfo.duration, swinfo.device.GUID, s.collector)
ch <- prometheus.MustNewConstMetric(s.Error, prometheus.GaugeValue, swinfo.error, swinfo.device.GUID, s.collector)
ch <- prometheus.MustNewConstMetric(s.Timeout, prometheus.GaugeValue, swinfo.timeout, swinfo.device.GUID, s.collector)
for _, psu := range swinfo.PowerSupplies {
if psu.Status != "" {
ch <- prometheus.MustNewConstMetric(s.PowerSupplyStatus, prometheus.GaugeValue, 1, swinfo.device.GUID, psu.ID, psu.Status)
Expand Down Expand Up @@ -180,16 +198,20 @@ func (s *IbswinfoCollector) collect() ([]Ibswinfo, float64, float64) {
ctxibswinfo, cancelibswinfo := context.WithTimeout(context.Background(), *ibswinfoTimeout)
defer cancelibswinfo()
level.Debug(s.logger).Log("msg", "Run ibswinfo", "lid", device.LID)
start := time.Now()
ibswinfoOut, ibswinfoErr := IbswinfoExec(device.LID, ctxibswinfo)
ibswinfoData := Ibswinfo{duration: time.Since(start).Seconds()}
if ibswinfoErr == context.DeadlineExceeded {
ibswinfoData.timeout = 1
level.Error(s.logger).Log("msg", "Timeout collecting ibswinfo data", "guid", device.GUID, "lid", device.LID)
timeouts++
} else if ibswinfoErr != nil {
ibswinfoData.error = 1
level.Error(s.logger).Log("msg", "Error collecting ibswinfo data", "err", fmt.Sprintf("%s:%s", ibswinfoErr, ibswinfoOut), "guid", device.GUID, "lid", device.LID)
errors++
}
if ibswinfoErr == nil {
ibswinfoData, err := parse_ibswinfo(ibswinfoOut, s.logger)
err := parse_ibswinfo(ibswinfoOut, &ibswinfoData, s.logger)
if err != nil {
level.Error(s.logger).Log("msg", "Error parsing ibswinfo output", "guid", device.GUID, "lid", device.LID)
errors++
Expand Down Expand Up @@ -235,8 +257,7 @@ func ibswinfo(lid string, ctx context.Context) (string, error) {
return stdout.String(), nil
}

func parse_ibswinfo(out string, logger log.Logger) (Ibswinfo, error) {
var data Ibswinfo
func parse_ibswinfo(out string, data *Ibswinfo, logger log.Logger) error {
data.Temp = math.NaN()
lines := strings.Split(out, "\n")
psus := make(map[string]SwitchPowerSupply)
Expand Down Expand Up @@ -315,7 +336,7 @@ func parse_ibswinfo(out string, logger log.Logger) (Ibswinfo, error) {
psu.PowerW = powerW
} else {
level.Error(logger).Log("msg", "Unable to parse power (W)", "err", err, "value", value)
return Ibswinfo{}, err
return err
}
}
if psuID != "" && dividerCount < 4 {
Expand All @@ -327,7 +348,7 @@ func parse_ibswinfo(out string, logger log.Logger) (Ibswinfo, error) {
data.Temp = temp
} else {
level.Error(logger).Log("msg", "Unable to parse temperature (C)", "err", err, "value", value)
return Ibswinfo{}, err
return err
}
}
if key == "fan status" && dividerCount >= 4 {
Expand All @@ -352,7 +373,7 @@ func parse_ibswinfo(out string, logger log.Logger) (Ibswinfo, error) {
fans = append(fans, fan)
} else {
level.Error(logger).Log("msg", "Unable to parse fan RPM", "err", err, "value", value)
return Ibswinfo{}, err
return err
}
}
}
Expand All @@ -362,5 +383,5 @@ func parse_ibswinfo(out string, logger log.Logger) (Ibswinfo, error) {
}
data.PowerSupplies = powerSupplies
data.Fans = fans
return data, nil
return nil
}
17 changes: 10 additions & 7 deletions collectors/ibswinfo_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,8 @@ func TestParseIBSWInfo(t *testing.T) {
if err != nil {
t.Fatal("Unable to read fixture")
}
data, err := parse_ibswinfo(out, log.NewNopLogger())
data := Ibswinfo{}
err = parse_ibswinfo(out, &data, log.NewNopLogger())
if err != nil {
t.Errorf("Unexpected error: %s", err)
}
Expand Down Expand Up @@ -101,7 +102,8 @@ func TestParseIBSWInfoFailedPSU(t *testing.T) {
if err != nil {
t.Fatal("Unable to read fixture")
}
data, err := parse_ibswinfo(out, log.NewNopLogger())
data := Ibswinfo{}
err = parse_ibswinfo(out, &data, log.NewNopLogger())
if err != nil {
t.Errorf("Unexpected error: %s", err)
}
Expand Down Expand Up @@ -189,7 +191,8 @@ func TestParseIBSWInfoErrors(t *testing.T) {
if err != nil {
t.Fatalf("Unable to read fixture %s", test)
}
_, err = parse_ibswinfo(out, log.NewNopLogger())
data := Ibswinfo{}
err = parse_ibswinfo(out, &data, log.NewNopLogger())
if err == nil {
t.Errorf("Expected an error for test %s(%d)", test, i)
}
Expand Down Expand Up @@ -282,8 +285,8 @@ func TestIbswinfoCollector(t *testing.T) {
gatherers := setupGatherer(collector)
if val, err := testutil.GatherAndCount(gatherers); err != nil {
t.Errorf("Unexpected error: %v", err)
} else if val != 44 {
t.Errorf("Unexpected collection count %d, expected 44", val)
} else if val != 50 {
t.Errorf("Unexpected collection count %d, expected 50", val)
}
if err := testutil.GatherAndCompare(gatherers, strings.NewReader(expected),
"infiniband_switch_power_supply_status_info", "infiniband_switch_power_supply_dc_power_status_info",
Expand Down Expand Up @@ -314,8 +317,8 @@ func TestIbswinfoCollectorMissingStatus(t *testing.T) {
gatherers := setupGatherer(collector)
if val, err := testutil.GatherAndCount(gatherers); err != nil {
t.Errorf("Unexpected error: %v", err)
} else if val != 37 {
t.Errorf("Unexpected collection count %d, expected 37", val)
} else if val != 43 {
t.Errorf("Unexpected collection count %d, expected 43", val)
}
}

Expand Down
Loading

0 comments on commit 8b98f4b

Please sign in to comment.