Skip to content

Commit

Permalink
add queue metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
czeumer committed Jun 3, 2022
1 parent 3ff3d53 commit 3ef756e
Show file tree
Hide file tree
Showing 2 changed files with 85 additions and 31 deletions.
12 changes: 11 additions & 1 deletion ossec/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -915,8 +915,11 @@ func (a *Client) openQueue(ctx context.Context) (chan *QueuePosting, *dque.DQue,
continue
}

if err = q.Enqueue(msg); err != nil {
if err = q.Enqueue(msg); err == nil {
AgentCollector.Enqueue(a)
} else {
a.logger.Error("enqueueItem", zap.Any("agentId", a.AgentID), zap.Any("item", msg), zap.Error(err))
AgentCollector.SetQueueSize(a, q.Size())
}
}
if ctx.Err() != nil {
Expand Down Expand Up @@ -954,6 +957,7 @@ func (a *Client) AgentLoop(ctx context.Context, closeOnError bool) (chan *QueueP
if err != nil {
return nil, out, err
}
AgentCollector.SetQueueSize(a, q.Size())

go func() {
defer func() {
Expand Down Expand Up @@ -993,16 +997,19 @@ func (a *Client) AgentLoop(ctx context.Context, closeOnError bool) (chan *QueueP
for {
item, dqErr := q.Peek()
if dqErr == dque.ErrEmpty {
AgentCollector.SetQueueSize(a, 0)
// a.logger.Debug("dequeue", zap.Any("agentId", a.AgentID), zap.String("problem", "queue empty"))
break
}
if dqErr != nil {
a.logger.Error("dequeue", zap.Any("agentId", a.AgentID), zap.Error(dqErr))
AgentCollector.SetQueueSize(a, q.Size())
break
}

if item == nil {
a.logger.Warn("dequeue", zap.Any("agentId", a.AgentID), zap.String("problem", "nil item"))
AgentCollector.SetQueueSize(a, q.Size())
continue
}

Expand All @@ -1025,6 +1032,7 @@ func (a *Client) AgentLoop(ctx context.Context, closeOnError bool) (chan *QueueP
a.logger.Error("marshall", zap.Any("agentId", a.AgentID), zap.Error(err))
item = nil
q.Dequeue()
AgentCollector.Dequeue(a)
continue
}

Expand All @@ -1051,8 +1059,10 @@ func (a *Client) AgentLoop(ctx context.Context, closeOnError bool) (chan *QueueP
}
// remove last item from queue
_, err = q.Dequeue()
AgentCollector.Dequeue(a)
if err != nil {
a.logger.Error("dequeue", zap.Any("agentId", a.AgentID), zap.Error(err))
AgentCollector.SetQueueSize(a, q.Size())
item = nil
}
// make sure, we take a pause
Expand Down
104 changes: 74 additions & 30 deletions ossec/prometheus.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,17 +11,18 @@ import (
type agentCollector struct {
agents []*Client
connectedMetric *prometheus.GaugeVec
eventsTotalMetric *prometheus.GaugeVec
messagesSentTotalMetric *prometheus.GaugeVec
messagesReceivedTotalMetric *prometheus.GaugeVec
messageErrorsTotalMetric *prometheus.GaugeVec

bytesSentTotalMetric *prometheus.GaugeVec
bytesReceivedTotalMetric *prometheus.GaugeVec

connectionAttemptsTotalMetric *prometheus.GaugeVec
connectionsOpenedTotalMetric *prometheus.GaugeVec
connectionsClosedTotalMetric *prometheus.GaugeVec
queueSizeMetric *prometheus.GaugeVec
eventsTotalMetric *prometheus.CounterVec
messagesSentTotalMetric *prometheus.CounterVec
messagesReceivedTotalMetric *prometheus.CounterVec
messageErrorsTotalMetric *prometheus.CounterVec

bytesSentTotalMetric *prometheus.CounterVec
bytesReceivedTotalMetric *prometheus.CounterVec

connectionAttemptsTotalMetric *prometheus.CounterVec
connectionsOpenedTotalMetric *prometheus.CounterVec
connectionsClosedTotalMetric *prometheus.CounterVec
}

var agentLabels = []string{"agent_id", "agent_name", "agent_protocol", "agent_encryption"}
Expand All @@ -44,63 +45,69 @@ func newAgentCollector() *agentCollector {
Help: "agent connection count",
}, agentLabels),

eventsTotalMetric: prometheus.NewGaugeVec(prometheus.GaugeOpts{
queueSizeMetric: prometheus.NewGaugeVec(prometheus.GaugeOpts{
Namespace: "ossec",
Subsystem: "agent",
Name: "queued_count",
Help: "total messages in queued",
}, agentLabels),
eventsTotalMetric: prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: "ossec",
Subsystem: "agent",
Name: "events_total",
Help: "total events processed",
}, agentLabels),

messageErrorsTotalMetric: prometheus.NewGaugeVec(prometheus.GaugeOpts{
messageErrorsTotalMetric: prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: "ossec",
Subsystem: "agent",
Name: "message_errors_total",
Help: "total errors in messages",
}, agentLabels),

messagesSentTotalMetric: prometheus.NewGaugeVec(prometheus.GaugeOpts{
messagesSentTotalMetric: prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: "ossec",
Subsystem: "agent",
Name: "messages_sent_total",
Help: "total messages sent",
}, agentLabels),

messagesReceivedTotalMetric: prometheus.NewGaugeVec(prometheus.GaugeOpts{
messagesReceivedTotalMetric: prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: "ossec",
Subsystem: "agent",
Name: "messages_received_total",
Help: "total bytes received",
}, agentLabels),

bytesSentTotalMetric: prometheus.NewGaugeVec(prometheus.GaugeOpts{
bytesSentTotalMetric: prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: "ossec",
Subsystem: "agent",
Name: "bytes_sent_total",
Help: "total bytes sent",
}, agentLabels),

bytesReceivedTotalMetric: prometheus.NewGaugeVec(prometheus.GaugeOpts{
bytesReceivedTotalMetric: prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: "ossec",
Subsystem: "agent",
Name: "bytes_received_total",
Help: "total bytes received",
}, agentLabels),

connectionAttemptsTotalMetric: prometheus.NewGaugeVec(prometheus.GaugeOpts{
connectionAttemptsTotalMetric: prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: "ossec",
Subsystem: "agent",
Name: "connection_attempts_total",
Help: "total connection attempts",
}, agentLabels),

connectionsOpenedTotalMetric: prometheus.NewGaugeVec(prometheus.GaugeOpts{
connectionsOpenedTotalMetric: prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: "ossec",
Subsystem: "agent",
Name: "connections_openend_total",
Help: "succesfull connection attempts",
}, agentLabels),

connectionsClosedTotalMetric: prometheus.NewGaugeVec(prometheus.GaugeOpts{
connectionsClosedTotalMetric: prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: "ossec",
Subsystem: "agent",
Name: "connections_closed_total",
Expand Down Expand Up @@ -147,16 +154,18 @@ func (collector *agentCollector) Register(agent *Client) {
if labels == nil {
return
}
collector.connectedMetric.WithLabelValues(labels...).Set(1)
collector.eventsTotalMetric.WithLabelValues(labels...).Set(0)
collector.messagesSentTotalMetric.WithLabelValues(labels...).Set(0)
collector.messagesReceivedTotalMetric.WithLabelValues(labels...).Set(0)
collector.messageErrorsTotalMetric.WithLabelValues(labels...).Set(0)
collector.bytesSentTotalMetric.WithLabelValues(labels...).Set(0)
collector.bytesReceivedTotalMetric.WithLabelValues(labels...).Set(0)
collector.connectionAttemptsTotalMetric.WithLabelValues(labels...).Set(0)
collector.connectionsOpenedTotalMetric.WithLabelValues(labels...).Set(0)
collector.connectionsClosedTotalMetric.WithLabelValues(labels...).Set(0)
collector.connectedMetric.WithLabelValues(labels...).Set(0)
collector.queueSizeMetric.WithLabelValues(labels...).Set(0)

collector.eventsTotalMetric.WithLabelValues(labels...).Add(0)
collector.messagesSentTotalMetric.WithLabelValues(labels...).Add(0)
collector.messagesReceivedTotalMetric.WithLabelValues(labels...).Add(0)
collector.messageErrorsTotalMetric.WithLabelValues(labels...).Add(0)
collector.bytesSentTotalMetric.WithLabelValues(labels...).Add(0)
collector.bytesReceivedTotalMetric.WithLabelValues(labels...).Add(0)
collector.connectionAttemptsTotalMetric.WithLabelValues(labels...).Add(0)
collector.connectionsOpenedTotalMetric.WithLabelValues(labels...).Add(0)
collector.connectionsClosedTotalMetric.WithLabelValues(labels...).Add(0)
}

func (collector *agentCollector) TryConnect(agent *Client) {
Expand Down Expand Up @@ -288,11 +297,45 @@ func (collector *agentCollector) MessagesSent(agent *Client, count int) {
collector.messagesSentTotalMetric.WithLabelValues(labels...).Add(float64(count))
}

func (collector *agentCollector) SetQueueSize(agent *Client, size int) {
if agent == nil {
return
}
labels := collector.getAgentLabels(agent)
if labels == nil {
return
}
collector.queueSizeMetric.WithLabelValues(labels...).Set(float64(size))
}

func (collector *agentCollector) Enqueue(agent *Client) {
if agent == nil {
return
}
labels := collector.getAgentLabels(agent)
if labels == nil {
return
}
collector.queueSizeMetric.WithLabelValues(labels...).Inc()
}

func (collector *agentCollector) Dequeue(agent *Client) {
if agent == nil {
return
}
labels := collector.getAgentLabels(agent)
if labels == nil {
return
}
collector.queueSizeMetric.WithLabelValues(labels...).Dec()
}

//Each and every collector must implement the Describe function.
//It essentially writes all descriptors to the prometheus desc channel.
func (collector *agentCollector) Describe(ch chan<- *prometheus.Desc) {
//Update this section with the each metric you create for a given collector
collector.connectedMetric.Describe(ch)
collector.queueSizeMetric.Describe(ch)
collector.eventsTotalMetric.Describe(ch)
collector.messagesSentTotalMetric.Describe(ch)
collector.messagesReceivedTotalMetric.Describe(ch)
Expand All @@ -308,6 +351,7 @@ func (collector *agentCollector) Describe(ch chan<- *prometheus.Desc) {
//Collect implements required collect function for all promehteus collectors
func (collector *agentCollector) Collect(ch chan<- prometheus.Metric) {
collector.connectedMetric.Collect(ch)
collector.queueSizeMetric.Collect(ch)
collector.eventsTotalMetric.Collect(ch)
collector.messagesSentTotalMetric.Collect(ch)
collector.messagesReceivedTotalMetric.Collect(ch)
Expand Down

0 comments on commit 3ef756e

Please sign in to comment.