Skip to content

Commit

Permalink
Add shutdown support memory arbitrator (facebookincubator#11325)
Browse files Browse the repository at this point in the history
Summary:
Pull Request resolved: facebookincubator#11325

There are bunch of flaky mock arbitrator tests which are due to the background global arbitration
which can continue to run after the test finishes.
In general we need the shutdown procedure to handle the shared arbitrator destruction properly.
This PR adds shutdown support in shared arbitrator and is invoked from memory manager. This fixes
plus some test fixes solves all existing flakiness in mock arbitrator tests in Meta.

Reviewed By: tanjialiang, kagamiori

Differential Revision: D64742451

fbshipit-source-id: 84d202f4e6926be030b009b2a1128372b1f619a5
  • Loading branch information
xiaoxmeng authored and facebook-github-bot committed Oct 23, 2024
1 parent 2661238 commit 5d8823f
Show file tree
Hide file tree
Showing 11 changed files with 171 additions and 28 deletions.
2 changes: 2 additions & 0 deletions velox/common/base/tests/StatsReporterTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,8 @@ class TestStatsReportMemoryArbitrator : public memory::MemoryArbitrator {
return "test";
}

void shutdown() override {}

void addPool(const std::shared_ptr<memory::MemoryPool>& /*unused*/) override {
}

Expand Down
2 changes: 2 additions & 0 deletions velox/common/memory/Memory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,8 @@ MemoryManager::MemoryManager(const MemoryManagerOptions& options)
}

MemoryManager::~MemoryManager() {
arbitrator_->shutdown();

if (pools_.size() != 0) {
const auto errMsg = fmt::format(
"pools_.size() != 0 ({} vs {}). There are unexpected alive memory "
Expand Down
2 changes: 2 additions & 0 deletions velox/common/memory/MemoryArbitrator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,8 @@ class NoopArbitrator : public MemoryArbitrator {
return "NOOP";
}

void shutdown() override {}

void addPool(const std::shared_ptr<MemoryPool>& pool) override {
VELOX_CHECK_EQ(pool->capacity(), 0);
growPool(pool.get(), pool->maxCapacity(), 0);
Expand Down
4 changes: 4 additions & 0 deletions velox/common/memory/MemoryArbitrator.h
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,10 @@ class MemoryArbitrator {

virtual ~MemoryArbitrator() = default;

/// Invoked by the memory manager to shutdown the memory arbitrator to stop
/// serving new memory arbitration requests.
virtual void shutdown() = 0;

/// Invoked by the memory manager to add a newly created memory pool. The
/// memory arbitrator allocates the initial capacity for 'pool' and
/// dynamically adjusts its capacity based query memory needs through memory
Expand Down
67 changes: 40 additions & 27 deletions velox/common/memory/SharedArbitrator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,31 @@ SharedArbitrator::SharedArbitrator(const Config& config)
<< participantConfig_.toString();
}

void SharedArbitrator::shutdown() {
{
std::lock_guard<std::mutex> l(stateLock_);
VELOX_CHECK(globalArbitrationWaiters_.empty());
if (hasShutdownLocked()) {
return;
}
state_ = State::kShutdown;
}

shutdownGlobalArbitration();

VELOX_MEM_LOG(INFO) << "Stopping memory reclaim executor '"
<< memoryReclaimExecutor_->getName() << "': threads: "
<< memoryReclaimExecutor_->numActiveThreads() << "/"
<< memoryReclaimExecutor_->numThreads()
<< ", task queue: "
<< memoryReclaimExecutor_->getTaskQueueSize();
memoryReclaimExecutor_.reset();
VELOX_MEM_LOG(INFO) << "Memory reclaim executor stopped";

VELOX_CHECK_EQ(
participants_.size(), 0, "Unexpected alive participants on destruction");
}

void SharedArbitrator::setupGlobalArbitration() {
if (!globalArbitrationEnabled_) {
return;
Expand Down Expand Up @@ -291,14 +316,6 @@ void SharedArbitrator::shutdownGlobalArbitration() {

VELOX_CHECK(!globalArbitrationAbortCapacityLimits_.empty());
VELOX_CHECK_NOT_NULL(globalArbitrationController_);
{
std::lock_guard<std::mutex> l(stateLock_);
// We only expect stop global arbitration once during velox runtime
// shutdown.
VELOX_CHECK(!globalArbitrationStop_);
VELOX_CHECK(globalArbitrationWaiters_.empty());
globalArbitrationStop_ = true;
}

VELOX_MEM_LOG(INFO) << "Stopping global arbitration controller";
globalArbitrationThreadCv_.notify_one();
Expand All @@ -315,19 +332,7 @@ void SharedArbitrator::wakeupGlobalArbitrationThread() {
}

SharedArbitrator::~SharedArbitrator() {
shutdownGlobalArbitration();

VELOX_MEM_LOG(INFO) << "Stopping memory reclaim executor '"
<< memoryReclaimExecutor_->getName() << "': threads: "
<< memoryReclaimExecutor_->numActiveThreads() << "/"
<< memoryReclaimExecutor_->numThreads()
<< ", task queue: "
<< memoryReclaimExecutor_->getTaskQueueSize();
memoryReclaimExecutor_.reset();
VELOX_MEM_LOG(INFO) << "Memory reclaim executor stopped";

VELOX_CHECK_EQ(
participants_.size(), 0, "Unexpected alive participants on destruction");
shutdown();

if (freeNonReservedCapacity_ + freeReservedCapacity_ != capacity_) {
const std::string errMsg = fmt::format(
Expand Down Expand Up @@ -393,6 +398,8 @@ void SharedArbitrator::finishArbitration(ArbitrationOperation* op) {
}

void SharedArbitrator::addPool(const std::shared_ptr<MemoryPool>& pool) {
checkRunning();

VELOX_CHECK_EQ(pool->capacity(), 0);

auto newParticipant = ArbitrationParticipant::create(
Expand Down Expand Up @@ -512,8 +519,8 @@ std::optional<ArbitrationCandidate> SharedArbitrator::findAbortCandidate(
candidateIdx = i;
continue;
}
// With the same capacity size bucket, we favor the old participant to let
// long running query proceed first.
// With the same capacity size bucket, we favor the old participant to
// let long running query proceed first.
if (candidates[candidateIdx].participant->id() <
candidates[i].participant->id()) {
candidateIdx = i;
Expand Down Expand Up @@ -608,6 +615,8 @@ uint64_t SharedArbitrator::allocateCapacityLocked(
uint64_t SharedArbitrator::shrinkCapacity(
MemoryPool* pool,
uint64_t /*unused*/) {
checkRunning();

VELOX_CHECK(pool->isRoot());
auto participant = getParticipant(pool->name());
VELOX_CHECK(participant.has_value());
Expand All @@ -618,6 +627,8 @@ uint64_t SharedArbitrator::shrinkCapacity(
uint64_t requestBytes,
bool allowSpill,
bool allowAbort) {
checkRunning();

const uint64_t targetBytes = requestBytes == 0 ? capacity_ : requestBytes;
ScopedMemoryArbitrationContext abitrationCtx{};
const uint64_t startTimeMs = getCurrentTimeMs();
Expand Down Expand Up @@ -663,6 +674,8 @@ ArbitrationOperation SharedArbitrator::createArbitrationOperation(
}

bool SharedArbitrator::growCapacity(MemoryPool* pool, uint64_t requestBytes) {
checkRunning();

VELOX_CHECK(pool->isRoot());
auto op = createArbitrationOperation(pool, requestBytes);
ScopedArbitration scopedArbitration(this, &op);
Expand Down Expand Up @@ -711,8 +724,8 @@ bool SharedArbitrator::growCapacity(ArbitrationOperation& op) {
if (!globalArbitrationEnabled_ &&
op.participant()->reclaimableUsedCapacity() >=
participantConfig_.minReclaimBytes) {
// NOTE: if global memory arbitration is not enabled, we will try to reclaim
// from the participant itself before failing this operation.
// NOTE: if global memory arbitration is not enabled, we will try to
// reclaim from the participant itself before failing this operation.
reclaim(
op.participant(),
op.requestBytes(),
Expand Down Expand Up @@ -804,9 +817,9 @@ void SharedArbitrator::globalArbitrationMain() {
{
std::unique_lock l(stateLock_);
globalArbitrationThreadCv_.wait(l, [&] {
return globalArbitrationStop_ || !globalArbitrationWaiters_.empty();
return hasShutdownLocked() || !globalArbitrationWaiters_.empty();
});
if (globalArbitrationStop_) {
if (hasShutdownLocked()) {
VELOX_CHECK(globalArbitrationWaiters_.empty());
break;
}
Expand Down
24 changes: 23 additions & 1 deletion velox/common/memory/SharedArbitrator.h
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,8 @@ class SharedArbitrator : public memory::MemoryArbitrator {

static void unregisterFactory();

void shutdown() override;

void addPool(const std::shared_ptr<MemoryPool>& pool) final;

void removePool(MemoryPool* pool) final;
Expand Down Expand Up @@ -253,6 +255,16 @@ class SharedArbitrator : public memory::MemoryArbitrator {
"globalArbitrationWaitWallNanos"};

private:
// Define the internal execution states of the arbitrator.
enum class State {
kRunning,
// Indicates the arbitrator is shutting down. The arbitrator doesn't accept
// any new arbitration requests under this state except remove pool as the
// last pool reference might be still held by the background global memory
// arbitration.
kShutdown,
};

// The kind string of shared arbitrator.
inline static const std::string kind_{"SHARED"};

Expand Down Expand Up @@ -283,6 +295,15 @@ class SharedArbitrator : public memory::MemoryArbitrator {
const memory::ScopedMemoryArbitrationContext arbitrationCtx_{};
};

FOLLY_ALWAYS_INLINE void checkRunning() {
std::lock_guard<std::mutex> l(stateLock_);
VELOX_CHECK(!hasShutdownLocked(), "SharedArbitrator is not running");
}

FOLLY_ALWAYS_INLINE bool hasShutdownLocked() const {
return state_ == State::kShutdown;
}

// Invoked to get the arbitration participant by 'name'. The function returns
// std::nullopt if the underlying query memory pool is destroyed.
std::optional<ScopedArbitrationParticipant> getParticipant(
Expand Down Expand Up @@ -553,10 +574,11 @@ class SharedArbitrator : public memory::MemoryArbitrator {
// Lock used to protect the arbitrator internal state.
mutable std::mutex stateLock_;

State state_{State::kRunning};

tsan_atomic<uint64_t> freeReservedCapacity_{0};
tsan_atomic<uint64_t> freeNonReservedCapacity_{0};

bool globalArbitrationStop_{false};
// Indicates if the global arbitration is currently running or not.
tsan_atomic<bool> globalArbitrationRunning_{false};

Expand Down
2 changes: 2 additions & 0 deletions velox/common/memory/tests/ArbitrationParticipantTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@ class TestArbitrator : public MemoryArbitrator {
.capacity = config.capacity,
.extraConfigs = config.extraConfigs}) {}

void shutdown() override {}

void addPool(const std::shared_ptr<MemoryPool>& /*unused*/) override {}

void removePool(MemoryPool* /*unused*/) override {}
Expand Down
2 changes: 2 additions & 0 deletions velox/common/memory/tests/MemoryArbitratorTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -333,6 +333,8 @@ class FakeTestArbitrator : public MemoryArbitrator {
return "USER";
}

void shutdown() override {}

void addPool(const std::shared_ptr<MemoryPool>& /*unused*/) override {}

void removePool(MemoryPool* /*unused*/) override {}
Expand Down
2 changes: 2 additions & 0 deletions velox/common/memory/tests/MemoryManagerTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,8 @@ class FakeTestArbitrator : public MemoryArbitrator {
.extraConfigs = config.extraConfigs}),
injectAddPoolFailure_(injectAddPoolFailure) {}

void shutdown() override {}

void addPool(const std::shared_ptr<MemoryPool>& /*unused*/) override {
VELOX_CHECK(!injectAddPoolFailure_, "Failed to add pool");
}
Expand Down
87 changes: 87 additions & 0 deletions velox/common/memory/tests/MockSharedArbitratorTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2268,6 +2268,91 @@ DEBUG_ONLY_TEST_F(MockSharedArbitrationTest, arbitrationAbort) {
ASSERT_EQ(task3->capacity(), memoryCapacity / 4);
}

TEST_F(MockSharedArbitrationTest, shutdown) {
uint64_t memoryCapacity = 256 * MB;
setupMemory(memoryCapacity);
arbitrator_->shutdown();
// double shutdown.
arbitrator_->shutdown();
// Check APIs.
// NOTE: the arbitrator running is first check for external APIs.
VELOX_ASSERT_THROW(
arbitrator_->addPool(nullptr), "SharedArbitrator is not running");
VELOX_ASSERT_THROW(
arbitrator_->growCapacity(nullptr, 0), "SharedArbitrator is not running");
VELOX_ASSERT_THROW(
arbitrator_->shrinkCapacity(nullptr, 0),
"SharedArbitrator is not running");

auto arbitratorHelper = test::SharedArbitratorTestHelper(arbitrator_);
ASSERT_TRUE(arbitratorHelper.hasShutdown());
}

DEBUG_ONLY_TEST_F(MockSharedArbitrationTest, shutdownWait) {
uint64_t memoryCapacity = 256 * MB;
setupMemory(
memoryCapacity, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1.0, nullptr, true, 2'000);
std::shared_ptr<MockTask> task1 = addTask(memoryCapacity);
auto* op1 = task1->addMemoryOp(true);
op1->allocate(memoryCapacity / 2);
ASSERT_EQ(task1->capacity(), memoryCapacity / 2);

std::shared_ptr<MockTask> task2 = addTask(memoryCapacity);
auto* op2 = task2->addMemoryOp(true);
op2->allocate(memoryCapacity / 2);
ASSERT_EQ(task2->capacity(), memoryCapacity / 2);

folly::EventCount globalArbitrationStarted;
std::atomic_bool globalArbitrationStartedFlag{false};
folly::EventCount globalArbitrationWait;
std::atomic_bool globalArbitrationWaitFlag{true};
SCOPED_TESTVALUE_SET(
"facebook::velox::memory::SharedArbitrator::runGlobalArbitration",
std::function<void(const SharedArbitrator*)>(
([&](const SharedArbitrator* arbitrator) {
test::SharedArbitratorTestHelper arbitratorHelper(
const_cast<SharedArbitrator*>(arbitrator));
ASSERT_EQ(arbitratorHelper.numGlobalArbitrationWaiters(), 1);
globalArbitrationStartedFlag = true;
globalArbitrationStarted.notifyAll();
globalArbitrationWait.await(
[&]() { return !globalArbitrationWaitFlag.load(); });
})));
VELOX_ASSERT_THROW(
op1->allocate(memoryCapacity / 4),
"Memory arbitration timed out on memory pool");
globalArbitrationStarted.await(
[&]() { return globalArbitrationStartedFlag.load(); });

op2->freeAll();
task2.reset();
op1->freeAll();
task1.reset();

test::SharedArbitratorTestHelper arbitratorHelper(
const_cast<SharedArbitrator*>(arbitrator_));
ASSERT_FALSE(arbitratorHelper.hasShutdown());

std::atomic_bool shutdownCompleted{false};
std::thread shutdownThread([&]() {
arbitrator_->shutdown();
shutdownCompleted = true;
});

std::this_thread::sleep_for(std::chrono::seconds(2)); // NOLINT
ASSERT_FALSE(shutdownCompleted);
ASSERT_TRUE(arbitratorHelper.globalArbitrationRunning());
ASSERT_TRUE(arbitratorHelper.hasShutdown());

globalArbitrationWaitFlag = false;
globalArbitrationWait.notifyAll();

arbitratorHelper.waitForGlobalArbitrationToFinish();
shutdownThread.join();
ASSERT_TRUE(shutdownCompleted);
ASSERT_TRUE(arbitratorHelper.hasShutdown());
}

TEST_F(MockSharedArbitrationTest, memoryPoolAbortCapacityLimit) {
const int64_t memoryCapacity = 256 << 20;

Expand Down Expand Up @@ -3128,6 +3213,7 @@ DEBUG_ONLY_TEST_F(MockSharedArbitrationTest, abortWithNoCandidate) {
abortWait.notifyAll();

allocationThread.join();
arbitratorHelper.waitForGlobalArbitrationToFinish();
}

// This test verifies the global arbitration can handle the case that there is
Expand Down Expand Up @@ -3193,6 +3279,7 @@ DEBUG_ONLY_TEST_F(MockSharedArbitrationTest, reclaimWithNoCandidate) {
reclaimWait.notifyAll();

allocationThread.join();
arbitratorHelper.waitForGlobalArbitrationToFinish();
}

TEST_F(MockSharedArbitrationTest, arbitrateBySelfMemoryReclaim) {
Expand Down
5 changes: 5 additions & 0 deletions velox/common/memory/tests/SharedArbitratorTestUtil.h
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,11 @@ class SharedArbitratorTestHelper {
return arbitrator_->globalArbitrationController_.get();
}

bool hasShutdown() const {
std::lock_guard<std::mutex> l(arbitrator_->stateLock_);
return arbitrator_->hasShutdownLocked();
}

private:
SharedArbitrator* const arbitrator_;
};
Expand Down

0 comments on commit 5d8823f

Please sign in to comment.