Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RPC delay metrics #2293

Merged
merged 5 commits into from
Jul 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions doc/rpc.md
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,14 @@ Actual negotiation looks like this:
The server does not directly assign meaning to values of `isolation_cookie`;
instead, the interpretation is left to user code.

#### Handler duration
feature number: 5
data: none

Asks server to send "extended" response that includes the handler duration time. See
the response frame description for more details


##### Compressed frame format
uint32_t len
uint8_t compressed_data[len]
Expand All @@ -111,11 +119,15 @@ data is transparent for the protocol and serialized/deserialized by a user
## Response frame format
int64_t msg_id
uint32_t len
uint32_t handler_duration - present if handler duration is negotiated
uint8_t data[len]

if msg_id < 0 enclosed response contains an exception that came as a response to msg id abs(msg_id)
data is transparent for the protocol and serialized/deserialized by a user

the handler_duration is in microseconds, the value of 0xffffffff means that it wasn't measured
and should be disregarded by client

## Stream frame format
uint32_t len
uint8_t data[len]
Expand Down
12 changes: 10 additions & 2 deletions include/seastar/rpc/rpc.hh
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ struct client_options {
/// \see resource_limits::isolate_connection
sstring isolation_cookie;
sstring metrics_domain = "default";
bool send_handler_duration = true;
};

/// @}
Expand Down Expand Up @@ -179,6 +180,7 @@ enum class protocol_features : uint32_t {
CONNECTION_ID = 2,
STREAM_PARENT = 3,
ISOLATION = 4,
HANDLER_DURATION = 5,
};

// internal representation of feature data
Expand Down Expand Up @@ -285,6 +287,7 @@ protected:
std::unique_ptr<compressor> _compressor;
bool _propagate_timeout = false;
bool _timeout_negotiated = false;
bool _handler_duration_negotiated = false;
// stream related fields
bool _is_stream = false;
connection_id _id = invalid_connection_id;
Expand Down Expand Up @@ -427,6 +430,7 @@ class client : public rpc::connection, public weakly_referencable<client> {
struct reply_handler_base {
timer<rpc_clock_type> t;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now the client has two durations using which it can estimate the
rount-trip time of a packet -- the duration between sending request and
receiving response and the time it too handler to execute on the server
side. Difference is the RTT value.

What about time waiting in the send queue (usually none) or the receive queue? Or waiting for the rpc semaphore?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This new metrics include all of these.
Or does your question really mean "lets adding those metrics too"?

Copy link
Member

@avikivity avikivity Jun 24, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I mean that the difference is not RTT, it's RTT + waits in queues we don't (and can't) measure.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's the definition of RTT I get used to

Round-trip time (RTT) is the duration it takes for a network request to go from a starting point to a destination and back again to the starting point

and "starting point" here is whoever calls send_helper::operator(). It doesn't tell if there are or there are no queues/delays/throttlers/whatever on the way of the request.

I'm fine to rename it to be clear, but how do you define RTT then?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

RTT is the network time without any queues (which ping measures if it doesn't have a bad day).

Do you subtract service time from the measured round trip?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

RTT is the network time without any queues (which ping measures if it doesn't have a bad day).

The "without any queues" is pretty optimistic requirement, I would say. I admit that outgoing is likely to take queue-less paths even in the kernel, but incoming all consists of queues.

Do you subtract service time from the measured round trip?

Depends on what "service time" is. If it's duration of the verb handler, then of course, that's the point (9bbe38c)

@@ -953,8 +964,13 @@ namespace rpc {
                           _error = true;
                       } else if (it != _outstanding.end()) {
                           auto handler = std::move(it->second);
+                          auto ht = std::get<1>(msg_id_and_data); // ht stands for "handler time"
                           _outstanding.erase(it);
                           (*handler)(*this, msg_id, std::move(data.value()));
+                          if (ht) {
+                              _stats.rtt_samples++;
+                              _stats.rtt_total += (rpc_clock_type::now() - handler->start) - std::chrono::microseconds(*ht);
+                          }
                       } else if (msg_id < 0) {
                           try {
                               std::rethrow_exception(unmarshal_exception(data.value()));

cancellable* pcancel = nullptr;
rpc_clock_type::time_point start;
virtual void operator()(client&, id_type, rcv_buf data) = 0;
virtual void timeout() {}
virtual void cancel() {}
Expand Down Expand Up @@ -487,7 +491,11 @@ private:
private:
future<> negotiate_protocol(feature_map map);
void negotiate(feature_map server_features);
future<std::tuple<int64_t, std::optional<rcv_buf>>>
// Returned future is
// - message id
// - optional server-side handler duration
// - message payload
future<std::tuple<int64_t, std::optional<uint32_t>, std::optional<rcv_buf>>>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Better to document what the optional means.

Note: microseconds can overflow 32 bits for an rpc call.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's ~1 hour. Are there handlers that last more?

read_response_frame_compressed(input_stream<char>& in);
public:
/**
Expand Down Expand Up @@ -592,7 +600,7 @@ public:
public:
connection(server& s, connected_socket&& fd, socket_address&& addr, const logger& l, void* seralizer, connection_id id);
future<> process();
future<> respond(int64_t msg_id, snd_buf&& data, std::optional<rpc_clock_type::time_point> timeout);
future<> respond(int64_t msg_id, snd_buf&& data, std::optional<rpc_clock_type::time_point> timeout, std::optional<rpc_clock_type::duration> handler_duration);
client_info& info() { return _info; }
const client_info& info() const { return _info; }
stats get_stats() const {
Expand Down
24 changes: 14 additions & 10 deletions include/seastar/rpc/rpc_impl.hh
Original file line number Diff line number Diff line change
Expand Up @@ -414,7 +414,7 @@ template<typename Serializer>
struct rcv_reply<Serializer, future<>> : rcv_reply<Serializer, void> {};

template <typename Serializer, typename Ret, typename... InArgs>
inline auto wait_for_reply(wait_type, std::optional<rpc_clock_type::time_point> timeout, cancellable* cancel, rpc::client& dst, id_type msg_id,
inline auto wait_for_reply(wait_type, std::optional<rpc_clock_type::time_point> timeout, rpc_clock_type::time_point start, cancellable* cancel, rpc::client& dst, id_type msg_id,
signature<Ret (InArgs...)>) {
using reply_type = rcv_reply<Serializer, Ret>;
auto lambda = [] (reply_type& r, rpc::client& dst, id_type msg_id, rcv_buf data) mutable {
Expand All @@ -429,13 +429,14 @@ inline auto wait_for_reply(wait_type, std::optional<rpc_clock_type::time_point>
};
using handler_type = typename rpc::client::template reply_handler<reply_type, decltype(lambda)>;
auto r = std::make_unique<handler_type>(std::move(lambda));
r->start = start;
auto fut = r->reply.p.get_future();
dst.wait_for_reply(msg_id, std::move(r), timeout, cancel);
return fut;
}

template<typename Serializer, typename... InArgs>
inline auto wait_for_reply(no_wait_type, std::optional<rpc_clock_type::time_point>, cancellable*, rpc::client&, id_type,
inline auto wait_for_reply(no_wait_type, std::optional<rpc_clock_type::time_point>, rpc_clock_type::time_point start, cancellable*, rpc::client&, id_type,
signature<no_wait_type (InArgs...)>) { // no_wait overload
return make_ready_future<>();
}
Expand Down Expand Up @@ -473,13 +474,14 @@ auto send_helper(MsgType xt, signature<Ret (InArgs...)> xsig) {
return futurize<cleaned_ret_type>::make_exception_future(closed_error());
}

auto start = rpc_clock_type::now();
// send message
auto msg_id = dst.next_message_id();
snd_buf data = marshall(dst.template serializer<Serializer>(), request_frame_headroom, args...);

// prepare reply handler, if return type is now_wait_type this does nothing, since no reply will be sent
using wait = wait_signature_t<Ret>;
return when_all(dst.request(uint64_t(t), msg_id, std::move(data), timeout, cancel), wait_for_reply<Serializer>(wait(), timeout, cancel, dst, msg_id, sig)).then([] (auto r) {
return when_all(dst.request(uint64_t(t), msg_id, std::move(data), timeout, cancel), wait_for_reply<Serializer>(wait(), timeout, start, cancel, dst, msg_id, sig)).then([] (auto r) {
std::get<0>(r).ignore_ready_future();
return std::move(std::get<1>(r)); // return future of wait_for_reply
});
Expand All @@ -502,11 +504,11 @@ auto send_helper(MsgType xt, signature<Ret (InArgs...)> xsig) {
}

// Refer to struct response_frame for more details
static constexpr size_t response_frame_headroom = 12;
static constexpr size_t response_frame_headroom = 16;

template<typename Serializer, typename RetTypes>
inline future<> reply(wait_type, future<RetTypes>&& ret, int64_t msg_id, shared_ptr<server::connection> client,
std::optional<rpc_clock_type::time_point> timeout) {
std::optional<rpc_clock_type::time_point> timeout, std::optional<rpc_clock_type::duration> handler_duration) {
if (!client->error()) {
snd_buf data;
try {
Expand All @@ -529,7 +531,7 @@ inline future<> reply(wait_type, future<RetTypes>&& ret, int64_t msg_id, shared_
msg_id = -msg_id;
}

return client->respond(msg_id, std::move(data), timeout);
return client->respond(msg_id, std::move(data), timeout, handler_duration);
} else {
ret.ignore_ready_future();
return make_ready_future<>();
Expand All @@ -538,7 +540,8 @@ inline future<> reply(wait_type, future<RetTypes>&& ret, int64_t msg_id, shared_

// specialization for no_wait_type which does not send a reply
template<typename Serializer>
inline future<> reply(no_wait_type, future<no_wait_type>&& r, int64_t msgid, shared_ptr<server::connection> client, std::optional<rpc_clock_type::time_point>) {
inline future<> reply(no_wait_type, future<no_wait_type>&& r, int64_t msgid, shared_ptr<server::connection> client,
std::optional<rpc_clock_type::time_point>, std::optional<rpc_clock_type::duration>) {
try {
r.get();
} catch (std::exception& ex) {
Expand Down Expand Up @@ -581,7 +584,7 @@ auto recv_helper(signature<Ret (InArgs...)> sig, Func&& func, WantClientInfo, Wa
client->get_logger()(client->peer_address(), err);
// FIXME: future is discarded
(void)try_with_gate(client->get_server().reply_gate(), [client, timeout, msg_id, err = std::move(err)] {
return reply<Serializer>(wait_style(), futurize<Ret>::make_exception_future(std::runtime_error(err.c_str())), msg_id, client, timeout).handle_exception([client, msg_id] (std::exception_ptr eptr) {
return reply<Serializer>(wait_style(), futurize<Ret>::make_exception_future(std::runtime_error(err.c_str())), msg_id, client, timeout, std::nullopt).handle_exception([client, msg_id] (std::exception_ptr eptr) {
client->get_logger()(client->info(), msg_id, format("got exception while processing an oversized message: {}", eptr));
});
}).handle_exception_type([] (gate_closed_exception&) {/* ignore */});
Expand All @@ -593,8 +596,9 @@ auto recv_helper(signature<Ret (InArgs...)> sig, Func&& func, WantClientInfo, Wa
(void)try_with_gate(client->get_server().reply_gate(), [client, timeout, msg_id, data = std::move(data), permit = std::move(permit), &func] () mutable {
try {
auto args = unmarshall<Serializer, InArgs...>(*client, std::move(data));
return apply(func, client->info(), timeout, WantClientInfo(), WantTimePoint(), signature(), std::move(args)).then_wrapped([client, timeout, msg_id, permit = std::move(permit)] (futurize_t<Ret> ret) mutable {
return reply<Serializer>(wait_style(), std::move(ret), msg_id, client, timeout).handle_exception([permit = std::move(permit), client, msg_id] (std::exception_ptr eptr) {
auto start = rpc_clock_type::now();
return apply(func, client->info(), timeout, WantClientInfo(), WantTimePoint(), signature(), std::move(args)).then_wrapped([client, timeout, msg_id, permit = std::move(permit), start] (futurize_t<Ret> ret) mutable {
return reply<Serializer>(wait_style(), std::move(ret), msg_id, client, timeout, rpc_clock_type::now() - start).handle_exception([permit = std::move(permit), client, msg_id] (std::exception_ptr eptr) {
client->get_logger()(client->info(), msg_id, format("got exception while processing a message: {}", eptr));
});
});
Expand Down
2 changes: 2 additions & 0 deletions include/seastar/rpc/rpc_types.hh
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,8 @@ struct stats {
counter_type sent_messages = 0;
counter_type wait_reply = 0;
counter_type timeout = 0;
counter_type delay_samples = 0;
std::chrono::duration<double> delay_total = std::chrono::duration<double>(0);
};

class connection_id {
Expand Down
Loading