Skip to content

Commit

Permalink
pkg/fuzzer: use queue layers
Browse files Browse the repository at this point in the history
Instead of relying on a fuzzer-internal priority queue, utilize
stackable layers of request-generating steps.

Move the functionality to a separate pkg/fuzzer/queue package.

The pkg/fuzzer/queue package can be reused to add extra processing
layers on top of the fuzzing and to combine machine checking and fuzzing
execution pipelines.
  • Loading branch information
a-nogikh committed May 3, 2024
1 parent 37d1458 commit 7a1b19c
Show file tree
Hide file tree
Showing 9 changed files with 438 additions and 395 deletions.
238 changes: 113 additions & 125 deletions pkg/fuzzer/fuzzer.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,11 @@ import (
"math/rand"
"runtime"
"sync"
"sync/atomic"
"time"

"github.com/google/syzkaller/pkg/corpus"
"github.com/google/syzkaller/pkg/fuzzer/queue"
"github.com/google/syzkaller/pkg/ipc"
"github.com/google/syzkaller/pkg/signal"
"github.com/google/syzkaller/pkg/stats"
"github.com/google/syzkaller/prog"
)
Expand All @@ -34,8 +33,7 @@ type Fuzzer struct {
ctMu sync.Mutex // TODO: use RWLock.
ctRegenerate chan struct{}

nextExec *priorityQueue[*Request]
nextJobID atomic.Int64
execQueues
}

func NewFuzzer(ctx context.Context, cfg *Config, rnd *rand.Rand,
Expand All @@ -57,9 +55,8 @@ func NewFuzzer(ctx context.Context, cfg *Config, rnd *rand.Rand,
// We're okay to lose some of the messages -- if we are already
// regenerating the table, we don't want to repeat it right away.
ctRegenerate: make(chan struct{}),

nextExec: makePriorityQueue[*Request](),
}
f.execQueues = newExecQueues(f)
f.updateChoiceTable(nil)
go f.choiceTableUpdater()
if cfg.Debug {
Expand All @@ -68,67 +65,104 @@ func NewFuzzer(ctx context.Context, cfg *Config, rnd *rand.Rand,
return f
}

type Config struct {
Debug bool
Corpus *corpus.Corpus
Logf func(level int, msg string, args ...interface{})
Coverage bool
FaultInjection bool
Comparisons bool
Collide bool
EnabledCalls map[*prog.Syscall]bool
NoMutateCalls map[int]bool
FetchRawCover bool
NewInputFilter func(call string) bool
type execQueues struct {
smashQueue *queue.PlainQueue
triageQueue *queue.PlainQueue
candidateQueue *queue.PlainQueue
triageCandidateQueue *queue.PlainQueue
source queue.Source
}

type Request struct {
Prog *prog.Prog
NeedSignal SignalType
NeedCover bool
NeedHints bool
// If specified, the resulting signal for call SignalFilterCall
// will include subset of it even if it's not new.
SignalFilter signal.Signal
SignalFilterCall int
// Fields that are only relevant within pkg/fuzzer.
flags ProgTypes
stat *stats.Val
resultC chan *Result
func newExecQueues(fuzzer *Fuzzer) execQueues {
ret := execQueues{
smashQueue: queue.Plain(),
triageQueue: queue.Plain(),
candidateQueue: queue.PlainWithStat(fuzzer.statExecCandidate),
triageCandidateQueue: queue.Plain(),
}
// Sources are listed in the order, in which they will be polled.
ret.source = queue.Multiplex(
ret.triageCandidateQueue,
ret.candidateQueue,
ret.triageQueue,
// Alternate smash jobs with exec/fuzz in 33% of cases.
queue.Alternate(ret.smashQueue, fuzzer.rand(), 1.0/3.0),
queue.Callback(fuzzer.genFuzz),
)
return ret
}

type SignalType int
type execOpt any
type dontTriage struct{}
type progFlags ProgTypes

const (
NoSignal SignalType = iota // we don't need any signal
NewSignal // we need the newly seen signal
AllSignal // we need all signal
)
func (fuzzer *Fuzzer) validateRequest(req *queue.Request) {
if req.NeedHints && (req.NeedCover || req.NeedSignal != queue.NoSignal) {
panic("Request.NeedHints is mutually exclusive with other fields")
}
if req.SignalFilter != nil && req.NeedSignal != queue.NewSignal {
panic("SignalFilter must be used with NewSignal")
}
}

type Result struct {
Info *ipc.ProgInfo
Stop bool
func (fuzzer *Fuzzer) execute(executor queue.Executor, req *queue.Request, opts ...execOpt) *queue.Result {
fuzzer.validateRequest(req)
res := queue.Wait(fuzzer.ctx, executor, req)
fuzzer.processResult(req, res, opts...)
return res
}

func (fuzzer *Fuzzer) Done(req *Request, res *Result) {
func (fuzzer *Fuzzer) prepare(req *queue.Request, opts ...execOpt) {
fuzzer.validateRequest(req)
req.OnDone(func(req *queue.Request, res *queue.Result) bool {
fuzzer.processResult(req, res, opts...)
return true
})
}

func (fuzzer *Fuzzer) enqueue(executor queue.Executor, req *queue.Request, opts ...execOpt) {
fuzzer.prepare(req)
executor.Submit(req)
}

func (fuzzer *Fuzzer) processResult(req *queue.Request, res *queue.Result, opts ...execOpt) {
var flags ProgTypes
var noTriage bool
for _, opt := range opts {
switch v := opt.(type) {
case progFlags:
flags = ProgTypes(v)
case dontTriage:
noTriage = true
}
}
// Triage individual calls.
// We do it before unblocking the waiting threads because
// it may result it concurrent modification of req.Prog.
// If we are already triaging this exact prog, this is flaky coverage.
if req.NeedSignal != NoSignal && res.Info != nil && req.flags&progInTriage == 0 {
if req.NeedSignal != queue.NoSignal && res.Info != nil && !noTriage {
for call, info := range res.Info.Calls {
fuzzer.triageProgCall(req.Prog, &info, call, req.flags)
fuzzer.triageProgCall(req.Prog, &info, call, flags)
}
fuzzer.triageProgCall(req.Prog, &res.Info.Extra, -1, req.flags)
}
// Unblock threads that wait for the result.
if req.resultC != nil {
req.resultC <- res
fuzzer.triageProgCall(req.Prog, &res.Info.Extra, -1, flags)
}
if res.Info != nil {
fuzzer.statExecTime.Add(int(res.Info.Elapsed.Milliseconds()))
}
req.stat.Add(1)
}

type Config struct {
Debug bool
Corpus *corpus.Corpus
Logf func(level int, msg string, args ...interface{})
Coverage bool
FaultInjection bool
Comparisons bool
Collide bool
EnabledCalls map[*prog.Syscall]bool
NoMutateCalls map[int]bool
FetchRawCover bool
NewInputFilter func(call string) bool
}

func (fuzzer *Fuzzer) triageProgCall(p *prog.Prog, info *ipc.CallInfo, call int, flags ProgTypes) {
Expand All @@ -142,12 +176,11 @@ func (fuzzer *Fuzzer) triageProgCall(p *prog.Prog, info *ipc.CallInfo, call int,
}
fuzzer.Logf(2, "found new signal in call %d in %s", call, p)
fuzzer.startJob(fuzzer.statJobsTriage, &triageJob{
p: p.Clone(),
call: call,
info: *info,
newSignal: newMaxSignal,
flags: flags,
jobPriority: triageJobPrio(flags),
p: p.Clone(),
call: call,
info: *info,
newSignal: newMaxSignal,
flags: flags,
})
}

Expand All @@ -164,57 +197,30 @@ func signalPrio(p *prog.Prog, info *ipc.CallInfo, call int) (prio uint8) {
return
}

type Candidate struct {
Prog *prog.Prog
Smashed bool
Minimized bool
}

func (fuzzer *Fuzzer) NextInput() *Request {
req := fuzzer.nextInput()
if req.stat == fuzzer.statExecCandidate {
fuzzer.StatCandidates.Add(-1)
}
return req
}

func (fuzzer *Fuzzer) nextInput() *Request {
nextExec := fuzzer.nextExec.tryPop()

// The fuzzer may become too interested in potentially very long hint and smash jobs.
// Let's leave more space for new input space exploration.
if nextExec != nil {
if nextExec.prio.greaterThan(priority{smashPrio}) || fuzzer.nextRand()%3 != 0 {
return nextExec.value
} else {
fuzzer.nextExec.push(nextExec)
}
}

func (fuzzer *Fuzzer) genFuzz() *queue.Request {
// Either generate a new input or mutate an existing one.
mutateRate := 0.95
if !fuzzer.Config.Coverage {
// If we don't have real coverage signal, generate programs
// more frequently because fallback signal is weak.
mutateRate = 0.5
}
var req *queue.Request
rnd := fuzzer.rand()
if rnd.Float64() < mutateRate {
req := mutateProgRequest(fuzzer, rnd)
if req != nil {
return req
}
req = mutateProgRequest(fuzzer, rnd)
}
if req == nil {
req = genProgRequest(fuzzer, rnd)
}
if req != nil {
fuzzer.prepare(req)
}
return genProgRequest(fuzzer, rnd)
return req
}

func (fuzzer *Fuzzer) startJob(stat *stats.Val, newJob job) {
fuzzer.Logf(2, "started %T", newJob)
if impl, ok := newJob.(jobSaveID); ok {
// E.g. for big and slow hint jobs, we would prefer not to serialize them,
// but rather to start them all in parallel.
impl.saveID(-fuzzer.nextJobID.Add(1))
}
go func() {
stat.Add(1)
fuzzer.statJobs.Add(1)
Expand All @@ -224,52 +230,34 @@ func (fuzzer *Fuzzer) startJob(stat *stats.Val, newJob job) {
}()
}

func (fuzzer *Fuzzer) Next() *queue.Request {
return fuzzer.source.Next()
}

func (fuzzer *Fuzzer) Logf(level int, msg string, args ...interface{}) {
if fuzzer.Config.Logf == nil {
return
}
fuzzer.Config.Logf(level, msg, args...)
}

type Candidate struct {
Prog *prog.Prog
Smashed bool
Minimized bool
}

func (fuzzer *Fuzzer) AddCandidates(candidates []Candidate) {
fuzzer.StatCandidates.Add(len(candidates))
for _, candidate := range candidates {
fuzzer.pushExec(candidateRequest(fuzzer, candidate), priority{candidatePrio})
req, flags := candidateRequest(fuzzer, candidate)
fuzzer.enqueue(fuzzer.candidateQueue, req, progFlags(flags))
}
}

func (fuzzer *Fuzzer) rand() *rand.Rand {
return rand.New(rand.NewSource(fuzzer.nextRand()))
}

func (fuzzer *Fuzzer) nextRand() int64 {
fuzzer.mu.Lock()
defer fuzzer.mu.Unlock()
return fuzzer.rnd.Int63()
}

func (fuzzer *Fuzzer) pushExec(req *Request, prio priority) {
if req.NeedHints && (req.NeedCover || req.NeedSignal != NoSignal) {
panic("Request.NeedHints is mutually exclusive with other fields")
}
if req.SignalFilter != nil && req.NeedSignal != NewSignal {
panic("SignalFilter must be used with NewSignal")
}
fuzzer.nextExec.push(&priorityQueueItem[*Request]{
value: req, prio: prio,
})
}

func (fuzzer *Fuzzer) exec(job job, req *Request) *Result {
req.resultC = make(chan *Result, 1)
fuzzer.pushExec(req, job.priority())
select {
case <-fuzzer.ctx.Done():
return &Result{Stop: true}
case res := <-req.resultC:
close(req.resultC)
return res
}
return rand.New(rand.NewSource(fuzzer.rnd.Int63()))
}

func (fuzzer *Fuzzer) updateChoiceTable(programs []*prog.Prog) {
Expand Down Expand Up @@ -327,8 +315,8 @@ func (fuzzer *Fuzzer) logCurrentStats() {
var m runtime.MemStats
runtime.ReadMemStats(&m)

str := fmt.Sprintf("exec queue size: %d, running jobs: %d, heap (MB): %d",
fuzzer.nextExec.Len(), fuzzer.statJobs.Val(), m.Alloc/1000/1000)
str := fmt.Sprintf("running jobs: %d, heap (MB): %d",
fuzzer.statJobs.Val(), m.Alloc/1000/1000)
fuzzer.Logf(0, "%s", str)
}
}
Expand Down

0 comments on commit 7a1b19c

Please sign in to comment.