From 7f0febd55019d3b93f31263a4a6beb7e6cb193c1 Mon Sep 17 00:00:00 2001 From: Patryk Wrobel Date: Wed, 22 May 2024 12:12:20 +0200 Subject: [PATCH] io_tester: implement latency correction for jobs with RPS Currently, for jobs with RPS (request-per-second) rate specified, io_tester measures only service time. It means, that if servicing one of requests takes much longer than available delta time between consecutive requests (issuing the next request is delayed), then the measured latency does not depict that fact. In such case we issue less requests than required and high latency is reported just for the first request. For instance, if there is a latency spike for one of requests, that exceeds the available time of service according to RPS schedule, then the total number of scheduled requests does not match the expected count calculated as 'TOTAL = duration_seconds * RPS'. Furthermore, the percentiles with latency printed at the end of the simulation may show inaccurate data. Firstly, the count of samples is lower than expected. Secondly, if the amount of time needed to handle requests after the latency spike returned to the ordinary value, then our statistics show that handling of only one request was long, but it is not true - io_tester stopped sending requests at given RPS and this way other requests could not be properly measured. This indicates that io_tester suffers from coordinated omission problem. This change implements latency correction flag. When it is enabled, then io_tester measures the total request latency including the delay between the expected schedule time and the actual schedule time. Moreover, if any of requests take more time than available, then io_tester tries to schedule 'delayed' requests as soon as possible to return to the defined schedule. Signed-off-by: Patryk Wrobel --- apps/io_tester/io_tester.cc | 54 ++++++++++++++++++++++++++++++++----- 1 file changed, 48 insertions(+), 6 deletions(-) diff --git a/apps/io_tester/io_tester.cc b/apps/io_tester/io_tester.cc index 29a05ca38ab..758717b6dca 100644 --- a/apps/io_tester/io_tester.cc +++ b/apps/io_tester/io_tester.cc @@ -255,6 +255,7 @@ struct job_config { std::array quantiles = { 0.5, 0.95, 0.99, 0.999}; static bool keep_files = false; +static bool latency_correction_enabled = false; future<> maybe_remove_file(sstring fname) { return keep_files ? make_ready_future<>() : remove_file(fname); @@ -284,6 +285,7 @@ class class_data { std::uniform_int_distribution _pos_distribution; file _file; bool _think = false; + bool _latency_correction_enabled = false; ::sleep_fn _sleep_fn = timer_sleep; timer<> _thinker; @@ -296,6 +298,7 @@ class class_data { , _sg(cfg.shard_info.scheduling_group) , _latencies(extended_p_square_probabilities = quantiles) , _pos_distribution(0, _config.file_size / _config.shard_info.request_size) + , _latency_correction_enabled(latency_correction_enabled) , _sleep_fn(_config.options.sleep_fn) , _thinker([this] { think_tick(); }) { @@ -304,6 +307,12 @@ class class_data { } else if (_config.shard_info.think_time > 0us) { _think = true; } + + // When using the schedule for issuing requests we need to ensure that requests are not issued before + // their exact schedule time. Therefore, the sleep function must have the desired accuracy. + if (_latency_correction_enabled && rps() != 0) { + _sleep_fn = timer_sleep; + } } virtual ~class_data() = default; @@ -344,11 +353,28 @@ class class_data { auto buf = bufptr.get(); auto pause = std::chrono::duration_cast(1s) / rps; auto pause_dist = _config.options.pause_fn(pause); - return seastar::sleep((pause / parallelism) * dummy).then([this, buf, stop, pause = pause_dist.get(), &intent, &in_flight] () mutable { - return do_until([this, stop] { return std::chrono::steady_clock::now() > stop || requests() > limit(); }, [this, buf, stop, pause, &intent, &in_flight] () mutable { + auto request_id_buf = std::make_unique(0u); + auto request_id_ptr = request_id_buf.get(); + return seastar::sleep((pause / parallelism) * dummy).then([this, buf, stop, pause = pause_dist.get(), &intent, &in_flight, request_id_ptr] () mutable { + auto issuer_start = std::chrono::steady_clock::now(); + return do_until([this, stop] { return std::chrono::steady_clock::now() > stop || requests() > limit(); }, + [this, buf, stop, pause, &intent, &in_flight, request_id_ptr, issuer_start] () mutable { auto start = std::chrono::steady_clock::now(); + auto& request_id = *request_id_ptr; + auto intended_request_start = start; + + if (_latency_correction_enabled) { + auto request_delta_time = pause->template get_as(); + intended_request_start = issuer_start + (request_id * request_delta_time); + if (std::chrono::steady_clock::now() < intended_request_start) { + throw std::runtime_error{fmt::format("Issuing request would invalidate the schedule - it is too early!")}; + } + } + in_flight++; - return issue_request(buf, &intent).then_wrapped([this, start, pause, stop, &in_flight] (auto size_f) { + request_id++; + + return issue_request(buf, &intent).then_wrapped([this, start, intended_request_start, pause, stop, &in_flight] (auto size_f) { size_t size; try { size = size_f.get(); @@ -359,12 +385,20 @@ class class_data { } auto now = std::chrono::steady_clock::now(); + auto service_time = std::chrono::duration_cast(now - start); + auto request_latency = service_time; + if (_latency_correction_enabled) { + auto submission_delay = std::chrono::duration_cast(start - intended_request_start); + request_latency = (submission_delay + service_time); + } + if (now < stop) { - this->add_result(size, std::chrono::duration_cast(now - start)); + this->add_result(size, request_latency); } in_flight--; + auto p = pause->template get_as(); - auto next = start + p; + auto next = intended_request_start + p; if (next > now) { return this->_sleep_fn(next, now); @@ -377,7 +411,7 @@ class class_data { }).then([&intent, &in_flight] { intent.cancel(); return do_until([&in_flight] { return in_flight == 0; }, [] { return seastar::sleep(100ms /* ¯\_(ツ)_/¯ */); }); - }).finally([bufptr = std::move(bufptr), pause = std::move(pause_dist)] {}); + }).finally([bufptr = std::move(bufptr), pause = std::move(pause_dist), request_id_buf = std::move(request_id_buf)] {}); }); }); } @@ -1090,6 +1124,9 @@ int main(int ac, char** av) { ("duration", bpo::value()->default_value(10), "for how long (in seconds) to run the test") ("conf", bpo::value()->default_value("./conf.yaml"), "YAML file containing benchmark specification") ("keep-files", bpo::value()->default_value(false), "keep test files, next run may re-use them") + ("latency-correction", bpo::value()->default_value(false), "meaningful only for jobs that specify RPS (requests per second) rate; " + "accounts delay between expected request submission time and the actual " + "submission time into measured latency") ; distributed ctx; @@ -1111,6 +1148,11 @@ int main(int ac, char** av) { } } + latency_correction_enabled = opts["latency-correction"].as(); + if (latency_correction_enabled) { + fmt::print("Warning: sleep function for jobs using RPS will be forced to use steady_clock because of latency correction enabled!\n"); + } + keep_files = opts["keep-files"].as(); auto& duration = opts["duration"].as(); auto& yaml = opts["conf"].as();