From 3ff3d5384c3b07da875529c15674449077443d6a Mon Sep 17 00:00:00 2001 From: Carsten Zeumer Date: Fri, 3 Jun 2022 09:51:08 +0200 Subject: [PATCH] refactor prometheus metrics --- ossec/agent.go | 67 ++++---- ossec/prometheus.go | 381 +++++++++++++++++++++++++++++++------------- 2 files changed, 299 insertions(+), 149 deletions(-) diff --git a/ossec/agent.go b/ossec/agent.go index b66b86f..7466a57 100644 --- a/ossec/agent.go +++ b/ossec/agent.go @@ -27,7 +27,6 @@ import ( "github.com/autonubil/go-wazuh/sysinfo" "github.com/joncrlsn/dque" "github.com/matishsiao/goInfo" - "github.com/prometheus/client_golang/prometheus" ) // EncryptionMethod supported transport encryption @@ -102,9 +101,6 @@ type Client struct { evtCount uint64 sentCount uint64 receivedCount uint64 - connectionAttempts uint64 - connectionsOpened uint64 - connectionsClose uint64 cOrigSize uint cCompSize uint sessionID int64 @@ -127,7 +123,6 @@ type Client struct { CurrentRemoteFile *RemoteFileInfo un *goInfo.GoInfoObject osInfo *sysinfo.OS - collector *agentCollector } type FileUpdatedEvent struct { @@ -326,14 +321,7 @@ func NewAgent(server string, agentID string, agentName string, agentKey string, return nil, err } - collector := newAgentCollector(a) - err = prometheus.Register(collector) - if err == nil { - a.collector = collector - } else { - a.logger.Debug("registerPrometheus", zap.Any("agentId", a.AgentID), zap.Error(err)) - } - + AgentCollector.Register(a) return a, nil } @@ -405,7 +393,7 @@ func (a *Client) close(sendCloseMsg bool) error { err := a.writeMessage(msg) if err != nil { a.connected = false - a.conn.Close() + AgentCollector.Disconnect(a) return err } } @@ -413,12 +401,13 @@ func (a *Client) close(sendCloseMsg bool) error { a.sentBytes = 0 a.sessionID = 0 a.receivedBytes = 0 - a.connectionsClose = a.connectionsClose + 1 + AgentCollector.Disconnect(a) return a.conn.Close() } a.receivedBytes = 0 a.sentBytes = 0 a.sessionID = 0 + AgentCollector.Disconnect(a) return nil } @@ -426,9 +415,7 @@ func (a *Client) close(sendCloseMsg bool) error { // Any blocked Read or Write operations will be unblocked and return errors. func (a *Client) Close() error { err := a.close(true) - if a.collector != nil { - prometheus.Unregister(a.collector) - } + AgentCollector.Unregister(a) return err } @@ -621,13 +608,15 @@ func (a *Client) writeMessage(msg string) error { // ensure ratelimit is honored prev := time.Now() now := a.rateLimit.Take() - ret, err := a.conn.Write(encryptedMsg) - a.sentBytes += uint64(ret) - a.sentBytesTotal += uint64(ret) + written, err := a.conn.Write(encryptedMsg) + a.sentBytes += uint64(written) + a.sentBytesTotal += uint64(written) + AgentCollector.BytesSent(a, written) - if err != nil { + if err != nil || written == 0 { + AgentCollector.MessageError(a, 1) if a.logger != nil { - a.logger.Warn("writeMessage", zap.Any("agentId", a.AgentID), zap.String("msg", msg), zap.Int("result", ret), zap.Uint64("sentBytes", a.sentBytes), zap.Uint64("sentBytesTotal", a.sentBytesTotal), zap.Duration("rateWait", now.Sub(prev)), zap.Uint("globalCount", a.globalCount), zap.Uint("localCount", a.localCount), zap.Uint64("evtCount", a.evtCount), zap.Uint64("sentCount", a.sentCount), zap.Uint64("receivedCount", a.receivedCount), zap.Error(err)) + a.logger.Warn("writeMessage", zap.Any("agentId", a.AgentID), zap.String("msg", msg), zap.Int("result", written), zap.Uint64("sentBytes", a.sentBytes), zap.Uint64("sentBytesTotal", a.sentBytesTotal), zap.Duration("rateWait", now.Sub(prev)), zap.Uint("globalCount", a.globalCount), zap.Uint("localCount", a.localCount), zap.Uint64("evtCount", a.evtCount), zap.Uint64("sentCount", a.sentCount), zap.Uint64("receivedCount", a.receivedCount), zap.Error(err)) } err2 := a.close(false) if err2 != nil { @@ -636,9 +625,11 @@ func (a *Client) writeMessage(msg string) error { return err } a.sentCount++ - time.Sleep(25 * time.Millisecond) + AgentCollector.MessagesSent(a, 1) + // time.Sleep(25 * time.Millisecond) + if a.logger != nil { - a.logger.Debug("writeMessage", zap.Any("agentId", a.AgentID), zap.String("msg", msg), zap.Int("result", ret), zap.Uint64("sentBytes", a.sentBytes), zap.Uint64("sentBytesTotal", a.sentBytesTotal), zap.Duration("rateWait", now.Sub(prev)), zap.Uint("globalCount", a.globalCount), zap.Uint("localCount", a.localCount), zap.Uint64("evtCount", a.evtCount), zap.Uint64("sentCount", a.sentCount), zap.Uint64("receivedCount", a.receivedCount)) + a.logger.Debug("writeMessage", zap.Any("agentId", a.AgentID), zap.String("msg", msg), zap.Int("result", written), zap.Uint64("sentBytes", a.sentBytes), zap.Uint64("sentBytesTotal", a.sentBytesTotal), zap.Duration("rateWait", now.Sub(prev)), zap.Uint("globalCount", a.globalCount), zap.Uint("localCount", a.localCount), zap.Uint64("evtCount", a.evtCount), zap.Uint64("sentCount", a.sentCount), zap.Uint64("receivedCount", a.receivedCount)) } return nil } @@ -707,6 +698,7 @@ func (a *Client) readServerResponse(timeout time.Duration) error { nRead, err := a.conn.Read(buffer) a.receivedBytes = a.receivedBytes + uint64(nRead) a.receivedBytesTotal = a.receivedBytesTotal + uint64(nRead) + AgentCollector.BytesRead(a, nRead) // a.logger.Info("read", zap.Any("agentId", a.AgentID), zap.Any("deadline", deadline), zap.Int("read", nRead), zap.Int("readSoFar", totallyRead), zap.Error(err)) if nRead == 0 { if oe, ok := err.(*net.OpError); ok && oe.Err != os.ErrDeadlineExceeded { @@ -750,6 +742,7 @@ func (a *Client) readServerResponse(timeout time.Duration) error { var msgSize uint32 if !a.UDP { if len(rawMsg) < 4 { + AgentCollector.MessageError(a, 1) return errors.New("message too short") } msgSize = binary.LittleEndian.Uint32(rawMsg) @@ -760,12 +753,14 @@ func (a *Client) readServerResponse(timeout time.Duration) error { } if int(msgSize) > len(rawMsg) { + AgentCollector.MessageError(a, 1) return errors.New("message exceeds buffer") } a.evtCount++ msg, err := a.decryptMessage(rawMsg[:msgSize], msgSize) // fmt.Printf("%d\t%d\t'%s'\t%s\n", totallyRead, msgSize, msg, err) if err != nil { + AgentCollector.MessageError(a, 1) return err } a.receivedCount++ @@ -774,10 +769,12 @@ func (a *Client) readServerResponse(timeout time.Duration) error { msg = msg[32:] globalCount, err := strconv.Atoi(msg[5:15]) if err != nil { + AgentCollector.MessageError(a, 1) return err } localCount, err := strconv.Atoi(msg[16:20]) if err != nil { + AgentCollector.MessageError(a, 1) return err } msg = msg[21:] @@ -787,7 +784,7 @@ func (a *Client) readServerResponse(timeout time.Duration) error { // normal status, nothing to report } else if globalCountU > a.globalCount || (globalCountU == a.globalCount && localCountU > a.localCount) { a.logger.Debug(fmt.Sprintf("Updated to remote counters %d:%d (%d:%d)", localCountU, globalCountU, a.localCount, a.globalCount), zap.Skip()) - // move one ahaed + // move one ahead localCountU++ } else { a.logger.Info(fmt.Sprintf("Unexpected counter %d:%d (%d:%d)", localCountU, globalCountU, a.localCount, a.globalCount), zap.Skip()) @@ -796,6 +793,7 @@ func (a *Client) readServerResponse(timeout time.Duration) error { a.globalCount = globalCountU a.WriteClientCounter() + AgentCollector.MessagesReceived(a, 1) // rand1 := msg[:5] //fmt.Printf("packet-received: bytes=%d (%s:%d:%d) '%s'\n", nRead, rand1, globalCount, localCount, msg) // empty buffer for next read @@ -810,7 +808,6 @@ func (a *Client) readServerResponse(timeout time.Duration) error { if len(rawMsg) == 0 { break } - } return nil } @@ -820,20 +817,11 @@ func (a *Client) Connect(isStartup bool) error { a.mx.Lock() defer a.mx.Unlock() a.sessionID = time.Now().UnixMicro() - a.connected = false var err error - - a.connectionAttempts = a.connectionAttempts + 1 - // try to re-register agent if the connection has been closed before - if a.collector == nil { - collector := newAgentCollector(a) - err = prometheus.Register(collector) - if err == nil { - a.collector = collector - } - } + AgentCollector.Disconnect(a) + AgentCollector.TryConnect(a) var localAddr net.Addr if a.UDP { @@ -895,7 +883,8 @@ func (a *Client) Connect(isStartup bool) error { return err } } - a.connectionsOpened = a.connectionsOpened + 1 + + AgentCollector.Connect(a) a.connected = true return nil } diff --git a/ossec/prometheus.go b/ossec/prometheus.go index 86dfbf3..ce6ba7e 100644 --- a/ossec/prometheus.go +++ b/ossec/prometheus.go @@ -1,8 +1,6 @@ package ossec import ( - "strconv" - "github.com/prometheus/client_golang/prometheus" ) @@ -11,116 +9,116 @@ import ( //Note you can also include fields of other types if they provide utility //but we just won't be exposing them as metrics. type agentCollector struct { - agent *Client - connectedMetric *prometheus.Desc - eventsTotalMetric *prometheus.Desc - messagesSentTotalMetric *prometheus.Desc - messagesReceivedTotalMetric *prometheus.Desc - - sentBytesTotalMetric *prometheus.Desc - sentBytesSessionTotalMetric *prometheus.Desc + agents []*Client + connectedMetric *prometheus.GaugeVec + eventsTotalMetric *prometheus.GaugeVec + messagesSentTotalMetric *prometheus.GaugeVec + messagesReceivedTotalMetric *prometheus.GaugeVec + messageErrorsTotalMetric *prometheus.GaugeVec - receivedBytesTotalMetric *prometheus.Desc - receivedBytesSessionTotalMetric *prometheus.Desc + bytesSentTotalMetric *prometheus.GaugeVec + bytesReceivedTotalMetric *prometheus.GaugeVec - connectionAttemptsTotalMetric *prometheus.Desc - connectionsOpenedTotalMetric *prometheus.Desc - connectionsClosedTotalMetric *prometheus.Desc + connectionAttemptsTotalMetric *prometheus.GaugeVec + connectionsOpenedTotalMetric *prometheus.GaugeVec + connectionsClosedTotalMetric *prometheus.GaugeVec } -var agentLabels = []string{"agent_id", "agent_name", "agent_ip", "agent_protocol", "agent_encryption", "session_id"} +var agentLabels = []string{"agent_id", "agent_name", "agent_protocol", "agent_encryption"} + +var AgentCollector = newAgentCollector() + +func init() { + prometheus.Register(AgentCollector) +} //You must create a constructor for you collector that //initializes every descriptor and returns a pointer to the collector -func newAgentCollector(agent *Client) *agentCollector { +func newAgentCollector() *agentCollector { return &agentCollector{ - agent: agent, - connectedMetric: prometheus.NewDesc("ossec_agent_connections_total", - "agent connection count (normally 0 or 1)", - agentLabels, nil, - ), - - eventsTotalMetric: prometheus.NewDesc("ossec_agent_events_total", - "total events processes", - agentLabels, nil, - ), - messagesSentTotalMetric: prometheus.NewDesc("ossec_agent_messages_sent_total", - "total messages sent", - agentLabels, nil, - ), - messagesReceivedTotalMetric: prometheus.NewDesc("ossec_agent_messages_received_total", - "total messages received", - agentLabels, nil, - ), - - sentBytesTotalMetric: prometheus.NewDesc("ossec_agent_bytes_sent_total", - "total bytes sent", - agentLabels, nil, - ), - sentBytesSessionTotalMetric: prometheus.NewDesc("ossec_agent_session_bytes_sent_total", - "bytes sent during current session", - agentLabels, nil, - ), - - receivedBytesTotalMetric: prometheus.NewDesc("ossec_agent_bytes_received_total", - "total bytes received", - agentLabels, nil, - ), - receivedBytesSessionTotalMetric: prometheus.NewDesc("ossec_agent_session_bytes_received_total", - "bytes received during current session", - agentLabels, nil, - ), - - connectionAttemptsTotalMetric: prometheus.NewDesc("ossec_agent_connection_attempts_total", - "total connection attempts", - agentLabels, nil, - ), - - connectionsOpenedTotalMetric: prometheus.NewDesc("ossec_agent_connections_openend_total", - "total succesfull connection attempts", - agentLabels, nil, - ), - - connectionsClosedTotalMetric: prometheus.NewDesc("ossec_agent_connections_closed_total", - "total connection activly closed ", - agentLabels, nil, - ), - } -} + agents: make([]*Client, 0), + connectedMetric: prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: "ossec", + Subsystem: "agent", + Name: "connections", + Help: "agent connection count", + }, agentLabels), -//Each and every collector must implement the Describe function. -//It essentially writes all descriptors to the prometheus desc channel. -func (collector *agentCollector) Describe(ch chan<- *prometheus.Desc) { - //Update this section with the each metric you create for a given collector - ch <- collector.connectedMetric - ch <- collector.eventsTotalMetric - ch <- collector.messagesSentTotalMetric - ch <- collector.messagesReceivedTotalMetric + eventsTotalMetric: prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: "ossec", + Subsystem: "agent", + Name: "events_total", + Help: "total events processed", + }, agentLabels), + + messageErrorsTotalMetric: prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: "ossec", + Subsystem: "agent", + Name: "message_errors_total", + Help: "total errors in messages", + }, agentLabels), + + messagesSentTotalMetric: prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: "ossec", + Subsystem: "agent", + Name: "messages_sent_total", + Help: "total messages sent", + }, agentLabels), + + messagesReceivedTotalMetric: prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: "ossec", + Subsystem: "agent", + Name: "messages_received_total", + Help: "total bytes received", + }, agentLabels), + + bytesSentTotalMetric: prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: "ossec", + Subsystem: "agent", + Name: "bytes_sent_total", + Help: "total bytes sent", + }, agentLabels), - ch <- collector.sentBytesTotalMetric - ch <- collector.sentBytesSessionTotalMetric + bytesReceivedTotalMetric: prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: "ossec", + Subsystem: "agent", + Name: "bytes_received_total", + Help: "total bytes received", + }, agentLabels), - ch <- collector.receivedBytesTotalMetric - ch <- collector.receivedBytesSessionTotalMetric + connectionAttemptsTotalMetric: prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: "ossec", + Subsystem: "agent", + Name: "connection_attempts_total", + Help: "total connection attempts", + }, agentLabels), - ch <- collector.connectionAttemptsTotalMetric - ch <- collector.connectionsOpenedTotalMetric - ch <- collector.connectionsClosedTotalMetric + connectionsOpenedTotalMetric: prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: "ossec", + Subsystem: "agent", + Name: "connections_openend_total", + Help: "succesfull connection attempts", + }, agentLabels), + + connectionsClosedTotalMetric: prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: "ossec", + Subsystem: "agent", + Name: "connections_closed_total", + Help: "total connection activly closed", + }, agentLabels), + } } -//Collect implements required collect function for all promehteus collectors -func (collector *agentCollector) Collect(ch chan<- prometheus.Metric) { - //Implement logic here to determine proper metric value to return to prometheus - //for each descriptor or call other functions that do so. - if collector.agent != nil { +func (collector *agentCollector) getAgentLabels(agent *Client) []string { + if agent != nil { var protocol string - if collector.agent.UDP { + if agent.UDP { protocol = "UDP" } else { protocol = "TCP" } var encryption string - switch collector.agent.EncryptionMethod { + switch agent.EncryptionMethod { case EncryptionMethodBlowFish: encryption = "BlowFish" case EncryptionMethodAES: @@ -128,32 +126,195 @@ func (collector *agentCollector) Collect(ch chan<- prometheus.Metric) { default: encryption = "Unknown" } + /* TODO: Implement session stats? var sessionID string - if collector.agent.sessionID == 0 { + if agent.sessionID == 0 { sessionID = "none" } else { - sessionID = strconv.FormatInt(collector.agent.sessionID, 16) + sessionID = strconv.FormatInt(agent.sessionID, 16) } - agentLabeValues := []string{collector.agent.AgentID, collector.agent.AgentName, collector.agent.AgentIP, protocol, encryption, sessionID} + */ + return []string{agent.AgentID, agent.AgentName, protocol, encryption} + } + return nil +} - var connections float64 - if collector.agent.IsConencted() { - connections = 1 - } - ch <- prometheus.MustNewConstMetric(collector.connectedMetric, prometheus.CounterValue, connections, agentLabeValues...) - ch <- prometheus.MustNewConstMetric(collector.eventsTotalMetric, prometheus.CounterValue, float64(collector.agent.evtCount), agentLabeValues...) - ch <- prometheus.MustNewConstMetric(collector.messagesSentTotalMetric, prometheus.CounterValue, float64(collector.agent.sentCount), agentLabeValues...) - ch <- prometheus.MustNewConstMetric(collector.messagesReceivedTotalMetric, prometheus.CounterValue, float64(collector.agent.receivedCount), agentLabeValues...) +func (collector *agentCollector) Register(agent *Client) { + if agent == nil { + return + } + labels := collector.getAgentLabels(agent) + if labels == nil { + return + } + collector.connectedMetric.WithLabelValues(labels...).Set(1) + collector.eventsTotalMetric.WithLabelValues(labels...).Set(0) + collector.messagesSentTotalMetric.WithLabelValues(labels...).Set(0) + collector.messagesReceivedTotalMetric.WithLabelValues(labels...).Set(0) + collector.messageErrorsTotalMetric.WithLabelValues(labels...).Set(0) + collector.bytesSentTotalMetric.WithLabelValues(labels...).Set(0) + collector.bytesReceivedTotalMetric.WithLabelValues(labels...).Set(0) + collector.connectionAttemptsTotalMetric.WithLabelValues(labels...).Set(0) + collector.connectionsOpenedTotalMetric.WithLabelValues(labels...).Set(0) + collector.connectionsClosedTotalMetric.WithLabelValues(labels...).Set(0) +} + +func (collector *agentCollector) TryConnect(agent *Client) { + if agent == nil { + return + } + labels := collector.getAgentLabels(agent) + if labels == nil { + return + } + collector.connectionAttemptsTotalMetric.WithLabelValues(labels...).Add(1) +} + +func (collector *agentCollector) Connect(agent *Client) { + if agent == nil { + return + } + labels := collector.getAgentLabels(agent) + if labels == nil { + return + } + collector.connectedMetric.WithLabelValues(labels...).Set(1) + collector.connectionsOpenedTotalMetric.WithLabelValues(labels...).Add(1) +} + +func (collector *agentCollector) Disconnect(agent *Client) { + if agent == nil { + return + } + labels := collector.getAgentLabels(agent) + if labels == nil { + return + } + collector.connectedMetric.WithLabelValues(labels...).Set(0) +} + +func (collector *agentCollector) Unregister(agent *Client) { + if agent == nil { + return + } + labels := collector.getAgentLabels(agent) + if labels == nil { + return + } + collector.connectedMetric.DeleteLabelValues(labels...) +} - ch <- prometheus.MustNewConstMetric(collector.sentBytesTotalMetric, prometheus.CounterValue, float64(collector.agent.sentBytes), agentLabeValues...) - ch <- prometheus.MustNewConstMetric(collector.sentBytesSessionTotalMetric, prometheus.CounterValue, float64(collector.agent.sentBytesTotal), agentLabeValues...) +func (collector *agentCollector) EventsReceived(agent *Client, count int) { + if count == 0 { + return + } + if agent == nil { + return + } + labels := collector.getAgentLabels(agent) + if labels == nil { + return + } + collector.eventsTotalMetric.WithLabelValues(labels...).Add(float64(count)) +} - ch <- prometheus.MustNewConstMetric(collector.receivedBytesTotalMetric, prometheus.CounterValue, float64(collector.agent.receivedBytes), agentLabeValues...) - ch <- prometheus.MustNewConstMetric(collector.receivedBytesSessionTotalMetric, prometheus.CounterValue, float64(collector.agent.receivedBytesTotal), agentLabeValues...) +func (collector *agentCollector) MessagesReceived(agent *Client, count int) { + if count == 0 { + return + } + if agent == nil { + return + } + labels := collector.getAgentLabels(agent) + if labels == nil { + return + } + collector.messagesReceivedTotalMetric.WithLabelValues(labels...).Add(float64(count)) +} + +func (collector *agentCollector) MessageError(agent *Client, count int) { + if count == 0 { + return + } + if agent == nil { + return + } + labels := collector.getAgentLabels(agent) + if labels == nil { + return + } + collector.messageErrorsTotalMetric.WithLabelValues(labels...).Add(float64(count)) +} + +func (collector *agentCollector) BytesSent(agent *Client, count int) { + if count == 0 { + return + } + if agent == nil { + return + } + labels := collector.getAgentLabels(agent) + if labels == nil { + return + } + collector.bytesSentTotalMetric.WithLabelValues(labels...).Add(float64(count)) +} - ch <- prometheus.MustNewConstMetric(collector.connectionAttemptsTotalMetric, prometheus.CounterValue, float64(collector.agent.connectionAttempts), agentLabeValues...) - ch <- prometheus.MustNewConstMetric(collector.connectionsOpenedTotalMetric, prometheus.CounterValue, float64(collector.agent.connectionsOpened), agentLabeValues...) - ch <- prometheus.MustNewConstMetric(collector.connectionsClosedTotalMetric, prometheus.CounterValue, float64(collector.agent.connectionsOpened), agentLabeValues...) +func (collector *agentCollector) BytesRead(agent *Client, count int) { + if count == 0 { + return + } + if agent == nil { + return + } + labels := collector.getAgentLabels(agent) + if labels == nil { + return + } + collector.bytesReceivedTotalMetric.WithLabelValues(labels...).Add(float64(count)) +} +func (collector *agentCollector) MessagesSent(agent *Client, count int) { + if count == 0 { + return + } + if agent == nil { + return + } + labels := collector.getAgentLabels(agent) + if labels == nil { + return } + collector.messagesSentTotalMetric.WithLabelValues(labels...).Add(float64(count)) +} + +//Each and every collector must implement the Describe function. +//It essentially writes all descriptors to the prometheus desc channel. +func (collector *agentCollector) Describe(ch chan<- *prometheus.Desc) { + //Update this section with the each metric you create for a given collector + collector.connectedMetric.Describe(ch) + collector.eventsTotalMetric.Describe(ch) + collector.messagesSentTotalMetric.Describe(ch) + collector.messagesReceivedTotalMetric.Describe(ch) + collector.messageErrorsTotalMetric.Describe(ch) + collector.bytesSentTotalMetric.Describe(ch) + collector.bytesReceivedTotalMetric.Describe(ch) + collector.connectionAttemptsTotalMetric.Describe(ch) + collector.connectionsOpenedTotalMetric.Describe(ch) + collector.connectionsClosedTotalMetric.Describe(ch) + +} + +//Collect implements required collect function for all promehteus collectors +func (collector *agentCollector) Collect(ch chan<- prometheus.Metric) { + collector.connectedMetric.Collect(ch) + collector.eventsTotalMetric.Collect(ch) + collector.messagesSentTotalMetric.Collect(ch) + collector.messagesReceivedTotalMetric.Collect(ch) + collector.messageErrorsTotalMetric.Collect(ch) + collector.bytesSentTotalMetric.Collect(ch) + collector.bytesReceivedTotalMetric.Collect(ch) + collector.connectionAttemptsTotalMetric.Collect(ch) + collector.connectionsOpenedTotalMetric.Collect(ch) + collector.connectionsClosedTotalMetric.Collect(ch) }