Skip to content

Commit

Permalink
syz-fuzzer: remove ipc gate
Browse files Browse the repository at this point in the history
Ipc gate slows down overall execution a lot.
Without ipc gate I am getting ~20% more executions with debug kernel
and ~100% more executions with a fast non-debug kernel.

Replace ipc gate with explicit tracking of last executing programs
per proc in syz-manager.
Ipc gate was also used for leak checking, but leak checking seems
to be still broken. At least in my local runs I am not getting
any leaks even with the previous fix.
So remove the gate completly for now. Taking into account that
we are likely to rewrite this code in C++ soon, it makes
little sense to create a special gate for leak checking only in Go.

Update google#4728
  • Loading branch information
dvyukov committed May 23, 2024
1 parent ef4747c commit 5e8ef3c
Show file tree
Hide file tree
Showing 7 changed files with 173 additions and 62 deletions.
16 changes: 0 additions & 16 deletions syz-fuzzer/fuzzer.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ import (

"github.com/google/syzkaller/pkg/flatrpc"
"github.com/google/syzkaller/pkg/host"
"github.com/google/syzkaller/pkg/ipc"
"github.com/google/syzkaller/pkg/ipc/ipcconfig"
"github.com/google/syzkaller/pkg/log"
"github.com/google/syzkaller/pkg/osutil"
Expand All @@ -33,7 +32,6 @@ import (
type FuzzerTool struct {
conn *flatrpc.Conn
executor string
gate *ipc.Gate
checkLeaks atomic.Int32
timeouts targets.Timeouts
leakFrames []string
Expand All @@ -43,13 +41,6 @@ type FuzzerTool struct {
maxSignal signal.Signal
}

// Gate size controls how deep in the log the last executed by every proc
// program may be. The intent is to make sure that, given the output log,
// we always understand what was happening.
// Judging by the logs collected on syzbot, 32 should be a reasonable figure.
// It coincides with prog.MaxPids.
const gateSize = prog.MaxPids

// TODO: split into smaller methods.
// nolint: funlen, gocyclo
func main() {
Expand Down Expand Up @@ -173,13 +164,6 @@ func main() {
requests: make(chan *flatrpc.ExecRequest, *flagProcs*4),
}
fuzzerTool.filterDataRaceFrames(connectReply.RaceFrames)
var gateCallback func()
for _, feat := range features {
if feat.Id == flatrpc.FeatureLeak && feat.Reason == "" {
gateCallback = fuzzerTool.leakGateCallback
}
}
fuzzerTool.gate = ipc.NewGate(gateSize, gateCallback)

log.Logf(0, "starting %v executor processes", *flagProcs)
for pid := 0; pid < *flagProcs; pid++ {
Expand Down
5 changes: 0 additions & 5 deletions syz-fuzzer/proc.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,15 +117,10 @@ func (proc *Proc) executeProgram(req *flatrpc.ExecRequest, wait time.Duration) (
var output []byte
var info *flatrpc.ProgInfo
var hanged bool
// On a heavily loaded VM, syz-executor may take significant time to start.
// Let's do it outside of the gate ticket.
err := proc.env.RestartIfNeeded(req.ExecOpts)
if err == nil {
// Limit concurrency.
ticket := proc.tool.gate.Enter()
proc.tool.startExecutingCall(req.Id, proc.pid, try, wait)
output, info, hanged, err = proc.env.ExecProg(req.ExecOpts, req.ProgData)
proc.tool.gate.Leave(ticket)
// Don't print output if returning error b/c it may contain SYZFAIL.
if !returnError {
log.Logf(2, "result hanged=%v err=%v: %s", hanged, err, output)
Expand Down
59 changes: 59 additions & 0 deletions syz-manager/last_executing.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
// Copyright 2024 syzkaller project authors. All rights reserved.
// Use of this source code is governed by Apache 2 LICENSE that can be found in the LICENSE file.

package main

import (
"sort"
"time"
)

type LastExecuting struct {
count int
procs []ExecRecord
positions []int
}

type ExecRecord struct {
Proc int
Prog []byte
Time time.Duration
}

func MakeLastExecuting(procs, count int) *LastExecuting {
return &LastExecuting{
count: count,
procs: make([]ExecRecord, procs*count),
positions: make([]int, procs),
}
}

func (last *LastExecuting) Note(proc int, prog []byte, now time.Duration) {
pos := &last.positions[proc]
last.procs[proc*last.count+*pos] = ExecRecord{
Proc: proc,
Prog: prog,
Time: now,
}
*pos++
if *pos == last.count {
*pos = 0
}
}

func (last *LastExecuting) Collect() []ExecRecord {
procs := last.procs
last.procs = nil // The type must not be used after this.
sort.Slice(procs, func(i, j int) bool {
return procs[i].Time < procs[j].Time
})
max := procs[len(procs)-1].Time
for i := len(procs) - 1; i >= 0; i-- {
if procs[i].Time == 0 {
procs = procs[i+1:]
break
}
procs[i].Time = max - procs[i].Time
}
return procs
}
56 changes: 56 additions & 0 deletions syz-manager/last_executing_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
// Copyright 2024 syzkaller project authors. All rights reserved.
// Use of this source code is governed by Apache 2 LICENSE that can be found in the LICENSE file.

package main

import (
"testing"

"github.com/stretchr/testify/assert"
)

func TestLastExecutingEmpty(t *testing.T) {
last := MakeLastExecuting(10, 10)
assert.Empty(t, last.Collect())
}

func TestLastExecuting(t *testing.T) {
last := MakeLastExecuting(10, 3)
last.Note(0, []byte("prog1"), 1)

last.Note(1, []byte("prog2"), 2)
last.Note(1, []byte("prog3"), 3)

last.Note(3, []byte("prog4"), 4)
last.Note(3, []byte("prog5"), 5)
last.Note(3, []byte("prog6"), 6)

last.Note(7, []byte("prog7"), 7)
last.Note(7, []byte("prog8"), 8)
last.Note(7, []byte("prog9"), 9)
last.Note(7, []byte("prog10"), 10)
last.Note(7, []byte("prog11"), 11)

last.Note(9, []byte("prog12"), 12)

last.Note(8, []byte("prog13"), 13)

assert.Equal(t, last.Collect(), []ExecRecord{
{Proc: 0, Prog: []byte("prog1"), Time: 12},

{Proc: 1, Prog: []byte("prog2"), Time: 11},
{Proc: 1, Prog: []byte("prog3"), Time: 10},

{Proc: 3, Prog: []byte("prog4"), Time: 9},
{Proc: 3, Prog: []byte("prog5"), Time: 8},
{Proc: 3, Prog: []byte("prog6"), Time: 7},

{Proc: 7, Prog: []byte("prog9"), Time: 4},
{Proc: 7, Prog: []byte("prog10"), Time: 3},
{Proc: 7, Prog: []byte("prog11"), Time: 2},

{Proc: 9, Prog: []byte("prog12"), Time: 1},

{Proc: 8, Prog: []byte("prog13"), Time: 0},
})
}
26 changes: 20 additions & 6 deletions syz-manager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -758,12 +758,13 @@ func (mgr *Manager) runInstance(index int) (*Crash, error) {
mgr.checkUsedFiles()
// Use unique instance names to prevent name collisions in case of untimely RPC messages.
instanceName := fmt.Sprintf("vm-%d", mgr.nextInstanceID.Add(1))
injectLog := make(chan []byte, 10)
mgr.serv.createInstance(instanceName, injectLog)
injectExec := make(chan bool, 10)
mgr.serv.createInstance(instanceName, injectExec)

rep, vmInfo, err := mgr.runInstanceInner(index, instanceName, injectLog)
machineInfo := mgr.serv.shutdownInstance(instanceName, rep != nil)
rep, vmInfo, err := mgr.runInstanceInner(index, instanceName, injectExec)
lastExec, machineInfo := mgr.serv.shutdownInstance(instanceName, rep != nil)
if rep != nil {
prependExecuting(rep, lastExec)
if len(vmInfo) != 0 {
machineInfo = append(append(vmInfo, '\n'), machineInfo...)
}
Expand All @@ -785,7 +786,7 @@ func (mgr *Manager) runInstance(index int) (*Crash, error) {
return crash, nil
}

func (mgr *Manager) runInstanceInner(index int, instanceName string, injectLog <-chan []byte) (
func (mgr *Manager) runInstanceInner(index int, instanceName string, injectExec <-chan bool) (
*report.Report, []byte, error) {
start := time.Now()

Expand Down Expand Up @@ -863,7 +864,7 @@ func (mgr *Manager) runInstanceInner(index int, instanceName string, injectLog <
}
cmd := instance.FuzzerCmd(args)
_, rep, err := inst.Run(mgr.cfg.Timeouts.VMRunningTime, mgr.reporter, cmd,
vm.ExitTimeout, vm.StopChan(mgr.vmStop), vm.InjectOutput(injectLog),
vm.ExitTimeout, vm.StopChan(mgr.vmStop), vm.InjectExecuting(injectExec),
vm.EarlyFinishCb(func() {
// Depending on the crash type and kernel config, fuzzing may continue
// running for several seconds even after kernel has printed a crash report.
Expand All @@ -886,6 +887,19 @@ func (mgr *Manager) runInstanceInner(index int, instanceName string, injectLog <
return rep, vmInfo, nil
}

func prependExecuting(rep *report.Report, lastExec []ExecRecord) {
buf := new(bytes.Buffer)
fmt.Fprintf(buf, "last executing test programs:\n\n")
for _, exec := range lastExec {
fmt.Fprintf(buf, "%v ago: executing program %v:\n%s\n", exec.Time, exec.Proc, exec.Prog)
}
rep.Output = append(buf.Bytes(), rep.Output...)
n := len(buf.Bytes())
rep.StartPos += n
rep.EndPos += n
rep.SkipPos += n
}

func (mgr *Manager) emailCrash(crash *Crash) {
if len(mgr.cfg.EmailAddrs) == 0 {
return
Expand Down
55 changes: 30 additions & 25 deletions syz-manager/rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/google/syzkaller/pkg/ipc"
"github.com/google/syzkaller/pkg/log"
"github.com/google/syzkaller/pkg/mgrconfig"
"github.com/google/syzkaller/pkg/osutil"
"github.com/google/syzkaller/pkg/signal"
"github.com/google/syzkaller/pkg/stats"
"github.com/google/syzkaller/pkg/vminfo"
Expand Down Expand Up @@ -63,16 +64,15 @@ type RPCServer struct {

type Runner struct {
stopped bool
shutdown chan bool
injectLog chan<- []byte
injectStop chan bool
injectBuf bytes.Buffer
finished chan bool
injectExec chan<- bool
conn *flatrpc.Conn
machineInfo []byte
canonicalizer *cover.CanonicalizerInstance
nextRequestID int64
requests map[int64]*queue.Request
executing map[int64]bool
lastExec *LastExecuting
rnd *rand.Rand
}

Expand Down Expand Up @@ -148,6 +148,7 @@ func (serv *RPCServer) handleConn(conn *flatrpc.Conn) {
runner.canonicalizer = canonicalizer
checkLeaks := serv.checkLeaks
serv.mu.Unlock()
defer close(runner.finished)

if checkLeaks {
if err := runner.sendStartLeakChecks(); err != nil {
Expand All @@ -158,15 +159,6 @@ func (serv *RPCServer) handleConn(conn *flatrpc.Conn) {

err = serv.connectionLoop(runner)
log.Logf(2, "runner %v: %v", name, err)

crashed := <-runner.shutdown
for id, req := range runner.requests {
status := queue.Restarted
if crashed && runner.executing[id] {
status = queue.Crashed
}
req.Done(&queue.Result{Status: status})
}
}

func (serv *RPCServer) handshake(conn *flatrpc.Conn) (string, []byte, *cover.CanonicalizerInstance, error) {
Expand Down Expand Up @@ -359,6 +351,10 @@ func (serv *RPCServer) handleExecutingMessage(runner *Runner, msg *flatrpc.Execu
if req == nil {
return fmt.Errorf("can't find executing request %v", msg.Id)
}
proc := int(msg.ProcId)
if proc < 0 || proc >= serv.cfg.Procs {
return fmt.Errorf("got bad proc id %v", proc)
}
serv.statExecs.Add(1)
if msg.Try == 0 {
if msg.WaitDuration != 0 {
Expand All @@ -370,12 +366,11 @@ func (serv *RPCServer) handleExecutingMessage(runner *Runner, msg *flatrpc.Execu
} else {
serv.statExecRetries.Add(1)
}
fmt.Fprintf(&runner.injectBuf, "executing program %v:\n%s\n", msg.ProcId, req.Prog.Serialize())
runner.lastExec.Note(proc, req.Prog.Serialize(), osutil.MonotonicNano())
select {
case runner.injectLog <- runner.injectBuf.Bytes():
case <-runner.injectStop:
case runner.injectExec <- true:
default:
}
runner.injectBuf.Reset()
runner.executing[msg.Id] = true
return nil
}
Expand Down Expand Up @@ -511,13 +506,13 @@ func validateRequest(req *queue.Request) error {
return nil
}

func (serv *RPCServer) createInstance(name string, injectLog chan<- []byte) {
func (serv *RPCServer) createInstance(name string, injectExec chan<- bool) {
runner := &Runner{
injectLog: injectLog,
injectStop: make(chan bool),
shutdown: make(chan bool, 1),
injectExec: injectExec,
finished: make(chan bool),
requests: make(map[int64]*queue.Request),
executing: make(map[int64]bool),
lastExec: MakeLastExecuting(serv.cfg.Procs, 3),
rnd: rand.New(rand.NewSource(time.Now().UnixNano())),
}
serv.mu.Lock()
Expand All @@ -536,19 +531,29 @@ func (serv *RPCServer) stopFuzzing(name string) {
runner.stopped = true
conn := runner.conn
serv.mu.Unlock()
close(runner.injectStop)
if conn != nil {
conn.Close()
}
}

func (serv *RPCServer) shutdownInstance(name string, crashed bool) []byte {
func (serv *RPCServer) shutdownInstance(name string, crashed bool) ([]ExecRecord, []byte) {
serv.mu.Lock()
runner := serv.runners[name]
delete(serv.runners, name)
serv.mu.Unlock()
runner.shutdown <- crashed
return runner.machineInfo
if runner.conn != nil {
// Wait for the connection goroutine to finish and stop touching data.
// If conn is nil before we removed the runner, then it won't touch anything.
<-runner.finished
}
for id, req := range runner.requests {
status := queue.Restarted
if crashed && runner.executing[id] {
status = queue.Crashed
}
req.Done(&queue.Result{Status: status})
}
return runner.lastExec.Collect(), runner.machineInfo
}

func (serv *RPCServer) distributeSignalDelta(plus, minus signal.Signal) {
Expand Down
Loading

0 comments on commit 5e8ef3c

Please sign in to comment.