Skip to content

Commit

Permalink
feat: support for parsing ipip packet (#257)
Browse files Browse the repository at this point in the history
* feat: support for parsing ipip packet

    This PR introduces a new feature for parsing IPIP packets and correctly associating them.

    Additionally, this PR improves the current logic in processor.go to prevent the incorrect association of syscall and kernel events. When new events arrive, they are first enqueued and then processed only if they have been in the queue longer than a specified time limit. This is necessary because when many short connections use the same tgid-fd, syscall and kernel events may arrive asynchronously in user space. As a result, events from a new connection might reach user space before the connection event itself, causing the new connection's events to be incorrectly associated with the old connection and leading to erroneous time calculations.

  And to ensure that the total time calculation is not negative, the syscall event will report the syscall start time and the syscall duration. By adding the start time and the duration, we can determine the end time. This way, when calculating the client's elapsed time, we can subtract the start time of the write syscall from the end time of the read syscall.

  Additionally, to ensure that DEV_IN and TCP_IN events are present when the server receives the first request, the concept of a first packet event is introduced. Even if the kernel does not find conn_info or other information when reporting the event, as long as its seq=1, it will be considered a first packet. This allows it to be directly reported to user space. In user space, the connection is found based on its sock key, and then it is converted into a kernevent for processing. This way, even for the server's first request, we can see the total time and read from socket time.

* fix: remove bpf_printk statements

* feat: add first-packet-event-map-page-num option

* refactor: translate comments to english
  • Loading branch information
hengyoush authored Jan 8, 2025
1 parent 8ff2696 commit 3f6a44c
Show file tree
Hide file tree
Showing 58 changed files with 1,557 additions and 534 deletions.
5 changes: 5 additions & 0 deletions agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,11 @@ func SetupAgent(options ac.AgentOptions) {
if err != nil {
return
}
firstPacketChannel := make(chan *bpf.AgentFirstPacketEvt, 10)
firstPacketProcessor := conn.NewFirstPacketProcessor(firstPacketChannel, pm.GetFirstPacketEventsChannels())
go firstPacketProcessor.Start()
err = bpf.PullFirstPacketEvents(ctx, firstPacketChannel, options.FirstPacketEventMapPageNum)

err = _bf.AttachProgs(&options)
if err != nil {
return
Expand Down
97 changes: 74 additions & 23 deletions agent/analysis/stat.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,19 +181,22 @@ func (s *StatRecorder) ReceiveRecord(r protocol.Record, connection *conn.Connect
// because we could missed some nicIngressEvents, the total duration may be negative
annotatedRecord.StartTs = math.MaxUint64
if hasNicInEvents {
annotatedRecord.StartTs = min(events.nicIngressEvents[0].GetTimestamp(), annotatedRecord.StartTs)
}
if hasTcpInEvents {
annotatedRecord.StartTs = min(events.tcpInEvents[0].GetTimestamp(), annotatedRecord.StartTs)
}
if hasUserCopyEvents {
annotatedRecord.StartTs = min(events.userCopyEvents[0].GetTimestamp(), annotatedRecord.StartTs)
}
if hasReadSyscallEvents {
annotatedRecord.StartTs = min(events.readSyscallEvents[0].GetTimestamp(), annotatedRecord.StartTs)
nicInTimestamp, _, ok := events.nicIngressEvents[0].GetMinIfItmestampAttr()
if ok {
annotatedRecord.StartTs = min(uint64(nicInTimestamp), annotatedRecord.StartTs)
}
} else if hasTcpInEvents {
annotatedRecord.StartTs = min(events.tcpInEvents[0].GetStartTs(), annotatedRecord.StartTs)
} else if hasUserCopyEvents {
annotatedRecord.StartTs = min(events.userCopyEvents[0].GetStartTs(), annotatedRecord.StartTs)
} else if hasReadSyscallEvents {
annotatedRecord.StartTs = min(events.readSyscallEvents[0].GetStartTs(), annotatedRecord.StartTs)
}
if hasDevOutEvents {
annotatedRecord.EndTs = events.devOutEvents[len(events.devOutEvents)-1].GetTimestamp()
devOutTimestamp, _, ok := events.devOutEvents[len(events.devOutEvents)-1].GetMaxIfItmestampAttr()
if ok {
annotatedRecord.EndTs = uint64(devOutTimestamp)
}
}
if connection.IsSsl() {
annotatedRecord.ReqPlainTextSize = events.ingressMessage.ByteSize()
Expand All @@ -205,28 +208,28 @@ func (s *StatRecorder) ReceiveRecord(r protocol.Record, connection *conn.Connect
annotatedRecord.TotalDuration = float64(annotatedRecord.EndTs) - float64(annotatedRecord.StartTs)
}
if hasReadSyscallEvents && hasWriteSyscallEvents {
annotatedRecord.BlackBoxDuration = float64(events.writeSyscallEvents[len(events.writeSyscallEvents)-1].GetTimestamp()) - float64(events.readSyscallEvents[0].GetTimestamp())
annotatedRecord.BlackBoxDuration = float64(events.writeSyscallEvents[len(events.writeSyscallEvents)-1].GetEndTs()) - float64(events.readSyscallEvents[0].GetStartTs())
} else {
annotatedRecord.BlackBoxDuration = float64(events.egressMessage.TimestampNs()) - float64(events.ingressMessage.TimestampNs())
}
if hasUserCopyEvents && hasTcpInEvents {
annotatedRecord.ReadFromSocketBufferDuration = float64(events.userCopyEvents[len(events.userCopyEvents)-1].GetTimestamp()) - float64(events.tcpInEvents[0].GetTimestamp())
annotatedRecord.ReadFromSocketBufferDuration = float64(events.userCopyEvents[len(events.userCopyEvents)-1].GetStartTs()) - float64(events.tcpInEvents[0].GetStartTs())
}
if hasTcpInEvents && hasNicInEvents {
annotatedRecord.CopyToSocketBufferDuration = float64(events.tcpInEvents[len(events.tcpInEvents)-1].GetTimestamp() - events.nicIngressEvents[0].GetTimestamp())
annotatedRecord.CopyToSocketBufferDuration = float64(events.tcpInEvents[len(events.tcpInEvents)-1].GetStartTs() - events.nicIngressEvents[0].GetStartTs())
}
annotatedRecord.ReqSyscallEventDetails = KernEventsToEventDetails[analysisCommon.SyscallEventDetail](events.readSyscallEvents)
annotatedRecord.RespSyscallEventDetails = KernEventsToEventDetails[analysisCommon.SyscallEventDetail](events.writeSyscallEvents)
annotatedRecord.ReqNicEventDetails = KernEventsToNicEventDetails(events.nicIngressEvents)
annotatedRecord.RespNicEventDetails = KernEventsToNicEventDetails(events.devOutEvents)
} else {
if hasWriteSyscallEvents {
annotatedRecord.StartTs = events.writeSyscallEvents[0].GetTimestamp()
annotatedRecord.StartTs = findMinTimestamp(events.writeSyscallEvents, true)
} else {
annotatedRecord.StartTs = events.egressMessage.TimestampNs()
}
if hasReadSyscallEvents {
annotatedRecord.EndTs = events.readSyscallEvents[len(events.readSyscallEvents)-1].GetTimestamp()
annotatedRecord.EndTs = findMaxTimestamp(events.readSyscallEvents, false)
} else {
annotatedRecord.EndTs = events.ingressMessage.TimestampNs()
}
Expand All @@ -242,19 +245,43 @@ func (s *StatRecorder) ReceiveRecord(r protocol.Record, connection *conn.Connect
annotatedRecord.TotalDuration = float64(events.ingressMessage.TimestampNs()) - float64(events.egressMessage.TimestampNs())
}
if hasNicInEvents && hasDevOutEvents {
annotatedRecord.BlackBoxDuration = float64(events.nicIngressEvents[len(events.nicIngressEvents)-1].GetTimestamp()) - float64(events.devOutEvents[0].GetTimestamp())
nicIngressTimestamp := int64(0)
for _, nicIngressEvent := range events.nicIngressEvents {
_nicIngressTimestamp, _, ok := nicIngressEvent.GetMinIfItmestampAttr()
if ok {
nicIngressTimestamp = max(nicIngressTimestamp, _nicIngressTimestamp)
}
}

if nicIngressTimestamp != 0 {
nicEgressTimestamp := int64(math.MaxInt64)
for _, devOutEvent := range events.devOutEvents {
_nicEgressTimestamp, _, ok := devOutEvent.GetMaxIfItmestampAttr()
if ok {
nicEgressTimestamp = min(nicEgressTimestamp, _nicEgressTimestamp)
}
}
if nicEgressTimestamp != int64(math.MaxInt64) {
annotatedRecord.BlackBoxDuration = float64(nicIngressTimestamp) - float64(nicEgressTimestamp)
} else {
annotatedRecord.BlackBoxDuration = -1
}
nicEgressTimestamp++
} else {
annotatedRecord.BlackBoxDuration = -1
}
}
if (hasUserCopyEvents || hasReadSyscallEvents) && hasTcpInEvents {
var readFromEndTime float64
if hasUserCopyEvents {
readFromEndTime = float64(events.userCopyEvents[len(events.userCopyEvents)-1].GetTimestamp())
readFromEndTime = float64(events.userCopyEvents[len(events.userCopyEvents)-1].GetStartTs())
} else {
readFromEndTime = float64(events.readSyscallEvents[len(events.readSyscallEvents)-1].GetTimestamp())
readFromEndTime = float64(events.readSyscallEvents[len(events.readSyscallEvents)-1].GetEndTs())
}
annotatedRecord.ReadFromSocketBufferDuration = readFromEndTime - float64(events.tcpInEvents[0].GetTimestamp())
annotatedRecord.ReadFromSocketBufferDuration = readFromEndTime - float64(events.tcpInEvents[0].GetStartTs())
}
if hasTcpInEvents && hasNicInEvents {
annotatedRecord.CopyToSocketBufferDuration = float64(events.tcpInEvents[len(events.tcpInEvents)-1].GetTimestamp() - events.nicIngressEvents[0].GetTimestamp())
annotatedRecord.CopyToSocketBufferDuration = float64(events.tcpInEvents[len(events.tcpInEvents)-1].GetStartTs() - events.nicIngressEvents[0].GetStartTs())
}
annotatedRecord.ReqSyscallEventDetails = KernEventsToEventDetails[analysisCommon.SyscallEventDetail](events.writeSyscallEvents)
annotatedRecord.RespSyscallEventDetails = KernEventsToEventDetails[analysisCommon.SyscallEventDetail](events.readSyscallEvents)
Expand Down Expand Up @@ -292,6 +319,30 @@ func (s *StatRecorder) ReceiveRecord(r protocol.Record, connection *conn.Connect
return nil
}

func findMaxTimestamp(events []conn.KernEvent, useStartTs bool) uint64 {
var maxTimestamp uint64 = 0
for _, each := range events {
if useStartTs {
maxTimestamp = max(maxTimestamp, each.GetStartTs())
} else {
maxTimestamp = max(maxTimestamp, each.GetEndTs())
}
}
return maxTimestamp
}

func findMinTimestamp(events []conn.KernEvent, useStartTs bool) uint64 {
var minTimestamp uint64 = math.MaxUint64
for _, each := range events {
if useStartTs {
minTimestamp = min(minTimestamp, each.GetStartTs())
} else {
minTimestamp = min(minTimestamp, each.GetEndTs())
}
}
return minTimestamp
}

func KernEventsToEventDetails[K analysisCommon.PacketEventDetail | analysisCommon.SyscallEventDetail](kernEvents []conn.KernEvent) []K {
if len(kernEvents) == 0 {
return []K{}
Expand All @@ -300,7 +351,7 @@ func KernEventsToEventDetails[K analysisCommon.PacketEventDetail | analysisCommo
for _, each := range kernEvents {
result = append(result, K{
ByteSize: each.GetLen(),
Timestamp: each.GetTimestamp(),
Timestamp: each.GetStartTs(),
})
}
return result
Expand All @@ -315,7 +366,7 @@ func KernEventsToNicEventDetails(kernEvents []conn.KernEvent) []analysisCommon.N
PacketEventDetail: analysisCommon.PacketEventDetail{

ByteSize: each.GetLen(),
Timestamp: each.GetTimestamp(),
Timestamp: each.GetStartTs(),
},
Attributes: each.GetAttributes(),
})
Expand Down
13 changes: 9 additions & 4 deletions agent/buffer/stream_buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,11 @@ func (sb *StreamBuffer) FindTimestampBySeq(targetSeq uint64) (uint64, bool) {
return value.(uint64), true
}

func (sb *StreamBuffer) Add(seq uint64, data []byte, timestamp uint64) {
func (sb *StreamBuffer) Add(seq uint64, data []byte, timestamp uint64) bool {
_, found := sb.timestamps.Get(seq)
if found {
return false
}
dataLen := uint64(len(data))
newBuffer := &Buffer{
buf: data,
Expand All @@ -137,16 +141,16 @@ func (sb *StreamBuffer) Add(seq uint64, data []byte, timestamp uint64) {
if sb.IsEmpty() {
sb.updateTimestamp(seq, timestamp)
sb.buffers = append(sb.buffers, newBuffer)
return
return true
}
if sb.Position0()-int(seq) >= maxBytesGap {
return
return true
}
if int(seq)-sb.PositionN() >= maxBytesGap {
sb.Clear()
sb.buffers = append(sb.buffers, newBuffer)
sb.updateTimestamp(seq, timestamp)
return
return true
}

leftIndex, leftBuffer := sb.findLeftBufferBySeq(seq)
Expand Down Expand Up @@ -180,6 +184,7 @@ func (sb *StreamBuffer) Add(seq uint64, data []byte, timestamp uint64) {
}
sb.updateTimestamp(seq, timestamp)
sb.shrinkBufferUntilSizeBelowCapacity()
return true
}

func (sb *StreamBuffer) updateTimestamp(seq uint64, timestamp uint64) {
Expand Down
1 change: 1 addition & 0 deletions agent/common/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ type AgentOptions struct {
SslPerfEventMapPageNum int
ConnPerfEventMapPageNum int
KernPerfEventMapPageNum int
FirstPacketEventMapPageNum int
}

func (o AgentOptions) FilterByContainer() bool {
Expand Down
Loading

0 comments on commit 3f6a44c

Please sign in to comment.