Skip to content

Commit

Permalink
feat(treansaction): Use single hop in squashing when possible
Browse files Browse the repository at this point in the history
Signed-off-by: Vladislav Oleshko <[email protected]>
  • Loading branch information
dranikpg committed Jan 4, 2024
1 parent 0e72846 commit d3374b9
Show file tree
Hide file tree
Showing 5 changed files with 83 additions and 28 deletions.
9 changes: 5 additions & 4 deletions src/server/main_service.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1987,15 +1987,15 @@ CmdArgVec CollectAllKeys(ConnectionState::ExecInfo* exec_info) {

// Return true if transaction was scheduled, false if scheduling was not required.
void StartMultiExec(DbIndex dbid, Transaction* trans, ConnectionState::ExecInfo* exec_info,
Transaction::MultiMode multi_mode) {
Transaction::MultiMode multi_mode, bool squashing) {
CmdArgVec tmp_keys;
switch (multi_mode) {
case Transaction::GLOBAL:
trans->StartMultiGlobal(dbid);
break;
case Transaction::LOCK_AHEAD:
tmp_keys = CollectAllKeys(exec_info);
trans->StartMultiLockedAhead(dbid, CmdArgList{tmp_keys});
trans->StartMultiLockedAhead(dbid, CmdArgList{tmp_keys}, squashing /* skip schedule */);
break;
case Transaction::NON_ATOMIC:
trans->StartMultiNonAtomic();
Expand Down Expand Up @@ -2031,6 +2031,7 @@ void Service::Exec(CmdArgList args, ConnectionContext* cntx) {
const CommandId* const exec_cid = cntx->cid;
CmdArgVec arg_vec;
ExecEvalState state = DetermineEvalPresense(exec_info.body);
bool allow_squashing = absl::GetFlag(FLAGS_multi_exec_squash) && state == ExecEvalState::NONE;

// We adjust the atomicity level of multi transaction inside StartMultiExec. i.e if multi mode is
// lock ahead and we run global script in the transaction then multi mode will be global.
Expand All @@ -2041,7 +2042,7 @@ void Service::Exec(CmdArgList args, ConnectionContext* cntx) {

bool scheduled = false;
if (*multi_mode != Transaction::NOT_DETERMINED) {
StartMultiExec(cntx->db_index(), cntx->transaction, &exec_info, *multi_mode);
StartMultiExec(cntx->db_index(), cntx->transaction, &exec_info, *multi_mode, allow_squashing);
scheduled = true;
}

Expand Down Expand Up @@ -2069,7 +2070,7 @@ void Service::Exec(CmdArgList args, ConnectionContext* cntx) {
ss->exec_freq_count[descr]++;
}

if (absl::GetFlag(FLAGS_multi_exec_squash) && state == ExecEvalState::NONE) {
if (allow_squashing) {
MultiCommandSquasher::Execute(absl::MakeSpan(exec_info.body), cntx, this);
} else {
for (auto& scmd : exec_info.body) {
Expand Down
65 changes: 48 additions & 17 deletions src/server/multi_command_squasher.cc
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#include "server/multi_command_squasher.h"

#include <absl/container/inlined_vector.h>
#include <absl/functional/bind_front.h>

#include "facade/dragonfly_connection.h"
#include "server/command_registry.h"
Expand Down Expand Up @@ -66,6 +67,39 @@ MultiCommandSquasher::ShardExecInfo& MultiCommandSquasher::PrepareShardInfo(Shar
return sinfo;
}

// Atomic transactions (that have all keys locked) perform hops and run squashed commands via
// stubs, non-atomic ones just run the commands in parallel (hopefully running many hops inline).
void MultiCommandSquasher::PerformHop() {
auto* tx = cntx_->transaction;

// Simple fanout for non-atomic.
if (!IsAtomic()) {
shard_set->RunBlockingInParallel([this, tx](auto* es) { SquashedHopCb(tx, es); },
[this](auto sid) { return !sharded_[sid].cmds.empty(); });
return;
}

cntx_->cid = base_cid_;
auto run_cb = absl::bind_front(&MultiCommandSquasher::SquashedHopCb, this);

// If all commands fit into a single batch, run them as a real single hop without multi overhead.
if (!tx->IsScheduled() && cmds_.empty()) {
tx->MultiSwitchToNonAtomic();
DCHECK_GT(tx->GetUniqueShardCnt(), 0u); // it was initialized and determined active shards
tx->ScheduleSingleHop(run_cb);
return;
}

if (!tx->IsScheduled()) {
DCHECK_GT(cmds_.size(), 0u); // check single hop optimization not missed
tx->Schedule();
}

auto check_cb = [this](ShardId sid) { return !sharded_[sid].cmds.empty(); };
tx->PrepareSquashedMultiHop(base_cid_, check_cb);
tx->ScheduleSingleHop(run_cb);
}

MultiCommandSquasher::SquashResult MultiCommandSquasher::TrySquash(StoredCmd* cmd) {
DCHECK(cmd->Cid());

Expand Down Expand Up @@ -125,6 +159,9 @@ bool MultiCommandSquasher::ExecuteStandalone(StoredCmd* cmd) {
}

auto* tx = cntx_->transaction;
if (!tx->IsScheduled())
tx->Schedule();

tx->MultiSwitchCmd(cmd->Cid());
cntx_->cid = cmd->Cid();

Expand Down Expand Up @@ -193,23 +230,12 @@ bool MultiCommandSquasher::ExecuteSquashed() {
for (auto& sd : sharded_)
sd.replies.reserve(sd.cmds.size());

Transaction* tx = cntx_->transaction;
ServerState* ss = ServerState::tlocal();
ss->stats.multi_squash_executions++;
ProactorBase* proactor = ProactorBase::me();
uint64_t start = proactor->GetMonotonicTimeNs();

// Atomic transactions (that have all keys locked) perform hops and run squashed commands via
// stubs, non-atomic ones just run the commands in parallel.
if (IsAtomic()) {
cntx_->cid = base_cid_;
auto cb = [this](ShardId sid) { return !sharded_[sid].cmds.empty(); };
tx->PrepareSquashedMultiHop(base_cid_, cb);
tx->ScheduleSingleHop([this](auto* tx, auto* es) { return SquashedHopCb(tx, es); });
} else {
shard_set->RunBlockingInParallel([this, tx](auto* es) { SquashedHopCb(tx, es); },
[this](auto sid) { return !sharded_[sid].cmds.empty(); });
}
PerformHop();

uint64_t after_hop = proactor->GetMonotonicTimeNs();
bool aborted = false;
Expand Down Expand Up @@ -242,8 +268,11 @@ void MultiCommandSquasher::Run() {
DVLOG(1) << "Trying to squash " << cmds_.size() << " commands for transaction "
<< cntx_->transaction->DebugId();

for (auto& cmd : cmds_) {
auto res = TrySquash(&cmd);
while (!cmds_.empty()) {
StoredCmd* cmd = &cmds_.front();
cmds_.remove_prefix(1);

auto res = TrySquash(cmd);

if (res == SquashResult::ERROR)
break;
Expand All @@ -254,9 +283,11 @@ void MultiCommandSquasher::Run() {
}

if (res == SquashResult::NOT_SQUASHED) {
if (!ExecuteStandalone(&cmd))
if (!ExecuteStandalone(cmd))
break;
}

cmds_.remove_prefix(1);
}

ExecuteSquashed(); // Flush leftover
Expand All @@ -277,8 +308,8 @@ void MultiCommandSquasher::Run() {
}
}

VLOG(1) << "Squashed " << num_squashed_ << " of " << cmds_.size()
<< " commands, max fanout: " << num_shards_ << ", atomic: " << atomic_;
VLOG(1) << "Squashed " << num_squashed_ << " commands, max fanout: " << num_shards_
<< ", atomic: " << atomic_;
}

bool MultiCommandSquasher::IsAtomic() const {
Expand Down
2 changes: 2 additions & 0 deletions src/server/multi_command_squasher.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ class MultiCommandSquasher {
// Lazy initialize shard info.
ShardExecInfo& PrepareShardInfo(ShardId sid);

void PerformHop();

// Retrun squash flags
SquashResult TrySquash(StoredCmd* cmd);

Expand Down
25 changes: 19 additions & 6 deletions src/server/transaction.cc
Original file line number Diff line number Diff line change
Expand Up @@ -349,6 +349,13 @@ void Transaction::PrepareSquashedMultiHop(const CommandId* cid,
}
}

void Transaction::MultiSwitchToNonAtomic() {
DCHECK_EQ(multi_->mode, LOCK_AHEAD);
CHECK_EQ(coordinator_state_, 0); // not scheduled and certainly not executing
multi_->mode = NON_ATOMIC;
multi_->locks.clear();
}

void Transaction::StartMultiGlobal(DbIndex dbid) {
CHECK(multi_);
CHECK(shard_data_.empty()); // Make sure default InitByArgs didn't run.
Expand All @@ -361,7 +368,7 @@ void Transaction::StartMultiGlobal(DbIndex dbid) {
ScheduleInternal();
}

void Transaction::StartMultiLockedAhead(DbIndex dbid, CmdArgList keys) {
void Transaction::StartMultiLockedAhead(DbIndex dbid, CmdArgList keys, bool skip_scheduling) {
DVLOG(1) << "StartMultiLockedAhead on " << keys.size() << " keys";

DCHECK(multi_);
Expand All @@ -371,7 +378,8 @@ void Transaction::StartMultiLockedAhead(DbIndex dbid, CmdArgList keys) {
InitBase(dbid, keys);
InitByKeys(KeyIndex::Range(0, keys.size()));

ScheduleInternal();
if (!skip_scheduling)
ScheduleInternal();
}

void Transaction::StartMultiNonAtomic() {
Expand All @@ -382,18 +390,21 @@ void Transaction::StartMultiNonAtomic() {
void Transaction::MultiSwitchCmd(const CommandId* cid) {
DCHECK(multi_);
DCHECK(!cb_ptr_);
multi_->cmd_seq_num++;

if (multi_->role != SQUASHED_STUB) // stub transactions don't migrate between threads
// stub transactions don't migrate between threads, so keep it's index cached
if (multi_->role != SQUASHED_STUB)
unique_shard_id_ = 0;
multi_->cmd_seq_num++;

unique_shard_cnt_ = 0;

// Commands in multi are initialized with their own args and command id
args_.clear();
reverse_index_.clear();

cid_ = cid;
cb_ptr_ = nullptr;

// TODO: clarify
if (multi_->mode == NON_ATOMIC || multi_->role == SQUASHED_STUB) {
// Reset shard data without resizing because armed might be read from cancelled callbacks.
for (auto& sd : shard_data_) {
Expand All @@ -404,9 +415,11 @@ void Transaction::MultiSwitchCmd(const CommandId* cid) {
coordinator_state_ = 0;
}

// Non atomic re-schedules for each command
if (multi_->mode == NON_ATOMIC)
txid_ = 0;

// Each hop needs to be prepared, reset role
if (multi_->role == SQUASHER)
multi_->role = DEFAULT;
}
Expand Down Expand Up @@ -809,7 +822,7 @@ void Transaction::Schedule() {
if (multi_ && multi_->role == SQUASHED_STUB)
return;

if (!IsAtomicMulti())
if ((coordinator_state_ & COORD_SCHED) == 0)
ScheduleInternal();
}

Expand Down
10 changes: 9 additions & 1 deletion src/server/transaction.h
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,8 @@ class Transaction {
void StartMultiGlobal(DbIndex dbid);

// Start multi in LOCK_AHEAD mode with given keys.
void StartMultiLockedAhead(DbIndex dbid, CmdArgList keys);
// Scheduling can be optionally disabled to allow more fine-grained control.
void StartMultiLockedAhead(DbIndex dbid, CmdArgList keys, bool skip_scheduling = false);

// Start multi in NON_ATOMIC mode.
void StartMultiNonAtomic();
Expand Down Expand Up @@ -286,6 +287,10 @@ class Transaction {
return coordinator_state_ & COORD_OOO;
}

bool IsScheduled() const {
return coordinator_state_ & COORD_SCHED;
}

// If blocking tx was woken up on this shard, get wake key.
std::optional<std::string_view> GetWakeKey(ShardId sid) const;

Expand All @@ -312,6 +317,9 @@ class Transaction {
// to it must not block.
void PrepareMultiForScheduleSingleHop(ShardId sid, DbIndex db, CmdArgList args);

// Switch lock-ahead transaction to non-atomic. Can only be called *before* it was scheduled.
void MultiSwitchToNonAtomic();

// Write a journal entry to a shard journal with the given payload. When logging a non-automatic
// journal command, multiple journal entries may be necessary. In this case, call with set
// multi_commands to true and call the FinishLogJournalOnShard function after logging the final
Expand Down

0 comments on commit d3374b9

Please sign in to comment.