Skip to content

Commit

Permalink
misc: Improve ExchangeBenchmark (facebookincubator#11543)
Browse files Browse the repository at this point in the history
Summary:
Add detailed operator level statistics to the output.

Sample output:
```
============================================================================
[...]exec/benchmarks/ExchangeBenchmark.cpp     relative  time/iter   iters/s
============================================================================
exchangeFlat10k                                              4.11s   243.35m
exchangeFlat50                                               9.55s   104.76m
exchangeDeep10k                                             16.24s    61.57m
exchangeDeep50                                              21.31s    46.93m
exchangeStruct1K                                             2.70s   370.03m
localFlat10k                                                53.60s    18.66m
----------------------------------Flat10K----------------------------------
Wall Time (ms): 4.16s
PartitionOutput: Output: 6400064 rows (611.35MB, 1152 batches), Cpu time: 14.71s, Wall time: 23.37s, Blocked wall time: 0ns, Peak memory: 296.00MB, Memory allocations: 102400, Threads: 128, CPU breakdown: B/I/O/F (1.12ms/0ns/14.70s/1.04ms)
Exchange: Output: 6400000 rows (1.56GB, 141 batches), Cpu time: 1.72s, Wall time: 2.81s, Blocked wall time: 11.94s, Peak memory: 248.45MB, Memory allocations: 13796, Threads: 64, Splits: 256, CPU breakdown: B/I/O/F (37.27ms/4.47ms/1.64s/102.50us)
----------------------------------Flat50K----------------------------------
Wall Time (ms): 9.55s
PartitionOutput: Output: 6400064 rows (600.85MB, 1389 batches), Cpu time: 20.59s, Wall time: 28.00s, Blocked wall time: 15.26s, Peak memory: 267.19MB, Memory allocations: 93261, Threads: 128, CPU breakdown: B/I/O/F (47.17ms/0ns/20.45s/48.90ms)
Exchange: Output: 6400000 rows (1.56GB, 141 batches), Cpu time: 1.72s, Wall time: 2.81s, Blocked wall time: 11.94s, Peak memory: 248.45MB, Memory allocations: 13796, Threads: 64, Splits: 256, CPU breakdown: B/I/O/F (37.27ms/4.47ms/1.64s/102.50us)
----------------------------------Deep10K----------------------------------
Wall Time (ms): 16.23s
PartitionOutput: Output: 6400064 rows (2.98GB, 4600 batches), Cpu time: 47.96s, Wall time: 1m 34s, Blocked wall time: 7m 39s, Peak memory: 549.55MB, Memory allocations: 338361, Threads: 123, CPU breakdown: B/I/O/F (2.69ms/0ns/47.95s/1.92ms)
Exchange: Output: 6400000 rows (10.68GB, 720 batches), Cpu time: 11.38s, Wall time: 20.15s, Blocked wall time: 2m 27s, Peak memory: 299.32MB, Memory allocations: 27054, Threads: 59, Splits: 256, CPU breakdown: B/I/O/F (203.09ms/17.81ms/10.96s/3.69ms)
----------------------------------Deep50K----------------------------------
Wall Time (ms): 21.32s
PartitionOutput: Output: 6400064 rows (3.09GB, 5121 batches), Cpu time: 1m 1s, Wall time: 1m 30s, Blocked wall time: 10m 23s, Peak memory: 480.96MB, Memory allocations: 338279, Threads: 128, CPU breakdown: B/I/O/F (59.24ms/0ns/1m 0s/61.37ms)
Exchange: Output: 6400000 rows (9.96GB, 684 batches), Cpu time: 9.99s, Wall time: 15.91s, Blocked wall time: 3m 59s, Peak memory: 291.77MB, Memory allocations: 25606, Threads: 64, Splits: 256, CPU breakdown: B/I/O/F (240.90ms/14.48ms/9.49s/362.83us)
----------------------------------Struct1K---------------------------------
Wall Time (ms): 2.71s
PartitionOutput: Output: 6400064 rows (361.27MB, 1216 batches), Cpu time: 10.11s, Wall time: 14.42s, Blocked wall time: 0ns, Peak memory: 182.81MB, Memory allocations: 66624, Threads: 127, CPU breakdown: B/I/O/F (3.50ms/0ns/10.10s/3.72ms)
Exchange: Output: 6400000 rows (845.37MB, 86 batches), Cpu time: 1.96s, Wall time: 3.85s, Blocked wall time: 7.85s, Peak memory: 247.79MB, Memory allocations: 10584, Threads: 63, Splits: 256, CPU breakdown: B/I/O/F (13.44ms/3.58ms/1.93s/103.42us)
--------------------------------LocalFlat10K-------------------------------
Wall Time (ms):
 Total: 53.59s
 Max: 6483
 Median: 6347
 Min: 6906
LocalPartition: Output: 204802048 rows (27.16GB, 176128 batches), Cpu time: 2m 52s, Wall time: 28m 4s, Blocked wall time: 39m 0s, Peak memory: 11.23MB, Memory allocations: 163840, CPU breakdown: B/I/O/F (220.24ms/2m 49s/1.98s/15.06ms)
Producer Wait Time (ms)
 Total 21m 16s
 Max: sum:8016268000, count:1, min:8016268000, max:8016268000
 Median: sum:16968760000, count:16, min:613686000, max:2176757000
 Min: sum:7695600000, count:16, min:271392000, max:785580000
Consumer Wait Time (ms)
 Total 17m 44s
 Max: sum:23843660000, count:16, min:448579000, max:2245904000
 Median: sum:0, count:0, min:9223372036854775807, max:-9223372036854775808
 Min: sum:0, count:0, min:9223372036854775807, max:-9223372036854775808
```

This PR depends on facebookincubator#11361, which adds `isBlockedTime` to PlanNodeStats.

Pull Request resolved: facebookincubator#11543

Reviewed By: tanjialiang

Differential Revision: D66727100

Pulled By: xiaoxmeng

fbshipit-source-id: d1d488a93e80d01120bd755782da126aa31e1931
  • Loading branch information
yingsu00 authored and athmaja-n committed Jan 10, 2025
1 parent 9da0543 commit 783acf3
Show file tree
Hide file tree
Showing 3 changed files with 397 additions and 199 deletions.
70 changes: 66 additions & 4 deletions velox/exec/PlanNodeStats.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,67 @@

namespace facebook::velox::exec {

PlanNodeStats& PlanNodeStats::operator+=(const PlanNodeStats& another) {
inputRows += another.inputRows;
inputBytes += another.inputBytes;
inputVectors += another.inputVectors;

rawInputRows += another.inputRows;
rawInputBytes += another.rawInputBytes;

dynamicFilterStats.add(another.dynamicFilterStats);

outputRows += another.outputRows;
outputBytes += another.outputBytes;
outputVectors += another.outputVectors;

isBlockedTiming.add(another.isBlockedTiming);
addInputTiming.add(another.addInputTiming);
getOutputTiming.add(another.getOutputTiming);
finishTiming.add(another.finishTiming);
cpuWallTiming.add(another.isBlockedTiming);
cpuWallTiming.add(another.addInputTiming);
cpuWallTiming.add(another.getOutputTiming);
cpuWallTiming.add(another.finishTiming);
cpuWallTiming.add(another.isBlockedTiming);

backgroundTiming.add(another.backgroundTiming);

blockedWallNanos += another.blockedWallNanos;

peakMemoryBytes += another.peakMemoryBytes;
numMemoryAllocations += another.numMemoryAllocations;

physicalWrittenBytes += another.physicalWrittenBytes;

for (const auto& [name, customStats] : another.customStats) {
if (UNLIKELY(this->customStats.count(name) == 0)) {
this->customStats.insert(std::make_pair(name, customStats));
} else {
this->customStats.at(name).merge(customStats);
}
}

// Populating number of drivers for plan nodes with multiple operators is not
// useful. Each operator could have been executed in different pipelines with
// different number of drivers.
if (!isMultiOperatorTypeNode()) {
numDrivers += another.numDrivers;
} else {
numDrivers = 0;
}

numSplits += another.numSplits;

spilledInputBytes += another.spilledInputBytes;
spilledBytes += another.spilledBytes;
spilledRows += another.spilledRows;
spilledPartitions += another.spilledPartitions;
spilledFiles += another.spilledFiles;

return *this;
}

void PlanNodeStats::add(const OperatorStats& stats) {
auto it = operatorStats.find(stats.operatorType);
if (it != operatorStats.end()) {
Expand Down Expand Up @@ -63,11 +124,11 @@ void PlanNodeStats::addTotals(const OperatorStats& stats) {

physicalWrittenBytes += stats.physicalWrittenBytes;

for (const auto& [name, runtimeStats] : stats.runtimeStats) {
if (UNLIKELY(customStats.count(name) == 0)) {
customStats.insert(std::make_pair(name, runtimeStats));
for (const auto& [name, customStats] : stats.runtimeStats) {
if (UNLIKELY(this->customStats.count(name) == 0)) {
this->customStats.insert(std::make_pair(name, customStats));
} else {
customStats.at(name).merge(runtimeStats);
this->customStats.at(name).merge(customStats);
}
}

Expand Down Expand Up @@ -105,6 +166,7 @@ std::string PlanNodeStats::toString(bool includeInputStats) const {
out << ", Physical written output: " << succinctBytes(physicalWrittenBytes);
}
out << ", Cpu time: " << succinctNanos(cpuWallTiming.cpuNanos)
<< ", Wall time: " << succinctNanos(cpuWallTiming.wallNanos)
<< ", Blocked wall time: " << succinctNanos(blockedWallNanos)
<< ", Peak memory: " << succinctBytes(peakMemoryBytes)
<< ", Memory allocations: " << numMemoryAllocations;
Expand Down
2 changes: 2 additions & 0 deletions velox/exec/PlanNodeStats.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ struct PlanNodeStats {
PlanNodeStats(PlanNodeStats&&) = default;
PlanNodeStats& operator=(PlanNodeStats&&) = default;

PlanNodeStats& operator+=(const PlanNodeStats&);

/// Sum of input rows for all corresponding operators. Useful primarily for
/// leaf plan nodes or plan nodes that correspond to a single operator type.
uint64_t inputRows{0};
Expand Down
Loading

0 comments on commit 783acf3

Please sign in to comment.