Skip to content

Commit

Permalink
feat: Make skewed partition balancer thread-safe and share among scal…
Browse files Browse the repository at this point in the history
…e writer partitioners (#11786)

Summary:
Pull Request resolved: #11786

Make skewed partition balancer thread-safe and share it with all the scale writer local partitioners.
Skewed partition balance is created by driver factory init and saved in the associated exchange plan node's state.
This will help further reduces the unnecessary written files and make memory usage more efficient as
all the local scale writer partitioners have the consistent partition assignment views

This PR also fixes the race condition between get queue memory manager with
the task terminate

Reviewed By: tanjialiang, arhimondr

Differential Revision: D66628614

fbshipit-source-id: dbae2ff26e8c9d5d424d6ebf99af9c5edfed1f92
  • Loading branch information
xiaoxmeng authored and facebook-github-bot committed Dec 10, 2024
1 parent b9cce6d commit 733bc0c
Show file tree
Hide file tree
Showing 10 changed files with 345 additions and 131 deletions.
126 changes: 82 additions & 44 deletions velox/common/base/SkewedPartitionBalancer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,58 +16,96 @@

#include "velox/common/base/SkewedPartitionBalancer.h"

#include "velox/common/testutil/TestValue.h"

using facebook::velox::common::testutil::TestValue;

namespace facebook::velox::common {
SkewedPartitionRebalancer::SkewedPartitionRebalancer(
uint32_t partitionCount,
uint32_t taskCount,
uint32_t numPartitions,
uint32_t numTasks,
uint64_t minProcessedBytesRebalanceThresholdPerPartition,
uint64_t minProcessedBytesRebalanceThreshold)
: partitionCount_(partitionCount),
taskCount_(taskCount),
: numPartitions_(numPartitions),
numTasks_(numTasks),
minProcessedBytesRebalanceThresholdPerPartition_(
minProcessedBytesRebalanceThresholdPerPartition),
minProcessedBytesRebalanceThreshold_(std::max(
minProcessedBytesRebalanceThreshold,
minProcessedBytesRebalanceThresholdPerPartition_)) {
VELOX_CHECK_GT(partitionCount_, 0);
VELOX_CHECK_GT(taskCount_, 0);
minProcessedBytesRebalanceThresholdPerPartition_)),
partitionRowCount_(numPartitions_),
partitionAssignments_(numPartitions_) {
VELOX_CHECK_GT(numPartitions_, 0);
VELOX_CHECK_GT(numTasks_, 0);

partitionRowCount_.resize(partitionCount_, 0);
partitionBytes_.resize(partitionCount_, 0);
partitionBytesAtLastRebalance_.resize(partitionCount_, 0);
partitionBytesSinceLastRebalancePerTask_.resize(partitionCount_, 0);
estimatedTaskBytesSinceLastRebalance_.resize(taskCount_, 0);

partitionAssignments_.resize(partitionCount_);
partitionBytes_.resize(numPartitions_, 0);
partitionBytesAtLastRebalance_.resize(numPartitions_, 0);
partitionBytesSinceLastRebalancePerTask_.resize(numPartitions_, 0);
estimatedTaskBytesSinceLastRebalance_.resize(numTasks_, 0);

// Assigns one task for each partition intitially.
for (auto partition = 0; partition < partitionCount_; ++partition) {
const uint32_t taskId = partition % taskCount;
partitionAssignments_[partition].emplace_back(taskId);
for (auto partition = 0; partition < numPartitions_; ++partition) {
const uint32_t taskId = partition % numTasks_;
partitionAssignments_[partition].addTaskId(taskId);
}
}

void SkewedPartitionRebalancer::PartitionAssignment::addTaskId(
uint32_t taskId) {
std::unique_lock guard{lock_};
taskIds_.push_back(taskId);
}

uint32_t SkewedPartitionRebalancer::PartitionAssignment::nextTaskId(
uint64_t index) const {
std::shared_lock guard{lock_};
const auto taskIndex = index % taskIds_.size();
return taskIds_[taskIndex];
}

uint32_t SkewedPartitionRebalancer::PartitionAssignment::size() const {
std::shared_lock guard{lock_};
return taskIds_.size();
}

const std::vector<uint32_t>&
SkewedPartitionRebalancer::PartitionAssignment::taskIds() const {
std::shared_lock guard{lock_};
return taskIds_;
}

void SkewedPartitionRebalancer::rebalance() {
if (shouldRebalance()) {
rebalancePartitions();
const int64_t processedBytes = processedBytes_.load();
if (shouldRebalance(processedBytes)) {
rebalancePartitions(processedBytes);
}
}

bool SkewedPartitionRebalancer::shouldRebalance() const {
VELOX_CHECK_GE(processedBytes_, processedBytesAtLastRebalance_);
return (processedBytes_ - processedBytesAtLastRebalance_) >=
bool SkewedPartitionRebalancer::shouldRebalance(int64_t processedBytes) const {
return (processedBytes - processedBytesAtLastRebalance_) >=
minProcessedBytesRebalanceThreshold_;
}

void SkewedPartitionRebalancer::rebalancePartitions() {
VELOX_DCHECK(shouldRebalance());
++stats_.numBalanceTriggers;
void SkewedPartitionRebalancer::rebalancePartitions(int64_t processedBytes) {
if (rebalancing_.exchange(true)) {
return;
}

SCOPE_EXIT {
VELOX_CHECK(rebalancing_);
rebalancing_ = false;
};
++numBalanceTriggers_;

TestValue::adjust(
"facebook::velox::common::SkewedPartitionRebalancer::rebalancePartitions",
this);

// Updates the processed bytes for each partition.
calculatePartitionProcessedBytes();

// Updates 'partitionBytesSinceLastRebalancePerTask_'.
for (auto partition = 0; partition < partitionCount_; ++partition) {
for (auto partition = 0; partition < numPartitions_; ++partition) {
const auto totalAssignedTasks = partitionAssignments_[partition].size();
const auto partitionBytes = partitionBytes_[partition];
partitionBytesSinceLastRebalancePerTask_[partition] =
Expand All @@ -80,10 +118,10 @@ void SkewedPartitionRebalancer::rebalancePartitions() {
// max processed bytes since last rebalance at the top of the queue for
// rebalance later.
std::vector<IndexedPriorityQueue<uint32_t, /*MaxQueue=*/true>>
taskMaxPartitions{taskCount_};
for (auto partition = 0; partition < partitionCount_; ++partition) {
taskMaxPartitions{numTasks_};
for (auto partition = 0; partition < numPartitions_; ++partition) {
auto& taskAssignments = partitionAssignments_[partition];
for (uint32_t taskId : taskAssignments) {
for (uint32_t taskId : taskAssignments.taskIds()) {
auto& taskQueue = taskMaxPartitions[taskId];
taskQueue.addOrUpdate(
partition, partitionBytesSinceLastRebalancePerTask_[partition]);
Expand All @@ -94,15 +132,15 @@ void SkewedPartitionRebalancer::rebalancePartitions() {
// last rebalance.
IndexedPriorityQueue<uint32_t, /*MaxQueue=*/true> maxTasks;
IndexedPriorityQueue<uint32_t, /*MaxQueue=*/false> minTasks;
for (auto taskId = 0; taskId < taskCount_; ++taskId) {
for (auto taskId = 0; taskId < numTasks_; ++taskId) {
estimatedTaskBytesSinceLastRebalance_[taskId] =
calculateTaskDataSizeSinceLastRebalance(taskMaxPartitions[taskId]);
maxTasks.addOrUpdate(taskId, estimatedTaskBytesSinceLastRebalance_[taskId]);
minTasks.addOrUpdate(taskId, estimatedTaskBytesSinceLastRebalance_[taskId]);
}

rebalanceBasedOnTaskSkewness(maxTasks, minTasks, taskMaxPartitions);
processedBytesAtLastRebalance_ = processedBytes_;
processedBytesAtLastRebalance_.store(processedBytes);
}

void SkewedPartitionRebalancer::rebalanceBasedOnTaskSkewness(
Expand Down Expand Up @@ -159,7 +197,7 @@ void SkewedPartitionRebalancer::rebalanceBasedOnTaskSkewness(
}
}

stats_.numScaledPartitions += scaledPartitions.size();
numScaledPartitions_ += scaledPartitions.size();
}

bool SkewedPartitionRebalancer::rebalancePartition(
Expand All @@ -168,49 +206,49 @@ bool SkewedPartitionRebalancer::rebalancePartition(
IndexedPriorityQueue<uint32_t, true>& maxTasks,
IndexedPriorityQueue<uint32_t, false>& minTasks) {
auto& taskAssignments = partitionAssignments_[rebalancePartition];
for (auto taskId : taskAssignments) {
for (auto taskId : taskAssignments.taskIds()) {
if (taskId == targetTaskId) {
return false;
}
}

taskAssignments.push_back(targetTaskId);
taskAssignments.addTaskId(targetTaskId);
VELOX_CHECK_GT(partitionAssignments_[rebalancePartition].size(), 1);

const auto newTaskCount = taskAssignments.size();
const auto oldTaskCount = newTaskCount - 1;
const auto newNumTasks = taskAssignments.size();
const auto oldNumTasks = newNumTasks - 1;
// Since a partition is rebalanced from max to min skewed tasks,
// decrease the priority of max taskBucket as well as increase the priority
// of min taskBucket.
for (uint32_t taskId : taskAssignments) {
for (uint32_t taskId : taskAssignments.taskIds()) {
if (taskId == targetTaskId) {
estimatedTaskBytesSinceLastRebalance_[taskId] +=
(partitionBytesSinceLastRebalancePerTask_[rebalancePartition] *
oldTaskCount) /
newTaskCount;
oldNumTasks) /
newNumTasks;
} else {
estimatedTaskBytesSinceLastRebalance_[taskId] -=
partitionBytesSinceLastRebalancePerTask_[rebalancePartition] /
newTaskCount;
newNumTasks;
}

maxTasks.addOrUpdate(taskId, estimatedTaskBytesSinceLastRebalance_[taskId]);
minTasks.addOrUpdate(taskId, estimatedTaskBytesSinceLastRebalance_[taskId]);
}

LOG(INFO) << "Rebalanced partition " << rebalancePartition << " to task "
<< targetTaskId << " with taskCount " << newTaskCount;
VLOG(3) << "Rebalanced partition " << rebalancePartition << " to task "
<< targetTaskId << " with num of assigned tasks " << newNumTasks;
return true;
}

void SkewedPartitionRebalancer::calculatePartitionProcessedBytes() {
uint64_t totalPartitionRowCount{0};
for (auto partition = 0; partition < partitionCount_; ++partition) {
for (auto partition = 0; partition < numPartitions_; ++partition) {
totalPartitionRowCount += partitionRowCount_[partition];
}
VELOX_CHECK_GT(totalPartitionRowCount, 0);

for (auto partition = 0; partition < partitionCount_; ++partition) {
for (auto partition = 0; partition < numPartitions_; ++partition) {
// Since we estimate 'partitionBytes_' based on 'partitionRowCount_' and
// total 'processedBytes_'. It is possible that the estimated
// 'partitionBytes_' is slightly less than it was estimated at the last
Expand Down
74 changes: 54 additions & 20 deletions velox/common/base/SkewedPartitionBalancer.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,9 @@ class SkewedPartitionRebalancerTestHelper;
/// This class is used to auto-scale partition processing by assigning more
/// tasks to busy partition measured by processed data size. This is used by
/// local partition to scale table writers for now.
///
/// NOTE: this object is not thread-safe.
class SkewedPartitionRebalancer {
public:
/// 'partitionCount' is the number of partitions to process. 'taskCount' is
/// 'numPartitions' is the number of partitions to process. 'numTasks' is
/// number of tasks for execution.
/// 'minProcessedBytesRebalanceThresholdPerPartition' is the processed bytes
/// threshold to trigger task scaling for a single partition.
Expand All @@ -41,34 +39,46 @@ class SkewedPartitionRebalancer {
/// measured in the total number of processed data size from all its serving
/// partitions.
SkewedPartitionRebalancer(
uint32_t partitionCount,
uint32_t taskCount,
uint32_t numPartitions,
uint32_t numTasks,
uint64_t minProcessedBytesRebalanceThresholdPerPartition,
uint64_t minProcessedBytesRebalanceThreshold);

~SkewedPartitionRebalancer() {
VELOX_CHECK(!rebalancing_);
}

/// Invoked to rebalance the partition assignments if applicable.
void rebalance();

/// Gets the assigned task id for a given 'partition'. 'index' is used to
/// choose one of multiple assigned tasks in a round-robin order.
uint32_t getTaskId(uint32_t partition, uint64_t index) const {
const auto& taskList = partitionAssignments_[partition];
return taskList[index % taskList.size()];
auto& taskList = partitionAssignments_[partition];
return taskList.nextTaskId(index);
}

/// Adds the processed partition row count. This is used to estimate the
/// processed bytes of a partition.
void addPartitionRowCount(uint32_t partition, uint32_t numRows) {
VELOX_CHECK_LT(partition, partitionCount_);
VELOX_CHECK_LT(partition, numPartitions_);
partitionRowCount_[partition] += numRows;
}

/// Adds the total processed bytes from all the partitions.
void addProcessedBytes(long bytes) {
void addProcessedBytes(int64_t bytes) {
VELOX_CHECK_GT(bytes, 0);
processedBytes_ += bytes;
}

uint32_t numPartitions() const {
return numPartitions_;
}

uint32_t numTasks() const {
return numTasks_;
}

/// The rebalancer internal stats.
struct Stats {
/// The number of times that triggers rebalance.
Expand All @@ -85,13 +95,15 @@ class SkewedPartitionRebalancer {
};

Stats stats() const {
return stats_;
return Stats{
.numBalanceTriggers = numBalanceTriggers_.load(),
.numScaledPartitions = numScaledPartitions_.load()};
}

private:
bool shouldRebalance() const;
bool shouldRebalance(int64_t processedBytes) const;

void rebalancePartitions();
void rebalancePartitions(int64_t processedBytes);

// Calculates the partition processed data size based on the number of
// processed rows and the averaged row size.
Expand Down Expand Up @@ -132,19 +144,22 @@ class SkewedPartitionRebalancer {

static constexpr double kTaskSkewnessThreshod_{0.7};

const uint32_t partitionCount_;
const uint32_t taskCount_;
const uint32_t numPartitions_;
const uint32_t numTasks_;
const uint64_t minProcessedBytesRebalanceThresholdPerPartition_;
const uint64_t minProcessedBytesRebalanceThreshold_;

// The accumulated number of rows processed by each partition.
std::vector<uint64_t> partitionRowCount_;
std::vector<std::atomic_uint64_t> partitionRowCount_;

// Indicates if the rebalancer is running or not.
std::atomic_bool rebalancing_{false};

// The accumulated number of bytes processed by all the partitions.
uint64_t processedBytes_{0};
std::atomic_int64_t processedBytes_{0};
// 'processedBytes_' at the last rebalance. It is used to calculate the
// processed bytes changes since the last rebalance.
uint64_t processedBytesAtLastRebalance_{0};
std::atomic_int64_t processedBytesAtLastRebalance_{0};
// The accumulated number of bytes processed by each partition.
std::vector<uint64_t> partitionBytes_;
// 'partitionBytes_' at the last rebalance. It is used to calculate the
Expand All @@ -157,10 +172,29 @@ class SkewedPartitionRebalancer {
// The estimated task processed bytes since the last rebalance.
std::vector<uint64_t> estimatedTaskBytesSinceLastRebalance_;

// The assigned task id list for each partition.
std::vector<std::vector<uint32_t>> partitionAssignments_;
// The assigned task ids for a partition
class PartitionAssignment {
public:
PartitionAssignment() = default;

void addTaskId(uint32_t taskId);

uint32_t nextTaskId(uint64_t index) const;

const std::vector<uint32_t>& taskIds() const;

uint32_t size() const;

private:
mutable folly::SharedMutex lock_;
std::vector<uint32_t> taskIds_;
};
std::vector<PartitionAssignment> partitionAssignments_;

Stats stats_;
// The number of times that triggers rebalance.
std::atomic_uint64_t numBalanceTriggers_{0};
// The number of times that a scaled partition processing.
std::atomic_uint32_t numScaledPartitions_{0};

friend class test::SkewedPartitionRebalancerTestHelper;
};
Expand Down
Loading

0 comments on commit 733bc0c

Please sign in to comment.