Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: get rid of kv_args and replace it with slices to full_args #2942

Merged
merged 1 commit into from
May 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
5 changes: 0 additions & 5 deletions src/server/command_registry.cc
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,6 @@ CommandId::CommandId(const char* name, uint32_t mask, int8_t arity, int8_t first
: facade::CommandId(name, mask, arity, first_key, last_key, acl_categories) {
if (mask & CO::ADMIN)
opt_mask_ |= CO::NOSCRIPT;

if (mask & CO::BLOCKING)
opt_mask_ |= CO::REVERSE_MAPPING;
}

bool CommandId::IsTransactional() const {
Expand Down Expand Up @@ -173,8 +170,6 @@ const char* OptName(CO::CommandOpt fl) {
return "readonly";
case DENYOOM:
return "denyoom";
case REVERSE_MAPPING:
return "reverse-mapping";
case FAST:
return "fast";
case LOADING:
Expand Down
7 changes: 2 additions & 5 deletions src/server/command_registry.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,16 +27,13 @@ enum CommandOpt : uint32_t {
LOADING = 1U << 3, // Command allowed during LOADING state.
DENYOOM = 1U << 4, // use-memory in redis.

// marked commands that demand preserve the order of keys to work correctly.
// For example, MGET needs to know the order of keys to return the values in the same order.
// BLPOP needs to know the order of keys to return the first non-empty list from the left.
REVERSE_MAPPING = 1U << 5,
// UNUSED = 1U << 5,

VARIADIC_KEYS = 1U << 6, // arg 2 determines number of keys. Relevant for ZUNIONSTORE, EVAL etc.

ADMIN = 1U << 7, // implies NOSCRIPT,
NOSCRIPT = 1U << 8,
BLOCKING = 1U << 9, // implies REVERSE_MAPPING
BLOCKING = 1U << 9,
HIDDEN = 1U << 10, // does not show in COMMAND command output
INTERLEAVED_KEYS = 1U << 11, // keys are interleaved with arguments
GLOBAL_TRANS = 1U << 12,
Expand Down
12 changes: 5 additions & 7 deletions src/server/container_utils.cc
Original file line number Diff line number Diff line change
Expand Up @@ -40,14 +40,12 @@ OpResult<std::pair<DbSlice::ConstIterator, unsigned>> FindFirstReadOnly(const Db
int req_obj_type) {
DCHECK(!args.Empty());

unsigned i = 0;
for (string_view key : args) {
OpResult<DbSlice::ConstIterator> res = db_slice.FindReadOnly(cntx, key, req_obj_type);
for (auto it = args.begin(); it != args.end(); ++it) {
OpResult<DbSlice::ConstIterator> res = db_slice.FindReadOnly(cntx, *it, req_obj_type);
if (res)
return make_pair(res.value(), i);
return make_pair(res.value(), unsigned(it.index()));
if (res.status() != OpStatus::KEY_NOTFOUND)
return res.status();
++i;
}

VLOG(2) << "FindFirst not found";
Expand Down Expand Up @@ -119,8 +117,8 @@ OpResult<ShardFFResult> FindFirstNonEmpty(Transaction* trans, int req_obj_type)
auto comp = [trans](const OpResult<FFResult>& lhs, const OpResult<FFResult>& rhs) {
if (!lhs || !rhs)
return lhs.ok();
size_t i1 = trans->ReverseArgIndex(std::get<ShardId>(*lhs), std::get<unsigned>(*lhs));
size_t i2 = trans->ReverseArgIndex(std::get<ShardId>(*rhs), std::get<unsigned>(*rhs));
size_t i1 = std::get<1>(*lhs);
size_t i2 = std::get<1>(*rhs);
return i1 < i2;
};

Expand Down
6 changes: 4 additions & 2 deletions src/server/journal/types.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,15 +42,17 @@ struct Entry : public EntryBase {
struct Payload {
std::string_view cmd;
std::variant<CmdArgList, // Parts of a full command.
ShardArgs // Command and its shard parts.
>
ShardArgs, // Shard parts.
ArgSlice>
args;

Payload() = default;
Payload(std::string_view c, CmdArgList a) : cmd(c), args(a) {
}
Payload(std::string_view c, const ShardArgs& a) : cmd(c), args(a) {
}
Payload(std::string_view c, ArgSlice a) : cmd(c), args(a) {
}
};

Entry(TxId txid, Op opcode, DbIndex dbid, uint32_t shard_cnt,
Expand Down
13 changes: 7 additions & 6 deletions src/server/json_family.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1543,12 +1543,14 @@ void JsonFamily::MGet(CmdArgList args, ConnectionContext* cntx) {
continue;

vector<OptString>& res = mget_resp[sid];
for (size_t j = 0; j < res.size(); ++j) {
if (!res[j])
ShardArgs shard_args = transaction->GetShardArgs(sid);
unsigned src_index = 0;
for (auto it = shard_args.begin(); it != shard_args.end(); ++it, ++src_index) {
if (!res[src_index])
continue;

uint32_t indx = transaction->ReverseArgIndex(sid, j);
results[indx] = std::move(res[j]);
uint32_t dst_indx = it.index();
results[dst_indx] = std::move(res[src_index]);
}
}

Expand Down Expand Up @@ -2091,8 +2093,7 @@ void JsonFamily::Register(CommandRegistry* registry) {
constexpr size_t kMsetFlags = CO::WRITE | CO::DENYOOM | CO::FAST | CO::INTERLEAVED_KEYS;
registry->StartFamily();
*registry << CI{"JSON.GET", CO::READONLY | CO::FAST, -2, 1, 1, acl::JSON}.HFUNC(Get);
*registry << CI{"JSON.MGET", CO::READONLY | CO::FAST | CO::REVERSE_MAPPING, -3, 1, -2, acl::JSON}
.HFUNC(MGet);
*registry << CI{"JSON.MGET", CO::READONLY | CO::FAST, -3, 1, -2, acl::JSON}.HFUNC(MGet);
*registry << CI{"JSON.TYPE", CO::READONLY | CO::FAST, 3, 1, 1, acl::JSON}.HFUNC(Type);
*registry << CI{"JSON.STRLEN", CO::READONLY | CO::FAST, 3, 1, 1, acl::JSON}.HFUNC(StrLen);
*registry << CI{"JSON.OBJLEN", CO::READONLY | CO::FAST, 3, 1, 1, acl::JSON}.HFUNC(ObjLen);
Expand Down
27 changes: 24 additions & 3 deletions src/server/list_family.cc
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,22 @@ struct CircularMessages {
// Used to recover logs for BLPOP failures. See OpBPop.
thread_local CircularMessages debugMessages{50};

// A bit awkward translation from a single key to ShardArgs.
// We create a mutable slice (which will never be mutated) from the key, then we create
// a CmdArgList of size 1 that references mslice and finally
// we reference the first element in the CmdArgList via islice.
struct SingleArg {
MutableSlice mslice;
IndexSlice islice{0, 1};

SingleArg(string_view arg) : mslice(const_cast<char*>(arg.data()), arg.size()) {
}

ShardArgs Get() {
return ShardArgs{CmdArgList{&mslice, 1}, absl::MakeSpan(&islice, 1)};
}
};

Comment on lines +161 to +176
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's used only for smalle edge cases, so a string would do the job instead of a const cast

once we have this pr, we could even try to remove mutability from ArgSlice alltogether 🤷🏻‍♂️

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i will keep const cast as it's harmless anyways

class BPopPusher {
public:
BPopPusher(string_view pop_key, string_view push_key, ListDir popdir, ListDir pushdir);
Expand Down Expand Up @@ -448,7 +464,9 @@ OpResult<string> MoveTwoShards(Transaction* trans, string_view src, string_view
// hack, again. since we hacked which queue we are waiting on (see RunPair)
// we must clean-up src key here manually. See RunPair why we do this.
// in short- we suspended on "src" on both shards.
shard->blocking_controller()->FinalizeWatched(ArgSlice{&src, 1}, t);

SingleArg single_arg{src};
shard->blocking_controller()->FinalizeWatched(single_arg.Get(), t);
}
} else {
DVLOG(1) << "Popping value from list: " << key;
Expand Down Expand Up @@ -873,7 +891,8 @@ OpResult<string> BPopPusher::RunSingle(ConnectionContext* cntx, time_point tp) {
return op_res;
}

auto wcb = [&](Transaction* t, EngineShard* shard) { return ShardArgs{&this->pop_key_, 1}; };
SingleArg single_arg{pop_key_};
auto wcb = [&](Transaction* t, EngineShard* shard) { return single_arg.Get(); };

const auto key_checker = [](EngineShard* owner, const DbContext& context, Transaction*,
std::string_view key) -> bool {
Expand All @@ -900,11 +919,13 @@ OpResult<string> BPopPusher::RunPair(ConnectionContext* cntx, time_point tp) {
return op_res;
}

SingleArg single_arg(this->pop_key_);

// a hack: we watch in both shards for pop_key but only in the source shard it's relevant.
// Therefore we follow the regular flow of watching the key but for the destination shard it
// will never be triggerred.
// This allows us to run Transaction::Execute on watched transactions in both shards.
auto wcb = [&](Transaction* t, EngineShard* shard) { return ArgSlice{&this->pop_key_, 1}; };
auto wcb = [&](Transaction* t, EngineShard* shard) { return single_arg.Get(); };

const auto key_checker = [](EngineShard* owner, const DbContext& context, Transaction*,
std::string_view key) -> bool {
Expand Down
12 changes: 7 additions & 5 deletions src/server/stream_family.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2989,17 +2989,19 @@ void XReadImpl(CmdArgList args, std::optional<ReadOpts> opts, ConnectionContext*
continue;

vector<RecordVec>& results = xread_resp[sid];
unsigned src_index = 0;
ShardArgs shard_args = cntx->transaction->GetShardArgs(sid);

for (size_t i = 0; i < results.size(); ++i) {
if (results[i].size() == 0) {
for (auto it = shard_args.begin(); it != shard_args.end(); ++it, ++src_index) {
if (results[src_index].size() == 0) {
continue;
}

resolved_streams++;

// Add the stream records ordered by the original stream arguments.
size_t indx = cntx->transaction->ReverseArgIndex(sid, i);
res[indx - opts->streams_arg] = std::move(results[i]);
size_t dst_indx = it.index();
res[dst_indx - opts->streams_arg] = std::move(results[src_index]);
}
}

Expand Down Expand Up @@ -3323,7 +3325,7 @@ constexpr uint32_t kXAutoClaim = WRITE | STREAM | FAST;
void StreamFamily::Register(CommandRegistry* registry) {
using CI = CommandId;
registry->StartFamily();
constexpr auto kReadFlags = CO::READONLY | CO::BLOCKING | CO::REVERSE_MAPPING | CO::VARIADIC_KEYS;
constexpr auto kReadFlags = CO::READONLY | CO::BLOCKING | CO::VARIADIC_KEYS;
*registry << CI{"XADD", CO::WRITE | CO::DENYOOM | CO::FAST, -5, 1, 1, acl::kXAdd}.HFUNC(XAdd)
<< CI{"XCLAIM", CO::WRITE | CO::FAST, -6, 1, 1, acl::kXClaim}.HFUNC(XClaim)
<< CI{"XDEL", CO::WRITE | CO::FAST, -3, 1, 1, acl::kXDel}.HFUNC(XDel)
Expand Down
50 changes: 31 additions & 19 deletions src/server/string_family.cc
Original file line number Diff line number Diff line change
Expand Up @@ -275,13 +275,15 @@ void OpMSet(const OpArgs& op_args, const ShardArgs& args, atomic_bool* success)
SetCmd sg(op_args, false);

size_t index = 0;
bool partial = false;
for (auto it = args.begin(); it != args.end(); ++it) {
string_view key = *it;
++it;
string_view value = *it;
DVLOG(1) << "MSet " << key << ":" << value;
if (sg.Set(params, key, value) != OpStatus::OK) { // OOM for example.
success->store(false);
partial = true;
break;
}
index += 2;
Expand All @@ -290,18 +292,29 @@ void OpMSet(const OpArgs& op_args, const ShardArgs& args, atomic_bool* success)
if (auto journal = op_args.shard->journal(); journal) {
// We write a custom journal because an OOM in the above loop could lead to partial success, so
// we replicate only what was changed.
string_view cmd;
ArgSlice cmd_args;
if (index == 0) {
// All shards must record the tx was executed for the replica to execute it, so we send a PING
// in case nothing was changed
cmd = "PING";
if (partial) {
string_view cmd;
ArgSlice cmd_args;
vector<string_view> store_args(index);
if (index == 0) {
// All shards must record the tx was executed for the replica to execute it, so we send a
// PING in case nothing was changed
cmd = "PING";
} else {
// journal [0, i)
cmd = "MSET";
unsigned i = 0;
for (string_view arg : args) {
store_args[i++] = arg;
if (i >= store_args.size())
break;
}
cmd_args = absl::MakeSpan(store_args);
}
RecordJournal(op_args, cmd, cmd_args, op_args.tx->GetUniqueShardCnt());
} else {
// journal [0, i)
cmd = "MSET";
cmd_args = ArgSlice(args.begin(), index);
RecordJournal(op_args, "MSET", args, op_args.tx->GetUniqueShardCnt());
}
RecordJournal(op_args, cmd, cmd_args, op_args.tx->GetUniqueShardCnt());
}
}

Expand Down Expand Up @@ -1163,16 +1176,17 @@ void StringFamily::MGet(CmdArgList args, ConnectionContext* cntx) {
src.storage_list->next = res.storage_list;
res.storage_list = src.storage_list;
src.storage_list = nullptr;

for (size_t j = 0; j < src.resp_arr.size(); ++j) {
if (!src.resp_arr[j])
ShardArgs shard_args = transaction->GetShardArgs(sid);
unsigned src_indx = 0;
for (auto it = shard_args.begin(); it != shard_args.end(); ++it, ++src_indx) {
if (!src.resp_arr[src_indx])
continue;

uint32_t indx = transaction->ReverseArgIndex(sid, j);
uint32_t indx = it.index();

res.resp_arr[indx] = std::move(src.resp_arr[j]);
res.resp_arr[indx] = std::move(src.resp_arr[src_indx]);
if (cntx->protocol() == Protocol::MEMCACHE) {
res.resp_arr[indx]->key = ArgS(args, indx);
res.resp_arr[indx]->key = *it;
}
}
}
Expand Down Expand Up @@ -1488,9 +1502,7 @@ void StringFamily::Register(CommandRegistry* registry) {
<< CI{"GETEX", CO::WRITE | CO::DENYOOM | CO::FAST | CO::NO_AUTOJOURNAL, -1, 1, 1, acl::kGetEx}
.HFUNC(GetEx)
<< CI{"GETSET", CO::WRITE | CO::DENYOOM | CO::FAST, 3, 1, 1, acl::kGetSet}.HFUNC(GetSet)
<< CI{"MGET", CO::READONLY | CO::FAST | CO::REVERSE_MAPPING | CO::IDEMPOTENT, -2, 1, -1,
acl::kMGet}
.HFUNC(MGet)
<< CI{"MGET", CO::READONLY | CO::FAST | CO::IDEMPOTENT, -2, 1, -1, acl::kMGet}.HFUNC(MGet)
<< CI{"MSET", kMSetMask, -3, 1, -1, acl::kMSet}.HFUNC(MSet)
<< CI{"MSETNX", kMSetMask, -3, 1, -1, acl::kMSetNx}.HFUNC(MSetNx)
<< CI{"STRLEN", CO::READONLY | CO::FAST, 2, 1, 1, acl::kStrLen}.HFUNC(StrLen)
Expand Down