From 8b98f4b021885f3ca2c921c575ffc5232f2f8b9f Mon Sep 17 00:00:00 2001 From: treydock Date: Mon, 13 May 2024 09:59:37 -0400 Subject: [PATCH] Add metrics for per-device collection duration, error and timeout indicators (#22) --- collectors/hca.go | 52 ++++++++++++++++++++++++++++++++++--- collectors/hca_test.go | 20 +++++++------- collectors/ibswinfo.go | 35 ++++++++++++++++++++----- collectors/ibswinfo_test.go | 17 +++++++----- collectors/switch.go | 52 ++++++++++++++++++++++++++++++++++--- collectors/switch_test.go | 24 ++++++++--------- 6 files changed, 158 insertions(+), 42 deletions(-) diff --git a/collectors/hca.go b/collectors/hca.go index 03c6673..77b6e4a 100644 --- a/collectors/hca.go +++ b/collectors/hca.go @@ -15,6 +15,7 @@ package collectors import ( "context" + "fmt" "math" "strings" "sync" @@ -36,6 +37,9 @@ type HCACollector struct { devices *[]InfinibandDevice logger log.Logger collector string + Duration *prometheus.Desc + Error *prometheus.Desc + Timeout *prometheus.Desc PortXmitData *prometheus.Desc PortRcvData *prometheus.Desc PortXmitPkts *prometheus.Desc @@ -70,6 +74,15 @@ type HCACollector struct { Info *prometheus.Desc } +type HCAMetrics struct { + duration float64 + timeout float64 + error float64 + rcvErrDuration float64 + rcvErrTimeout float64 + rcvErrError float64 +} + func NewHCACollector(devices *[]InfinibandDevice, runonce bool, logger log.Logger) *HCACollector { labels := []string{"guid", "port"} collector := "hca" @@ -80,6 +93,12 @@ func NewHCACollector(devices *[]InfinibandDevice, runonce bool, logger log.Logge devices: devices, logger: log.With(logger, "collector", collector), collector: collector, + Duration: prometheus.NewDesc(prometheus.BuildFQName(namespace, "hca", "collect_duration_seconds"), + "Duration of collection", []string{"guid", "collector"}, nil), + Error: prometheus.NewDesc(prometheus.BuildFQName(namespace, "hca", "collect_error"), + "Indicates if collect error", []string{"guid", "collector"}, nil), + Timeout: prometheus.NewDesc(prometheus.BuildFQName(namespace, "hca", "collect_timeout"), + "Indicates if collect timeout", []string{"guid", "collector"}, nil), PortXmitData: prometheus.NewDesc(prometheus.BuildFQName(namespace, "hca", "port_transmit_data_bytes_total"), "Infiniband HCA port PortXmitData", labels, nil), PortRcvData: prometheus.NewDesc(prometheus.BuildFQName(namespace, "hca", "port_receive_data_bytes_total"), @@ -148,6 +167,9 @@ func NewHCACollector(devices *[]InfinibandDevice, runonce bool, logger log.Logge } func (h *HCACollector) Describe(ch chan<- *prometheus.Desc) { + ch <- h.Duration + ch <- h.Error + ch <- h.Timeout ch <- h.PortXmitData ch <- h.PortRcvData ch <- h.PortXmitPkts @@ -184,7 +206,7 @@ func (h *HCACollector) Describe(ch chan<- *prometheus.Desc) { func (h *HCACollector) Collect(ch chan<- prometheus.Metric) { collectTime := time.Now() - counters, errors, timeouts := h.collect() + counters, metrics, errors, timeouts := h.collect() for _, c := range counters { if !math.IsNaN(c.PortXmitData) { ch <- prometheus.MustNewConstMetric(h.PortXmitData, prometheus.CounterValue, c.PortXmitData, c.device.GUID, c.PortSelect) @@ -273,14 +295,26 @@ func (h *HCACollector) Collect(ch chan<- prometheus.Metric) { } if *hcaCollectBase { for _, device := range *h.devices { + metric := metrics[device.GUID] ch <- prometheus.MustNewConstMetric(h.Rate, prometheus.GaugeValue, device.Rate, device.GUID) ch <- prometheus.MustNewConstMetric(h.RawRate, prometheus.GaugeValue, device.RawRate, device.GUID) ch <- prometheus.MustNewConstMetric(h.Info, prometheus.GaugeValue, 1, device.GUID, device.Name, device.LID) + ch <- prometheus.MustNewConstMetric(h.Duration, prometheus.GaugeValue, metric.duration, device.GUID, h.collector) + ch <- prometheus.MustNewConstMetric(h.Timeout, prometheus.GaugeValue, metric.timeout, device.GUID, h.collector) + ch <- prometheus.MustNewConstMetric(h.Error, prometheus.GaugeValue, metric.error, device.GUID, h.collector) for port, uplink := range device.Uplinks { ch <- prometheus.MustNewConstMetric(h.Uplink, prometheus.GaugeValue, 1, device.GUID, port, device.Name, uplink.Name, uplink.GUID, uplink.Type, uplink.PortNumber, uplink.LID) } } } + if *hcaCollectRcvErr { + for _, device := range *h.devices { + metric := metrics[device.GUID] + ch <- prometheus.MustNewConstMetric(h.Duration, prometheus.GaugeValue, metric.rcvErrDuration, device.GUID, fmt.Sprintf("%s-rcv-err", h.collector)) + ch <- prometheus.MustNewConstMetric(h.Timeout, prometheus.GaugeValue, metric.rcvErrTimeout, device.GUID, fmt.Sprintf("%s-rcv-err", h.collector)) + ch <- prometheus.MustNewConstMetric(h.Error, prometheus.GaugeValue, metric.rcvErrError, device.GUID, fmt.Sprintf("%s-rcv-err", h.collector)) + } + } ch <- prometheus.MustNewConstMetric(collectErrors, prometheus.GaugeValue, errors, h.collector) ch <- prometheus.MustNewConstMetric(collecTimeouts, prometheus.GaugeValue, timeouts, h.collector) ch <- prometheus.MustNewConstMetric(collectDuration, prometheus.GaugeValue, time.Since(collectTime).Seconds(), h.collector) @@ -289,8 +323,9 @@ func (h *HCACollector) Collect(ch chan<- prometheus.Metric) { } } -func (h *HCACollector) collect() ([]PerfQueryCounters, float64, float64) { +func (h *HCACollector) collect() ([]PerfQueryCounters, map[string]HCAMetrics, float64, float64) { var counters []PerfQueryCounters + metrics := make(map[string]HCAMetrics) var countersLock sync.Mutex var errors, timeouts float64 limit := make(chan int, *maxConcurrent) @@ -307,11 +342,15 @@ func (h *HCACollector) collect() ([]PerfQueryCounters, float64, float64) { defer cancelExtended() ports := getDevicePorts(device.Uplinks) perfqueryPorts := strings.Join(ports, ",") + start := time.Now() extendedOut, err := PerfqueryExec(device.GUID, perfqueryPorts, []string{"-l", "-x"}, ctxExtended) + metric := HCAMetrics{duration: time.Since(start).Seconds()} if err == context.DeadlineExceeded { + metric.timeout = 1 level.Error(h.logger).Log("msg", "Timeout collecting extended perfquery counters", "guid", device.GUID) timeouts++ } else if err != nil { + metric.error = 1 level.Error(h.logger).Log("msg", "Error collecting extended perfquery counters", "guid", device.GUID) errors++ } @@ -330,12 +369,16 @@ func (h *HCACollector) collect() ([]PerfQueryCounters, float64, float64) { for _, deviceCounter := range deviceCounters { ctxRcvErr, cancelRcvErr := context.WithTimeout(context.Background(), *perfqueryTimeout) defer cancelRcvErr() + rcvErrStart := time.Now() rcvErrOut, err := PerfqueryExec(device.GUID, deviceCounter.PortSelect, []string{"-E"}, ctxRcvErr) + metric.rcvErrDuration = time.Since(rcvErrStart).Seconds() if err == context.DeadlineExceeded { + metric.rcvErrTimeout = 1 level.Error(h.logger).Log("msg", "Timeout collecting rcvErr perfquery counters", "guid", device.GUID) timeouts++ continue } else if err != nil { + metric.rcvErrError = 1 level.Error(h.logger).Log("msg", "Error collecting rcvErr perfquery counters", "guid", device.GUID) errors++ continue @@ -347,9 +390,12 @@ func (h *HCACollector) collect() ([]PerfQueryCounters, float64, float64) { countersLock.Unlock() } } + countersLock.Lock() + metrics[device.GUID] = metric + countersLock.Unlock() }(device) } wg.Wait() close(limit) - return counters, errors, timeouts + return counters, metrics, errors, timeouts } diff --git a/collectors/hca_test.go b/collectors/hca_test.go index e190b37..027a835 100644 --- a/collectors/hca_test.go +++ b/collectors/hca_test.go @@ -158,8 +158,8 @@ func TestHCACollector(t *testing.T) { gatherers := setupGatherer(collector) if val, err := testutil.GatherAndCount(gatherers); err != nil { t.Errorf("Unexpected error: %v", err) - } else if val != 55 { - t.Errorf("Unexpected collection count %d, expected 55", val) + } else if val != 61 { + t.Errorf("Unexpected collection count %d, expected 61", val) } if err := testutil.GatherAndCompare(gatherers, strings.NewReader(expected), "infiniband_hca_port_excessive_buffer_overrun_errors_total", "infiniband_hca_port_link_downed_total", @@ -325,8 +325,8 @@ func TestHCACollectorFull(t *testing.T) { gatherers := setupGatherer(collector) if val, err := testutil.GatherAndCount(gatherers); err != nil { t.Errorf("Unexpected error: %v", err) - } else if val != 67 { - t.Errorf("Unexpected collection count %d, expected 67", val) + } else if val != 79 { + t.Errorf("Unexpected collection count %d, expected 79", val) } if err := testutil.GatherAndCompare(gatherers, strings.NewReader(expected), "infiniband_hca_port_excessive_buffer_overrun_errors_total", "infiniband_hca_port_link_downed_total", @@ -366,8 +366,8 @@ func TestHCACollectorError(t *testing.T) { gatherers := setupGatherer(collector) if val, err := testutil.GatherAndCount(gatherers); err != nil { t.Errorf("Unexpected error: %v", err) - } else if val != 11 { - t.Errorf("Unexpected collection count %d, expected 11", val) + } else if val != 17 { + t.Errorf("Unexpected collection count %d, expected 17", val) } if err := testutil.GatherAndCompare(gatherers, strings.NewReader(expected), "infiniband_hca_port_excessive_buffer_overrun_errors_total", "infiniband_hca_port_link_downed_total", @@ -394,8 +394,8 @@ func TestHCACollectorErrorRunonce(t *testing.T) { gatherers := setupGatherer(collector) if val, err := testutil.GatherAndCount(gatherers); err != nil { t.Errorf("Unexpected error: %v", err) - } else if val != 12 { - t.Errorf("Unexpected collection count %d, expected 12", val) + } else if val != 18 { + t.Errorf("Unexpected collection count %d, expected 18", val) } if err := testutil.GatherAndCompare(gatherers, strings.NewReader(expected), "infiniband_hca_port_excessive_buffer_overrun_errors_total", "infiniband_hca_port_link_downed_total", @@ -422,8 +422,8 @@ func TestHCACollectorTimeout(t *testing.T) { gatherers := setupGatherer(collector) if val, err := testutil.GatherAndCount(gatherers); err != nil { t.Errorf("Unexpected error: %v", err) - } else if val != 11 { - t.Errorf("Unexpected collection count %d, expected 11", val) + } else if val != 17 { + t.Errorf("Unexpected collection count %d, expected 17", val) } if err := testutil.GatherAndCompare(gatherers, strings.NewReader(expected), "infiniband_hca_port_excessive_buffer_overrun_errors_total", "infiniband_hca_port_link_downed_total", diff --git a/collectors/ibswinfo.go b/collectors/ibswinfo.go index b1beb79..e73ea22 100644 --- a/collectors/ibswinfo.go +++ b/collectors/ibswinfo.go @@ -42,6 +42,9 @@ type IbswinfoCollector struct { devices *[]InfinibandDevice logger log.Logger collector string + Duration *prometheus.Desc + Error *prometheus.Desc + Timeout *prometheus.Desc HardwareInfo *prometheus.Desc Uptime *prometheus.Desc PowerSupplyStatus *prometheus.Desc @@ -64,6 +67,9 @@ type Ibswinfo struct { Temp float64 FanStatus string Fans []SwitchFan + duration float64 + error float64 + timeout float64 } type SwitchPowerSupply struct { @@ -88,6 +94,12 @@ func NewIbswinfoCollector(devices *[]InfinibandDevice, runonce bool, logger log. devices: devices, logger: log.With(logger, "collector", collector), collector: collector, + Duration: prometheus.NewDesc(prometheus.BuildFQName(namespace, "switch", "collect_duration_seconds"), + "Duration of collection", []string{"guid", "collector"}, nil), + Error: prometheus.NewDesc(prometheus.BuildFQName(namespace, "switch", "collect_error"), + "Indicates if collect error", []string{"guid", "collector"}, nil), + Timeout: prometheus.NewDesc(prometheus.BuildFQName(namespace, "switch", "collect_timeout"), + "Indicates if collect timeout", []string{"guid", "collector"}, nil), HardwareInfo: prometheus.NewDesc(prometheus.BuildFQName(namespace, "switch", "hardware_info"), "Infiniband switch hardware info", []string{"guid", "firmware_version", "psid", "part_number", "serial_number", "switch"}, nil), Uptime: prometheus.NewDesc(prometheus.BuildFQName(namespace, "switch", "uptime_seconds"), @@ -110,6 +122,9 @@ func NewIbswinfoCollector(devices *[]InfinibandDevice, runonce bool, logger log. } func (s *IbswinfoCollector) Describe(ch chan<- *prometheus.Desc) { + ch <- s.Duration + ch <- s.Error + ch <- s.Timeout ch <- s.HardwareInfo ch <- s.Uptime ch <- s.PowerSupplyStatus @@ -128,6 +143,9 @@ func (s *IbswinfoCollector) Collect(ch chan<- prometheus.Metric) { ch <- prometheus.MustNewConstMetric(s.HardwareInfo, prometheus.GaugeValue, 1, swinfo.device.GUID, swinfo.FirmwareVersion, swinfo.PSID, swinfo.PartNumber, swinfo.SerialNumber, swinfo.device.Name) ch <- prometheus.MustNewConstMetric(s.Uptime, prometheus.GaugeValue, swinfo.Uptime, swinfo.device.GUID) + ch <- prometheus.MustNewConstMetric(s.Duration, prometheus.GaugeValue, swinfo.duration, swinfo.device.GUID, s.collector) + ch <- prometheus.MustNewConstMetric(s.Error, prometheus.GaugeValue, swinfo.error, swinfo.device.GUID, s.collector) + ch <- prometheus.MustNewConstMetric(s.Timeout, prometheus.GaugeValue, swinfo.timeout, swinfo.device.GUID, s.collector) for _, psu := range swinfo.PowerSupplies { if psu.Status != "" { ch <- prometheus.MustNewConstMetric(s.PowerSupplyStatus, prometheus.GaugeValue, 1, swinfo.device.GUID, psu.ID, psu.Status) @@ -180,16 +198,20 @@ func (s *IbswinfoCollector) collect() ([]Ibswinfo, float64, float64) { ctxibswinfo, cancelibswinfo := context.WithTimeout(context.Background(), *ibswinfoTimeout) defer cancelibswinfo() level.Debug(s.logger).Log("msg", "Run ibswinfo", "lid", device.LID) + start := time.Now() ibswinfoOut, ibswinfoErr := IbswinfoExec(device.LID, ctxibswinfo) + ibswinfoData := Ibswinfo{duration: time.Since(start).Seconds()} if ibswinfoErr == context.DeadlineExceeded { + ibswinfoData.timeout = 1 level.Error(s.logger).Log("msg", "Timeout collecting ibswinfo data", "guid", device.GUID, "lid", device.LID) timeouts++ } else if ibswinfoErr != nil { + ibswinfoData.error = 1 level.Error(s.logger).Log("msg", "Error collecting ibswinfo data", "err", fmt.Sprintf("%s:%s", ibswinfoErr, ibswinfoOut), "guid", device.GUID, "lid", device.LID) errors++ } if ibswinfoErr == nil { - ibswinfoData, err := parse_ibswinfo(ibswinfoOut, s.logger) + err := parse_ibswinfo(ibswinfoOut, &ibswinfoData, s.logger) if err != nil { level.Error(s.logger).Log("msg", "Error parsing ibswinfo output", "guid", device.GUID, "lid", device.LID) errors++ @@ -235,8 +257,7 @@ func ibswinfo(lid string, ctx context.Context) (string, error) { return stdout.String(), nil } -func parse_ibswinfo(out string, logger log.Logger) (Ibswinfo, error) { - var data Ibswinfo +func parse_ibswinfo(out string, data *Ibswinfo, logger log.Logger) error { data.Temp = math.NaN() lines := strings.Split(out, "\n") psus := make(map[string]SwitchPowerSupply) @@ -315,7 +336,7 @@ func parse_ibswinfo(out string, logger log.Logger) (Ibswinfo, error) { psu.PowerW = powerW } else { level.Error(logger).Log("msg", "Unable to parse power (W)", "err", err, "value", value) - return Ibswinfo{}, err + return err } } if psuID != "" && dividerCount < 4 { @@ -327,7 +348,7 @@ func parse_ibswinfo(out string, logger log.Logger) (Ibswinfo, error) { data.Temp = temp } else { level.Error(logger).Log("msg", "Unable to parse temperature (C)", "err", err, "value", value) - return Ibswinfo{}, err + return err } } if key == "fan status" && dividerCount >= 4 { @@ -352,7 +373,7 @@ func parse_ibswinfo(out string, logger log.Logger) (Ibswinfo, error) { fans = append(fans, fan) } else { level.Error(logger).Log("msg", "Unable to parse fan RPM", "err", err, "value", value) - return Ibswinfo{}, err + return err } } } @@ -362,5 +383,5 @@ func parse_ibswinfo(out string, logger log.Logger) (Ibswinfo, error) { } data.PowerSupplies = powerSupplies data.Fans = fans - return data, nil + return nil } diff --git a/collectors/ibswinfo_test.go b/collectors/ibswinfo_test.go index 0c21d38..cb1eb49 100644 --- a/collectors/ibswinfo_test.go +++ b/collectors/ibswinfo_test.go @@ -34,7 +34,8 @@ func TestParseIBSWInfo(t *testing.T) { if err != nil { t.Fatal("Unable to read fixture") } - data, err := parse_ibswinfo(out, log.NewNopLogger()) + data := Ibswinfo{} + err = parse_ibswinfo(out, &data, log.NewNopLogger()) if err != nil { t.Errorf("Unexpected error: %s", err) } @@ -101,7 +102,8 @@ func TestParseIBSWInfoFailedPSU(t *testing.T) { if err != nil { t.Fatal("Unable to read fixture") } - data, err := parse_ibswinfo(out, log.NewNopLogger()) + data := Ibswinfo{} + err = parse_ibswinfo(out, &data, log.NewNopLogger()) if err != nil { t.Errorf("Unexpected error: %s", err) } @@ -189,7 +191,8 @@ func TestParseIBSWInfoErrors(t *testing.T) { if err != nil { t.Fatalf("Unable to read fixture %s", test) } - _, err = parse_ibswinfo(out, log.NewNopLogger()) + data := Ibswinfo{} + err = parse_ibswinfo(out, &data, log.NewNopLogger()) if err == nil { t.Errorf("Expected an error for test %s(%d)", test, i) } @@ -282,8 +285,8 @@ func TestIbswinfoCollector(t *testing.T) { gatherers := setupGatherer(collector) if val, err := testutil.GatherAndCount(gatherers); err != nil { t.Errorf("Unexpected error: %v", err) - } else if val != 44 { - t.Errorf("Unexpected collection count %d, expected 44", val) + } else if val != 50 { + t.Errorf("Unexpected collection count %d, expected 50", val) } if err := testutil.GatherAndCompare(gatherers, strings.NewReader(expected), "infiniband_switch_power_supply_status_info", "infiniband_switch_power_supply_dc_power_status_info", @@ -314,8 +317,8 @@ func TestIbswinfoCollectorMissingStatus(t *testing.T) { gatherers := setupGatherer(collector) if val, err := testutil.GatherAndCount(gatherers); err != nil { t.Errorf("Unexpected error: %v", err) - } else if val != 37 { - t.Errorf("Unexpected collection count %d, expected 37", val) + } else if val != 43 { + t.Errorf("Unexpected collection count %d, expected 43", val) } } diff --git a/collectors/switch.go b/collectors/switch.go index f7d3723..9259ad0 100644 --- a/collectors/switch.go +++ b/collectors/switch.go @@ -15,6 +15,7 @@ package collectors import ( "context" + "fmt" "math" "strings" "sync" @@ -36,6 +37,9 @@ type SwitchCollector struct { devices *[]InfinibandDevice logger log.Logger collector string + Duration *prometheus.Desc + Error *prometheus.Desc + Timeout *prometheus.Desc PortXmitData *prometheus.Desc PortRcvData *prometheus.Desc PortXmitPkts *prometheus.Desc @@ -70,6 +74,15 @@ type SwitchCollector struct { Info *prometheus.Desc } +type SwitchMetrics struct { + duration float64 + timeout float64 + error float64 + rcvErrDuration float64 + rcvErrTimeout float64 + rcvErrError float64 +} + func NewSwitchCollector(devices *[]InfinibandDevice, runonce bool, logger log.Logger) *SwitchCollector { labels := []string{"guid", "port"} collector := "switch" @@ -80,6 +93,12 @@ func NewSwitchCollector(devices *[]InfinibandDevice, runonce bool, logger log.Lo devices: devices, logger: log.With(logger, "collector", collector), collector: collector, + Duration: prometheus.NewDesc(prometheus.BuildFQName(namespace, "switch", "collect_duration_seconds"), + "Duration of collection", []string{"guid", "collector"}, nil), + Error: prometheus.NewDesc(prometheus.BuildFQName(namespace, "switch", "collect_error"), + "Indicates if collect error", []string{"guid", "collector"}, nil), + Timeout: prometheus.NewDesc(prometheus.BuildFQName(namespace, "switch", "collect_timeout"), + "Indicates if collect timeout", []string{"guid", "collector"}, nil), PortXmitData: prometheus.NewDesc(prometheus.BuildFQName(namespace, "switch", "port_transmit_data_bytes_total"), "Infiniband switch port PortXmitData", labels, nil), PortRcvData: prometheus.NewDesc(prometheus.BuildFQName(namespace, "switch", "port_receive_data_bytes_total"), @@ -148,6 +167,9 @@ func NewSwitchCollector(devices *[]InfinibandDevice, runonce bool, logger log.Lo } func (s *SwitchCollector) Describe(ch chan<- *prometheus.Desc) { + ch <- s.Duration + ch <- s.Error + ch <- s.Timeout ch <- s.PortXmitData ch <- s.PortRcvData ch <- s.PortXmitPkts @@ -184,7 +206,7 @@ func (s *SwitchCollector) Describe(ch chan<- *prometheus.Desc) { func (s *SwitchCollector) Collect(ch chan<- prometheus.Metric) { collectTime := time.Now() - counters, errors, timeouts := s.collect() + counters, metrics, errors, timeouts := s.collect() for _, c := range counters { if !math.IsNaN(c.PortXmitData) { ch <- prometheus.MustNewConstMetric(s.PortXmitData, prometheus.CounterValue, c.PortXmitData, c.device.GUID, c.PortSelect) @@ -273,14 +295,26 @@ func (s *SwitchCollector) Collect(ch chan<- prometheus.Metric) { } if *switchCollectBase { for _, device := range *s.devices { + metric := metrics[device.GUID] ch <- prometheus.MustNewConstMetric(s.Rate, prometheus.GaugeValue, device.Rate, device.GUID) ch <- prometheus.MustNewConstMetric(s.RawRate, prometheus.GaugeValue, device.RawRate, device.GUID) ch <- prometheus.MustNewConstMetric(s.Info, prometheus.GaugeValue, 1, device.GUID, device.Name, device.LID) + ch <- prometheus.MustNewConstMetric(s.Duration, prometheus.GaugeValue, metric.duration, device.GUID, s.collector) + ch <- prometheus.MustNewConstMetric(s.Timeout, prometheus.GaugeValue, metric.timeout, device.GUID, s.collector) + ch <- prometheus.MustNewConstMetric(s.Error, prometheus.GaugeValue, metric.error, device.GUID, s.collector) for port, uplink := range device.Uplinks { ch <- prometheus.MustNewConstMetric(s.Uplink, prometheus.GaugeValue, 1, device.GUID, port, device.Name, uplink.Name, uplink.GUID, uplink.Type, uplink.PortNumber, uplink.LID) } } } + if *switchCollectRcvErr { + for _, device := range *s.devices { + metric := metrics[device.GUID] + ch <- prometheus.MustNewConstMetric(s.Duration, prometheus.GaugeValue, metric.rcvErrDuration, device.GUID, fmt.Sprintf("%s-rcv-err", s.collector)) + ch <- prometheus.MustNewConstMetric(s.Timeout, prometheus.GaugeValue, metric.rcvErrTimeout, device.GUID, fmt.Sprintf("%s-rcv-err", s.collector)) + ch <- prometheus.MustNewConstMetric(s.Error, prometheus.GaugeValue, metric.rcvErrError, device.GUID, fmt.Sprintf("%s-rcv-err", s.collector)) + } + } ch <- prometheus.MustNewConstMetric(collectErrors, prometheus.GaugeValue, errors, s.collector) ch <- prometheus.MustNewConstMetric(collecTimeouts, prometheus.GaugeValue, timeouts, s.collector) ch <- prometheus.MustNewConstMetric(collectDuration, prometheus.GaugeValue, time.Since(collectTime).Seconds(), s.collector) @@ -289,8 +323,9 @@ func (s *SwitchCollector) Collect(ch chan<- prometheus.Metric) { } } -func (s *SwitchCollector) collect() ([]PerfQueryCounters, float64, float64) { +func (s *SwitchCollector) collect() ([]PerfQueryCounters, map[string]SwitchMetrics, float64, float64) { var counters []PerfQueryCounters + metrics := make(map[string]SwitchMetrics) var countersLock sync.Mutex var errors, timeouts float64 limit := make(chan int, *maxConcurrent) @@ -307,11 +342,15 @@ func (s *SwitchCollector) collect() ([]PerfQueryCounters, float64, float64) { defer cancelExtended() ports := getDevicePorts(device.Uplinks) perfqueryPorts := strings.Join(ports, ",") + start := time.Now() extendedOut, err := PerfqueryExec(device.GUID, perfqueryPorts, []string{"-l", "-x"}, ctxExtended) + metric := SwitchMetrics{duration: time.Since(start).Seconds()} if err == context.DeadlineExceeded { + metric.timeout = 1 level.Error(s.logger).Log("msg", "Timeout collecting extended perfquery counters", "guid", device.GUID) timeouts++ } else if err != nil { + metric.error = 1 level.Error(s.logger).Log("msg", "Error collecting extended perfquery counters", "guid", device.GUID) errors++ } @@ -330,12 +369,16 @@ func (s *SwitchCollector) collect() ([]PerfQueryCounters, float64, float64) { for _, deviceCounter := range deviceCounters { ctxRcvErr, cancelRcvErr := context.WithTimeout(context.Background(), *perfqueryTimeout) defer cancelRcvErr() + rcvErrStart := time.Now() rcvErrOut, err := PerfqueryExec(device.GUID, deviceCounter.PortSelect, []string{"-E"}, ctxRcvErr) + metric.rcvErrDuration = time.Since(rcvErrStart).Seconds() if err == context.DeadlineExceeded { + metric.rcvErrTimeout = 1 level.Error(s.logger).Log("msg", "Timeout collecting rcvErr perfquery counters", "guid", device.GUID) timeouts++ continue } else if err != nil { + metric.rcvErrError = 1 level.Error(s.logger).Log("msg", "Error collecting rcvErr perfquery counters", "guid", device.GUID) errors++ continue @@ -347,9 +390,12 @@ func (s *SwitchCollector) collect() ([]PerfQueryCounters, float64, float64) { countersLock.Unlock() } } + countersLock.Lock() + metrics[device.GUID] = metric + countersLock.Unlock() }(device) } wg.Wait() close(limit) - return counters, errors, timeouts + return counters, metrics, errors, timeouts } diff --git a/collectors/switch_test.go b/collectors/switch_test.go index 9f3a8d9..612352c 100644 --- a/collectors/switch_test.go +++ b/collectors/switch_test.go @@ -167,8 +167,8 @@ func TestSwitchCollector(t *testing.T) { gatherers := setupGatherer(collector) if val, err := testutil.GatherAndCount(gatherers); err != nil { t.Errorf("Unexpected error: %v", err) - } else if val != 79 { - t.Errorf("Unexpected collection count %d, expected 79", val) + } else if val != 85 { + t.Errorf("Unexpected collection count %d, expected 85", val) } if err := testutil.GatherAndCompare(gatherers, strings.NewReader(expected), "infiniband_switch_port_excessive_buffer_overrun_errors_total", "infiniband_switch_port_link_downed_total", @@ -364,8 +364,8 @@ func TestSwitchCollectorFull(t *testing.T) { gatherers := setupGatherer(collector) if val, err := testutil.GatherAndCount(gatherers); err != nil { t.Errorf("Unexpected error: %v", err) - } else if val != 97 { - t.Errorf("Unexpected collection count %d, expected 97", val) + } else if val != 109 { + t.Errorf("Unexpected collection count %d, expected 109", val) } if err := testutil.GatherAndCompare(gatherers, strings.NewReader(expected), "infiniband_switch_port_excessive_buffer_overrun_errors_total", "infiniband_switch_port_link_downed_total", @@ -435,8 +435,8 @@ func TestSwitchCollectorNoBase(t *testing.T) { gatherers := setupGatherer(collector) if val, err := testutil.GatherAndCount(gatherers); err != nil { t.Errorf("Unexpected error: %v", err) - } else if val != 21 { - t.Errorf("Unexpected collection count %d, expected 21", val) + } else if val != 27 { + t.Errorf("Unexpected collection count %d, expected 27", val) } if err := testutil.GatherAndCompare(gatherers, strings.NewReader(expected), "infiniband_switch_port_excessive_buffer_overrun_errors_total", "infiniband_switch_port_link_downed_total", @@ -476,8 +476,8 @@ func TestSwitchCollectorError(t *testing.T) { gatherers := setupGatherer(collector) if val, err := testutil.GatherAndCount(gatherers); err != nil { t.Errorf("Unexpected error: %v", err) - } else if val != 13 { - t.Errorf("Unexpected collection count %d, expected 13", val) + } else if val != 19 { + t.Errorf("Unexpected collection count %d, expected 19", val) } if err := testutil.GatherAndCompare(gatherers, strings.NewReader(expected), "infiniband_switch_port_excessive_buffer_overrun_errors_total", "infiniband_switch_port_link_downed_total", @@ -504,8 +504,8 @@ func TestSwitchCollectorErrorRunonce(t *testing.T) { gatherers := setupGatherer(collector) if val, err := testutil.GatherAndCount(gatherers); err != nil { t.Errorf("Unexpected error: %v", err) - } else if val != 14 { - t.Errorf("Unexpected collection count %d, expected 14", val) + } else if val != 20 { + t.Errorf("Unexpected collection count %d, expected 20", val) } if err := testutil.GatherAndCompare(gatherers, strings.NewReader(expected), "infiniband_switch_port_excessive_buffer_overrun_errors_total", "infiniband_switch_port_link_downed_total", @@ -532,8 +532,8 @@ func TestSwitchCollectorTimeout(t *testing.T) { gatherers := setupGatherer(collector) if val, err := testutil.GatherAndCount(gatherers); err != nil { t.Errorf("Unexpected error: %v", err) - } else if val != 13 { - t.Errorf("Unexpected collection count %d, expected 13", val) + } else if val != 19 { + t.Errorf("Unexpected collection count %d, expected 19", val) } if err := testutil.GatherAndCompare(gatherers, strings.NewReader(expected), "infiniband_switch_port_excessive_buffer_overrun_errors_total", "infiniband_switch_port_link_downed_total",