Skip to content

Commit

Permalink
chore: get rid of kv_args and replace it with slices to full_args
Browse files Browse the repository at this point in the history
Signed-off-by: Roman Gershman <[email protected]>
  • Loading branch information
romange committed Apr 27, 2024
1 parent bbe6c85 commit 0be3cd3
Show file tree
Hide file tree
Showing 9 changed files with 292 additions and 139 deletions.
2 changes: 1 addition & 1 deletion src/server/journal/serializer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ void JournalWriter::Write(std::string_view sv) {
template <typename C> void JournalWriter::Write(std::pair<std::string_view, C> args) {
auto [cmd, tail_args] = args;

Write(1 + tail_args.size());
Write(1 + size(tail_args);

size_t cmd_size = cmd.size();
for (auto v : tail_args) {
Expand Down
69 changes: 45 additions & 24 deletions src/server/journal/types.cc
Original file line number Diff line number Diff line change
Expand Up @@ -4,39 +4,60 @@

#include "server/journal/types.h"

#include "core/overloaded.h"
#include "server/cluster/cluster_defs.h"

namespace dfly::journal {

std::string Entry::ToString() const {
std::string rv = absl::StrCat("{op=", opcode, ", dbid=", dbid);
std::visit(
[&rv](const auto& payload) {
if constexpr (std::is_same_v<std::decay_t<decltype(payload)>, std::monostate>) {
absl::StrAppend(&rv, ", empty");
} else {
const auto& [cmd, args] = payload;
absl::StrAppend(&rv, ", cmd='");
absl::StrAppend(&rv, cmd);
absl::StrAppend(&rv, "', args=[");
for (size_t i = 0; i < args.size(); i++) {
absl::StrAppend(&rv, "'");
absl::StrAppend(&rv, facade::ToSV(args[i]));
absl::StrAppend(&rv, "'");
if (i + 1 != args.size())
absl::StrAppend(&rv, ", ");
}
absl::StrAppend(&rv, "]");
}
},
payload);
using namespace std;

void AppendPrefix(string_view cmd, string* dest) {
absl::StrAppend(dest, ", cmd='");
absl::StrAppend(dest, cmd);
absl::StrAppend(dest, "', args=[");
}

void AppendArg(string_view cmd, string* dest) {
absl::StrAppend(dest, "'");
absl::StrAppend(dest, cmd);
absl::StrAppend(dest, "',");
}

void AppendSuffix(string* dest) {
if (dest->back() == ',')
dest->pop_back();
absl::StrAppend(dest, "]");
}

string Entry::ToString() const {
string rv = absl::StrCat("{op=", opcode, ", dbid=", dbid);
visit(Overloaded{
[&rv](const monostate& payload) { absl::StrAppend(&rv, ", empty"); },
[&rv](const pair<std::string_view, CmdArgList>& payload) {
const auto& [cmd, args] = payload;
AppendPrefix(cmd, &rv);
for (MutableSlice arg : args) {
AppendArg(facade::ToSV(arg), &rv);
}
AppendSuffix(&rv);
},
[&rv](const pair<std::string_view, ShardArgs>& payload) {
const auto& [cmd, args] = payload;
AppendPrefix(cmd, &rv);
for (string_view arg : args) {
AppendArg(arg, &rv);
}
AppendSuffix(&rv);
},
},
payload);

rv += "}";
return rv;
}

std::string ParsedEntry::ToString() const {
std::string rv = absl::StrCat("{op=", opcode, ", dbid=", dbid, ", cmd='");
string ParsedEntry::ToString() const {
string rv = absl::StrCat("{op=", opcode, ", dbid=", dbid, ", cmd='");
for (auto& arg : cmd.cmd_args) {
absl::StrAppend(&rv, facade::ToSV(arg));
absl::StrAppend(&rv, " ");
Expand Down
2 changes: 1 addition & 1 deletion src/server/journal/types.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ struct Entry : public EntryBase {
using Payload =
std::variant<std::monostate, // No payload.
std::pair<std::string_view, CmdArgList>, // Parts of a full command.
std::pair<std::string_view, ArgSlice> // Command and its shard parts.
std::pair<std::string_view, ShardArgs> // Command and its shard parts.
>;

Entry(TxId txid, Op opcode, DbIndex dbid, uint32_t shard_cnt,
Expand Down
24 changes: 21 additions & 3 deletions src/server/list_family.cc
Original file line number Diff line number Diff line change
Expand Up @@ -448,7 +448,15 @@ OpResult<string> MoveTwoShards(Transaction* trans, string_view src, string_view
// hack, again. since we hacked which queue we are waiting on (see RunPair)
// we must clean-up src key here manually. See RunPair why we do this.
// in short- we suspended on "src" on both shards.
shard->blocking_controller()->FinalizeWatched(ArgSlice{&src, 1}, t);
//
// A very awakward translation from a single key to ShardArgs.
// We create a mutable slice (which will never be mutated) from the key, then we create
// a CmdArgList of size 1 that references mslice and finally
// we reference the first element in the CmdArgList via islice.
MutableSlice mslice(const_cast<char*>(src.data()), src.size());
IndexSlice islice(0, 1);
ShardArgs shard_args{CmdArgList{&mslice, 1}, absl::MakeSpan(&islice, 1)};
shard->blocking_controller()->FinalizeWatched(shard_args, t);
}
} else {
DVLOG(1) << "Popping value from list: " << key;
Expand Down Expand Up @@ -873,7 +881,15 @@ OpResult<string> BPopPusher::RunSingle(ConnectionContext* cntx, time_point tp) {
return op_res;
}

auto wcb = [&](Transaction* t, EngineShard* shard) { return ShardArgs{&this->pop_key_, 1}; };
// Very awakward translation from a single key to ShardArgs.
// We create a mutable slice (which will never be mutated) from the key, then we create
// a CmdArgList of size 1 that references mslice and finally we reference the first element in
// the CmdArgList via islice.
MutableSlice mslice(const_cast<char*>(this->pop_key_.data()), this->pop_key_.size());
IndexSlice islice(0, 1);
auto wcb = [&](Transaction* t, EngineShard* shard) {
return ShardArgs(facade::CmdArgList{&mslice, 1}, absl::MakeSpan(&islice, 1));
};

const auto key_checker = [](EngineShard* owner, const DbContext& context, Transaction*,
std::string_view key) -> bool {
Expand Down Expand Up @@ -904,7 +920,9 @@ OpResult<string> BPopPusher::RunPair(ConnectionContext* cntx, time_point tp) {
// Therefore we follow the regular flow of watching the key but for the destination shard it
// will never be triggerred.
// This allows us to run Transaction::Execute on watched transactions in both shards.
auto wcb = [&](Transaction* t, EngineShard* shard) { return ArgSlice{&this->pop_key_, 1}; };
auto wcb = [&](Transaction* t, EngineShard* shard) -> ShardArgs {
return ArgSlice{&this->pop_key_, 1};
};

const auto key_checker = [](EngineShard* owner, const DbContext& context, Transaction*,
std::string_view key) -> bool {
Expand Down
38 changes: 26 additions & 12 deletions src/server/string_family.cc
Original file line number Diff line number Diff line change
Expand Up @@ -275,13 +275,15 @@ void OpMSet(const OpArgs& op_args, const ShardArgs& args, atomic_bool* success)
SetCmd sg(op_args, false);

size_t index = 0;
bool partial = false;
for (auto it = args.begin(); it != args.end(); ++it) {
string_view key = *it;
++it;
string_view value = *it;
DVLOG(1) << "MSet " << key << ":" << value;
if (sg.Set(params, key, value) != OpStatus::OK) { // OOM for example.
success->store(false);
partial = true;
break;
}
index += 2;
Expand All @@ -290,18 +292,29 @@ void OpMSet(const OpArgs& op_args, const ShardArgs& args, atomic_bool* success)
if (auto journal = op_args.shard->journal(); journal) {
// We write a custom journal because an OOM in the above loop could lead to partial success, so
// we replicate only what was changed.
string_view cmd;
ArgSlice cmd_args;
if (index == 0) {
// All shards must record the tx was executed for the replica to execute it, so we send a PING
// in case nothing was changed
cmd = "PING";
if (partial) {
string_view cmd;
ArgSlice cmd_args;
vector<string_view> store_args(index);
if (index == 0) {
// All shards must record the tx was executed for the replica to execute it, so we send a
// PING in case nothing was changed
cmd = "PING";
} else {
// journal [0, i)
cmd = "MSET";
unsigned i = 0;
for (string_view arg : args) {
store_args[i++] = arg;
if (i >= store_args.size())
break;
}
cmd_args = absl::MakeSpan(store_args);
}
RecordJournal(op_args, cmd, cmd_args, op_args.tx->GetUniqueShardCnt());
} else {
// journal [0, i)
cmd = "MSET";
cmd_args = ArgSlice(args.begin(), index);
RecordJournal(op_args, "MSET", args, op_args.tx->GetUniqueShardCnt());
}
RecordJournal(op_args, cmd, cmd_args, op_args.tx->GetUniqueShardCnt());
}
}

Expand Down Expand Up @@ -990,11 +1003,12 @@ void StringFamily::GetEx(CmdArgList args, ConnectionContext* cntx) {
// Replicate GETEX as PEXPIREAT or PERSIST
if (shard->journal()) {
if (exp_params.persist) {
RecordJournal(op_args, "PERSIST", {key});
RecordJournal(op_args, "PERSIST", ArgSlice{key});
} else {
auto [ignore, abs_time] = exp_params.Calculate(op_args.db_cntx.time_now_ms);
auto abs_time_str = absl::StrCat(abs_time);
RecordJournal(op_args, "PEXPIREAT", {key, abs_time_str});
string_view jargs[2] = {key, abs_time_str};
RecordJournal(op_args, "PEXPIREAT", ArgSlice{jargs});
}
}

Expand Down

0 comments on commit 0be3cd3

Please sign in to comment.