From 5b3270e06ab063aaf974edb1b0b77c5440502835 Mon Sep 17 00:00:00 2001 From: Pavel Emelyanov Date: Wed, 26 Jun 2024 13:47:00 +0300 Subject: [PATCH 1/4] http/client: Add timeout argument to make_request() And propagate it down the call stack to the interesting places. Next patch will make the code obey the provided timeout. Signed-off-by: Pavel Emelyanov --- include/seastar/http/client.hh | 13 ++++++++----- src/http/client.cc | 28 ++++++++++++++-------------- 2 files changed, 22 insertions(+), 19 deletions(-) diff --git a/include/seastar/http/client.hh b/include/seastar/http/client.hh index 6601df4287e..a56b0fd6154 100644 --- a/include/seastar/http/client.hh +++ b/include/seastar/http/client.hh @@ -28,6 +28,7 @@ #include #include #include +#include #include namespace bi = boost::intrusive; @@ -169,6 +170,7 @@ class client { public: using reply_handler = noncopyable_function(const reply&, input_stream&& body)>; using retry_requests = bool_class; + using clock_type = lowres_clock; private: friend class http::internal::client_ref; @@ -185,17 +187,17 @@ private: using connection_ptr = seastar::shared_ptr; - future get_connection(); - future make_connection(); + future get_connection(clock_type::time_point timeout); + future make_connection(clock_type::time_point timeout); future<> put_connection(connection_ptr con); future<> shrink_connections(); template Fn> - auto with_connection(Fn&& fn); + auto with_connection(Fn&& fn, clock_type::time_point timeout); template requires std::invocable - auto with_new_connection(Fn&& fn); + auto with_new_connection(Fn&& fn, clock_type::time_point timeout); future<> do_make_request(connection& con, request& req, reply_handler& handle, std::optional expected); @@ -275,12 +277,13 @@ public: * \param req -- request to be sent * \param handle -- the response handler * \param expected -- the optional expected reply status code, default is std::nullopt + * \param send_timeout -- time point at which make_request will stop waiting for transport * * Note that the handle callback should be prepared to be called more than once, because * client may restart the whole request processing in case server closes the connection * in the middle of operation */ - future<> make_request(request req, reply_handler handle, std::optional expected = std::nullopt); + future<> make_request(request req, reply_handler handle, std::optional expected = std::nullopt, clock_type::time_point send_timeout = clock_type::time_point::max()); /** * \brief Updates the maximum number of connections a client may have diff --git a/src/http/client.cc b/src/http/client.cc index 53664a68017..2376abdf74a 100644 --- a/src/http/client.cc +++ b/src/http/client.cc @@ -243,7 +243,7 @@ client::client(std::unique_ptr f, unsigned max_connections, { } -future client::get_connection() { +future client::get_connection(clock_type::time_point timeout) { if (!_pool.empty()) { connection_ptr con = _pool.front().shared_from_this(); _pool.pop_front(); @@ -252,15 +252,15 @@ future client::get_connection() { } if (_nr_connections >= _max_connections) { - return _wait_con.wait().then([this] { - return get_connection(); + return _wait_con.wait().then([this, timeout] { + return get_connection(timeout); }); } - return make_connection(); + return make_connection(timeout); } -future client::make_connection() { +future client::make_connection(clock_type::time_point timeout) { _total_new_connections++; return _new_connections->make().then([cr = internal::client_ref(this)] (connected_socket cs) mutable { http_log.trace("created new http connection {}", cs.local_address()); @@ -311,8 +311,8 @@ future<> client::set_maximum_connections(unsigned nr) { } template Fn> -auto client::with_connection(Fn&& fn) { - return get_connection().then([this, fn = std::move(fn)] (connection_ptr con) mutable { +auto client::with_connection(Fn&& fn, clock_type::time_point timeout) { + return get_connection(timeout).then([this, fn = std::move(fn)] (connection_ptr con) mutable { return fn(*con).finally([this, con = std::move(con)] () mutable { return put_connection(std::move(con)); }); @@ -321,22 +321,22 @@ auto client::with_connection(Fn&& fn) { template requires std::invocable -auto client::with_new_connection(Fn&& fn) { - return make_connection().then([this, fn = std::move(fn)] (connection_ptr con) mutable { +auto client::with_new_connection(Fn&& fn, clock_type::time_point timeout) { + return make_connection(timeout).then([this, fn = std::move(fn)] (connection_ptr con) mutable { return fn(*con).finally([this, con = std::move(con)] () mutable { return put_connection(std::move(con)); }); }); } -future<> client::make_request(request req, reply_handler handle, std::optional expected) { - return do_with(std::move(req), std::move(handle), [this, expected] (request& req, reply_handler& handle) mutable { +future<> client::make_request(request req, reply_handler handle, std::optional expected, clock_type::time_point send_timeout) { + return do_with(std::move(req), std::move(handle), [this, expected, send_timeout] (request& req, reply_handler& handle) mutable { auto f = with_connection([this, &req, &handle, expected] (connection& con) { return do_make_request(con, req, handle, expected); - }); + }, send_timeout); if (_retry) { - f = f.handle_exception_type([this, &req, &handle, expected] (const std::system_error& ex) { + f = f.handle_exception_type([this, &req, &handle, expected, send_timeout] (const std::system_error& ex) { auto code = ex.code().value(); if ((code != EPIPE) && (code != ECONNABORTED)) { return make_exception_future<>(ex); @@ -347,7 +347,7 @@ future<> client::make_request(request req, reply_handler handle, std::optional Date: Wed, 26 Jun 2024 15:43:47 +0300 Subject: [PATCH 2/4] http/client: Respect user-provided timeout There are two places that can wait for undefined amount of time -- the one that waits the connection pool to release a connection and the one that establishes a new connection. The former is simple, as it uses conditional variable, so just wait() on it with the timeout and return back timeout error if it fired. Tha latter is trickier. New connections come from factory and it's not guaranteed that a factory obeys it (e.g. see #2302). So instead of relying on the factory, check if connected_socket appeared soon enough and return back timeout error if it didn't. Signed-off-by: Pavel Emelyanov --- include/seastar/http/exception.hh | 11 +++++++++++ src/http/client.cc | 20 ++++++++++++++++++-- 2 files changed, 29 insertions(+), 2 deletions(-) diff --git a/include/seastar/http/exception.hh b/include/seastar/http/exception.hh index 048a9de42f7..f49e69aea8d 100644 --- a/include/seastar/http/exception.hh +++ b/include/seastar/http/exception.hh @@ -158,6 +158,17 @@ public: {} }; +/** + * Client-side exception to report that making request timed out + * prior to communicating to server + */ +class timeout_error : public base_exception { +public: + timeout_error() + : base_exception("client timed out", http::reply::status_type::request_timeout) + {} +}; + SEASTAR_MODULE_EXPORT_END } diff --git a/src/http/client.cc b/src/http/client.cc index 2376abdf74a..a456a5b6cfc 100644 --- a/src/http/client.cc +++ b/src/http/client.cc @@ -28,6 +28,7 @@ module; #include #include #include +#include #ifdef SEASTAR_MODULE module seastar; @@ -252,7 +253,14 @@ future client::get_connection(clock_type::time_point tim } if (_nr_connections >= _max_connections) { - return _wait_con.wait().then([this, timeout] { + return _wait_con.wait(timeout).then_wrapped([this, timeout] (auto f) { + try { + f.get(); + } catch (const condition_variable_timed_out&) { + return make_exception_future(httpd::timeout_error()); + } catch (...) { + return current_exception_as_future(); + } return get_connection(timeout); }); } @@ -262,9 +270,17 @@ future client::get_connection(clock_type::time_point tim future client::make_connection(clock_type::time_point timeout) { _total_new_connections++; - return _new_connections->make().then([cr = internal::client_ref(this)] (connected_socket cs) mutable { + return _new_connections->make().then([this, timeout, cr = internal::client_ref(this)] (connected_socket cs) mutable { http_log.trace("created new http connection {}", cs.local_address()); auto con = seastar::make_shared(std::move(cs), std::move(cr)); + if (clock_type::now() > timeout) { + // Factory made new connection, though it's too late already. Don't throw + // this effort out, but don't use it to serve current request either. + return put_connection(std::move(con)).then_wrapped([] (auto f) { + f.ignore_ready_future(); + return make_exception_future(httpd::timeout_error()); + }); + } return make_ready_future(std::move(con)); }); } From 5281ee280a5ee500d74b171144a268ee9dc5ddc7 Mon Sep 17 00:00:00 2001 From: Pavel Emelyanov Date: Wed, 26 Jun 2024 15:00:00 +0300 Subject: [PATCH 3/4] test/http: Generalize simple request sending There are several tests that call client::make_request() with some simple pre-defined request that's supposed to fail. Next patches will want to do the same, so prepare the helper function in advance. Signed-off-by: Pavel Emelyanov --- tests/unit/httpd_test.cc | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/tests/unit/httpd_test.cc b/tests/unit/httpd_test.cc index 96f58278685..d6e95c08934 100644 --- a/tests/unit/httpd_test.cc +++ b/tests/unit/httpd_test.cc @@ -891,6 +891,12 @@ static void read_simple_http_request(input_stream& in) { } } +static future<> make_failing_http_request(http::experimental::client& cln) { + return cln.make_request(http::request::make("GET", "test", "/test"), [] (const http::reply& rep, input_stream&& in) { + return make_exception_future<>(std::runtime_error("Shouldn't happen")); + }, http::reply::status_type::ok); +} + SEASTAR_TEST_CASE(test_client_response_eof) { return seastar::async([] { loopback_connection_factory lcf(1); @@ -908,10 +914,7 @@ SEASTAR_TEST_CASE(test_client_response_eof) { future<> client = seastar::async([&lcf] { auto cln = http::experimental::client(std::make_unique(lcf)); - auto req = http::request::make("GET", "test", "/test"); - BOOST_REQUIRE_EXCEPTION(cln.make_request(std::move(req), [] (const http::reply& rep, input_stream&& in) { - return make_exception_future<>(std::runtime_error("Shouldn't happen")); - }, http::reply::status_type::ok).get(), std::system_error, [] (auto& ex) { + BOOST_REQUIRE_EXCEPTION(make_failing_http_request(cln).get(), std::system_error, [] (auto& ex) { return ex.code().value() == ECONNABORTED; }); @@ -940,9 +943,7 @@ SEASTAR_TEST_CASE(test_client_response_parse_error) { future<> client = seastar::async([&lcf] { auto cln = http::experimental::client(std::make_unique(lcf)); auto req = http::request::make("GET", "test", "/test"); - BOOST_REQUIRE_EXCEPTION(cln.make_request(std::move(req), [] (const http::reply& rep, input_stream&& in) { - return make_exception_future<>(std::runtime_error("Shouldn't happen")); - }, http::reply::status_type::ok).get(), std::runtime_error, [] (auto& ex) { + BOOST_REQUIRE_EXCEPTION(make_failing_http_request(cln).get(), std::runtime_error, [] (auto& ex) { return sstring(ex.what()).contains("Invalid http server response"); }); From 8eedc903735a9dbf1d41a59e83021554a97ba192 Mon Sep 17 00:00:00 2001 From: Pavel Emelyanov Date: Wed, 26 Jun 2024 15:00:11 +0300 Subject: [PATCH 4/4] test/http: Test how client::make_request() handles provided timeout Two cases to check -- that timing out factory is handled and that timing out pooled connection is handled. Signed-off-by: Pavel Emelyanov --- tests/unit/httpd_test.cc | 68 ++++++++++++++++++++++++++++++++++++++++ 1 file changed, 68 insertions(+) diff --git a/tests/unit/httpd_test.cc b/tests/unit/httpd_test.cc index d6e95c08934..4e1e98cc0d0 100644 --- a/tests/unit/httpd_test.cc +++ b/tests/unit/httpd_test.cc @@ -954,6 +954,74 @@ SEASTAR_TEST_CASE(test_client_response_parse_error) { }); } +SEASTAR_TEST_CASE(test_client_request_send_timeout_cached_conn) { + return seastar::async([] { + loopback_connection_factory lcf(1); + auto ss = lcf.get_server_socket(); + promise<> server_started; + future<> server = ss.accept().then([&] (accept_result ar) { + server_started.set_value(); + return seastar::async([sk = std::move(ar.connection)] () mutable { + input_stream in = sk.input(); + read_simple_http_request(in); + sleep(std::chrono::seconds(1)).get(); + output_stream out = sk.output(); + out.close().get(); + }); + }); + + future<> client = seastar::async([&lcf, &server_started] { + auto cln = http::experimental::client(std::make_unique(lcf), 1 /* max connections */); + // this request gets handled by server and ... + auto f1 = make_failing_http_request(cln); + server_started.get_future().get(); + // ... this should hang waiting for cached connection + auto f2 = cln.make_request(http::request::make("GET", "test", "/test"), [] (const auto& rep, auto&& in) { + return make_exception_future<>(std::runtime_error("Shouldn't happen")); + }, http::reply::status_type::ok, http::experimental::client::clock_type::now() + std::chrono::milliseconds(100)); + + BOOST_REQUIRE_THROW(f2.get(), httpd::timeout_error); + cln.close().get(); + try { + f1.get(); + } catch (...) { + } + }); + + when_all(std::move(client), std::move(server)).discard_result().get(); + }); +} + +SEASTAR_TEST_CASE(test_client_request_send_timeout_new_conn) { + return seastar::async([] { + loopback_connection_factory lcf(1); + auto ss = lcf.get_server_socket(); + future<> server = ss.accept().discard_result(); + + class delayed_factory : public loopback_http_factory { + public: + explicit delayed_factory(loopback_connection_factory& f) : loopback_http_factory(f) {} + virtual future make() override { + return sleep(std::chrono::seconds(1)).then([this] { + return loopback_http_factory::make(); + }); + } + }; + + auto client = async([&] { + auto cln = http::experimental::client(std::make_unique(lcf)); + auto f = cln.make_request(http::request::make("GET", "test", "/test"), [] (const auto& rep, auto&& in) { + return make_exception_future<>(std::runtime_error("Shouldn't happen")); + }, http::reply::status_type::ok, http::experimental::client::clock_type::now() + std::chrono::milliseconds(100)); + + BOOST_REQUIRE_THROW(f.get(), httpd::timeout_error); + cln.close().get(); + }); + + when_all(std::move(client), std::move(server)).discard_result().get(); + }); +} + SEASTAR_TEST_CASE(test_client_retry_request) { return seastar::async([] { loopback_connection_factory lcf(1);