diff --git a/doc/rpc.md b/doc/rpc.md index b78a30dad6b..190cf876d99 100644 --- a/doc/rpc.md +++ b/doc/rpc.md @@ -86,6 +86,14 @@ Actual negotiation looks like this: The server does not directly assign meaning to values of `isolation_cookie`; instead, the interpretation is left to user code. +#### Handler duration + feature number: 5 + data: none + + Asks server to send "extended" response that includes the handler duration time. See + the response frame description for more details + + ##### Compressed frame format uint32_t len uint8_t compressed_data[len] @@ -111,11 +119,15 @@ data is transparent for the protocol and serialized/deserialized by a user ## Response frame format int64_t msg_id uint32_t len + uint32_t handler_duration - present if handler duration is negotiated uint8_t data[len] if msg_id < 0 enclosed response contains an exception that came as a response to msg id abs(msg_id) data is transparent for the protocol and serialized/deserialized by a user +the handler_duration is in microseconds, the value of 0xffffffff means that it wasn't measured +and should be disregarded by client + ## Stream frame format uint32_t len uint8_t data[len] diff --git a/include/seastar/rpc/rpc.hh b/include/seastar/rpc/rpc.hh index fb8172c910f..b4eecd6ade5 100644 --- a/include/seastar/rpc/rpc.hh +++ b/include/seastar/rpc/rpc.hh @@ -118,6 +118,7 @@ struct client_options { /// \see resource_limits::isolate_connection sstring isolation_cookie; sstring metrics_domain = "default"; + bool send_handler_duration = true; }; /// @} @@ -179,6 +180,7 @@ enum class protocol_features : uint32_t { CONNECTION_ID = 2, STREAM_PARENT = 3, ISOLATION = 4, + HANDLER_DURATION = 5, }; // internal representation of feature data @@ -285,6 +287,7 @@ protected: std::unique_ptr _compressor; bool _propagate_timeout = false; bool _timeout_negotiated = false; + bool _handler_duration_negotiated = false; // stream related fields bool _is_stream = false; connection_id _id = invalid_connection_id; @@ -427,6 +430,7 @@ class client : public rpc::connection, public weakly_referencable { struct reply_handler_base { timer t; cancellable* pcancel = nullptr; + rpc_clock_type::time_point start; virtual void operator()(client&, id_type, rcv_buf data) = 0; virtual void timeout() {} virtual void cancel() {} @@ -487,7 +491,11 @@ private: private: future<> negotiate_protocol(feature_map map); void negotiate(feature_map server_features); - future>> + // Returned future is + // - message id + // - optional server-side handler duration + // - message payload + future, std::optional>> read_response_frame_compressed(input_stream& in); public: /** @@ -592,7 +600,7 @@ public: public: connection(server& s, connected_socket&& fd, socket_address&& addr, const logger& l, void* seralizer, connection_id id); future<> process(); - future<> respond(int64_t msg_id, snd_buf&& data, std::optional timeout); + future<> respond(int64_t msg_id, snd_buf&& data, std::optional timeout, std::optional handler_duration); client_info& info() { return _info; } const client_info& info() const { return _info; } stats get_stats() const { diff --git a/include/seastar/rpc/rpc_impl.hh b/include/seastar/rpc/rpc_impl.hh index 1fd08bbabdc..2f25763ab98 100644 --- a/include/seastar/rpc/rpc_impl.hh +++ b/include/seastar/rpc/rpc_impl.hh @@ -414,7 +414,7 @@ template struct rcv_reply> : rcv_reply {}; template -inline auto wait_for_reply(wait_type, std::optional timeout, cancellable* cancel, rpc::client& dst, id_type msg_id, +inline auto wait_for_reply(wait_type, std::optional timeout, rpc_clock_type::time_point start, cancellable* cancel, rpc::client& dst, id_type msg_id, signature) { using reply_type = rcv_reply; auto lambda = [] (reply_type& r, rpc::client& dst, id_type msg_id, rcv_buf data) mutable { @@ -429,13 +429,14 @@ inline auto wait_for_reply(wait_type, std::optional }; using handler_type = typename rpc::client::template reply_handler; auto r = std::make_unique(std::move(lambda)); + r->start = start; auto fut = r->reply.p.get_future(); dst.wait_for_reply(msg_id, std::move(r), timeout, cancel); return fut; } template -inline auto wait_for_reply(no_wait_type, std::optional, cancellable*, rpc::client&, id_type, +inline auto wait_for_reply(no_wait_type, std::optional, rpc_clock_type::time_point start, cancellable*, rpc::client&, id_type, signature) { // no_wait overload return make_ready_future<>(); } @@ -473,13 +474,14 @@ auto send_helper(MsgType xt, signature xsig) { return futurize::make_exception_future(closed_error()); } + auto start = rpc_clock_type::now(); // send message auto msg_id = dst.next_message_id(); snd_buf data = marshall(dst.template serializer(), request_frame_headroom, args...); // prepare reply handler, if return type is now_wait_type this does nothing, since no reply will be sent using wait = wait_signature_t; - return when_all(dst.request(uint64_t(t), msg_id, std::move(data), timeout, cancel), wait_for_reply(wait(), timeout, cancel, dst, msg_id, sig)).then([] (auto r) { + return when_all(dst.request(uint64_t(t), msg_id, std::move(data), timeout, cancel), wait_for_reply(wait(), timeout, start, cancel, dst, msg_id, sig)).then([] (auto r) { std::get<0>(r).ignore_ready_future(); return std::move(std::get<1>(r)); // return future of wait_for_reply }); @@ -502,11 +504,11 @@ auto send_helper(MsgType xt, signature xsig) { } // Refer to struct response_frame for more details -static constexpr size_t response_frame_headroom = 12; +static constexpr size_t response_frame_headroom = 16; template inline future<> reply(wait_type, future&& ret, int64_t msg_id, shared_ptr client, - std::optional timeout) { + std::optional timeout, std::optional handler_duration) { if (!client->error()) { snd_buf data; try { @@ -529,7 +531,7 @@ inline future<> reply(wait_type, future&& ret, int64_t msg_id, shared_ msg_id = -msg_id; } - return client->respond(msg_id, std::move(data), timeout); + return client->respond(msg_id, std::move(data), timeout, handler_duration); } else { ret.ignore_ready_future(); return make_ready_future<>(); @@ -538,7 +540,8 @@ inline future<> reply(wait_type, future&& ret, int64_t msg_id, shared_ // specialization for no_wait_type which does not send a reply template -inline future<> reply(no_wait_type, future&& r, int64_t msgid, shared_ptr client, std::optional) { +inline future<> reply(no_wait_type, future&& r, int64_t msgid, shared_ptr client, + std::optional, std::optional) { try { r.get(); } catch (std::exception& ex) { @@ -581,7 +584,7 @@ auto recv_helper(signature sig, Func&& func, WantClientInfo, Wa client->get_logger()(client->peer_address(), err); // FIXME: future is discarded (void)try_with_gate(client->get_server().reply_gate(), [client, timeout, msg_id, err = std::move(err)] { - return reply(wait_style(), futurize::make_exception_future(std::runtime_error(err.c_str())), msg_id, client, timeout).handle_exception([client, msg_id] (std::exception_ptr eptr) { + return reply(wait_style(), futurize::make_exception_future(std::runtime_error(err.c_str())), msg_id, client, timeout, std::nullopt).handle_exception([client, msg_id] (std::exception_ptr eptr) { client->get_logger()(client->info(), msg_id, format("got exception while processing an oversized message: {}", eptr)); }); }).handle_exception_type([] (gate_closed_exception&) {/* ignore */}); @@ -593,8 +596,9 @@ auto recv_helper(signature sig, Func&& func, WantClientInfo, Wa (void)try_with_gate(client->get_server().reply_gate(), [client, timeout, msg_id, data = std::move(data), permit = std::move(permit), &func] () mutable { try { auto args = unmarshall(*client, std::move(data)); - return apply(func, client->info(), timeout, WantClientInfo(), WantTimePoint(), signature(), std::move(args)).then_wrapped([client, timeout, msg_id, permit = std::move(permit)] (futurize_t ret) mutable { - return reply(wait_style(), std::move(ret), msg_id, client, timeout).handle_exception([permit = std::move(permit), client, msg_id] (std::exception_ptr eptr) { + auto start = rpc_clock_type::now(); + return apply(func, client->info(), timeout, WantClientInfo(), WantTimePoint(), signature(), std::move(args)).then_wrapped([client, timeout, msg_id, permit = std::move(permit), start] (futurize_t ret) mutable { + return reply(wait_style(), std::move(ret), msg_id, client, timeout, rpc_clock_type::now() - start).handle_exception([permit = std::move(permit), client, msg_id] (std::exception_ptr eptr) { client->get_logger()(client->info(), msg_id, format("got exception while processing a message: {}", eptr)); }); }); diff --git a/include/seastar/rpc/rpc_types.hh b/include/seastar/rpc/rpc_types.hh index 8c747577eae..8c6270119ec 100644 --- a/include/seastar/rpc/rpc_types.hh +++ b/include/seastar/rpc/rpc_types.hh @@ -60,6 +60,8 @@ struct stats { counter_type sent_messages = 0; counter_type wait_reply = 0; counter_type timeout = 0; + counter_type delay_samples = 0; + std::chrono::duration delay_total = std::chrono::duration(0); }; class connection_id { diff --git a/src/rpc/rpc.cc b/src/rpc/rpc.cc index 7c7c70358da..83ae22985bf 100644 --- a/src/rpc/rpc.cc +++ b/src/rpc/rpc.cc @@ -667,6 +667,9 @@ namespace rpc { case protocol_features::TIMEOUT: _timeout_negotiated = true; break; + case protocol_features::HANDLER_DURATION: + _handler_duration_negotiated = true; + break; case protocol_features::CONNECTION_ID: { _id = deserialize_connection_id(e.second); break; @@ -692,8 +695,8 @@ namespace rpc { // ... payload struct response_frame { using opt_buf_type = std::optional; - using return_type = std::tuple; - using header_type = std::tuple; + using return_type = std::tuple, opt_buf_type>; + using header_type = std::tuple>; static constexpr size_t raw_header_size = sizeof(int64_t) + sizeof(uint32_t); static size_t header_size() { static_assert(response_frame_headroom >= raw_header_size); @@ -703,28 +706,68 @@ namespace rpc { return "client"; } static auto empty_value() { - return std::make_tuple(0, std::nullopt); + return std::make_tuple(0, std::nullopt, std::nullopt); } static std::pair decode_header(const char* ptr) { auto msgid = read_le(ptr); auto size = read_le(ptr + 8); - return std::make_pair(size, std::make_tuple(msgid)); + return std::make_pair(size, std::make_tuple(msgid, std::nullopt)); } - static void encode_header(int64_t msg_id, snd_buf& data) { + static void encode_header(int64_t msg_id, snd_buf& data, size_t header_size = raw_header_size) { static_assert(snd_buf::chunk_size >= raw_header_size, "send buffer chunk size is too small"); auto p = data.front().get_write(); write_le(p, msg_id); - write_le(p + 8, data.size - raw_header_size); + write_le(p + 8, data.size - header_size); } static auto make_value(const header_type& t, rcv_buf data) { - return std::make_tuple(std::get<0>(t), std::move(data)); + return std::make_tuple(std::get<0>(t), std::get<1>(t), std::move(data)); } }; + // The response frame is + // le64 message ID + // le32 payload size + // le32 handler duration + // ... payload + struct response_frame_with_handler_time : public response_frame { + using super = response_frame; + static constexpr size_t raw_header_size = super::raw_header_size + sizeof(uint32_t); + static size_t header_size() { + static_assert(response_frame_headroom >= raw_header_size); + return raw_header_size; + } + static std::pair decode_header(const char* ptr) { + auto p = super::decode_header(ptr); + auto ht = read_le(ptr + 12); + if (ht != -1U) { + std::get<1>(p.second) = ht; + } + return p; + } + static uint32_t encode_handler_duration(const std::optional& ht) noexcept { + if (ht.has_value()) { + std::chrono::microseconds us = std::chrono::duration_cast(*ht); + if (us.count() < std::numeric_limits::max()) { + return us.count(); + } + } + return -1U; + } + static void encode_header(int64_t msg_id, std::optional ht, snd_buf& data) { + static_assert(snd_buf::chunk_size >= raw_header_size); + auto p = data.front().get_write(); + super::encode_header(msg_id, data, raw_header_size); + write_le(p + 12, encode_handler_duration(ht)); + } + }; future client::read_response_frame_compressed(input_stream& in) { - return read_frame_compressed(_server_addr, _compressor, in); + if (_handler_duration_negotiated) { + return read_frame_compressed(_server_addr, _compressor, in); + } else { + return read_frame_compressed(_server_addr, _compressor, in); + } } stats client::get_stats() const { @@ -843,6 +886,15 @@ namespace rpc { sm::description("Total number of exceptional responses received"), { domain_l }).set_skip_when_empty(), sm::make_counter("timeout", std::bind(&domain::count_all, this, &stats::timeout), sm::description("Total number of timeout responses"), { domain_l }).set_skip_when_empty(), + sm::make_counter("delay_samples", std::bind(&domain::count_all, this, &stats::delay_samples), + sm::description("Total number of delay samples"), { domain_l }), + sm::make_counter("delay_total", [this] () -> double { + std::chrono::duration res(0); + for (const auto& m : list) { + res += m._c._stats.delay_total; + } + return res.count(); + }, sm::description("Total delay in seconds"), { domain_l }), sm::make_gauge("pending", std::bind(&domain::count_all_fn, this, &client::outgoing_queue_length), sm::description("Number of queued outbound messages"), { domain_l }), sm::make_gauge("wait_reply", std::bind(&domain::count_all_fn, this, &client::incoming_queue_length), @@ -870,6 +922,8 @@ namespace rpc { _domain.dead.exception_received += _c._stats.exception_received; _domain.dead.sent_messages += _c._stats.sent_messages; _domain.dead.timeout += _c._stats.timeout; + _domain.dead.delay_samples += _c._stats.delay_samples; + _domain.dead.delay_total += _c._stats.delay_total; } client::client(const logger& l, void* s, client_options ops, socket socket, const socket_address& addr, const socket_address& local) @@ -894,6 +948,9 @@ namespace rpc { if (_options.send_timeout_data) { features[protocol_features::TIMEOUT] = ""; } + if (_options.send_handler_duration) { + features[protocol_features::HANDLER_DURATION] = ""; + } if (_options.stream_parent) { features[protocol_features::STREAM_PARENT] = serialize_connection_id(_options.stream_parent); } @@ -910,14 +967,19 @@ namespace rpc { } return read_response_frame_compressed(_read_buf).then([this] (response_frame::return_type msg_id_and_data) { auto& msg_id = std::get<0>(msg_id_and_data); - auto& data = std::get<1>(msg_id_and_data); + auto& data = std::get<2>(msg_id_and_data); auto it = _outstanding.find(std::abs(msg_id)); if (!data) { _error = true; } else if (it != _outstanding.end()) { auto handler = std::move(it->second); + auto ht = std::get<1>(msg_id_and_data); _outstanding.erase(it); (*handler)(*this, msg_id, std::move(data.value())); + if (ht) { + _stats.delay_samples++; + _stats.delay_total += (rpc_clock_type::now() - handler->start) - std::chrono::microseconds(*ht); + } } else if (msg_id < 0) { try { std::rethrow_exception(unmarshal_exception(data.value())); @@ -1008,6 +1070,10 @@ namespace rpc { _timeout_negotiated = true; ret[protocol_features::TIMEOUT] = ""; break; + case protocol_features::HANDLER_DURATION: + _handler_duration_negotiated = true; + ret[protocol_features::HANDLER_DURATION] = ""; + break; case protocol_features::STREAM_PARENT: { if (!get_server()._options.streaming_domain) { f = f.then([] { @@ -1095,17 +1161,24 @@ namespace rpc { } future<> - server::connection::respond(int64_t msg_id, snd_buf&& data, std::optional timeout) { - response_frame::encode_header(msg_id, data); + server::connection::respond(int64_t msg_id, snd_buf&& data, std::optional timeout, std::optional handler_duration) { + if (_handler_duration_negotiated) { + response_frame_with_handler_time::encode_header(msg_id, handler_duration, data); + } else { + data.front().trim_front(sizeof(uint32_t)); + data.size -= sizeof(uint32_t); + response_frame::encode_header(msg_id, data); + } return send(std::move(data), timeout); } future<> server::connection::send_unknown_verb_reply(std::optional timeout, int64_t msg_id, uint64_t type) { return wait_for_resources(28, timeout).then([this, timeout, msg_id, type] (auto permit) { // send unknown_verb exception back - snd_buf data(28); - static_assert(snd_buf::chunk_size >= 28, "send buffer chunk size is too small"); - auto p = data.front().get_write() + 12; + constexpr size_t unknown_verb_message_size = response_frame_headroom + 2 * sizeof(uint32_t) + sizeof(uint64_t); + snd_buf data(unknown_verb_message_size); + static_assert(snd_buf::chunk_size >= unknown_verb_message_size, "send buffer chunk size is too small"); + auto p = data.front().get_write() + response_frame_headroom; write_le(p, uint32_t(exception_type::UNKNOWN_VERB)); write_le(p + 4, uint32_t(8)); write_le(p + 8, type); @@ -1115,7 +1188,7 @@ future<> server::connection::send_unknown_verb_reply(std::optional(); rpc::server_options so; rpc::client_options co; @@ -349,6 +350,7 @@ SEASTAR_TEST_CASE(test_rpc_connect) { co.compressor_factory = factory.get(); } co.send_timeout_data = j & 2; + co.send_handler_duration = with_delay; rpc_test_config cfg; cfg.server_options = so; auto f = rpc_test_env<>::do_with_thread(cfg, co, [] (rpc_test_env<>& env, test_rpc_proto::client& c1) { @@ -368,6 +370,7 @@ SEASTAR_TEST_CASE(test_rpc_connect) { } }); fs.emplace_back(std::move(f)); + } } } return when_all(fs.begin(), fs.end()).discard_result();