Skip to content

Commit

Permalink
Merge 'RPC delay metrics' from Pavel Emelyanov
Browse files Browse the repository at this point in the history
The new metrics is total time it takes an RPC message to travel back and forth between verb caller and server. It implicitly consists of the time request and response messages spend in seastar and kernel queues on both sides and classical network RTT.

Calculated for verbs that require response only. First, client negotiates with the server that response frame includes the "handler duration" value, which is the handler callback execution time. Then client notices duration between sending request and receiving response minus handler execution time received from server. This is called "delay sample". Total number of samples and their duration summary are exported as counters in the metrics domain.

refs: #323
refs: #1753 (for what metrics domain is)

Closes #2293

* github.com:scylladb/seastar:
  test,rpc: Extend simple ping-pong case
  rpc: Calculate delay and export it via metrics
  rpc: Exchange handler duration with server responses
  rpc: Track handler execution time
  rpc: Fix hard-coded constants when sending unknown verb reply
  • Loading branch information
avikivity committed Jul 3, 2024
2 parents e057679 + 595ae18 commit 826bc0b
Show file tree
Hide file tree
Showing 6 changed files with 129 additions and 27 deletions.
12 changes: 12 additions & 0 deletions doc/rpc.md
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,14 @@ Actual negotiation looks like this:
The server does not directly assign meaning to values of `isolation_cookie`;
instead, the interpretation is left to user code.

#### Handler duration
feature number: 5
data: none

Asks server to send "extended" response that includes the handler duration time. See
the response frame description for more details


##### Compressed frame format
uint32_t len
uint8_t compressed_data[len]
Expand All @@ -111,11 +119,15 @@ data is transparent for the protocol and serialized/deserialized by a user
## Response frame format
int64_t msg_id
uint32_t len
uint32_t handler_duration - present if handler duration is negotiated
uint8_t data[len]

if msg_id < 0 enclosed response contains an exception that came as a response to msg id abs(msg_id)
data is transparent for the protocol and serialized/deserialized by a user

the handler_duration is in microseconds, the value of 0xffffffff means that it wasn't measured
and should be disregarded by client

## Stream frame format
uint32_t len
uint8_t data[len]
Expand Down
12 changes: 10 additions & 2 deletions include/seastar/rpc/rpc.hh
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ struct client_options {
/// \see resource_limits::isolate_connection
sstring isolation_cookie;
sstring metrics_domain = "default";
bool send_handler_duration = true;
};

/// @}
Expand Down Expand Up @@ -179,6 +180,7 @@ enum class protocol_features : uint32_t {
CONNECTION_ID = 2,
STREAM_PARENT = 3,
ISOLATION = 4,
HANDLER_DURATION = 5,
};

// internal representation of feature data
Expand Down Expand Up @@ -285,6 +287,7 @@ protected:
std::unique_ptr<compressor> _compressor;
bool _propagate_timeout = false;
bool _timeout_negotiated = false;
bool _handler_duration_negotiated = false;
// stream related fields
bool _is_stream = false;
connection_id _id = invalid_connection_id;
Expand Down Expand Up @@ -427,6 +430,7 @@ class client : public rpc::connection, public weakly_referencable<client> {
struct reply_handler_base {
timer<rpc_clock_type> t;
cancellable* pcancel = nullptr;
rpc_clock_type::time_point start;
virtual void operator()(client&, id_type, rcv_buf data) = 0;
virtual void timeout() {}
virtual void cancel() {}
Expand Down Expand Up @@ -487,7 +491,11 @@ private:
private:
future<> negotiate_protocol(feature_map map);
void negotiate(feature_map server_features);
future<std::tuple<int64_t, std::optional<rcv_buf>>>
// Returned future is
// - message id
// - optional server-side handler duration
// - message payload
future<std::tuple<int64_t, std::optional<uint32_t>, std::optional<rcv_buf>>>
read_response_frame_compressed(input_stream<char>& in);
public:
/**
Expand Down Expand Up @@ -592,7 +600,7 @@ public:
public:
connection(server& s, connected_socket&& fd, socket_address&& addr, const logger& l, void* seralizer, connection_id id);
future<> process();
future<> respond(int64_t msg_id, snd_buf&& data, std::optional<rpc_clock_type::time_point> timeout);
future<> respond(int64_t msg_id, snd_buf&& data, std::optional<rpc_clock_type::time_point> timeout, std::optional<rpc_clock_type::duration> handler_duration);
client_info& info() { return _info; }
const client_info& info() const { return _info; }
stats get_stats() const {
Expand Down
24 changes: 14 additions & 10 deletions include/seastar/rpc/rpc_impl.hh
Original file line number Diff line number Diff line change
Expand Up @@ -414,7 +414,7 @@ template<typename Serializer>
struct rcv_reply<Serializer, future<>> : rcv_reply<Serializer, void> {};

template <typename Serializer, typename Ret, typename... InArgs>
inline auto wait_for_reply(wait_type, std::optional<rpc_clock_type::time_point> timeout, cancellable* cancel, rpc::client& dst, id_type msg_id,
inline auto wait_for_reply(wait_type, std::optional<rpc_clock_type::time_point> timeout, rpc_clock_type::time_point start, cancellable* cancel, rpc::client& dst, id_type msg_id,
signature<Ret (InArgs...)>) {
using reply_type = rcv_reply<Serializer, Ret>;
auto lambda = [] (reply_type& r, rpc::client& dst, id_type msg_id, rcv_buf data) mutable {
Expand All @@ -429,13 +429,14 @@ inline auto wait_for_reply(wait_type, std::optional<rpc_clock_type::time_point>
};
using handler_type = typename rpc::client::template reply_handler<reply_type, decltype(lambda)>;
auto r = std::make_unique<handler_type>(std::move(lambda));
r->start = start;
auto fut = r->reply.p.get_future();
dst.wait_for_reply(msg_id, std::move(r), timeout, cancel);
return fut;
}

template<typename Serializer, typename... InArgs>
inline auto wait_for_reply(no_wait_type, std::optional<rpc_clock_type::time_point>, cancellable*, rpc::client&, id_type,
inline auto wait_for_reply(no_wait_type, std::optional<rpc_clock_type::time_point>, rpc_clock_type::time_point start, cancellable*, rpc::client&, id_type,
signature<no_wait_type (InArgs...)>) { // no_wait overload
return make_ready_future<>();
}
Expand Down Expand Up @@ -473,13 +474,14 @@ auto send_helper(MsgType xt, signature<Ret (InArgs...)> xsig) {
return futurize<cleaned_ret_type>::make_exception_future(closed_error());
}

auto start = rpc_clock_type::now();
// send message
auto msg_id = dst.next_message_id();
snd_buf data = marshall(dst.template serializer<Serializer>(), request_frame_headroom, args...);

// prepare reply handler, if return type is now_wait_type this does nothing, since no reply will be sent
using wait = wait_signature_t<Ret>;
return when_all(dst.request(uint64_t(t), msg_id, std::move(data), timeout, cancel), wait_for_reply<Serializer>(wait(), timeout, cancel, dst, msg_id, sig)).then([] (auto r) {
return when_all(dst.request(uint64_t(t), msg_id, std::move(data), timeout, cancel), wait_for_reply<Serializer>(wait(), timeout, start, cancel, dst, msg_id, sig)).then([] (auto r) {
std::get<0>(r).ignore_ready_future();
return std::move(std::get<1>(r)); // return future of wait_for_reply
});
Expand All @@ -502,11 +504,11 @@ auto send_helper(MsgType xt, signature<Ret (InArgs...)> xsig) {
}

// Refer to struct response_frame for more details
static constexpr size_t response_frame_headroom = 12;
static constexpr size_t response_frame_headroom = 16;

template<typename Serializer, typename RetTypes>
inline future<> reply(wait_type, future<RetTypes>&& ret, int64_t msg_id, shared_ptr<server::connection> client,
std::optional<rpc_clock_type::time_point> timeout) {
std::optional<rpc_clock_type::time_point> timeout, std::optional<rpc_clock_type::duration> handler_duration) {
if (!client->error()) {
snd_buf data;
try {
Expand All @@ -529,7 +531,7 @@ inline future<> reply(wait_type, future<RetTypes>&& ret, int64_t msg_id, shared_
msg_id = -msg_id;
}

return client->respond(msg_id, std::move(data), timeout);
return client->respond(msg_id, std::move(data), timeout, handler_duration);
} else {
ret.ignore_ready_future();
return make_ready_future<>();
Expand All @@ -538,7 +540,8 @@ inline future<> reply(wait_type, future<RetTypes>&& ret, int64_t msg_id, shared_

// specialization for no_wait_type which does not send a reply
template<typename Serializer>
inline future<> reply(no_wait_type, future<no_wait_type>&& r, int64_t msgid, shared_ptr<server::connection> client, std::optional<rpc_clock_type::time_point>) {
inline future<> reply(no_wait_type, future<no_wait_type>&& r, int64_t msgid, shared_ptr<server::connection> client,
std::optional<rpc_clock_type::time_point>, std::optional<rpc_clock_type::duration>) {
try {
r.get();
} catch (std::exception& ex) {
Expand Down Expand Up @@ -581,7 +584,7 @@ auto recv_helper(signature<Ret (InArgs...)> sig, Func&& func, WantClientInfo, Wa
client->get_logger()(client->peer_address(), err);
// FIXME: future is discarded
(void)try_with_gate(client->get_server().reply_gate(), [client, timeout, msg_id, err = std::move(err)] {
return reply<Serializer>(wait_style(), futurize<Ret>::make_exception_future(std::runtime_error(err.c_str())), msg_id, client, timeout).handle_exception([client, msg_id] (std::exception_ptr eptr) {
return reply<Serializer>(wait_style(), futurize<Ret>::make_exception_future(std::runtime_error(err.c_str())), msg_id, client, timeout, std::nullopt).handle_exception([client, msg_id] (std::exception_ptr eptr) {
client->get_logger()(client->info(), msg_id, format("got exception while processing an oversized message: {}", eptr));
});
}).handle_exception_type([] (gate_closed_exception&) {/* ignore */});
Expand All @@ -593,8 +596,9 @@ auto recv_helper(signature<Ret (InArgs...)> sig, Func&& func, WantClientInfo, Wa
(void)try_with_gate(client->get_server().reply_gate(), [client, timeout, msg_id, data = std::move(data), permit = std::move(permit), &func] () mutable {
try {
auto args = unmarshall<Serializer, InArgs...>(*client, std::move(data));
return apply(func, client->info(), timeout, WantClientInfo(), WantTimePoint(), signature(), std::move(args)).then_wrapped([client, timeout, msg_id, permit = std::move(permit)] (futurize_t<Ret> ret) mutable {
return reply<Serializer>(wait_style(), std::move(ret), msg_id, client, timeout).handle_exception([permit = std::move(permit), client, msg_id] (std::exception_ptr eptr) {
auto start = rpc_clock_type::now();
return apply(func, client->info(), timeout, WantClientInfo(), WantTimePoint(), signature(), std::move(args)).then_wrapped([client, timeout, msg_id, permit = std::move(permit), start] (futurize_t<Ret> ret) mutable {
return reply<Serializer>(wait_style(), std::move(ret), msg_id, client, timeout, rpc_clock_type::now() - start).handle_exception([permit = std::move(permit), client, msg_id] (std::exception_ptr eptr) {
client->get_logger()(client->info(), msg_id, format("got exception while processing a message: {}", eptr));
});
});
Expand Down
2 changes: 2 additions & 0 deletions include/seastar/rpc/rpc_types.hh
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,8 @@ struct stats {
counter_type sent_messages = 0;
counter_type wait_reply = 0;
counter_type timeout = 0;
counter_type delay_samples = 0;
std::chrono::duration<double> delay_total = std::chrono::duration<double>(0);
};

class connection_id {
Expand Down
Loading

0 comments on commit 826bc0b

Please sign in to comment.