From 3ef756e9433d841911ef71875f7358932a9403af Mon Sep 17 00:00:00 2001 From: Carsten Zeumer Date: Fri, 3 Jun 2022 10:16:55 +0200 Subject: [PATCH] add queue metrics --- ossec/agent.go | 12 ++++- ossec/prometheus.go | 104 +++++++++++++++++++++++++++++++------------- 2 files changed, 85 insertions(+), 31 deletions(-) diff --git a/ossec/agent.go b/ossec/agent.go index 7466a57..c5bcd52 100644 --- a/ossec/agent.go +++ b/ossec/agent.go @@ -915,8 +915,11 @@ func (a *Client) openQueue(ctx context.Context) (chan *QueuePosting, *dque.DQue, continue } - if err = q.Enqueue(msg); err != nil { + if err = q.Enqueue(msg); err == nil { + AgentCollector.Enqueue(a) + } else { a.logger.Error("enqueueItem", zap.Any("agentId", a.AgentID), zap.Any("item", msg), zap.Error(err)) + AgentCollector.SetQueueSize(a, q.Size()) } } if ctx.Err() != nil { @@ -954,6 +957,7 @@ func (a *Client) AgentLoop(ctx context.Context, closeOnError bool) (chan *QueueP if err != nil { return nil, out, err } + AgentCollector.SetQueueSize(a, q.Size()) go func() { defer func() { @@ -993,16 +997,19 @@ func (a *Client) AgentLoop(ctx context.Context, closeOnError bool) (chan *QueueP for { item, dqErr := q.Peek() if dqErr == dque.ErrEmpty { + AgentCollector.SetQueueSize(a, 0) // a.logger.Debug("dequeue", zap.Any("agentId", a.AgentID), zap.String("problem", "queue empty")) break } if dqErr != nil { a.logger.Error("dequeue", zap.Any("agentId", a.AgentID), zap.Error(dqErr)) + AgentCollector.SetQueueSize(a, q.Size()) break } if item == nil { a.logger.Warn("dequeue", zap.Any("agentId", a.AgentID), zap.String("problem", "nil item")) + AgentCollector.SetQueueSize(a, q.Size()) continue } @@ -1025,6 +1032,7 @@ func (a *Client) AgentLoop(ctx context.Context, closeOnError bool) (chan *QueueP a.logger.Error("marshall", zap.Any("agentId", a.AgentID), zap.Error(err)) item = nil q.Dequeue() + AgentCollector.Dequeue(a) continue } @@ -1051,8 +1059,10 @@ func (a *Client) AgentLoop(ctx context.Context, closeOnError bool) (chan *QueueP } // remove last item from queue _, err = q.Dequeue() + AgentCollector.Dequeue(a) if err != nil { a.logger.Error("dequeue", zap.Any("agentId", a.AgentID), zap.Error(err)) + AgentCollector.SetQueueSize(a, q.Size()) item = nil } // make sure, we take a pause diff --git a/ossec/prometheus.go b/ossec/prometheus.go index ce6ba7e..317d347 100644 --- a/ossec/prometheus.go +++ b/ossec/prometheus.go @@ -11,17 +11,18 @@ import ( type agentCollector struct { agents []*Client connectedMetric *prometheus.GaugeVec - eventsTotalMetric *prometheus.GaugeVec - messagesSentTotalMetric *prometheus.GaugeVec - messagesReceivedTotalMetric *prometheus.GaugeVec - messageErrorsTotalMetric *prometheus.GaugeVec - - bytesSentTotalMetric *prometheus.GaugeVec - bytesReceivedTotalMetric *prometheus.GaugeVec - - connectionAttemptsTotalMetric *prometheus.GaugeVec - connectionsOpenedTotalMetric *prometheus.GaugeVec - connectionsClosedTotalMetric *prometheus.GaugeVec + queueSizeMetric *prometheus.GaugeVec + eventsTotalMetric *prometheus.CounterVec + messagesSentTotalMetric *prometheus.CounterVec + messagesReceivedTotalMetric *prometheus.CounterVec + messageErrorsTotalMetric *prometheus.CounterVec + + bytesSentTotalMetric *prometheus.CounterVec + bytesReceivedTotalMetric *prometheus.CounterVec + + connectionAttemptsTotalMetric *prometheus.CounterVec + connectionsOpenedTotalMetric *prometheus.CounterVec + connectionsClosedTotalMetric *prometheus.CounterVec } var agentLabels = []string{"agent_id", "agent_name", "agent_protocol", "agent_encryption"} @@ -44,63 +45,69 @@ func newAgentCollector() *agentCollector { Help: "agent connection count", }, agentLabels), - eventsTotalMetric: prometheus.NewGaugeVec(prometheus.GaugeOpts{ + queueSizeMetric: prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: "ossec", + Subsystem: "agent", + Name: "queued_count", + Help: "total messages in queued", + }, agentLabels), + eventsTotalMetric: prometheus.NewCounterVec(prometheus.CounterOpts{ Namespace: "ossec", Subsystem: "agent", Name: "events_total", Help: "total events processed", }, agentLabels), - messageErrorsTotalMetric: prometheus.NewGaugeVec(prometheus.GaugeOpts{ + messageErrorsTotalMetric: prometheus.NewCounterVec(prometheus.CounterOpts{ Namespace: "ossec", Subsystem: "agent", Name: "message_errors_total", Help: "total errors in messages", }, agentLabels), - messagesSentTotalMetric: prometheus.NewGaugeVec(prometheus.GaugeOpts{ + messagesSentTotalMetric: prometheus.NewCounterVec(prometheus.CounterOpts{ Namespace: "ossec", Subsystem: "agent", Name: "messages_sent_total", Help: "total messages sent", }, agentLabels), - messagesReceivedTotalMetric: prometheus.NewGaugeVec(prometheus.GaugeOpts{ + messagesReceivedTotalMetric: prometheus.NewCounterVec(prometheus.CounterOpts{ Namespace: "ossec", Subsystem: "agent", Name: "messages_received_total", Help: "total bytes received", }, agentLabels), - bytesSentTotalMetric: prometheus.NewGaugeVec(prometheus.GaugeOpts{ + bytesSentTotalMetric: prometheus.NewCounterVec(prometheus.CounterOpts{ Namespace: "ossec", Subsystem: "agent", Name: "bytes_sent_total", Help: "total bytes sent", }, agentLabels), - bytesReceivedTotalMetric: prometheus.NewGaugeVec(prometheus.GaugeOpts{ + bytesReceivedTotalMetric: prometheus.NewCounterVec(prometheus.CounterOpts{ Namespace: "ossec", Subsystem: "agent", Name: "bytes_received_total", Help: "total bytes received", }, agentLabels), - connectionAttemptsTotalMetric: prometheus.NewGaugeVec(prometheus.GaugeOpts{ + connectionAttemptsTotalMetric: prometheus.NewCounterVec(prometheus.CounterOpts{ Namespace: "ossec", Subsystem: "agent", Name: "connection_attempts_total", Help: "total connection attempts", }, agentLabels), - connectionsOpenedTotalMetric: prometheus.NewGaugeVec(prometheus.GaugeOpts{ + connectionsOpenedTotalMetric: prometheus.NewCounterVec(prometheus.CounterOpts{ Namespace: "ossec", Subsystem: "agent", Name: "connections_openend_total", Help: "succesfull connection attempts", }, agentLabels), - connectionsClosedTotalMetric: prometheus.NewGaugeVec(prometheus.GaugeOpts{ + connectionsClosedTotalMetric: prometheus.NewCounterVec(prometheus.CounterOpts{ Namespace: "ossec", Subsystem: "agent", Name: "connections_closed_total", @@ -147,16 +154,18 @@ func (collector *agentCollector) Register(agent *Client) { if labels == nil { return } - collector.connectedMetric.WithLabelValues(labels...).Set(1) - collector.eventsTotalMetric.WithLabelValues(labels...).Set(0) - collector.messagesSentTotalMetric.WithLabelValues(labels...).Set(0) - collector.messagesReceivedTotalMetric.WithLabelValues(labels...).Set(0) - collector.messageErrorsTotalMetric.WithLabelValues(labels...).Set(0) - collector.bytesSentTotalMetric.WithLabelValues(labels...).Set(0) - collector.bytesReceivedTotalMetric.WithLabelValues(labels...).Set(0) - collector.connectionAttemptsTotalMetric.WithLabelValues(labels...).Set(0) - collector.connectionsOpenedTotalMetric.WithLabelValues(labels...).Set(0) - collector.connectionsClosedTotalMetric.WithLabelValues(labels...).Set(0) + collector.connectedMetric.WithLabelValues(labels...).Set(0) + collector.queueSizeMetric.WithLabelValues(labels...).Set(0) + + collector.eventsTotalMetric.WithLabelValues(labels...).Add(0) + collector.messagesSentTotalMetric.WithLabelValues(labels...).Add(0) + collector.messagesReceivedTotalMetric.WithLabelValues(labels...).Add(0) + collector.messageErrorsTotalMetric.WithLabelValues(labels...).Add(0) + collector.bytesSentTotalMetric.WithLabelValues(labels...).Add(0) + collector.bytesReceivedTotalMetric.WithLabelValues(labels...).Add(0) + collector.connectionAttemptsTotalMetric.WithLabelValues(labels...).Add(0) + collector.connectionsOpenedTotalMetric.WithLabelValues(labels...).Add(0) + collector.connectionsClosedTotalMetric.WithLabelValues(labels...).Add(0) } func (collector *agentCollector) TryConnect(agent *Client) { @@ -288,11 +297,45 @@ func (collector *agentCollector) MessagesSent(agent *Client, count int) { collector.messagesSentTotalMetric.WithLabelValues(labels...).Add(float64(count)) } +func (collector *agentCollector) SetQueueSize(agent *Client, size int) { + if agent == nil { + return + } + labels := collector.getAgentLabels(agent) + if labels == nil { + return + } + collector.queueSizeMetric.WithLabelValues(labels...).Set(float64(size)) +} + +func (collector *agentCollector) Enqueue(agent *Client) { + if agent == nil { + return + } + labels := collector.getAgentLabels(agent) + if labels == nil { + return + } + collector.queueSizeMetric.WithLabelValues(labels...).Inc() +} + +func (collector *agentCollector) Dequeue(agent *Client) { + if agent == nil { + return + } + labels := collector.getAgentLabels(agent) + if labels == nil { + return + } + collector.queueSizeMetric.WithLabelValues(labels...).Dec() +} + //Each and every collector must implement the Describe function. //It essentially writes all descriptors to the prometheus desc channel. func (collector *agentCollector) Describe(ch chan<- *prometheus.Desc) { //Update this section with the each metric you create for a given collector collector.connectedMetric.Describe(ch) + collector.queueSizeMetric.Describe(ch) collector.eventsTotalMetric.Describe(ch) collector.messagesSentTotalMetric.Describe(ch) collector.messagesReceivedTotalMetric.Describe(ch) @@ -308,6 +351,7 @@ func (collector *agentCollector) Describe(ch chan<- *prometheus.Desc) { //Collect implements required collect function for all promehteus collectors func (collector *agentCollector) Collect(ch chan<- prometheus.Metric) { collector.connectedMetric.Collect(ch) + collector.queueSizeMetric.Collect(ch) collector.eventsTotalMetric.Collect(ch) collector.messagesSentTotalMetric.Collect(ch) collector.messagesReceivedTotalMetric.Collect(ch)