diff --git a/include/seastar/http/client.hh b/include/seastar/http/client.hh index 6601df4287e..a56b0fd6154 100644 --- a/include/seastar/http/client.hh +++ b/include/seastar/http/client.hh @@ -28,6 +28,7 @@ #include #include #include +#include #include namespace bi = boost::intrusive; @@ -169,6 +170,7 @@ class client { public: using reply_handler = noncopyable_function(const reply&, input_stream&& body)>; using retry_requests = bool_class; + using clock_type = lowres_clock; private: friend class http::internal::client_ref; @@ -185,17 +187,17 @@ private: using connection_ptr = seastar::shared_ptr; - future get_connection(); - future make_connection(); + future get_connection(clock_type::time_point timeout); + future make_connection(clock_type::time_point timeout); future<> put_connection(connection_ptr con); future<> shrink_connections(); template Fn> - auto with_connection(Fn&& fn); + auto with_connection(Fn&& fn, clock_type::time_point timeout); template requires std::invocable - auto with_new_connection(Fn&& fn); + auto with_new_connection(Fn&& fn, clock_type::time_point timeout); future<> do_make_request(connection& con, request& req, reply_handler& handle, std::optional expected); @@ -275,12 +277,13 @@ public: * \param req -- request to be sent * \param handle -- the response handler * \param expected -- the optional expected reply status code, default is std::nullopt + * \param send_timeout -- time point at which make_request will stop waiting for transport * * Note that the handle callback should be prepared to be called more than once, because * client may restart the whole request processing in case server closes the connection * in the middle of operation */ - future<> make_request(request req, reply_handler handle, std::optional expected = std::nullopt); + future<> make_request(request req, reply_handler handle, std::optional expected = std::nullopt, clock_type::time_point send_timeout = clock_type::time_point::max()); /** * \brief Updates the maximum number of connections a client may have diff --git a/include/seastar/http/exception.hh b/include/seastar/http/exception.hh index 048a9de42f7..f49e69aea8d 100644 --- a/include/seastar/http/exception.hh +++ b/include/seastar/http/exception.hh @@ -158,6 +158,17 @@ public: {} }; +/** + * Client-side exception to report that making request timed out + * prior to communicating to server + */ +class timeout_error : public base_exception { +public: + timeout_error() + : base_exception("client timed out", http::reply::status_type::request_timeout) + {} +}; + SEASTAR_MODULE_EXPORT_END } diff --git a/src/http/client.cc b/src/http/client.cc index 53664a68017..a456a5b6cfc 100644 --- a/src/http/client.cc +++ b/src/http/client.cc @@ -28,6 +28,7 @@ module; #include #include #include +#include #ifdef SEASTAR_MODULE module seastar; @@ -243,7 +244,7 @@ client::client(std::unique_ptr f, unsigned max_connections, { } -future client::get_connection() { +future client::get_connection(clock_type::time_point timeout) { if (!_pool.empty()) { connection_ptr con = _pool.front().shared_from_this(); _pool.pop_front(); @@ -252,19 +253,34 @@ future client::get_connection() { } if (_nr_connections >= _max_connections) { - return _wait_con.wait().then([this] { - return get_connection(); + return _wait_con.wait(timeout).then_wrapped([this, timeout] (auto f) { + try { + f.get(); + } catch (const condition_variable_timed_out&) { + return make_exception_future(httpd::timeout_error()); + } catch (...) { + return current_exception_as_future(); + } + return get_connection(timeout); }); } - return make_connection(); + return make_connection(timeout); } -future client::make_connection() { +future client::make_connection(clock_type::time_point timeout) { _total_new_connections++; - return _new_connections->make().then([cr = internal::client_ref(this)] (connected_socket cs) mutable { + return _new_connections->make().then([this, timeout, cr = internal::client_ref(this)] (connected_socket cs) mutable { http_log.trace("created new http connection {}", cs.local_address()); auto con = seastar::make_shared(std::move(cs), std::move(cr)); + if (clock_type::now() > timeout) { + // Factory made new connection, though it's too late already. Don't throw + // this effort out, but don't use it to serve current request either. + return put_connection(std::move(con)).then_wrapped([] (auto f) { + f.ignore_ready_future(); + return make_exception_future(httpd::timeout_error()); + }); + } return make_ready_future(std::move(con)); }); } @@ -311,8 +327,8 @@ future<> client::set_maximum_connections(unsigned nr) { } template Fn> -auto client::with_connection(Fn&& fn) { - return get_connection().then([this, fn = std::move(fn)] (connection_ptr con) mutable { +auto client::with_connection(Fn&& fn, clock_type::time_point timeout) { + return get_connection(timeout).then([this, fn = std::move(fn)] (connection_ptr con) mutable { return fn(*con).finally([this, con = std::move(con)] () mutable { return put_connection(std::move(con)); }); @@ -321,22 +337,22 @@ auto client::with_connection(Fn&& fn) { template requires std::invocable -auto client::with_new_connection(Fn&& fn) { - return make_connection().then([this, fn = std::move(fn)] (connection_ptr con) mutable { +auto client::with_new_connection(Fn&& fn, clock_type::time_point timeout) { + return make_connection(timeout).then([this, fn = std::move(fn)] (connection_ptr con) mutable { return fn(*con).finally([this, con = std::move(con)] () mutable { return put_connection(std::move(con)); }); }); } -future<> client::make_request(request req, reply_handler handle, std::optional expected) { - return do_with(std::move(req), std::move(handle), [this, expected] (request& req, reply_handler& handle) mutable { +future<> client::make_request(request req, reply_handler handle, std::optional expected, clock_type::time_point send_timeout) { + return do_with(std::move(req), std::move(handle), [this, expected, send_timeout] (request& req, reply_handler& handle) mutable { auto f = with_connection([this, &req, &handle, expected] (connection& con) { return do_make_request(con, req, handle, expected); - }); + }, send_timeout); if (_retry) { - f = f.handle_exception_type([this, &req, &handle, expected] (const std::system_error& ex) { + f = f.handle_exception_type([this, &req, &handle, expected, send_timeout] (const std::system_error& ex) { auto code = ex.code().value(); if ((code != EPIPE) && (code != ECONNABORTED)) { return make_exception_future<>(ex); @@ -347,7 +363,7 @@ future<> client::make_request(request req, reply_handler handle, std::optional& in) { } } +static future<> make_failing_http_request(http::experimental::client& cln) { + return cln.make_request(http::request::make("GET", "test", "/test"), [] (const http::reply& rep, input_stream&& in) { + return make_exception_future<>(std::runtime_error("Shouldn't happen")); + }, http::reply::status_type::ok); +} + SEASTAR_TEST_CASE(test_client_response_eof) { return seastar::async([] { loopback_connection_factory lcf(1); @@ -908,10 +914,7 @@ SEASTAR_TEST_CASE(test_client_response_eof) { future<> client = seastar::async([&lcf] { auto cln = http::experimental::client(std::make_unique(lcf)); - auto req = http::request::make("GET", "test", "/test"); - BOOST_REQUIRE_EXCEPTION(cln.make_request(std::move(req), [] (const http::reply& rep, input_stream&& in) { - return make_exception_future<>(std::runtime_error("Shouldn't happen")); - }, http::reply::status_type::ok).get(), std::system_error, [] (auto& ex) { + BOOST_REQUIRE_EXCEPTION(make_failing_http_request(cln).get(), std::system_error, [] (auto& ex) { return ex.code().value() == ECONNABORTED; }); @@ -940,9 +943,7 @@ SEASTAR_TEST_CASE(test_client_response_parse_error) { future<> client = seastar::async([&lcf] { auto cln = http::experimental::client(std::make_unique(lcf)); auto req = http::request::make("GET", "test", "/test"); - BOOST_REQUIRE_EXCEPTION(cln.make_request(std::move(req), [] (const http::reply& rep, input_stream&& in) { - return make_exception_future<>(std::runtime_error("Shouldn't happen")); - }, http::reply::status_type::ok).get(), std::runtime_error, [] (auto& ex) { + BOOST_REQUIRE_EXCEPTION(make_failing_http_request(cln).get(), std::runtime_error, [] (auto& ex) { return sstring(ex.what()).contains("Invalid http server response"); }); @@ -953,6 +954,74 @@ SEASTAR_TEST_CASE(test_client_response_parse_error) { }); } +SEASTAR_TEST_CASE(test_client_request_send_timeout_cached_conn) { + return seastar::async([] { + loopback_connection_factory lcf(1); + auto ss = lcf.get_server_socket(); + promise<> server_started; + future<> server = ss.accept().then([&] (accept_result ar) { + server_started.set_value(); + return seastar::async([sk = std::move(ar.connection)] () mutable { + input_stream in = sk.input(); + read_simple_http_request(in); + sleep(std::chrono::seconds(1)).get(); + output_stream out = sk.output(); + out.close().get(); + }); + }); + + future<> client = seastar::async([&lcf, &server_started] { + auto cln = http::experimental::client(std::make_unique(lcf), 1 /* max connections */); + // this request gets handled by server and ... + auto f1 = make_failing_http_request(cln); + server_started.get_future().get(); + // ... this should hang waiting for cached connection + auto f2 = cln.make_request(http::request::make("GET", "test", "/test"), [] (const auto& rep, auto&& in) { + return make_exception_future<>(std::runtime_error("Shouldn't happen")); + }, http::reply::status_type::ok, http::experimental::client::clock_type::now() + std::chrono::milliseconds(100)); + + BOOST_REQUIRE_THROW(f2.get(), httpd::timeout_error); + cln.close().get(); + try { + f1.get(); + } catch (...) { + } + }); + + when_all(std::move(client), std::move(server)).discard_result().get(); + }); +} + +SEASTAR_TEST_CASE(test_client_request_send_timeout_new_conn) { + return seastar::async([] { + loopback_connection_factory lcf(1); + auto ss = lcf.get_server_socket(); + future<> server = ss.accept().discard_result(); + + class delayed_factory : public loopback_http_factory { + public: + explicit delayed_factory(loopback_connection_factory& f) : loopback_http_factory(f) {} + virtual future make() override { + return sleep(std::chrono::seconds(1)).then([this] { + return loopback_http_factory::make(); + }); + } + }; + + auto client = async([&] { + auto cln = http::experimental::client(std::make_unique(lcf)); + auto f = cln.make_request(http::request::make("GET", "test", "/test"), [] (const auto& rep, auto&& in) { + return make_exception_future<>(std::runtime_error("Shouldn't happen")); + }, http::reply::status_type::ok, http::experimental::client::clock_type::now() + std::chrono::milliseconds(100)); + + BOOST_REQUIRE_THROW(f.get(), httpd::timeout_error); + cln.close().get(); + }); + + when_all(std::move(client), std::move(server)).discard_result().get(); + }); +} + SEASTAR_TEST_CASE(test_client_retry_request) { return seastar::async([] { loopback_connection_factory lcf(1);