Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

io_tester: implement latency correction for jobs with RPS #2260

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 48 additions & 6 deletions apps/io_tester/io_tester.cc
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,7 @@ struct job_config {

std::array<double, 4> quantiles = { 0.5, 0.95, 0.99, 0.999};
static bool keep_files = false;
static bool latency_correction_enabled = false;

future<> maybe_remove_file(sstring fname) {
return keep_files ? make_ready_future<>() : remove_file(fname);
Expand Down Expand Up @@ -284,6 +285,7 @@ class class_data {
std::uniform_int_distribution<uint32_t> _pos_distribution;
file _file;
bool _think = false;
bool _latency_correction_enabled = false;
::sleep_fn _sleep_fn = timer_sleep<lowres_clock>;
timer<> _thinker;

Expand All @@ -296,6 +298,7 @@ class class_data {
, _sg(cfg.shard_info.scheduling_group)
, _latencies(extended_p_square_probabilities = quantiles)
, _pos_distribution(0, _config.file_size / _config.shard_info.request_size)
, _latency_correction_enabled(latency_correction_enabled)
, _sleep_fn(_config.options.sleep_fn)
, _thinker([this] { think_tick(); })
{
Expand All @@ -304,6 +307,12 @@ class class_data {
} else if (_config.shard_info.think_time > 0us) {
_think = true;
}

// When using the schedule for issuing requests we need to ensure that requests are not issued before
// their exact schedule time. Therefore, the sleep function must have the desired accuracy.
if (_latency_correction_enabled && rps() != 0) {
_sleep_fn = timer_sleep<std::chrono::steady_clock>;
}
}

virtual ~class_data() = default;
Expand Down Expand Up @@ -344,11 +353,28 @@ class class_data {
auto buf = bufptr.get();
auto pause = std::chrono::duration_cast<std::chrono::microseconds>(1s) / rps;
auto pause_dist = _config.options.pause_fn(pause);
return seastar::sleep((pause / parallelism) * dummy).then([this, buf, stop, pause = pause_dist.get(), &intent, &in_flight] () mutable {
return do_until([this, stop] { return std::chrono::steady_clock::now() > stop || requests() > limit(); }, [this, buf, stop, pause, &intent, &in_flight] () mutable {
auto request_id_buf = std::make_unique<unsigned>(0u);
auto request_id_ptr = request_id_buf.get();
return seastar::sleep((pause / parallelism) * dummy).then([this, buf, stop, pause = pause_dist.get(), &intent, &in_flight, request_id_ptr] () mutable {
auto issuer_start = std::chrono::steady_clock::now();
return do_until([this, stop] { return std::chrono::steady_clock::now() > stop || requests() > limit(); },
[this, buf, stop, pause, &intent, &in_flight, request_id_ptr, issuer_start] () mutable {
auto start = std::chrono::steady_clock::now();
auto& request_id = *request_id_ptr;
auto intended_request_start = start;

if (_latency_correction_enabled) {
auto request_delta_time = pause->template get_as<std::chrono::microseconds>();
intended_request_start = issuer_start + (request_id * request_delta_time);
if (std::chrono::steady_clock::now() < intended_request_start) {
throw std::runtime_error{fmt::format("Issuing request would invalidate the schedule - it is too early!")};
}
}

in_flight++;
return issue_request(buf, &intent).then_wrapped([this, start, pause, stop, &in_flight] (auto size_f) {
request_id++;

return issue_request(buf, &intent).then_wrapped([this, start, intended_request_start, pause, stop, &in_flight] (auto size_f) {
size_t size;
try {
size = size_f.get();
Expand All @@ -359,12 +385,20 @@ class class_data {
}

auto now = std::chrono::steady_clock::now();
auto service_time = std::chrono::duration_cast<std::chrono::microseconds>(now - start);
auto request_latency = service_time;
if (_latency_correction_enabled) {
auto submission_delay = std::chrono::duration_cast<std::chrono::microseconds>(start - intended_request_start);
request_latency = (submission_delay + service_time);
}

if (now < stop) {
this->add_result(size, std::chrono::duration_cast<std::chrono::microseconds>(now - start));
this->add_result(size, request_latency);
}
in_flight--;

auto p = pause->template get_as<std::chrono::microseconds>();
auto next = start + p;
auto next = intended_request_start + p;

if (next > now) {
return this->_sleep_fn(next, now);
Expand All @@ -377,7 +411,7 @@ class class_data {
}).then([&intent, &in_flight] {
intent.cancel();
return do_until([&in_flight] { return in_flight == 0; }, [] { return seastar::sleep(100ms /* ¯\_(ツ)_/¯ */); });
}).finally([bufptr = std::move(bufptr), pause = std::move(pause_dist)] {});
}).finally([bufptr = std::move(bufptr), pause = std::move(pause_dist), request_id_buf = std::move(request_id_buf)] {});
});
});
}
Expand Down Expand Up @@ -1090,6 +1124,9 @@ int main(int ac, char** av) {
("duration", bpo::value<unsigned>()->default_value(10), "for how long (in seconds) to run the test")
("conf", bpo::value<sstring>()->default_value("./conf.yaml"), "YAML file containing benchmark specification")
("keep-files", bpo::value<bool>()->default_value(false), "keep test files, next run may re-use them")
("latency-correction", bpo::value<bool>()->default_value(false), "meaningful only for jobs that specify RPS (requests per second) rate; "
"accounts delay between expected request submission time and the actual "
"submission time into measured latency")
;

distributed<context> ctx;
Expand All @@ -1111,6 +1148,11 @@ int main(int ac, char** av) {
}
}

latency_correction_enabled = opts["latency-correction"].as<bool>();
if (latency_correction_enabled) {
fmt::print("Warning: sleep function for jobs using RPS will be forced to use steady_clock because of latency correction enabled!\n");
}

keep_files = opts["keep-files"].as<bool>();
auto& duration = opts["duration"].as<unsigned>();
auto& yaml = opts["conf"].as<sstring>();
Expand Down