forked from CN-TU/go-ipfix
-
Notifications
You must be signed in to change notification settings - Fork 0
/
message.go
148 lines (140 loc) · 4.2 KB
/
message.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
package ipfix
import (
"encoding/binary"
"errors"
"io"
"time"
)
// MessageStream represents an ipfix message stream.
type MessageStream struct {
w io.Writer
buffer scratchBuffer
length []byte
time []byte
record record
sequence uint32
observationID uint32
templates []*template
currentSet set
currentDataRecord recordBuffer
mtu int
dirty bool
}
// MakeMessageStream initializes a new message stream, which writes to the given writer and uses the given mtu size.
// The observationID is used as the observation id in the ipfix messages.
func MakeMessageStream(w io.Writer, mtu uint16, observationID uint32) (ret *MessageStream, err error) {
if mtu == 0 {
mtu = 65535
} else if mtu < 28 {
return nil, errors.New("mtu must be at least 28")
}
buffer := makeBasicBuffer(int(mtu))
ret = &MessageStream{
w: w,
buffer: buffer,
observationID: observationID,
currentSet: makeSet(buffer),
currentDataRecord: makeRecordBuffer(int(mtu)),
mtu: int(mtu),
}
return
}
func (m *MessageStream) startMessage() error {
b, err := m.buffer.append(16)
if err != nil {
return err
}
_ = b[15]
b[0] = 0
b[1] = 0x0a
m.length = b[2:4]
m.time = b[4:8]
m.dirty = true
binary.BigEndian.PutUint32(b[8:12], uint32(m.sequence))
binary.BigEndian.PutUint32(b[12:16], uint32(m.observationID))
return nil
}
func (m *MessageStream) sendRecord(rec record, now interface{}) (err error) {
if !m.dirty {
m.startMessage()
}
RETRY:
err = m.currentSet.appendRecord(rec)
if err == nil {
if rec.id() >= 256 {
m.sequence++
}
return
}
// fmt.Println(err)
if ipfixerr, ok := err.(ipfixError); ok {
switch {
case ipfixerr.bufferFull():
if m.buffer.length() == 16 {
return RecordTooBigError{16 + rec.length(), m.mtu}
}
m.Flush(now)
m.startMessage()
goto RETRY
case ipfixerr.recordTypeMismatch():
m.currentSet.finalize()
goto RETRY
}
}
return
}
// AddTemplate adds the given InformationElement as a new template. now must be the current or exported
// time either as a time.Time value or as one of the provieded ipfix time types. A template id is
// returned that can be used with SendData. In case of error an error value is provided.
func (m *MessageStream) AddTemplate(now interface{}, elements ...InformationElement) (id int, err error) {
id = len(m.templates) + 256
newTemplate := template{int16(id), elements}
if err = m.sendRecord(newTemplate, now); err == nil {
m.templates = append(m.templates, &newTemplate)
}
return
}
// SendData sends the given values for the given template id (Can be allocated with AddTemplate).
// now must be the current or exported time either as a time.Time value or as one of the provieded ipfix time types.
// Template InformationElements and given data types must match. Numeric types are converted automatically.
// In case of error an error is returned.
func (m *MessageStream) SendData(now interface{}, template int, data ...interface{}) (err error) {
id := template - 256
if id < 0 || id >= len(m.templates) {
return UnknownTemplateError(template)
}
t := m.templates[id]
if t == nil {
return UnknownTemplateError(template)
}
err = t.assignDataRecord(&m.currentDataRecord, data...)
if err != nil {
return
}
return m.sendRecord(&m.currentDataRecord, now)
}
// Flush must be called before the underlying writer is closed. This function finishes and flushes
// eventual not yet finalized messages. This does not flush the underlying buffer!
func (m *MessageStream) Flush(now interface{}) (err error) {
if !m.dirty {
return nil
}
m.currentSet.finalize()
binary.BigEndian.PutUint16(m.length, uint16(m.buffer.length()))
switch v := now.(type) {
case time.Time:
binary.BigEndian.PutUint32(m.time, uint32(v.Unix()))
case DateTimeSeconds:
binary.BigEndian.PutUint32(m.time, uint32(v))
case DateTimeMilliseconds:
binary.BigEndian.PutUint32(m.time, uint32(v/1e3))
case DateTimeMicroseconds:
binary.BigEndian.PutUint32(m.time, uint32(v/1e6))
case DateTimeNanoseconds:
binary.BigEndian.PutUint32(m.time, uint32(v/1e9))
}
if err = m.buffer.finalize(m.w); err == nil {
m.dirty = false
}
return
}