Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

improve the spawn feature #181

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 40 additions & 15 deletions runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,10 @@ func (r *runner) outputOnStop() {
}

func (r *runner) spawnWorkers(spawnCount int, quit chan bool, spawnCompleteFunc func()) {
log.Println("Spawning", spawnCount, "clients immediately")
for i := 0; i > spawnCount; i-- {
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What about a new stopWorkers function?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What about a new stopWorkers function?

OK, so u think it might be a better way to define another function such as stopWorkers which could handle spawnCount < 0

r.stopChan <- true
atomic.AddInt32(&r.numClients, -1)
}

for i := 1; i <= spawnCount; i++ {
select {
Expand Down Expand Up @@ -158,6 +161,7 @@ func (r *runner) spawnWorkers(spawnCount int, quit chan bool, spawnCompleteFunc

if spawnCompleteFunc != nil {
spawnCompleteFunc()
log.Printf("Spawned %d clients completely, the spawn count is: %d\n", r.numClients, spawnCount)
}
}

Expand Down Expand Up @@ -201,23 +205,37 @@ func (r *runner) getTask() *Task {
return nil
}

func (r *runner) startSpawning(spawnCount int, spawnRate float64, spawnCompleteFunc func()) {
Events.Publish(EVENT_SPAWN, spawnCount, spawnRate)

func (r *runner) initSpawning() {
r.stopChan = make(chan bool)
r.numClients = 0
// prevent receiving spawn message from master when boomer is handling stopping,
// that might happen when the numClients is too large and might take a while for stopping all request goroutines
for {
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't need this loop if we use a buffered channel?

if atomic.LoadInt32(&r.numClients) == 0 {
break
} else {
time.Sleep(1 * time.Millisecond)
}
}
r.userClassesCountFromMaster = make(map[string]int64)
}

func (r *runner) startSpawning(spawnCount int, spawnRate float64, spawnCompleteFunc func()) {
Events.Publish(EVENT_SPAWN, spawnCount, spawnRate)
go r.spawnWorkers(spawnCount, r.stopChan, spawnCompleteFunc)
}

func (r *runner) stop() {
// stop previous goroutines without blocking
// those goroutines will exit when r.safeRun returns
numClients := int(atomic.LoadInt32(&r.numClients))
for i := 0; i < numClients; i++ {
r.stopChan <- true
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we use a buffered channel? So message processing won't block here and wait for task execution.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we use a buffered channel? So message processing won't block here and wait for task execution.

so u might think the channel should be buffered if we send true signal in for loop avoiding blocking the message processing? so for this way, the channel capacity need to be initialized for each spawn, is that right~

Copy link
Author

@boboalex boboalex Oct 29, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we use a buffered channel? So message processing won't block here and wait for task execution.

so u might think the channel should be buffered if we send true signal in for loop avoiding blocking the message processing? so for this way, the channel capacity need to be initialized for each spawn, is that right~

i found it is not safe to resize the channel capacity for each spawn step, especially when the spawn count is decreasing comparing with previous spawn step. For instance, if spawn count is 30 of the previous step and then spawn count of current step is 20, if stopChan is resized for the current spawn period, then 10 of the goroutines could not exit normally.
therefore, I found another plan: try to make the stopChan as "unbounded channel": https://github.com/smallnest/chanx/blob/main/unbounded_chan.go then we could just focus on handling the signal sending to channel, the channel is buffered and also safe as i tested. in addition, i think the version 1.0.0 is enough for boomer to use.
however, it would change the type of stopchan and also influence some unit test cases

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think a bounded channel is enough and you don't need to resize it.

See aslo
https://github.com/myzhan/boomer/blob/master/stats.go#L42

atomic.AddInt32(&r.numClients, -1)
}

// publish the boomer stop event
// user's code can subscribe to this event and do thins like cleaning up
Events.Publish(EVENT_STOP)

// stop previous goroutines without blocking
// those goroutines will exit when r.safeRun returns
close(r.stopChan)
}

type localRunner struct {
Expand Down Expand Up @@ -351,19 +369,26 @@ func (r *slaveRunner) sumUsersAmount(msg *genericMessage) int {
userClassesCountMap := userClassesCount.(map[interface{}]interface{})

// Save the original field and send it back to master in spawnComplete message.
r.userClassesCountFromMaster = make(map[string]int64)
amount := 0
tmpClassesCountFromMaster := make(map[string]int64)
oldAmount := 0
newAmount := 0

for _, num := range r.userClassesCountFromMaster {
oldAmount = oldAmount + int(num)
}

for class, num := range userClassesCountMap {
c, ok := class.(string)
n, ok2 := castToInt64(num)
if !ok || !ok2 {
log.Printf("user_classes_count in spawn message can't be casted to map[string]int64, current type is map[%T]%T, ignored!\n", class, num)
continue
}
r.userClassesCountFromMaster[c] = n
amount = amount + int(n)
tmpClassesCountFromMaster[c] = n
newAmount = newAmount + int(n)
}
return amount
r.userClassesCountFromMaster = tmpClassesCountFromMaster
return newAmount - oldAmount
}

// TODO: Since locust 2.0, spawn rate and user count are both handled by master.
Expand Down Expand Up @@ -440,6 +465,7 @@ func (r *slaveRunner) onMessage(msgInterface message) {
case "spawn":
r.state = stateSpawning
r.stats.clearStatsChan <- true
r.initSpawning()
r.onSpawnMessage(genericMsg)
case "quit":
Events.Publish(EVENT_QUIT)
Expand All @@ -452,7 +478,6 @@ func (r *slaveRunner) onMessage(msgInterface message) {
switch msgType {
case "spawn":
r.state = stateSpawning
r.stop()
r.onSpawnMessage(genericMsg)
case "stop":
r.stop()
Expand Down
29 changes: 26 additions & 3 deletions runner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,7 @@ func TestSpawnAndStop(t *testing.T) {
runner.state = stateSpawning
runner.client = newClient("localhost", 5557, runner.nodeID)
defer runner.shutdown()

runner.initSpawning()
runner.startSpawning(10, float64(10), runner.spawnComplete)
// wait for spawning goroutines
time.Sleep(2 * time.Second)
Expand All @@ -234,6 +234,7 @@ func TestSpawnAndStop(t *testing.T) {
msg = <-runner.client.sendChannel()
m = msg.(*genericMessage)
assert.Equal(t, "quit", m.Type)
assert.Equal(t, runner.numClients, int32(0))
}

func TestStop(t *testing.T) {
Expand All @@ -244,8 +245,7 @@ func TestStop(t *testing.T) {
}

runner := newSlaveRunner("localhost", 5557, []*Task{taskA}, nil)
runner.stopChan = make(chan bool)

runner.initSpawning()
stopped := false
handler := func() {
stopped = true
Expand Down Expand Up @@ -414,6 +414,7 @@ func TestOnMessage(t *testing.T) {
// stop all the workers
runner.onMessage(newGenericMessage("stop", nil, runner.nodeID))
assert.Equal(t, stateInit, runner.state)
assert.Equal(t, runner.numClients, int32(0))

msg = <-runner.client.sendChannel()
m = msg.(*genericMessage)
Expand Down Expand Up @@ -444,9 +445,31 @@ func TestOnMessage(t *testing.T) {
m = msg.(*genericMessage)
assert.Equal(t, "spawning_complete", m.Type)

// decrease goroutines while running
runner.onMessage(newGenericMessage("spawn", map[string]interface{}{
"user_classes_count": map[interface{}]interface{}{
"Dummy": int64(3),
"Dummy2": int64(2),
},
}, runner.nodeID))

msg = <-runner.client.sendChannel()
m = msg.(*genericMessage)
assert.Equal(t, "spawning", m.Type)

// spawn complete and running
time.Sleep(2 * time.Second)
assert.Equal(t, stateRunning, runner.state)
assert.Equal(t, int32(5), runner.numClients)

msg = <-runner.client.sendChannel()
m = msg.(*genericMessage)
assert.Equal(t, "spawning_complete", m.Type)

// stop all the workers
runner.onMessage(newGenericMessage("stop", nil, runner.nodeID))
assert.Equal(t, stateInit, runner.state)
assert.Equal(t, runner.numClients, int32(0))

msg = <-runner.client.sendChannel()
m = msg.(*genericMessage)
Expand Down