Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: omit using gRPC for pktmon IPC on Windows #865

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 25 additions & 3 deletions controller/Dockerfile.windows-native
Original file line number Diff line number Diff line change
Expand Up @@ -3,25 +3,47 @@
# buildx targets, and this one requires legacy build.
# Maybe one day: https://github.com/moby/buildkit/issues/616
ARG BUILDER_IMAGE

FROM --platform=windows/amd64 ${BUILDER_IMAGE} as pktmon-builder
WORKDIR C:\\retina

# mcr.microsoft.com/oss/go/microsoft/golang:1.23.1-windowsservercore-ltsc2022
FROM --platform=windows/amd64 mcr.microsoft.com/oss/go/microsoft/golang@sha256:e2d55093522b5f4a311494255d0598145b1f13da5ae2354a09c7f7c1355f3ad9 AS builder
WORKDIR C:\\retina
COPY go.mod .
COPY go.sum .
ENV CGO_ENABLED=0
RUN go mod download


SHELL ["powershell", "-Command", "$ErrorActionPreference = 'Stop'; $ProgressPreference = 'SilentlyContinue';"]

RUN [Net.ServicePointManager]::SecurityProtocol = [Net.SecurityProtocolType]::Tls12; \
Invoke-WebRequest -UseBasicParsing -uri "https://github.com/msys2/msys2-installer/releases/download/2024-05-07/msys2-base-x86_64-20240507.sfx.exe" -OutFile msys2.exe; \
.\msys2.exe -y -oC:\; \
Remove-Item msys2.exe ; \
function msys() { C:\msys64\usr\bin\bash.exe @('-lc') + @Args; } \
msys ' '; \
msys 'pacman --noconfirm -Syuu'; \
msys 'pacman --noconfirm -S mingw-w64-x86_64-gcc'; \
msys 'pacman --noconfirm -Scc';

# pure magic: https://github.com/MicrosoftDocs/Virtualization-Documentation/blob/3f7c7ed7ef8d582c74ec740414c54f25bf5850c0/windows-container-samples/golang/Dockerfile#L15C1-L15C179
RUN setx path "C:\msys64\mingw64\bin"

ADD . .
RUN powershell -Command "Remove-Item -Recurse -Force C:\retina\pkg\plugin\windows\pktmon\stream"
COPY --from=pktmon-builder C:\\pktmon\\stream C:\\retina\\pkg\\plugin\\windows\\pktmon\\stream

ARG VERSION
ARG APP_INSIGHTS_ID
SHELL ["cmd", "/S", "/C"]
ENV VERSION=$VERSION

ENV APP_INSIGHTS_ID=$APP_INSIGHTS_ID
ENV CGO_ENABLED=1
RUN go build -v -o controller.exe -ldflags="-X github.com/microsoft/retina/internal/buildinfo.Version=%VERSION% -X github.com/microsoft/retina/internal/buildinfo.ApplicationInsightsID=%APP_INSIGHTS_ID%" .\controller
RUN go build -v -o captureworkload.exe -ldflags="-X github.com/microsoft/retina/internal/buildinfo.Version=%VERSION% -X github.com/microsoft/retina/internal/buildinfo.ApplicationInsightsID=%APP_INSIGHTS_ID%" .\captureworkload

FROM --platform=windows/amd64 ${BUILDER_IMAGE} as pktmon-builder
WORKDIR C:\\retina

FROM --platform=windows/amd64 mcr.microsoft.com/windows/nanoserver:ltsc2022 AS final
ADD https://github.com/microsoft/etl2pcapng/releases/download/v1.10.0/etl2pcapng.exe /etl2pcapng.exe
Expand Down
217 changes: 38 additions & 179 deletions pkg/plugin/windows/pktmon/pktmon_plugin_windows.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,53 +2,49 @@ package pktmon

import (
"context"
"encoding/json"
"fmt"
"os"
"os/exec"

"github.com/pkg/errors"

observerv1 "github.com/cilium/cilium/api/v1/observer"
"github.com/cilium/cilium/api/v1/flow"
v1 "github.com/cilium/cilium/pkg/hubble/api/v1"
"github.com/google/gopacket"
kcfg "github.com/microsoft/retina/pkg/config"
"github.com/microsoft/retina/pkg/enricher"
"github.com/microsoft/retina/pkg/log"
"github.com/microsoft/retina/pkg/metrics"
"github.com/microsoft/retina/pkg/plugin/api"
"github.com/microsoft/retina/pkg/plugin/windows/pktmon/stream"
"github.com/microsoft/retina/pkg/utils"
"go.uber.org/zap"
"go.uber.org/zap/zapio"
"golang.org/x/sync/errgroup"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/status"
)

var (
ErrNilEnricher = errors.New("enricher is nil")
ErrUnexpectedExit = errors.New("unexpected exit")
ErrNilGrpcClient = errors.New("grpc client is nil")

socket = "/temp/retina-pktmon.sock"
ErrNilEnricher = errors.New("enricher is nil")
ErrNotSupported = errors.New("not supported")
)

const (
Name = "pktmon"
connectionRetryAttempts = 5
eventChannelSize = 1000
Name = "pktmon"
eventChannelSize = 1000

defaultBufferMultiplier = 10
defaultTruncationSize = 256
defaultBufferSize = 9000
)

type PktMonConn interface {
Initialize() error
PrintAndResetMissedWrite(sessionID string)
PrintAndResetMissedRead(sessionID string)
ParseDNS(fl *flow.Flow, metadata *utils.RetinaMetadata, packet gopacket.Packet) error
GetNextPacket(ctx context.Context) (*flow.Flow, *utils.RetinaMetadata, gopacket.Packet, error)
}

type Plugin struct {
enricher enricher.EnricherInterface
externalChannel chan *v1.Event
l *log.ZapLogger
pktmonCmd *exec.Cmd
stdWriter *zapio.Writer
errWriter *zapio.Writer

grpcClient *GRPCClient
stream observerv1.Observer_GetFlowsClient
pkt PktMonConn
}

func (p *Plugin) Init() error {
Expand All @@ -59,174 +55,44 @@ func (p *Plugin) Name() string {
return "pktmon"
}

type GRPCClient struct {
observerv1.ObserverClient
}

func newGRPCClient() (*GRPCClient, error) {
retryPolicy := map[string]any{
"methodConfig": []map[string]any{
{
"waitForReady": true,
"retryPolicy": map[string]any{
"MaxAttempts": connectionRetryAttempts,
"InitialBackoff": ".01s",
"MaxBackoff": ".01s",
"BackoffMultiplier": 1.0,
"RetryableStatusCodes": []string{"UNAVAILABLE"},
},
},
},
}

bytes, err := json.Marshal(retryPolicy)
if err != nil {
return nil, errors.Wrapf(err, "failed to marshal retry policy")
}

retryPolicyStr := string(bytes)

conn, err := grpc.Dial(fmt.Sprintf("%s:%s", "unix", socket), grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithDefaultServiceConfig(retryPolicyStr))
if err != nil {
return nil, errors.Wrapf(err, "failed to dial pktmon server:")
}

return &GRPCClient{observerv1.NewObserverClient(conn)}, nil
}

func (p *Plugin) RunPktMonServer(ctx context.Context) error {
p.stdWriter = &zapio.Writer{Log: p.l.Logger, Level: zap.InfoLevel}
defer p.stdWriter.Close()
p.errWriter = &zapio.Writer{Log: p.l.Logger, Level: zap.ErrorLevel}
defer p.errWriter.Close()

pwd, err := os.Getwd()
if err != nil {
return errors.Wrapf(err, "failed to get current working directory for pktmon")
}

cmd := pwd + "\\" + "controller-pktmon.exe"

p.pktmonCmd = exec.CommandContext(ctx, cmd)
p.pktmonCmd.Dir = pwd
p.pktmonCmd.Args = append(p.pktmonCmd.Args, "--socketpath", socket)
p.pktmonCmd.Env = os.Environ()
p.pktmonCmd.Stdout = p.stdWriter
p.pktmonCmd.Stderr = p.errWriter

p.l.Info("calling start on pktmon stream server", zap.String("cmd", p.pktmonCmd.String()))

// block this thread, and should it ever return, it's a problem
err = p.pktmonCmd.Run()
if err != nil {
return errors.Wrapf(err, "pktmon server exited when it should not have")
}

// we never want to return happy from this
return errors.Wrapf(ErrUnexpectedExit, "pktmon server exited unexpectedly")
}

func (p *Plugin) Start(ctx context.Context) error {
p.enricher = enricher.Instance()
if p.enricher == nil {
return ErrNilEnricher
}

g, ctx := errgroup.WithContext(ctx)
p.pkt = stream.NewWinPktMonStreamer(p.l, defaultTruncationSize, defaultBufferSize, defaultBufferMultiplier)

g.Go(func() error {
err := p.RunPktMonServer(ctx)
if err != nil {
return errors.Wrapf(err, "pktmon server exited")
}
return nil
})

err := p.SetupStream()
err := p.pkt.Initialize()
if err != nil {
return errors.Wrapf(err, "failed to setup initial pktmon stream")
}

// run the getflows loop
g.Go(func() error {
for {
err := p.GetFlow(ctx)
if _, ok := status.FromError(err); ok {
p.l.Error("failed to get flow, retriable:", zap.Error(err))
continue
}
return errors.Wrapf(err, "failed to get flow, unrecoverable")
}
})

return g.Wait()
}

func (p *Plugin) SetupStream() error {
var err error
fn := func() error {
p.l.Info("creating pktmon client")
p.grpcClient, err = newGRPCClient()
if err != nil {
return errors.Wrapf(err, "failed to create pktmon client before getting flows")
}

return nil
}
err = utils.Retry(fn, connectionRetryAttempts)
if err != nil {
return errors.Wrapf(err, "failed to create pktmon client")
}

return nil
}

func (p *Plugin) StartStream(ctx context.Context) error {
if p.grpcClient == nil {
return errors.Wrapf(ErrNilGrpcClient, "unable to start stream")
}

var err error
fn := func() error {
p.stream, err = p.grpcClient.GetFlows(ctx, &observerv1.GetFlowsRequest{})
if err != nil {
return errors.Wrapf(err, "failed to open pktmon stream")
}
return nil
}
err = utils.Retry(fn, connectionRetryAttempts)
if err != nil {
return errors.Wrapf(err, "failed to create pktmon client")
}

return nil
}

func (p *Plugin) GetFlow(ctx context.Context) error {
ctx, cancel := context.WithCancel(ctx)
defer cancel()

err := p.StartStream(ctx)
if err != nil {
return errors.Wrapf(err, "failed to setup pktmon stream")
return errors.Wrapf(err, "Failed to initialize pktmon")
}

for {
select {
case <-ctx.Done():
return errors.Wrapf(ctx.Err(), "pktmon plugin context done")
default:
event, err := p.stream.Recv()
fl, meta, packet, err := p.pkt.GetNextPacket(ctx)
if fl == nil {
continue
}

if err != nil {
return errors.Wrapf(err, "failed to receive pktmon event")
p.l.Error("error getting packet", zap.Error(err))
continue
}

fl := event.GetFlow()
if fl == nil {
p.l.Error("received nil flow, flow proto mismatch from client/server?")
return nil
// do this here instead of GetNextPacket to keep higher level
// packet parsing out of L4 parsing
err = p.pkt.ParseDNS(fl, meta, packet)
if err != nil {
p.l.Error("failed to parse DNS", zap.Error(err))
continue
}

utils.AddRetinaMetadata(fl, meta)

ev := &v1.Event{
Event: fl,
Timestamp: fl.GetTime(),
Expand Down Expand Up @@ -264,13 +130,6 @@ func New(_ *kcfg.Config) api.Plugin {
}

func (p *Plugin) Stop() error {
if p.pktmonCmd != nil {
err := p.pktmonCmd.Process.Kill()
if err != nil {
return errors.Wrapf(err, "failed to kill pktmon server during stop")
}
}

return nil
}

Expand Down
49 changes: 49 additions & 0 deletions pkg/plugin/windows/pktmon/stream/stream.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package stream

import (
"context"

"github.com/cilium/cilium/api/v1/flow"
"github.com/google/gopacket"
"github.com/microsoft/retina/pkg/log"
"github.com/microsoft/retina/pkg/utils"
)

type WinPktMon struct {
l *log.ZapLogger
}

func NewWinPktMonStreamer(l *log.ZapLogger, truncationSize, bufferSize, bufferMultiplier int) *WinPktMon {
return &WinPktMon{
l: l,
}
}

func (w *WinPktMon) Initialize() error {
return nil
}

func (w *WinPktMon) GetNextPacket(ctx context.Context) (*flow.Flow, *utils.RetinaMetadata, gopacket.Packet, error) {
w.l.Info("pktmon plugin not implemented")
<-ctx.Done()
return nil, nil, nil, ctx.Err()
}

func (w *WinPktMon) ParseDNS(fl *flow.Flow, metadata *utils.RetinaMetadata, packet gopacket.Packet) error {
return nil
}

func (w *WinPktMon) IncMissedWrite(missed int) {
}

func (w *WinPktMon) IncMissedRead(missed int) {
}

func (w *WinPktMon) PrintAndResetMissedWrite(sessionID string) {
}

func (w *WinPktMon) PrintAndResetMissedRead(sessionID string) {
}

func AddTcpFlagsBool(f *flow.Flow, syn, ack, fin, rst, psh, urg bool) {
}
Loading