Skip to content

Commit

Permalink
bench: cluster patch
Browse files Browse the repository at this point in the history
  • Loading branch information
Kirill-Churkin committed Jan 16, 2023
1 parent cbf864a commit 6172ec4
Show file tree
Hide file tree
Showing 10 changed files with 634 additions and 220 deletions.
201 changes: 37 additions & 164 deletions cli/bench/bench.go
Original file line number Diff line number Diff line change
@@ -1,115 +1,38 @@
package bench

import (
bctx "context"
"fmt"
"math/rand"
"sync"
"time"

"github.com/FZambia/tarantool"
"github.com/tarantool/cartridge-cli/cli/context"
)

// printResults outputs benchmark foramatted results.
func printResults(results Results) {
fmt.Printf("\nResults:\n")
fmt.Printf("\tSuccess operations: %d\n", results.successResultCount)
fmt.Printf("\tFailed operations: %d\n", results.failedResultCount)
fmt.Printf("\tRequest count: %d\n", results.handledRequestsCount)
fmt.Printf("\tTime (seconds): %f\n", results.duration)
fmt.Printf("\tRequests per second: %d\n\n", results.requestsPerSecond)
}

// verifyOperationsPercentage checks that the amount of operations percentage is 100.
func verifyOperationsPercentage(ctx *context.BenchCtx) error {
entire_percentage := ctx.InsertCount + ctx.SelectCount + ctx.UpdateCount
if entire_percentage != 100 {
return fmt.Errorf(
"The number of operations as a percentage should be equal to 100, " +
"note that by default the percentage of inserts is 100")
}
return nil
}

// spacePreset prepares space for a benchmark.
func spacePreset(tarantoolConnection *tarantool.Connection) error {
dropBenchmarkSpace(tarantoolConnection)
return createBenchmarkSpace(tarantoolConnection)
}

// incrementRequest increases the counter of successful/failed requests depending on the presence of an error.
func (results *Results) incrementRequestsCounters(err error) {
if err == nil {
results.successResultCount++
} else {
results.failedResultCount++
}
results.handledRequestsCount++
}
// Main benchmark function.
func Run(ctx context.BenchCtx) error {
rand.Seed(time.Now().UnixNano())

// requestsLoop continuously executes the insert query until the benchmark time runs out.
func requestsLoop(requestsSequence *RequestsSequence, backgroundCtx bctx.Context) {
for {
select {
case <-backgroundCtx.Done():
return
default:
request := requestsSequence.getNext()
request.operation(&request)
}
if err := verifyOperationsPercentage(&ctx); err != nil {
return err
}
}

// connectionLoop runs "ctx.SimultaneousRequests" requests execution goroutines
// through the same connection.
func connectionLoop(
ctx *context.BenchCtx,
requestsSequence *RequestsSequence,
backgroundCtx bctx.Context,
) {
var connectionWait sync.WaitGroup
for i := 0; i < ctx.SimultaneousRequests; i++ {
connectionWait.Add(1)
go func() {
defer connectionWait.Done()
requestsLoop(requestsSequence, backgroundCtx)
}()
// Check cluster topology for further actions.
cluster, err := isCluster(ctx)
if err != nil {
return err
}

connectionWait.Wait()
}

// preFillBenchmarkSpaceIfRequired fills benchmark space
// if insert count = 0 or PreFillingCount flag is explicitly specified.
func preFillBenchmarkSpaceIfRequired(ctx context.BenchCtx, connectionPool []*tarantool.Connection) error {
if ctx.InsertCount == 0 || ctx.PreFillingCount != PreFillingCount {
fmt.Println("\nThe pre-filling of the space has started,\n" +
"because the insert operation is not specified\n" +
"or there was an explicit instruction for pre-filling.")
fmt.Println("...")
filledCount, err := fillBenchmarkSpace(ctx, connectionPool)
if err != nil {
if cluster {
// Check cluster for wrong topology.
if err := verifyClusterTopology(ctx); err != nil {
return err
}
fmt.Printf("Pre-filling is finished. Number of records: %d\n\n", filledCount)
}
return nil
}

// Main benchmark function.
func Run(ctx context.BenchCtx) error {
rand.Seed(time.Now().UnixNano())

if err := verifyOperationsPercentage(&ctx); err != nil {
return err
// Get url of one of instances in cluster for space preset and prefill.
ctx.URL = (*ctx.Leaders)[0]
}

// Connect to tarantool and preset space for benchmark.
tarantoolConnection, err := tarantool.Connect(ctx.URL, tarantool.Opts{
User: ctx.User,
Password: ctx.Password,
})
tarantoolConnection, err := createConnection(ctx)
if err != nil {
return fmt.Errorf(
"Couldn't connect to Tarantool %s.",
Expand All @@ -123,87 +46,37 @@ func Run(ctx context.BenchCtx) error {
return err
}

/// Сreate a "connectionPool" before starting the benchmark to exclude the connection establishment time from measurements.
connectionPool := make([]*tarantool.Connection, ctx.Connections)
for i := 0; i < ctx.Connections; i++ {
connectionPool[i], err = tarantool.Connect(ctx.URL, tarantool.Opts{
User: ctx.User,
Password: ctx.Password,
})
if err != nil {
return err
}
defer connectionPool[i].Close()
}

if err := preFillBenchmarkSpaceIfRequired(ctx, connectionPool); err != nil {
if err := preFillBenchmarkSpaceIfRequired(ctx); err != nil {
return err
}

fmt.Println("Benchmark start")
fmt.Println("...")

// The "context" will be used to stop all "connectionLoop" when the time is out.
backgroundCtx, cancel := bctx.WithCancel(bctx.Background())
var waitGroup sync.WaitGroup
results := Results{}

startTime := time.Now()
timer := time.NewTimer(time.Duration(ctx.Duration * int(time.Second)))

// Start detached connections.
for i := 0; i < ctx.Connections; i++ {
waitGroup.Add(1)
go func(connection *tarantool.Connection) {
defer waitGroup.Done()
requestsSequence := RequestsSequence{
[]RequestsGenerator{
{
Request{
insertOperation,
ctx,
connection,
&results,
},
ctx.InsertCount,
},
{
Request{
selectOperation,
ctx,
connection,
&results,
},
ctx.SelectCount,
},
{
Request{
updateOperation,
ctx,
connection,
&results,
},
ctx.UpdateCount,
},
},
0,
ctx.InsertCount,
sync.Mutex{},
}
connectionLoop(&ctx, &requestsSequence, backgroundCtx)
}(connectionPool[i])
// Bench one instance by default.
benchStart := benchOneInstance
if cluster {
benchStart = benchCluster
}

// Prepare data for bench.
benchData := getBenchData(ctx)

// Start benching.
if err := benchStart(ctx, &benchData); err != nil {
return err
}
// Sends "signal" to all "connectionLoop" and waits for them to complete.
<-timer.C
cancel()
waitGroup.Wait()

results.duration = time.Since(startTime).Seconds()
results.requestsPerSecond = int(float64(results.handledRequestsCount) / results.duration)
// Calculate results.
benchData.results.duration = time.Since(benchData.startTime).Seconds()
benchData.results.requestsPerSecond = int(float64(benchData.results.handledRequestsCount) / benchData.results.duration)

dropBenchmarkSpace(tarantoolConnection)
fmt.Println("Benchmark stop")
// Benchmark space must exist after bench.
if err := dropBenchmarkSpace(tarantoolConnection); err != nil {
return err
}
fmt.Println("Benchmark stop.")

printResults(results)
printResults(benchData.results)
return nil
}
Loading

0 comments on commit 6172ec4

Please sign in to comment.