Skip to content

Commit

Permalink
feat(treansaction): Use single hop in squashing when possible
Browse files Browse the repository at this point in the history
Signed-off-by: Vladislav Oleshko <[email protected]>
  • Loading branch information
dranikpg committed Jan 31, 2024
1 parent 90a9f05 commit 3228c04
Show file tree
Hide file tree
Showing 10 changed files with 263 additions and 218 deletions.
4 changes: 4 additions & 0 deletions src/server/command_registry.h
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,10 @@ class CommandId : public facade::CommandId {

bool IsTransactional() const;

bool IsMultiTransactional() const {
return CO::IsTransKind(name()) || CO::IsEvalKind(name());
}

bool IsReadOnly() const {
return opt_mask_ & CO::READONLY;
}
Expand Down
4 changes: 2 additions & 2 deletions src/server/conn_context.cc
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ ConnectionContext::ConnectionContext(const ConnectionContext* owner, Transaction
if (tx) { // If we have a carrier transaction, this context is used for squashing
DCHECK(owner);
conn_state.db_index = owner->conn_state.db_index;
conn_state.squashing_info = {owner};
conn_state.squashing_info = {owner, owner->transaction};
}
auto* prev_reply_builder = Inject(crb);
CHECK_EQ(prev_reply_builder, nullptr);
Expand Down Expand Up @@ -234,7 +234,7 @@ size_t ConnectionState::ExecInfo::UsedMemory() const {
}

size_t ConnectionState::ScriptInfo::UsedMemory() const {
return dfly::HeapSize(keys) + async_cmds_heap_mem;
return async_cmds_heap_mem;
}

size_t ConnectionState::SubscribeInfo::UsedMemory() const {
Expand Down
9 changes: 4 additions & 5 deletions src/server/conn_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -101,8 +101,6 @@ struct ConnectionState {
struct ScriptInfo {
size_t UsedMemory() const;

absl::flat_hash_set<std::string_view> keys; // declared keys

size_t async_cmds_heap_mem = 0; // bytes used by async_cmds
size_t async_cmds_heap_limit = 0; // max bytes allowed for async_cmds
std::vector<StoredCmd> async_cmds; // aggregated by acall
Expand Down Expand Up @@ -135,10 +133,11 @@ struct ConnectionState {
};

struct SquashingInfo {
// Pointer to the original underlying context of the base command.
// Only const access it possible for reading from multiple threads,
// each squashing thread has its own proxy context that contains this info.
// Underlying context of the base command, should be used for state checks.
// Note: some squashing mechanisms re-use the context (single shard eval).
const ConnectionContext* owner = nullptr;
// Underlying base transaction of squashing mechanism.
const Transaction* transaction = nullptr;
};

enum MCGetMask {
Expand Down
2 changes: 1 addition & 1 deletion src/server/journal/tx_executor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ std::optional<TransactionData> TransactionReader::NextTxData(JournalReader* read

// Otherwise, continue building multi command.
DCHECK(res->opcode == journal::Op::MULTI_COMMAND || res->opcode == journal::Op::EXEC);
DCHECK(res->txid > 0);
// DCHECK(res->txid > 0);

auto txid = res->txid;
auto& txdata = current_[txid];
Expand Down

0 comments on commit 3228c04

Please sign in to comment.