Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Let arbitrator system use nano second #11415

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 33 additions & 33 deletions velox/common/memory/ArbitrationOperation.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,10 @@ using namespace facebook::velox::memory;
ArbitrationOperation::ArbitrationOperation(
ScopedArbitrationParticipant&& participant,
uint64_t requestBytes,
uint64_t timeoutMs)
uint64_t timeoutNs)
: requestBytes_(requestBytes),
timeoutMs_(timeoutMs),
createTimeMs_(getCurrentTimeMs()),
timeoutNs_(timeoutNs),
createTimeNs_(getCurrentTimeNano()),
participant_(std::move(participant)) {
VELOX_CHECK_GT(requestBytes_, 0);
}
Expand Down Expand Up @@ -84,45 +84,45 @@ void ArbitrationOperation::start() {
VELOX_CHECK_EQ(state_, State::kInit);
participant_->startArbitration(this);
setState(ArbitrationOperation::State::kRunning);
VELOX_CHECK_EQ(startTimeMs_, 0);
startTimeMs_ = getCurrentTimeMs();
VELOX_CHECK_EQ(startTimeNs_, 0);
startTimeNs_ = getCurrentTimeNano();
}

void ArbitrationOperation::finish() {
setState(State::kFinished);
VELOX_CHECK_EQ(finishTimeMs_, 0);
finishTimeMs_ = getCurrentTimeMs();
VELOX_CHECK_EQ(finishTimeNs_, 0);
finishTimeNs_ = getCurrentTimeNano();
participant_->finishArbitration(this);
}

bool ArbitrationOperation::aborted() const {
return participant_->aborted();
}

size_t ArbitrationOperation::executionTimeMs() const {
uint64_t ArbitrationOperation::executionTimeNs() const {
if (state_ == State::kFinished) {
VELOX_CHECK_GE(finishTimeMs_, createTimeMs_);
return finishTimeMs_ - createTimeMs_;
VELOX_CHECK_GE(finishTimeNs_, createTimeNs_);
return finishTimeNs_ - createTimeNs_;
} else {
const auto currentTimeMs = getCurrentTimeMs();
VELOX_CHECK_GE(currentTimeMs, createTimeMs_);
return currentTimeMs - createTimeMs_;
const auto currentTimeNs = getCurrentTimeNano();
VELOX_CHECK_GE(currentTimeNs, createTimeNs_);
return currentTimeNs - createTimeNs_;
}
}

bool ArbitrationOperation::hasTimeout() const {
return state_ != State::kFinished && timeoutMs() <= 0;
return state_ != State::kFinished && timeoutNs() <= 0;
}

size_t ArbitrationOperation::timeoutMs() const {
uint64_t ArbitrationOperation::timeoutNs() const {
if (state_ == State::kFinished) {
return 0;
}
const auto execTimeMs = executionTimeMs();
if (execTimeMs >= timeoutMs_) {
const auto execTimeNs = executionTimeNs();
if (execTimeNs >= timeoutNs_) {
return 0;
}
return timeoutMs_ - execTimeMs;
return timeoutNs_ - execTimeNs;
}

void ArbitrationOperation::setGrowTargets() {
Expand All @@ -139,28 +139,28 @@ void ArbitrationOperation::setGrowTargets() {

ArbitrationOperation::Stats ArbitrationOperation::stats() const {
VELOX_CHECK_EQ(state_, State::kFinished);
VELOX_CHECK_NE(startTimeMs_, 0);
VELOX_CHECK_NE(startTimeNs_, 0);

const uint64_t executionTimeMs = this->executionTimeMs();
const uint64_t executionTimeNs = this->executionTimeNs();

VELOX_CHECK_GE(startTimeMs_, createTimeMs_);
const uint64_t localArbitrationWaitTimeMs = startTimeMs_ - createTimeMs_;
if (globalArbitrationStartTimeMs_ == 0) {
VELOX_CHECK_GE(startTimeNs_, createTimeNs_);
const uint64_t localArbitrationWaitTimeNs = startTimeNs_ - createTimeNs_;
if (globalArbitrationStartTimeNs_ == 0) {
return {
localArbitrationWaitTimeMs,
finishTimeMs_ - startTimeMs_,
localArbitrationWaitTimeNs,
finishTimeNs_ - startTimeNs_,
0,
executionTimeMs};
executionTimeNs};
}

VELOX_CHECK_GE(globalArbitrationStartTimeMs_, startTimeMs_);
const uint64_t localArbitrationExecTimeMs =
globalArbitrationStartTimeMs_ - startTimeMs_;
VELOX_CHECK_GE(globalArbitrationStartTimeNs_, startTimeNs_);
const uint64_t localArbitrationExecTimeNs =
globalArbitrationStartTimeNs_ - startTimeNs_;
return {
localArbitrationWaitTimeMs,
localArbitrationExecTimeMs,
finishTimeMs_ - globalArbitrationStartTimeMs_,
executionTimeMs};
localArbitrationWaitTimeNs,
localArbitrationExecTimeNs,
finishTimeNs_ - globalArbitrationStartTimeNs_,
executionTimeNs};
}

std::ostream& operator<<(std::ostream& out, ArbitrationOperation::State state) {
Expand Down
28 changes: 14 additions & 14 deletions velox/common/memory/ArbitrationOperation.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ class ArbitrationOperation {
ArbitrationOperation(
ScopedArbitrationParticipant&& pool,
uint64_t requestBytes,
uint64_t timeoutMs);
uint64_t timeoutNs);

~ArbitrationOperation();

Expand Down Expand Up @@ -95,28 +95,28 @@ class ArbitrationOperation {

/// Returns the remaining execution time for this operation before time out.
/// If the operation has already finished, this returns zero.
size_t timeoutMs() const;
uint64_t timeoutNs() const;

/// Returns true if this operation has timed out.
bool hasTimeout() const;

/// Returns the execution time of this arbitration operation since creation.
size_t executionTimeMs() const;
uint64_t executionTimeNs() const;

/// Invoked to mark the start of global arbitration. This is used to measure
/// how much time spent in waiting for global arbitration.
void recordGlobalArbitrationStartTime() {
VELOX_CHECK_EQ(globalArbitrationStartTimeMs_, 0);
VELOX_CHECK_EQ(globalArbitrationStartTimeNs_, 0);
VELOX_CHECK_EQ(state_, State::kRunning);
globalArbitrationStartTimeMs_ = getCurrentTimeMs();
globalArbitrationStartTimeNs_ = getCurrentTimeNano();
}

/// The execution stats of this arbitration operation after completion.
struct Stats {
uint64_t localArbitrationWaitTimeMs{0};
uint64_t localArbitrationExecTimeMs{0};
uint64_t globalArbitrationWaitTimeMs{0};
uint64_t executionTimeMs{0};
uint64_t localArbitrationWaitTimeNs{0};
uint64_t localArbitrationExecTimeNs{0};
uint64_t globalArbitrationWaitTimeNs{0};
uint64_t executionTimeNs{0};
};

/// NOTE: should only called after this arbitration operation finishes.
Expand All @@ -126,22 +126,22 @@ class ArbitrationOperation {
void setState(State state);

const uint64_t requestBytes_;
const uint64_t timeoutMs_;
const uint64_t timeoutNs_;

// The start time of this arbitration operation.
const uint64_t createTimeMs_;
const uint64_t createTimeNs_;
const ScopedArbitrationParticipant participant_;

State state_{State::kInit};

uint64_t startTimeMs_{0};
uint64_t finishTimeMs_{0};
uint64_t startTimeNs_{0};
uint64_t finishTimeNs_{0};

uint64_t maxGrowBytes_{0};
uint64_t minGrowBytes_{0};

// The time that starts global arbitration wait
uint64_t globalArbitrationStartTimeMs_{};
uint64_t globalArbitrationStartTimeNs_{};

friend class ArbitrationParticipant;
};
Expand Down
12 changes: 6 additions & 6 deletions velox/common/memory/ArbitrationParticipant.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ ArbitrationParticipant::ArbitrationParticipant(
pool_(pool.get()),
config_(config),
maxCapacity_(pool_->maxCapacity()),
createTimeUs_(getCurrentTimeMicro()) {
createTimeNs_(getCurrentTimeNano()) {
VELOX_CHECK_LE(
config_->minCapacity,
maxCapacity_,
Expand Down Expand Up @@ -261,7 +261,7 @@ void ArbitrationParticipant::finishArbitration(ArbitrationOperation* op) {

uint64_t ArbitrationParticipant::reclaim(
uint64_t targetBytes,
uint64_t maxWaitTimeMs,
uint64_t maxWaitTimeNs,
MemoryReclaimer::Stats& stats) noexcept {
targetBytes = std::max(targetBytes, config_->minReclaimBytes);
if (targetBytes == 0) {
Expand All @@ -275,7 +275,7 @@ uint64_t ArbitrationParticipant::reclaim(
++numReclaims_;
VELOX_MEM_LOG(INFO) << "Reclaiming from memory pool " << pool_->name()
<< " with target " << succinctBytes(targetBytes);
pool_->reclaim(targetBytes, maxWaitTimeMs, stats);
pool_->reclaim(targetBytes, maxWaitTimeNs / 1'000'000, stats);
reclaimedBytes = shrink(/*reclaimAll=*/false);
} catch (const std::exception& e) {
VELOX_MEM_LOG(ERROR) << "Failed to reclaim from memory pool "
Expand Down Expand Up @@ -354,9 +354,9 @@ uint64_t ArbitrationParticipant::abortLocked(
}

bool ArbitrationParticipant::waitForReclaimOrAbort(
uint64_t maxWaitTimeMs) const {
uint64_t maxWaitTimeNs) const {
std::unique_lock<std::timed_mutex> l(
reclaimLock_, std::chrono::milliseconds(maxWaitTimeMs));
reclaimLock_, std::chrono::nanoseconds(maxWaitTimeNs));
return l.owns_lock();
}

Expand All @@ -380,7 +380,7 @@ std::string ArbitrationParticipant::Stats::toString() const {
succinctBytes(reclaimedBytes),
succinctBytes(growBytes),
aborted,
succinctMicros(durationUs));
succinctNanos(durationNs));
}

ScopedArbitrationParticipant::ScopedArbitrationParticipant(
Expand Down
22 changes: 11 additions & 11 deletions velox/common/memory/ArbitrationParticipant.h
Original file line number Diff line number Diff line change
Expand Up @@ -144,10 +144,10 @@ class ArbitrationParticipant
}

/// Returns the duration of this arbitration participant since its creation.
uint64_t durationUs() const {
const auto now = getCurrentTimeMicro();
VELOX_CHECK_GE(now, createTimeUs_);
return now - createTimeUs_;
uint64_t durationNs() const {
const auto now = getCurrentTimeNano();
VELOX_CHECK_GE(now, createTimeNs_);
return now - createTimeNs_;
}

/// Invoked to acquire a shared reference to this arbitration participant
Expand Down Expand Up @@ -206,11 +206,11 @@ class ArbitrationParticipant
/// restriction.
uint64_t shrink(bool reclaimAll = false);

// Invoked to reclaim used memory from this memory pool with specified
// 'targetBytes'. The function returns the actually freed capacity.
/// Invoked to reclaim used memory from this memory pool with specified
/// 'targetBytes'. The function returns the actually freed capacity.
uint64_t reclaim(
uint64_t targetBytes,
uint64_t maxWaitTimeMs,
uint64_t maxWaitTimeNs,
MemoryReclaimer::Stats& stats) noexcept;

/// Invoked to abort the query memory pool and returns the reclaimed bytes
Expand All @@ -226,7 +226,7 @@ class ArbitrationParticipant
/// Invoked to wait for the pending memory reclaim or abort operation to
/// complete within a 'maxWaitTimeMs' time window. The function returns false
/// if the wait has timed out.
bool waitForReclaimOrAbort(uint64_t maxWaitTimeMs) const;
bool waitForReclaimOrAbort(uint64_t maxWaitTimeNs) const;

/// Invoked to start arbitration operation 'op'. The operation needs to wait
/// for the prior arbitration operations to finish first before executing to
Expand All @@ -246,7 +246,7 @@ class ArbitrationParticipant
size_t numWaitingOps() const;

struct Stats {
uint64_t durationUs{0};
uint64_t durationNs{0};
uint32_t numRequests{0};
uint32_t numReclaims{0};
uint32_t numShrinks{0};
Expand All @@ -260,7 +260,7 @@ class ArbitrationParticipant

Stats stats() const {
Stats stats;
stats.durationUs = durationUs();
stats.durationNs = durationNs();
stats.aborted = aborted_;
stats.numRequests = numRequests_;
stats.numGrows = numGrows_;
Expand Down Expand Up @@ -310,7 +310,7 @@ class ArbitrationParticipant
MemoryPool* const pool_;
const Config* const config_;
const uint64_t maxCapacity_;
const size_t createTimeUs_;
const uint64_t createTimeNs_;

mutable std::mutex stateLock_;
bool aborted_{false};
Expand Down
Loading
Loading