Skip to content

Commit

Permalink
chore: get rid of kv_args and replace it with slices to full_args
Browse files Browse the repository at this point in the history
Signed-off-by: Roman Gershman <[email protected]>
  • Loading branch information
romange committed May 2, 2024
1 parent 10cf2b6 commit 27b9d50
Show file tree
Hide file tree
Showing 11 changed files with 228 additions and 155 deletions.
12 changes: 5 additions & 7 deletions src/server/container_utils.cc
Original file line number Diff line number Diff line change
Expand Up @@ -40,14 +40,12 @@ OpResult<std::pair<DbSlice::ConstIterator, unsigned>> FindFirstReadOnly(const Db
int req_obj_type) {
DCHECK(!args.Empty());

unsigned i = 0;
for (string_view key : args) {
OpResult<DbSlice::ConstIterator> res = db_slice.FindReadOnly(cntx, key, req_obj_type);
for (auto it = args.begin(); it != args.end(); ++it) {
OpResult<DbSlice::ConstIterator> res = db_slice.FindReadOnly(cntx, *it, req_obj_type);
if (res)
return make_pair(res.value(), i);
return make_pair(res.value(), unsigned(it.index()));
if (res.status() != OpStatus::KEY_NOTFOUND)
return res.status();
++i;
}

VLOG(2) << "FindFirst not found";
Expand Down Expand Up @@ -119,8 +117,8 @@ OpResult<ShardFFResult> FindFirstNonEmpty(Transaction* trans, int req_obj_type)
auto comp = [trans](const OpResult<FFResult>& lhs, const OpResult<FFResult>& rhs) {
if (!lhs || !rhs)
return lhs.ok();
size_t i1 = trans->ReverseArgIndex(std::get<ShardId>(*lhs), std::get<unsigned>(*lhs));
size_t i2 = trans->ReverseArgIndex(std::get<ShardId>(*rhs), std::get<unsigned>(*rhs));
size_t i1 = std::get<1>(*lhs);
size_t i2 = std::get<1>(*rhs);
return i1 < i2;
};

Expand Down
8 changes: 5 additions & 3 deletions src/server/journal/types.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,15 +42,17 @@ struct Entry : public EntryBase {
struct Payload {
std::string_view cmd;
std::variant<CmdArgList, // Parts of a full command.
ShardArgs // Command and its shard parts.
>
args;
ShardArgs, // Command and its shard parts.
ArgSlice>
args; // Command and its shard parts.

Payload() = default;
Payload(std::string_view c, CmdArgList a) : cmd(c), args(a) {
}
Payload(std::string_view c, const ShardArgs& a) : cmd(c), args(a) {
}
Payload(std::string_view c, ArgSlice a) : cmd(c), args(a) {
}
};

Entry(TxId txid, Op opcode, DbIndex dbid, uint32_t shard_cnt,
Expand Down
10 changes: 6 additions & 4 deletions src/server/json_family.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1543,12 +1543,14 @@ void JsonFamily::MGet(CmdArgList args, ConnectionContext* cntx) {
continue;

vector<OptString>& res = mget_resp[sid];
for (size_t j = 0; j < res.size(); ++j) {
if (!res[j])
ShardArgs shard_args = transaction->GetShardArgs(sid);
unsigned src_index = 0;
for (auto it = shard_args.begin(); it != shard_args.end(); ++it, ++src_index) {
if (!res[src_index])
continue;

uint32_t indx = transaction->ReverseArgIndex(sid, j);
results[indx] = std::move(res[j]);
uint32_t dst_indx = it.index();
results[dst_indx] = std::move(res[src_index]);
}
}

Expand Down
28 changes: 25 additions & 3 deletions src/server/list_family.cc
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,22 @@ struct CircularMessages {
// Used to recover logs for BLPOP failures. See OpBPop.
thread_local CircularMessages debugMessages{50};

// A bit awakward translation from a single key to ShardArgs.
// We create a mutable slice (which will never be mutated) from the key, then we create
// a CmdArgList of size 1 that references mslice and finally
// we reference the first element in the CmdArgList via islice.
struct SingleArg {
MutableSlice mslice;
IndexSlice islice{0, 1};

SingleArg(string_view arg) : mslice(const_cast<char*>(arg.data()), arg.size()) {
}

ShardArgs Get() {
return ShardArgs{CmdArgList{&mslice, 1}, absl::MakeSpan(&islice, 1)};
}
};

class BPopPusher {
public:
BPopPusher(string_view pop_key, string_view push_key, ListDir popdir, ListDir pushdir);
Expand Down Expand Up @@ -448,7 +464,10 @@ OpResult<string> MoveTwoShards(Transaction* trans, string_view src, string_view
// hack, again. since we hacked which queue we are waiting on (see RunPair)
// we must clean-up src key here manually. See RunPair why we do this.
// in short- we suspended on "src" on both shards.
shard->blocking_controller()->FinalizeWatched(ArgSlice{&src, 1}, t);
//

SingleArg single_arg{src};
shard->blocking_controller()->FinalizeWatched(single_arg.Get(), t);
}
} else {
DVLOG(1) << "Popping value from list: " << key;
Expand Down Expand Up @@ -873,7 +892,8 @@ OpResult<string> BPopPusher::RunSingle(ConnectionContext* cntx, time_point tp) {
return op_res;
}

auto wcb = [&](Transaction* t, EngineShard* shard) { return ShardArgs{&this->pop_key_, 1}; };
SingleArg single_arg{pop_key_};
auto wcb = [&](Transaction* t, EngineShard* shard) { return single_arg.Get(); };

const auto key_checker = [](EngineShard* owner, const DbContext& context, Transaction*,
std::string_view key) -> bool {
Expand All @@ -900,11 +920,13 @@ OpResult<string> BPopPusher::RunPair(ConnectionContext* cntx, time_point tp) {
return op_res;
}

SingleArg single_arg(this->pop_key_);

// a hack: we watch in both shards for pop_key but only in the source shard it's relevant.
// Therefore we follow the regular flow of watching the key but for the destination shard it
// will never be triggerred.
// This allows us to run Transaction::Execute on watched transactions in both shards.
auto wcb = [&](Transaction* t, EngineShard* shard) { return ArgSlice{&this->pop_key_, 1}; };
auto wcb = [&](Transaction* t, EngineShard* shard) -> ShardArgs { return single_arg.Get(); };

const auto key_checker = [](EngineShard* owner, const DbContext& context, Transaction*,
std::string_view key) -> bool {
Expand Down
10 changes: 6 additions & 4 deletions src/server/stream_family.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2989,17 +2989,19 @@ void XReadImpl(CmdArgList args, std::optional<ReadOpts> opts, ConnectionContext*
continue;

vector<RecordVec>& results = xread_resp[sid];
unsigned src_index = 0;
ShardArgs shard_args = cntx->transaction->GetShardArgs(sid);

for (size_t i = 0; i < results.size(); ++i) {
if (results[i].size() == 0) {
for (auto it = shard_args.begin(); it != shard_args.end(); ++it, ++src_index) {
if (results[src_index].size() == 0) {
continue;
}

resolved_streams++;

// Add the stream records ordered by the original stream arguments.
size_t indx = cntx->transaction->ReverseArgIndex(sid, i);
res[indx - opts->streams_arg] = std::move(results[i]);
size_t dst_indx = it.index();
res[dst_indx - opts->streams_arg] = std::move(results[src_index]);
}
}

Expand Down
46 changes: 30 additions & 16 deletions src/server/string_family.cc
Original file line number Diff line number Diff line change
Expand Up @@ -275,13 +275,15 @@ void OpMSet(const OpArgs& op_args, const ShardArgs& args, atomic_bool* success)
SetCmd sg(op_args, false);

size_t index = 0;
bool partial = false;
for (auto it = args.begin(); it != args.end(); ++it) {
string_view key = *it;
++it;
string_view value = *it;
DVLOG(1) << "MSet " << key << ":" << value;
if (sg.Set(params, key, value) != OpStatus::OK) { // OOM for example.
success->store(false);
partial = true;
break;
}
index += 2;
Expand All @@ -290,18 +292,29 @@ void OpMSet(const OpArgs& op_args, const ShardArgs& args, atomic_bool* success)
if (auto journal = op_args.shard->journal(); journal) {
// We write a custom journal because an OOM in the above loop could lead to partial success, so
// we replicate only what was changed.
string_view cmd;
ArgSlice cmd_args;
if (index == 0) {
// All shards must record the tx was executed for the replica to execute it, so we send a PING
// in case nothing was changed
cmd = "PING";
if (partial) {
string_view cmd;
ArgSlice cmd_args;
vector<string_view> store_args(index);
if (index == 0) {
// All shards must record the tx was executed for the replica to execute it, so we send a
// PING in case nothing was changed
cmd = "PING";
} else {
// journal [0, i)
cmd = "MSET";
unsigned i = 0;
for (string_view arg : args) {
store_args[i++] = arg;
if (i >= store_args.size())
break;
}
cmd_args = absl::MakeSpan(store_args);
}
RecordJournal(op_args, cmd, cmd_args, op_args.tx->GetUniqueShardCnt());
} else {
// journal [0, i)
cmd = "MSET";
cmd_args = ArgSlice(args.begin(), index);
RecordJournal(op_args, "MSET", args, op_args.tx->GetUniqueShardCnt());
}
RecordJournal(op_args, cmd, cmd_args, op_args.tx->GetUniqueShardCnt());
}
}

Expand Down Expand Up @@ -1163,16 +1176,17 @@ void StringFamily::MGet(CmdArgList args, ConnectionContext* cntx) {
src.storage_list->next = res.storage_list;
res.storage_list = src.storage_list;
src.storage_list = nullptr;

for (size_t j = 0; j < src.resp_arr.size(); ++j) {
if (!src.resp_arr[j])
ShardArgs shard_args = transaction->GetShardArgs(sid);
unsigned src_indx = 0;
for (auto it = shard_args.begin(); it != shard_args.end(); ++it, ++src_indx) {
if (!src.resp_arr[src_indx])
continue;

uint32_t indx = transaction->ReverseArgIndex(sid, j);
uint32_t indx = it.index();

res.resp_arr[indx] = std::move(src.resp_arr[j]);
res.resp_arr[indx] = std::move(src.resp_arr[src_indx]);
if (cntx->protocol() == Protocol::MEMCACHE) {
res.resp_arr[indx]->key = ArgS(args, indx);
res.resp_arr[indx]->key = *it;
}
}
}
Expand Down

0 comments on commit 27b9d50

Please sign in to comment.