-
Notifications
You must be signed in to change notification settings - Fork 244
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
improve the spawn feature #181
base: master
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -121,7 +121,10 @@ func (r *runner) outputOnStop() { | |
} | ||
|
||
func (r *runner) spawnWorkers(spawnCount int, quit chan bool, spawnCompleteFunc func()) { | ||
log.Println("Spawning", spawnCount, "clients immediately") | ||
for i := 0; i > spawnCount; i-- { | ||
r.stopChan <- true | ||
atomic.AddInt32(&r.numClients, -1) | ||
} | ||
|
||
for i := 1; i <= spawnCount; i++ { | ||
select { | ||
|
@@ -158,6 +161,7 @@ func (r *runner) spawnWorkers(spawnCount int, quit chan bool, spawnCompleteFunc | |
|
||
if spawnCompleteFunc != nil { | ||
spawnCompleteFunc() | ||
log.Printf("Spawned %d clients completely, the spawn count is: %d\n", r.numClients, spawnCount) | ||
} | ||
} | ||
|
||
|
@@ -201,23 +205,37 @@ func (r *runner) getTask() *Task { | |
return nil | ||
} | ||
|
||
func (r *runner) startSpawning(spawnCount int, spawnRate float64, spawnCompleteFunc func()) { | ||
Events.Publish(EVENT_SPAWN, spawnCount, spawnRate) | ||
|
||
func (r *runner) initSpawning() { | ||
r.stopChan = make(chan bool) | ||
r.numClients = 0 | ||
// prevent receiving spawn message from master when boomer is handling stopping, | ||
// that might happen when the numClients is too large and might take a while for stopping all request goroutines | ||
for { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We don't need this loop if we use a buffered channel? |
||
if atomic.LoadInt32(&r.numClients) == 0 { | ||
break | ||
} else { | ||
time.Sleep(1 * time.Millisecond) | ||
} | ||
} | ||
r.userClassesCountFromMaster = make(map[string]int64) | ||
} | ||
|
||
func (r *runner) startSpawning(spawnCount int, spawnRate float64, spawnCompleteFunc func()) { | ||
Events.Publish(EVENT_SPAWN, spawnCount, spawnRate) | ||
go r.spawnWorkers(spawnCount, r.stopChan, spawnCompleteFunc) | ||
} | ||
|
||
func (r *runner) stop() { | ||
// stop previous goroutines without blocking | ||
// those goroutines will exit when r.safeRun returns | ||
numClients := int(atomic.LoadInt32(&r.numClients)) | ||
for i := 0; i < numClients; i++ { | ||
r.stopChan <- true | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should we use a buffered channel? So message processing won't block here and wait for task execution. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
so u might think the channel should be buffered if we send true signal in for loop avoiding blocking the message processing? so for this way, the channel capacity need to be initialized for each spawn, is that right~ There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
i found it is not safe to resize the channel capacity for each spawn step, especially when the spawn count is decreasing comparing with previous spawn step. For instance, if spawn count is 30 of the previous step and then spawn count of current step is 20, if stopChan is resized for the current spawn period, then 10 of the goroutines could not exit normally. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think a bounded channel is enough and you don't need to resize it. See aslo |
||
atomic.AddInt32(&r.numClients, -1) | ||
} | ||
|
||
// publish the boomer stop event | ||
// user's code can subscribe to this event and do thins like cleaning up | ||
Events.Publish(EVENT_STOP) | ||
|
||
// stop previous goroutines without blocking | ||
// those goroutines will exit when r.safeRun returns | ||
close(r.stopChan) | ||
} | ||
|
||
type localRunner struct { | ||
|
@@ -351,19 +369,26 @@ func (r *slaveRunner) sumUsersAmount(msg *genericMessage) int { | |
userClassesCountMap := userClassesCount.(map[interface{}]interface{}) | ||
|
||
// Save the original field and send it back to master in spawnComplete message. | ||
r.userClassesCountFromMaster = make(map[string]int64) | ||
amount := 0 | ||
tmpClassesCountFromMaster := make(map[string]int64) | ||
oldAmount := 0 | ||
newAmount := 0 | ||
|
||
for _, num := range r.userClassesCountFromMaster { | ||
oldAmount = oldAmount + int(num) | ||
} | ||
|
||
for class, num := range userClassesCountMap { | ||
c, ok := class.(string) | ||
n, ok2 := castToInt64(num) | ||
if !ok || !ok2 { | ||
log.Printf("user_classes_count in spawn message can't be casted to map[string]int64, current type is map[%T]%T, ignored!\n", class, num) | ||
continue | ||
} | ||
r.userClassesCountFromMaster[c] = n | ||
amount = amount + int(n) | ||
tmpClassesCountFromMaster[c] = n | ||
newAmount = newAmount + int(n) | ||
} | ||
return amount | ||
r.userClassesCountFromMaster = tmpClassesCountFromMaster | ||
return newAmount - oldAmount | ||
} | ||
|
||
// TODO: Since locust 2.0, spawn rate and user count are both handled by master. | ||
|
@@ -440,6 +465,7 @@ func (r *slaveRunner) onMessage(msgInterface message) { | |
case "spawn": | ||
r.state = stateSpawning | ||
r.stats.clearStatsChan <- true | ||
r.initSpawning() | ||
r.onSpawnMessage(genericMsg) | ||
case "quit": | ||
Events.Publish(EVENT_QUIT) | ||
|
@@ -452,7 +478,6 @@ func (r *slaveRunner) onMessage(msgInterface message) { | |
switch msgType { | ||
case "spawn": | ||
r.state = stateSpawning | ||
r.stop() | ||
r.onSpawnMessage(genericMsg) | ||
case "stop": | ||
r.stop() | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What about a new stopWorkers function?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK, so u think it might be a better way to define another function such as stopWorkers which could handle spawnCount < 0