Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support kafka protocol #267

Open
wants to merge 10 commits into
base: unstable
Choose a base branch
from
4 changes: 2 additions & 2 deletions .github/workflows/build_verification.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@ name: Build Verification
on:
workflow_dispatch:
push:
branches: [ "main", "dev","feature/*" ]
branches: [ "main", "dev","feature/*","unstable" ]
pull_request:
branches: [ "main", "dev","feature/*" ]
branches: [ "main", "dev","feature/*","unstable" ]

concurrency:
group: ${{ github.workflow }}-${{ github.ref }}
Expand Down
21 changes: 18 additions & 3 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@ name: Test
on:
workflow_dispatch:
push:
branches: [ "main", "dev","feature/*" ]
branches: [ "main", "dev","feature/*","unstable" ]
pull_request:
branches: [ "main", "dev","feature/*" ]
branches: [ "main", "dev","feature/*","unstable" ]

env:
kyanos_log_option: --bpf-event-log-level 5 --conntrack-log-level 5 --agent-log-level 5
Expand Down Expand Up @@ -161,6 +161,22 @@ jobs:
sudo chmod +x /usr/local/bin/docker-compose
docker-compose --version

- name: Test Kafka
uses: cilium/little-vm-helper@97c89f004bd0ab4caeacfe92ebc956e13e362e6b # v0.0.19
with:
provision: 'false'
cmd: |
set -euxo pipefail
uname -a
cat /etc/issue
pushd /host
if [ -f "/var/lib/kyanos/btf/current.btf" ]; then
bash /host/testdata/test_kafka.sh 'sudo /host/kyanos/kyanos $kyanos_log_option --btf /var/lib/kyanos/btf/current.btf'
else
bash /host/testdata/test_kafka.sh 'sudo /host/kyanos/kyanos $kyanos_log_option'
fi
popd

- name: Test CAP_BPF privilege check
uses: cilium/little-vm-helper@97c89f004bd0ab4caeacfe92ebc956e13e362e6b # v0.0.19
if: ${{ !contains(fromJSON('["4.19-20240912.022020", "5.4-20240912.022020"]'), matrix.kernel) }}
Expand All @@ -174,7 +190,6 @@ jobs:
bash /host/testdata/run_cap_bpf_test.sh "" "CAP_BPF"
popd


- name: Test CAP_SYS_ADMIN privilege check
uses: cilium/little-vm-helper@97c89f004bd0ab4caeacfe92ebc956e13e362e6b # v0.0.19
if: contains(fromJSON('["4.19-20240912.022020", "5.4-20240912.022020"]'), matrix.kernel)
Expand Down
2 changes: 1 addition & 1 deletion agent/buffer/stream_buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ func (sb *StreamBuffer) IsEmpty() bool {
return len(sb.buffers) == 0
}
func (sb *StreamBuffer) Clear() {
sb.buffers = sb.buffers[:]
sb.buffers = sb.buffers[0:0]
sb.timestamps.Clear()
}
func (sb *StreamBuffer) RemovePrefix(length int) {
Expand Down
40 changes: 39 additions & 1 deletion agent/conn/conntrack.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package conn

import (
"encoding/binary"
"fmt"
"kyanos/agent/buffer"
"kyanos/agent/protocol"
Expand Down Expand Up @@ -431,12 +432,44 @@ func getEventTimestamp(ke *bpf.AgentKernEvt, c *Connection4, isReq bool) uint64
}
}

func extractHeaderEvent(data []byte, ke *bpf.AgentKernEvt, c *Connection4) *bpf.SyscallEventData {
if !ke.PrependLengthHeader {
return nil
}

header := make([]byte, 4)
headerEvt := *ke
headerEvt.Len = 4
headerEvt.Seq = ke.Seq - 4
headerEvt.PrependLengthHeader = false

headerSyscallEvt := bpf.SyscallEventData{
SyscallEvent: bpf.SyscallEvent{
Ke: headerEvt,
BufSize: 4,
},
Buf: header,
}
binary.LittleEndian.PutUint32(header, uint32(ke.LengthHeader))
if common.ConntrackLog.Level >= logrus.DebugLevel {
common.ConntrackLog.Debugf("extract header event: %v", headerSyscallEvt)
}
return &headerSyscallEvt
}

func (c *Connection4) addDataToBufferAndTryParse(data []byte, ke *bpf.AgentKernEvt) bool {
addedToBuffer := false
isReq, _ := isReq(c, ke)
headerEvt := extractHeaderEvent(data, ke, c)
if isReq {
if headerEvt != nil {
c.reqStreamBuffer.Add(headerEvt.SyscallEvent.Ke.Seq, headerEvt.Buf, getEventTimestamp(ke, c, isReq))
}
addedToBuffer = c.reqStreamBuffer.Add(ke.Seq, data, getEventTimestamp(ke, c, isReq))
} else {
if headerEvt != nil {
c.respStreamBuffer.Add(headerEvt.SyscallEvent.Ke.Seq, headerEvt.Buf, getEventTimestamp(ke, c, isReq))
}
addedToBuffer = c.respStreamBuffer.Add(ke.Seq, data, getEventTimestamp(ke, c, isReq))
}
if !addedToBuffer {
Expand Down Expand Up @@ -524,7 +557,12 @@ func (c *Connection4) parseStreamBuffer(streamBuffer *buffer.StreamBuffer, messa
// TODO
startPos = 0
}
streamBuffer.RemovePrefix(startPos)
if startPos > 0 {
if common.ConntrackLog.Level >= logrus.DebugLevel {
common.ConntrackLog.Debugf("[parseStreamBuffer] %s Removed streambuffer some head data(%d bytes) due to find boundary from %s queue", c.ToString(), startPos, messageType.String())
}
streamBuffer.RemovePrefix(startPos)
}
originPos := streamBuffer.Position0()
// var parseState protocol.ParseState
for !stop && !streamBuffer.IsEmpty() {
Expand Down
2 changes: 1 addition & 1 deletion agent/conn/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -298,7 +298,7 @@ func (p *Processor) run() {
}

if common.ConntrackLog.Level >= logrus.DebugLevel {
common.ConntrackLog.Debugf("[conn][ts=%d] %s | type: %s, protocol: %d, \n", event.Ts, conn.ToString(), eventType, conn.Protocol)
common.ConntrackLog.Debugf("[conn][ts=%d] %s | type: %s, protocol: %d, role: %v\n", event.Ts, conn.ToString(), eventType, conn.Protocol, event.ConnInfo.Role)
}
case event := <-p.syscallEvents:
p.handleSyscallEvent(event, recordChannel)
Expand Down
68 changes: 68 additions & 0 deletions agent/protocol/decoder.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package protocol

import (
"reflect"
"strings"
)

Expand All @@ -13,6 +14,21 @@ type BinaryDecoder struct {
func NewBinaryDecoder(buf []byte) *BinaryDecoder {
return &BinaryDecoder{buf: buf, str: string(buf)}
}
func (b *BinaryDecoder) RemainingBytes() int {
return len(b.str)
}

func (b *BinaryDecoder) Buf() []byte {
return b.buf
}
func (b *BinaryDecoder) SubBuf(length int) []byte {
return b.buf[len(b.buf)-len(b.str)+length:]
}

func (b *BinaryDecoder) SetBuf(buf []byte) {
b.buf = buf
b.str = string(buf)
}

type NotFoundError struct {
msg string
Expand Down Expand Up @@ -83,3 +99,55 @@ func (d *BinaryDecoder) ExtractByte() (byte, error) {
d.readBytes++
return x, nil
}

func ExtractBEInt[TIntType int8 | int16 | int32 | uint32 | uint8 | int64](d *BinaryDecoder) (TIntType, error) {
typeSize := int(reflect.TypeOf(TIntType(0)).Size())
if len(d.str) < typeSize {
return 0, ResourceNotAvailble
}
var x TIntType = 0
for i := 0; i < typeSize; i++ {
x = TIntType(d.str[i]) | (x << 8)
}
d.str = d.str[typeSize:]
d.readBytes += typeSize
return x, nil
}

func ExtractLEInt[TIntType int32 | uint32 | uint8](d *BinaryDecoder) (TIntType, error) {
typeSize := int(reflect.TypeOf(TIntType(0)).Size())
if len(d.str) < 4 {
return 0, ResourceNotAvailble
}
var x TIntType = 0
for i := 0; i < typeSize; i++ {
x = TIntType(d.str[typeSize-1-i]) | (x << 8)
}
d.str = d.str[typeSize:]
d.readBytes += typeSize
return x, nil
}

func BEndianBytesToInt[TIntType int32 | uint32 | uint8](d *BinaryDecoder) TIntType {
typeSize := int(reflect.TypeOf(TIntType(0)).Size())
if len(d.str) < typeSize {
return 0
}
var x TIntType = 0
for i := 0; i < typeSize; i++ {
x = TIntType(d.str[i]) | (x << 8)
}
return x
}

func LEndianBytesToInt[TIntType int32 | uint32 | uint8](d *BinaryDecoder) TIntType {
typeSize := int(reflect.TypeOf(TIntType(0)).Size())
if len(d.str) < 4 {
return 0
}
var x TIntType = 0
for i := 0; i < typeSize; i++ {
x = TIntType(d.str[typeSize-1-i]) | (x << 8)
}
return x
}
Loading
Loading