Skip to content

Commit

Permalink
pkg/fuzzer: introduce a request restarter layer
Browse files Browse the repository at this point in the history
Make Result statuses more elaborate.
Instead of retrying inputs directly in rpc.go, extract this logic to a
separate entity in pkg/fuzzer/queue.
  • Loading branch information
a-nogikh committed May 6, 2024
1 parent bec0279 commit 4f1689f
Show file tree
Hide file tree
Showing 6 changed files with 89 additions and 45 deletions.
2 changes: 1 addition & 1 deletion pkg/fuzzer/fuzzer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,7 @@ func (f *testFuzzer) registerExecutor(proc *executorProc) {
return err
}
if crash != "" {
res = &queue.Result{Stop: true}
res = &queue.Result{Status: queue.Crashed}
if !f.expectedCrashes[crash] {
return fmt.Errorf("unexpected crash: %q", crash)
}
Expand Down
14 changes: 7 additions & 7 deletions pkg/fuzzer/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ func (job *triageJob) deflake(exec func(*queue.Request, ...execOpt) *queue.Resul
NeedCover: true,
Stat: stat,
}, &dontTriage{})
if result.Stop {
if result.Stop() {
stop = true
return
}
Expand Down Expand Up @@ -204,7 +204,7 @@ func (job *triageJob) minimize(newSignal signal.Signal) (stop bool) {
SignalFilterCall: call1,
Stat: job.fuzzer.statExecMinimize,
})
if result.Stop {
if result.Stop() {
stop = true
return false
}
Expand Down Expand Up @@ -273,15 +273,15 @@ func (job *smashJob) run(fuzzer *Fuzzer) {
NeedSignal: queue.NewSignal,
Stat: fuzzer.statExecSmash,
})
if result.Stop {
if result.Stop() {
return
}
if fuzzer.Config.Collide {
result := fuzzer.execute(fuzzer.smashQueue, &queue.Request{
Prog: randomCollide(p, rnd),
Stat: fuzzer.statExecCollide,
})
if result.Stop {
if result.Stop() {
return
}
}
Expand Down Expand Up @@ -323,7 +323,7 @@ func (job *smashJob) faultInjection(fuzzer *Fuzzer) {
Prog: newProg,
Stat: fuzzer.statExecSmash,
})
if result.Stop {
if result.Stop() {
return
}
info := result.Info
Expand All @@ -347,7 +347,7 @@ func (job *hintsJob) run(fuzzer *Fuzzer) {
NeedHints: true,
Stat: fuzzer.statExecSeed,
})
if result.Stop || result.Info == nil {
if result.Stop() || result.Info == nil {
return
}
// Then mutate the initial program for every match between
Expand All @@ -360,6 +360,6 @@ func (job *hintsJob) run(fuzzer *Fuzzer) {
NeedSignal: queue.NewSignal,
Stat: fuzzer.statExecHint,
})
return !result.Stop
return !result.Stop()
})
}
31 changes: 28 additions & 3 deletions pkg/fuzzer/queue/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ func Execute(ctx context.Context, executor Executor, req *Request) *Result {
executor.Submit(req)
select {
case <-ctx.Done():
return &Result{Stop: true}
return &Result{Status: ExecFailure}
case res := <-req.resultC:
close(req.resultC)
return res
Expand All @@ -88,10 +88,23 @@ const (
)

type Result struct {
Info *ipc.ProgInfo
Stop bool
Info *ipc.ProgInfo
Status Status
}

func (r *Result) Stop() bool {
return r.Status == ExecFailure || r.Status == Crashed
}

type Status int

const (
Success Status = 0
ExecFailure Status = 1 // For e.g. serialization errors.
Crashed Status = 2 // The VM crashed holding the request.
Restarted Status = 3 // The VM was restarted holding the request.
)

// Executor describes the interface wanted by the producers of requests.
type Executor interface {
Submit(req *Request)
Expand Down Expand Up @@ -148,6 +161,18 @@ func (pq *PlainQueue) Submit(req *Request) {
func (pq *PlainQueue) Next() *Request {
pq.mu.Lock()
defer pq.mu.Unlock()
return pq.nextLocked()
}

func (pq *PlainQueue) tryNext() *Request {
if !pq.mu.TryLock() {
return nil
}
defer pq.mu.Unlock()
return pq.nextLocked()
}

func (pq *PlainQueue) nextLocked() *Request {
if pq.pos < len(pq.queue) {
ret := pq.queue[pq.pos]
pq.queue[pq.pos] = nil
Expand Down
37 changes: 37 additions & 0 deletions pkg/fuzzer/queue/retry.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
// Copyright 2024 syzkaller project authors. All rights reserved.
// Use of this source code is governed by Apache 2 LICENSE that can be found in the LICENSE file.

package queue

type retryer struct {
pq *PlainQueue
base Source
}

// NewRetryer adds a layer that resends results with Status=Restarted.
func NewRetryer(base Source) Source {
return &retryer{
base: base,
pq: Plain(),
}
}

func (r *retryer) Next() *Request {
req := r.pq.tryNext()
if req == nil {
req = r.base.Next()
}
if req != nil {
req.OnDone(r.done)
}
return req
}

func (r *retryer) done(req *Request, res *Result) bool {
// The input was on a restarted VM.
if res.Status == Restarted {
r.pq.Submit(req)
return false
}
return true
}
7 changes: 5 additions & 2 deletions syz-manager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/google/syzkaller/pkg/csource"
"github.com/google/syzkaller/pkg/db"
"github.com/google/syzkaller/pkg/fuzzer"
"github.com/google/syzkaller/pkg/fuzzer/queue"
"github.com/google/syzkaller/pkg/gce"
"github.com/google/syzkaller/pkg/hash"
"github.com/google/syzkaller/pkg/host"
Expand Down Expand Up @@ -72,6 +73,7 @@ type Manager struct {

mu sync.Mutex
fuzzer atomic.Pointer[fuzzer.Fuzzer]
execSource atomic.Value // queue.Source
phase int
targetEnabledSyscalls map[*prog.Syscall]bool

Expand Down Expand Up @@ -1373,6 +1375,7 @@ func (mgr *Manager) machineChecked(features *host.Features, enabledSyscalls map[
},
}, rnd, mgr.target)
mgr.fuzzer.Store(fuzzerObj)
mgr.execSource.Store(queue.NewRetryer(fuzzerObj))

mgr.loadCorpus()
mgr.firstConnect.Store(time.Now().Unix())
Expand All @@ -1395,8 +1398,8 @@ func (mgr *Manager) corpusMinimization() {
}

// We need this method since we're not supposed to access Manager fields from RPCServer.
func (mgr *Manager) getFuzzer() *fuzzer.Fuzzer {
return mgr.fuzzer.Load()
func (mgr *Manager) getExecSource() queue.Source {
return mgr.execSource.Load().(queue.Source)
}

func (mgr *Manager) fuzzerSignalRotation() {
Expand Down
43 changes: 11 additions & 32 deletions syz-manager/rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ import (

"github.com/google/syzkaller/pkg/cover"
"github.com/google/syzkaller/pkg/flatrpc"
"github.com/google/syzkaller/pkg/fuzzer"
"github.com/google/syzkaller/pkg/fuzzer/queue"
"github.com/google/syzkaller/pkg/host"
"github.com/google/syzkaller/pkg/ipc"
Expand Down Expand Up @@ -52,10 +51,6 @@ type RPCServer struct {
mu sync.Mutex
runners sync.Map // Instead of map[string]*Runner.

// We did not finish these requests because of VM restarts.
// They will be eventually given to other VMs.
rescuedInputs []*queue.Request

statExecs *stats.Val
statExecRetries *stats.Val
statExecutorRestarts *stats.Val
Expand Down Expand Up @@ -100,7 +95,7 @@ type BugFrames struct {
type RPCManagerView interface {
currentBugFrames() BugFrames
machineChecked(features *host.Features, enabledSyscalls map[*prog.Syscall]bool)
getFuzzer() *fuzzer.Fuzzer
getExecSource() queue.Source
}

func startRPCServer(mgr *Manager) (*RPCServer, error) {
Expand Down Expand Up @@ -349,41 +344,26 @@ func (serv *RPCServer) ExchangeInfo(a *rpctype.ExchangeInfoRequest, r *rpctype.E
return nil
}

fuzzerObj := serv.mgr.getFuzzer()
if fuzzerObj == nil {
source := serv.mgr.getExecSource()
if source == nil {
// ExchangeInfo calls follow MachineCheck, so the fuzzer must have been initialized.
panic("exchange info call with nil fuzzer")
}

appendRequest := func(inp *queue.Request) {
// First query new inputs and only then post results.
// It should foster a more even distribution of executions
// across all VMs.
for len(r.Requests) < a.NeedProgs {
inp := source.Next()
if req, ok := serv.newRequest(runner, inp); ok {
r.Requests = append(r.Requests, req)
} else {
// It's bad if we systematically fail to serialize programs,
// but so far we don't have a better handling than counting this.
// This error is observed a lot on the seeded syz_mount_image calls.
serv.statExecBufferTooSmall.Add(1)
inp.Done(&queue.Result{Stop: true})
}
}

// Try to collect some of the postponed requests.
if serv.mu.TryLock() {
for len(serv.rescuedInputs) != 0 && len(r.Requests) < a.NeedProgs {
last := len(serv.rescuedInputs) - 1
inp := serv.rescuedInputs[last]
serv.rescuedInputs[last] = nil
serv.rescuedInputs = serv.rescuedInputs[:last]
appendRequest(inp)
inp.Done(&queue.Result{Status: queue.ExecFailure})
}
serv.mu.Unlock()
}

// First query new inputs and only then post results.
// It should foster a more even distribution of executions
// across all VMs.
for len(r.Requests) < a.NeedProgs {
appendRequest(fuzzerObj.Next())
}

for _, result := range a.Results {
Expand Down Expand Up @@ -470,10 +450,9 @@ func (serv *RPCServer) shutdownInstance(name string, crashed bool) []byte {
}
for _, req := range oldRequests {
if crashed && req.try >= 0 {
req.req.Done(&queue.Result{Stop: true})
req.req.Done(&queue.Result{Status: queue.Crashed})
} else {
// We will resend these inputs to another VM.
serv.rescuedInputs = append(serv.rescuedInputs, req.req)
req.req.Done(&queue.Result{Status: queue.Restarted})
}
}
return runner.machineInfo
Expand Down

0 comments on commit 4f1689f

Please sign in to comment.