Skip to content

Commit

Permalink
refactor prometheus metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
czeumer committed Jun 3, 2022
1 parent 2a0b7e9 commit 3ff3d53
Show file tree
Hide file tree
Showing 2 changed files with 299 additions and 149 deletions.
67 changes: 28 additions & 39 deletions ossec/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ import (
"github.com/autonubil/go-wazuh/sysinfo"
"github.com/joncrlsn/dque"
"github.com/matishsiao/goInfo"
"github.com/prometheus/client_golang/prometheus"
)

// EncryptionMethod supported transport encryption
Expand Down Expand Up @@ -102,9 +101,6 @@ type Client struct {
evtCount uint64
sentCount uint64
receivedCount uint64
connectionAttempts uint64
connectionsOpened uint64
connectionsClose uint64
cOrigSize uint
cCompSize uint
sessionID int64
Expand All @@ -127,7 +123,6 @@ type Client struct {
CurrentRemoteFile *RemoteFileInfo
un *goInfo.GoInfoObject
osInfo *sysinfo.OS
collector *agentCollector
}

type FileUpdatedEvent struct {
Expand Down Expand Up @@ -326,14 +321,7 @@ func NewAgent(server string, agentID string, agentName string, agentKey string,
return nil, err
}

collector := newAgentCollector(a)
err = prometheus.Register(collector)
if err == nil {
a.collector = collector
} else {
a.logger.Debug("registerPrometheus", zap.Any("agentId", a.AgentID), zap.Error(err))
}

AgentCollector.Register(a)
return a, nil
}

Expand Down Expand Up @@ -405,30 +393,29 @@ func (a *Client) close(sendCloseMsg bool) error {
err := a.writeMessage(msg)
if err != nil {
a.connected = false
a.conn.Close()
AgentCollector.Disconnect(a)
return err
}
}
a.connected = false
a.sentBytes = 0
a.sessionID = 0
a.receivedBytes = 0
a.connectionsClose = a.connectionsClose + 1
AgentCollector.Disconnect(a)
return a.conn.Close()
}
a.receivedBytes = 0
a.sentBytes = 0
a.sessionID = 0
AgentCollector.Disconnect(a)
return nil
}

// Close closes the connection.
// Any blocked Read or Write operations will be unblocked and return errors.
func (a *Client) Close() error {
err := a.close(true)
if a.collector != nil {
prometheus.Unregister(a.collector)
}
AgentCollector.Unregister(a)
return err
}

Expand Down Expand Up @@ -621,13 +608,15 @@ func (a *Client) writeMessage(msg string) error {
// ensure ratelimit is honored
prev := time.Now()
now := a.rateLimit.Take()
ret, err := a.conn.Write(encryptedMsg)
a.sentBytes += uint64(ret)
a.sentBytesTotal += uint64(ret)
written, err := a.conn.Write(encryptedMsg)
a.sentBytes += uint64(written)
a.sentBytesTotal += uint64(written)
AgentCollector.BytesSent(a, written)

if err != nil {
if err != nil || written == 0 {
AgentCollector.MessageError(a, 1)
if a.logger != nil {
a.logger.Warn("writeMessage", zap.Any("agentId", a.AgentID), zap.String("msg", msg), zap.Int("result", ret), zap.Uint64("sentBytes", a.sentBytes), zap.Uint64("sentBytesTotal", a.sentBytesTotal), zap.Duration("rateWait", now.Sub(prev)), zap.Uint("globalCount", a.globalCount), zap.Uint("localCount", a.localCount), zap.Uint64("evtCount", a.evtCount), zap.Uint64("sentCount", a.sentCount), zap.Uint64("receivedCount", a.receivedCount), zap.Error(err))
a.logger.Warn("writeMessage", zap.Any("agentId", a.AgentID), zap.String("msg", msg), zap.Int("result", written), zap.Uint64("sentBytes", a.sentBytes), zap.Uint64("sentBytesTotal", a.sentBytesTotal), zap.Duration("rateWait", now.Sub(prev)), zap.Uint("globalCount", a.globalCount), zap.Uint("localCount", a.localCount), zap.Uint64("evtCount", a.evtCount), zap.Uint64("sentCount", a.sentCount), zap.Uint64("receivedCount", a.receivedCount), zap.Error(err))
}
err2 := a.close(false)
if err2 != nil {
Expand All @@ -636,9 +625,11 @@ func (a *Client) writeMessage(msg string) error {
return err
}
a.sentCount++
time.Sleep(25 * time.Millisecond)
AgentCollector.MessagesSent(a, 1)
// time.Sleep(25 * time.Millisecond)

if a.logger != nil {
a.logger.Debug("writeMessage", zap.Any("agentId", a.AgentID), zap.String("msg", msg), zap.Int("result", ret), zap.Uint64("sentBytes", a.sentBytes), zap.Uint64("sentBytesTotal", a.sentBytesTotal), zap.Duration("rateWait", now.Sub(prev)), zap.Uint("globalCount", a.globalCount), zap.Uint("localCount", a.localCount), zap.Uint64("evtCount", a.evtCount), zap.Uint64("sentCount", a.sentCount), zap.Uint64("receivedCount", a.receivedCount))
a.logger.Debug("writeMessage", zap.Any("agentId", a.AgentID), zap.String("msg", msg), zap.Int("result", written), zap.Uint64("sentBytes", a.sentBytes), zap.Uint64("sentBytesTotal", a.sentBytesTotal), zap.Duration("rateWait", now.Sub(prev)), zap.Uint("globalCount", a.globalCount), zap.Uint("localCount", a.localCount), zap.Uint64("evtCount", a.evtCount), zap.Uint64("sentCount", a.sentCount), zap.Uint64("receivedCount", a.receivedCount))
}
return nil
}
Expand Down Expand Up @@ -707,6 +698,7 @@ func (a *Client) readServerResponse(timeout time.Duration) error {
nRead, err := a.conn.Read(buffer)
a.receivedBytes = a.receivedBytes + uint64(nRead)
a.receivedBytesTotal = a.receivedBytesTotal + uint64(nRead)
AgentCollector.BytesRead(a, nRead)
// a.logger.Info("read", zap.Any("agentId", a.AgentID), zap.Any("deadline", deadline), zap.Int("read", nRead), zap.Int("readSoFar", totallyRead), zap.Error(err))
if nRead == 0 {
if oe, ok := err.(*net.OpError); ok && oe.Err != os.ErrDeadlineExceeded {
Expand Down Expand Up @@ -750,6 +742,7 @@ func (a *Client) readServerResponse(timeout time.Duration) error {
var msgSize uint32
if !a.UDP {
if len(rawMsg) < 4 {
AgentCollector.MessageError(a, 1)
return errors.New("message too short")
}
msgSize = binary.LittleEndian.Uint32(rawMsg)
Expand All @@ -760,12 +753,14 @@ func (a *Client) readServerResponse(timeout time.Duration) error {
}

if int(msgSize) > len(rawMsg) {
AgentCollector.MessageError(a, 1)
return errors.New("message exceeds buffer")
}
a.evtCount++
msg, err := a.decryptMessage(rawMsg[:msgSize], msgSize)
// fmt.Printf("%d\t%d\t'%s'\t%s\n", totallyRead, msgSize, msg, err)
if err != nil {
AgentCollector.MessageError(a, 1)
return err
}
a.receivedCount++
Expand All @@ -774,10 +769,12 @@ func (a *Client) readServerResponse(timeout time.Duration) error {
msg = msg[32:]
globalCount, err := strconv.Atoi(msg[5:15])
if err != nil {
AgentCollector.MessageError(a, 1)
return err
}
localCount, err := strconv.Atoi(msg[16:20])
if err != nil {
AgentCollector.MessageError(a, 1)
return err
}
msg = msg[21:]
Expand All @@ -787,7 +784,7 @@ func (a *Client) readServerResponse(timeout time.Duration) error {
// normal status, nothing to report
} else if globalCountU > a.globalCount || (globalCountU == a.globalCount && localCountU > a.localCount) {
a.logger.Debug(fmt.Sprintf("Updated to remote counters %d:%d (%d:%d)", localCountU, globalCountU, a.localCount, a.globalCount), zap.Skip())
// move one ahaed
// move one ahead
localCountU++
} else {
a.logger.Info(fmt.Sprintf("Unexpected counter %d:%d (%d:%d)", localCountU, globalCountU, a.localCount, a.globalCount), zap.Skip())
Expand All @@ -796,6 +793,7 @@ func (a *Client) readServerResponse(timeout time.Duration) error {
a.globalCount = globalCountU

a.WriteClientCounter()
AgentCollector.MessagesReceived(a, 1)
// rand1 := msg[:5]
//fmt.Printf("packet-received: bytes=%d (%s:%d:%d) '%s'\n", nRead, rand1, globalCount, localCount, msg)
// empty buffer for next read
Expand All @@ -810,7 +808,6 @@ func (a *Client) readServerResponse(timeout time.Duration) error {
if len(rawMsg) == 0 {
break
}

}
return nil
}
Expand All @@ -820,20 +817,11 @@ func (a *Client) Connect(isStartup bool) error {
a.mx.Lock()
defer a.mx.Unlock()
a.sessionID = time.Now().UnixMicro()

a.connected = false
var err error

a.connectionAttempts = a.connectionAttempts + 1

// try to re-register agent if the connection has been closed before
if a.collector == nil {
collector := newAgentCollector(a)
err = prometheus.Register(collector)
if err == nil {
a.collector = collector
}
}
AgentCollector.Disconnect(a)
AgentCollector.TryConnect(a)

var localAddr net.Addr
if a.UDP {
Expand Down Expand Up @@ -895,7 +883,8 @@ func (a *Client) Connect(isStartup bool) error {
return err
}
}
a.connectionsOpened = a.connectionsOpened + 1

AgentCollector.Connect(a)
a.connected = true
return nil
}
Expand Down
Loading

0 comments on commit 3ff3d53

Please sign in to comment.