Skip to content

Commit

Permalink
new monitoring metric - engine_LastMaxActiveRequests
Browse files Browse the repository at this point in the history
58de492a9e1d14a374ca14ef471701c1fdbc8cac
  • Loading branch information
oke11o committed Jul 11, 2024
1 parent 5b8e11e commit 2084f86
Show file tree
Hide file tree
Showing 16 changed files with 108 additions and 75 deletions.
3 changes: 3 additions & 0 deletions .changes/v0.5.30.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
## v0.5.30 - 2024-07-10
### Added
* new monitoring metric - engine_LastMaxActiveRequests
2 changes: 2 additions & 0 deletions .mapping.json
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
".changes/v0.5.27.md":"load/projects/pandora/.changes/v0.5.27.md",
".changes/v0.5.28.md":"load/projects/pandora/.changes/v0.5.28.md",
".changes/v0.5.29.md":"load/projects/pandora/.changes/v0.5.29.md",
".changes/v0.5.30.md":"load/projects/pandora/.changes/v0.5.30.md",
".changie.yaml":"load/projects/pandora/.changie.yaml",
".github/actions/setup-yc/action.yml":"load/projects/pandora/.github/actions/setup-yc/action.yml",
".github/workflows/pages.yml":"load/projects/pandora/.github/workflows/pages.yml",
Expand Down Expand Up @@ -466,6 +467,7 @@
"lib/math/gcd_lcm.go":"load/projects/pandora/lib/math/gcd_lcm.go",
"lib/math/gcd_lcm_test.go":"load/projects/pandora/lib/math/gcd_lcm_test.go",
"lib/monitoring/counter.go":"load/projects/pandora/lib/monitoring/counter.go",
"lib/monitoring/instance.go":"load/projects/pandora/lib/monitoring/instance.go",
"lib/mp/iterator.go":"load/projects/pandora/lib/mp/iterator.go",
"lib/mp/map.go":"load/projects/pandora/lib/mp/map.go",
"lib/mp/map_test.go":"load/projects/pandora/lib/mp/map_test.go",
Expand Down
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,10 @@ adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html),
and is generated by [Changie](https://github.com/miniscruff/changie).


## v0.5.30 - 2024-07-10
### Added
* new monitoring metric - engine_LastMaxActiveRequests

## v0.5.29 - 2024-06-25
### Added
* HTTP scenario var/header postprocessor use multiple pipes
Expand Down
4 changes: 2 additions & 2 deletions cli/cli.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import (
"go.uber.org/zap/zapcore"
)

const Version = "0.5.29"
const Version = "0.5.30"
const defaultConfigFile = "load"
const stdinConfigSelector = "-"

Expand Down Expand Up @@ -122,7 +122,7 @@ func ReadConfigAndRunEngine() {

closeMonitoring := startMonitoring(conf.Monitoring)
defer closeMonitoring()
m := newEngineMetrics()
m := engine.NewMetrics("engine")
startReport(m)

pandora := engine.New(log, m, conf.Engine)
Expand Down
14 changes: 4 additions & 10 deletions cli/expvar.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,20 +8,12 @@ import (
"go.uber.org/zap"
)

func newEngineMetrics() engine.Metrics {
return engine.Metrics{
Request: monitoring.NewCounter("engine_Requests"),
Response: monitoring.NewCounter("engine_Responses"),
InstanceStart: monitoring.NewCounter("engine_UsersStarted"),
InstanceFinish: monitoring.NewCounter("engine_UsersFinished"),
}
}

func startReport(m engine.Metrics) {
evReqPS := monitoring.NewCounter("engine_ReqPS")
evResPS := monitoring.NewCounter("engine_ResPS")
evActiveUsers := monitoring.NewCounter("engine_ActiveUsers")
evActiveRequests := monitoring.NewCounter("engine_ActiveRequests")
evLastMaxActiveRequests := monitoring.NewCounter("engine_LastMaxActiveRequests")
requests := m.Request.Get()
responses := m.Response.Get()
go func() {
Expand All @@ -36,15 +28,17 @@ func startReport(m engine.Metrics) {
reqps := requestsNew - requests
activeUsers := m.InstanceStart.Get() - m.InstanceFinish.Get()
activeRequests := requestsNew - responsesNew
lastMaxActiveRequests := int64(m.BusyInstances.Flush())
zap.S().Infof(
"[ENGINE] %d resp/s; %d req/s; %d users; %d active\n",
rps, reqps, activeUsers, activeRequests)
rps, reqps, activeUsers, lastMaxActiveRequests)

requests = requestsNew
responses = responsesNew

evActiveUsers.Set(activeUsers)
evActiveRequests.Set(activeRequests)
evLastMaxActiveRequests.Set(lastMaxActiveRequests)
evReqPS.Set(reqps)
evResPS.Set(rps)
}
Expand Down
12 changes: 1 addition & 11 deletions components/guns/http/base_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ import (
"github.com/yandex/pandora/core/aggregator/netsample"
"github.com/yandex/pandora/core/coretest"
"github.com/yandex/pandora/core/engine"
"github.com/yandex/pandora/lib/monitoring"
"github.com/yandex/pandora/lib/testutil"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
Expand All @@ -35,15 +34,6 @@ func newLogger() *zap.Logger {
return log
}

func newEngineMetrics(prefix string) engine.Metrics {
return engine.Metrics{
Request: monitoring.NewCounter(prefix + "_Requests"),
Response: monitoring.NewCounter(prefix + "_Responses"),
InstanceStart: monitoring.NewCounter(prefix + "_UsersStarted"),
InstanceFinish: monitoring.NewCounter(prefix + "_UsersFinished"),
}
}

func TestGunSuite(t *testing.T) {
suite.Run(t, new(BaseGunSuite))
}
Expand All @@ -59,7 +49,7 @@ type BaseGunSuite struct {

func (s *BaseGunSuite) SetupSuite() {
s.log = testutil.NewLogger()
s.metrics = newEngineMetrics("http_suite")
s.metrics = engine.NewMetrics("http_suite")
}

func (s *BaseGunSuite) SetupTest() {
Expand Down
11 changes: 11 additions & 0 deletions core/engine/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,16 @@ type InstancePoolConfig struct {
DiscardOverflow bool `config:"discard_overflow"`
}

func NewMetrics(prefix string) Metrics {
return Metrics{
Request: monitoring.NewCounter(prefix + "_Requests"),
Response: monitoring.NewCounter(prefix + "_Responses"),
InstanceStart: monitoring.NewCounter(prefix + "_UsersStarted"),
InstanceFinish: monitoring.NewCounter(prefix + "_UsersFinished"),
BusyInstances: monitoring.NewInstanceTracker(prefix + "_BusyInstances"),
}
}

// TODO(skipor): use something github.com/rcrowley/go-metrics based.
// Its high level primitives like Meter can be not fast enough, but EWMAs
// and Counters should good for that.
Expand All @@ -37,6 +47,7 @@ type Metrics struct {
Response *monitoring.Counter
InstanceStart *monitoring.Counter
InstanceFinish *monitoring.Counter
BusyInstances *monitoring.InstanceTracker
}

func New(log *zap.Logger, m Metrics, conf Config) *Engine {
Expand Down
46 changes: 18 additions & 28 deletions core/engine/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ import (
coremock "github.com/yandex/pandora/core/mocks"
"github.com/yandex/pandora/core/provider"
"github.com/yandex/pandora/core/schedule"
"github.com/yandex/pandora/lib/monitoring"
"go.uber.org/atomic"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
Expand Down Expand Up @@ -83,15 +82,15 @@ func Test_InstancePool(t *testing.T) {
waitDoneCalled.Store(false)
ctx, cancel = context.WithCancel(context.Background())
}
var justBeforeEach = func() {
metrics := newTestMetrics()
var justBeforeEach = func(metricPrefix string) {
metrics := NewMetrics(metricPrefix)
p = newPool(newNopLogger(), metrics, onWaitDone, conf)
}
_ = cancel

t.Run("shoot ok", func(t *testing.T) {
beforeEach()
justBeforeEach()
justBeforeEach("shoot-ok")

err := p.Run(ctx)
require.NoError(t, err)
Expand Down Expand Up @@ -121,7 +120,7 @@ func Test_InstancePool(t *testing.T) {

beforeEach()
beforeEachContext()
justBeforeEach()
justBeforeEach("context-canceled")

err := p.Run(ctx)
require.Equal(t, context.Canceled, err)
Expand Down Expand Up @@ -170,7 +169,7 @@ func Test_InstancePool(t *testing.T) {
})
conf.Aggregator = aggr

justBeforeEach()
justBeforeEach("provider-failed")

err := p.Run(ctx)
require.Error(t, err)
Expand Down Expand Up @@ -201,7 +200,7 @@ func Test_InstancePool(t *testing.T) {
aggr := &coremock.Aggregator{}
aggr.On("Run", mock.Anything, mock.Anything).Return(failErr)
conf.Aggregator = aggr
justBeforeEach()
justBeforeEach("aggregator-failed")

err := p.Run(ctx)
require.Error(t, err)
Expand All @@ -227,7 +226,7 @@ func Test_InstancePool(t *testing.T) {
conf.NewGun = func() (core.Gun, error) {
return nil, failErr
}
justBeforeEach()
justBeforeEach("start-instances-failed")

err := p.Run(ctx)
require.Error(t, err)
Expand Down Expand Up @@ -259,7 +258,7 @@ func Test_MultipleInstance(t *testing.T) {
schedule.NewOnce(2),
schedule.NewConst(1, 5*time.Second),
)
pool := newPool(newNopLogger(), newTestMetrics(), nil, conf)
pool := newPool(newNopLogger(), NewMetrics("test_engine_1"), nil, conf)
ctx := context.Background()

err := pool.Run(ctx)
Expand All @@ -274,7 +273,7 @@ func Test_MultipleInstance(t *testing.T) {
return schedule.NewOnce(1), nil
}
conf.StartupSchedule = schedule.NewOnce(3)
pool := newPool(newNopLogger(), newTestMetrics(), nil, conf)
pool := newPool(newNopLogger(), NewMetrics("test_engine_2"), nil, conf)
ctx := context.Background()

err := pool.Run(ctx)
Expand All @@ -291,7 +290,7 @@ func Test_MultipleInstance(t *testing.T) {
schedule.NewOnce(2),
schedule.NewConst(1, 2*time.Second),
)
pool := newPool(newNopLogger(), newTestMetrics(), nil, conf)
pool := newPool(newNopLogger(), NewMetrics("test_engine_3"), nil, conf)
ctx := context.Background()

err := pool.Run(ctx)
Expand Down Expand Up @@ -319,14 +318,14 @@ func Test_Engine(t *testing.T) {
ctx, cancel = context.WithCancel(context.Background())
}

var justBeforeEach = func() {
metrics := newTestMetrics()
var justBeforeEach = func(metricPrefix string) {
metrics := NewMetrics(metricPrefix)
engine = New(newNopLogger(), metrics, Config{confs})
}

t.Run("shoot ok", func(t *testing.T) {
beforeEach()
justBeforeEach()
justBeforeEach("shoot-ok-2")

err := engine.Run(ctx)
require.NoError(t, err)
Expand Down Expand Up @@ -361,7 +360,7 @@ func Test_Engine(t *testing.T) {
}
beforeEach()
beforeEachCtx()
justBeforeEach()
justBeforeEach("context-canceled-2")

err := engine.Run(ctx)
require.Equal(t, err, context.Canceled)
Expand Down Expand Up @@ -398,7 +397,7 @@ func Test_Engine(t *testing.T) {
aggr.On("Run", mock.Anything, mock.Anything).Return(failErr)
confs[0].Aggregator = aggr

justBeforeEach()
justBeforeEach("one-pool-failed")

err := engine.Run(ctx)
require.Error(t, err)
Expand All @@ -411,7 +410,7 @@ func Test_BuildInstanceSchedule(t *testing.T) {
t.Run("per instance schedule", func(t *testing.T) {
conf, _ := newTestPoolConf()
conf.RPSPerInstance = true
pool := newPool(newNopLogger(), newTestMetrics(), nil, conf)
pool := newPool(newNopLogger(), NewMetrics("per-instance-schedule"), nil, conf)
newInstanceSchedule, err := pool.buildNewInstanceSchedule(context.Background(), func() {
panic("should not be called")
})
Expand All @@ -428,7 +427,7 @@ func Test_BuildInstanceSchedule(t *testing.T) {
conf.NewRPSSchedule = func() (core.Schedule, error) {
return nil, scheduleCreateErr
}
pool := newPool(newNopLogger(), newTestMetrics(), nil, conf)
pool := newPool(newNopLogger(), NewMetrics("shared-schedule-create-failed"), nil, conf)
newInstanceSchedule, err := pool.buildNewInstanceSchedule(context.Background(), func() {
panic("should not be called")
})
Expand All @@ -446,7 +445,7 @@ func Test_BuildInstanceSchedule(t *testing.T) {
newScheduleCalled = true
return schedule.NewOnce(1), nil
}
pool := newPool(newNopLogger(), newTestMetrics(), nil, conf)
pool := newPool(newNopLogger(), NewMetrics("shared-schedule-work"), nil, conf)
ctx, cancel := context.WithCancel(context.Background())
newInstanceSchedule, err := pool.buildNewInstanceSchedule(context.Background(), cancel)
require.NoError(t, err)
Expand Down Expand Up @@ -532,12 +531,3 @@ func newNopLogger() *zap.Logger {
log := zap.New(core)
return log
}

func newTestMetrics() Metrics {
return Metrics{
&monitoring.Counter{},
&monitoring.Counter{},
&monitoring.Counter{},
&monitoring.Counter{},
}
}
2 changes: 2 additions & 0 deletions core/engine/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,8 @@ func (i *instance) Run(ctx context.Context) (recoverErr error) {
}
if !i.discardOverflow || !waiter.IsSlowDown(ctx) {
i.metrics.Request.Add(1)
i.metrics.BusyInstances.OnStart(i.id)
defer i.metrics.BusyInstances.OnFinish(i.id)
if tag.Debug {
i.log.Debug("Shooting", zap.Any("ammo", ammo))
}
Expand Down
16 changes: 8 additions & 8 deletions core/engine/instance_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,15 +32,15 @@ func Test_Instance(t *testing.T) {
newGun func() (core.Gun, error)
)

var beforeEach = func() {
var beforeEach = func(metricPrefix string) {
provider = &coremock.Provider{}
aggregator = &coremock.Aggregator{}
gun = &coremock.Gun{}
newGunErr = nil
sched = &coremock.Schedule{}
newScheduleErr = nil
ctx = context.Background()
metrics = newTestMetrics()
metrics = NewMetrics(metricPrefix)
newSchedule = func() (core.Schedule, error) { return sched, newScheduleErr }
newGun = func() (core.Gun, error) { return gun, newGunErr }
}
Expand Down Expand Up @@ -85,7 +85,7 @@ func Test_Instance(t *testing.T) {
require.NoError(t, insCreateErr)
}
t.Run("start ok", func(t *testing.T) {
beforeEach()
beforeEach("start-ok")
beforeEachCtx()
justBeforeEachCtx()
justBeforeEach()
Expand All @@ -99,7 +99,7 @@ func Test_Instance(t *testing.T) {
})

t.Run("gun implements io.Closer / close called on instance close", func(t *testing.T) {
beforeEach()
beforeEach("gun-implements-io")
beforeEachCtx()
closeGun := mockGunCloser{gun}
closeGun.On("Close").Return(nil)
Expand All @@ -122,7 +122,7 @@ func Test_Instance(t *testing.T) {
})

t.Run("context canceled after run / start fail", func(t *testing.T) {
beforeEach()
beforeEach("context-canceled-after-run")

var cancel context.CancelFunc
ctx, cancel = context.WithTimeout(context.Background(), 10*time.Millisecond)
Expand All @@ -146,7 +146,7 @@ func Test_Instance(t *testing.T) {
})

t.Run("context canceled before run / nothing acquired and schedule not started", func(t *testing.T) {
beforeEach()
beforeEach("context-canceled-before-run")
var cancel context.CancelFunc
ctx, cancel = context.WithCancel(ctx)
cancel()
Expand All @@ -162,7 +162,7 @@ func Test_Instance(t *testing.T) {
})

t.Run("schedule create failed / instance create failed", func(t *testing.T) {
beforeEach()
beforeEach("schedule-create-failed")
sched = nil
newScheduleErr = errors.New("test err")
justBeforeEach()
Expand All @@ -173,7 +173,7 @@ func Test_Instance(t *testing.T) {
})

t.Run("gun create failed / instance create failed", func(t *testing.T) {
beforeEach()
beforeEach("gun-create-failed")
gun = nil
newGunErr = errors.New("test err")
justBeforeEach()
Expand Down
Loading

0 comments on commit 2084f86

Please sign in to comment.