From 6de08b079d995dba53ab1993c34fd189d0d3d7cc Mon Sep 17 00:00:00 2001 From: Kefu Chai Date: Sun, 19 May 2024 15:26:00 +0800 Subject: [PATCH 01/20] tests: enable prometheus_test.py to test metrics without aggregation in order to reduce the replicated code, this change adds a test to prometheus_test.py, to verify the metrics without aggregations. since we already have the coverage for this feature, let's drop it from test_metrics.py. Signed-off-by: Kefu Chai --- tests/unit/prometheus_test.py | 153 ++++++++++++++++++++++++++++------ tests/unit/test_metrics.py | 48 ----------- 2 files changed, 129 insertions(+), 72 deletions(-) diff --git a/tests/unit/prometheus_test.py b/tests/unit/prometheus_test.py index 00953f2002f..2f110334afd 100755 --- a/tests/unit/prometheus_test.py +++ b/tests/unit/prometheus_test.py @@ -21,33 +21,85 @@ # import argparse +import math import re import subprocess import sys import unittest import urllib.request import urllib.parse +import yaml from typing import Optional from collections import namedtuple class Exposition: + @classmethod + def from_hist(cls, + name: str, + hist: list[tuple[float, int]], + sum_: int, + count: int) -> 'Exposition': + # ignore these values, we might need to verify them in future + _, _ = sum_, count + buckets = (cls.value_to_bucket(le - 1) for le, _ in hist) + deltas = [] + last_n = 0 + for _, n in hist: + delta = n - last_n + last_n = n + deltas.append(delta) + return cls(name, dict(zip(buckets, deltas)), {}) + + @staticmethod + def value_to_bucket(value): + low = 2 ** math.floor(math.log(value, 2)) + high = 2 * low + dif = (high - low) / 4 + return low + dif * math.floor((value - low) / dif) + + @staticmethod + def _values_to_histogram(values): + hist = {} + for val in values: + bucket = Exposition.value_to_bucket(val) + if bucket in hist: + hist[bucket] += 1 + else: + hist[bucket] = 1 + return hist + + @classmethod + def from_conf(cls, + name: str, + type_: str, + values: list[str], + labels: dict[str, str]) -> 'Exposition': + if type_ in ('gauge', 'counter'): + assert len(values) == 1 + return cls(name, float(values[0]), labels) + if type_ == 'histogram': + hist = cls._values_to_histogram(float(v) for v in values) + return cls(name, hist, {}) + raise NotImplementedError(f'unsupported type: {type_}') + def __init__(self, name: str, - type_: str, - value: str, - labels: Optional[dict[str, str]] = None) -> None: + value: int | list[tuple[float, int]], + labels: dict[str, str]) -> None: self.name = name - if type_ == 'counter': - self.value = float(value) - elif type_ == 'gauge': - self.value = float(value) - else: - # we don't verify histogram or summary yet - self.value = None + self.value = value self.labels = labels + def __repr__(self): + return f"{self.name=}, {self.value=}, {self.labels=}" + + def __eq__(self, other): + if not isinstance(other, Exposition): + return False + return self.value == other.value + class Metrics: prefix = 'seastar' @@ -89,36 +141,67 @@ def get(self, full_name = None if name is not None: full_name = f'{self.prefix}_{self.group}_{name}' - results: list[Exposition] = [] metric_type = None + # for histogram and summary as they are represented with multiple lines + hist_name = '' + hist_buckets = [] + hist_sum = 0 + hist_count = 0 + for line in self.lines: if not line: continue if line.startswith('# HELP'): continue if line.startswith('# TYPE'): - _, _, metric_name, type_ = line.split() - if full_name is None or metric_name == full_name: - metric_type = type_ + _, _, type_metric_name, metric_type = line.split() + if hist_buckets: + yield Exposition.from_hist(hist_name, + hist_buckets, + hist_sum, + hist_count) + hist_buckets = [] + if metric_type in ('histogram', 'summary'): + hist_name = type_metric_name continue matched = self.pattern.match(line) assert matched, f'malformed metric line: {line}' - metric_name = matched.group('metric_name') - if full_name and metric_name != full_name: + value_metric_name = matched.group('metric_name') + if full_name and not value_metric_name.startswith(full_name): continue metric_labels = self._parse_labels(matched.group('labels')) if labels is not None and metric_labels != labels: continue - metric_value = matched.group('value') - results.append(Exposition(metric_name, - metric_type, - metric_value, - metric_labels)) - return results + metric_value = float(matched.group('value')) + if metric_type == 'histogram': + if value_metric_name == f'{type_metric_name}_bucket': + last_value = 0 + if hist_buckets: + last_value = hist_buckets[-1][1] + if metric_value - last_value != 0: + le = metric_labels['le'].strip('"') + hist_buckets.append((float(le), metric_value)) + elif value_metric_name == f'{type_metric_name}_sum': + hist_sum = metric_value + elif value_metric_name == f'{type_metric_name}_count': + hist_count = metric_value + else: + raise RuntimeError(f'unknown histogram value: {line}') + elif metric_type == 'summary': + raise NotImplementedError('unsupported type: summary') + else: + yield Exposition(type_metric_name, + metric_value, + metric_labels) + if hist_buckets: + yield Exposition.from_hist(hist_name, + hist_buckets, + hist_sum, + hist_count) def get_help(self, name: str) -> Optional[str]: full_name = f'{self.prefix}_{self.group}_{name}' @@ -175,7 +258,27 @@ def _get_metrics(cls, body = f.read().decode('utf-8') return Metrics(body.rstrip().split('\n')) - def test_filtering_by_label(self) -> None: + def test_filtering_by_label_sans_aggregation(self) -> None: + labels = {'private': '1'} + metrics = self._get_metrics(labels=labels) + actual_values = list(metrics.get()) + expected_values = [] + with open(self.exporter_config, encoding='utf-8') as f: + config = yaml.safe_load(f) + for metric in config['metrics']: + name = metric['name'] + metric_name = f'{Metrics.prefix}_{Metrics.group}_{name}' + metric_labels = metric['labels'] + if metric_labels != labels: + continue + e = Exposition.from_conf(metric_name, + metric['type'], + metric['values'], + metric_labels) + expected_values.append(e) + self.assertCountEqual(actual_values, expected_values) + + def test_filtering_by_label_with_aggregation(self) -> None: TestCase = namedtuple('TestCase', ['label', 'regex', 'found']) label = 'private' tests = [ @@ -188,7 +291,8 @@ def test_filtering_by_label(self) -> None: for test in tests: with self.subTest(regex=test.regex, found=test.found): metrics = self._get_metrics(labels={test.label: test.regex}) - self.assertEqual(len(metrics.get()), test.found) + values = list(metrics.get()) + self.assertEqual(len(values), test.found) def test_aggregated(self) -> None: name = 'counter_1' @@ -219,6 +323,7 @@ def test_help(self) -> None: self.assertIsNone(msg) + if __name__ == '__main__': parser = argparse.ArgumentParser() parser.add_argument('--exporter', diff --git a/tests/unit/test_metrics.py b/tests/unit/test_metrics.py index 236826ee5e0..b5a5773e42f 100755 --- a/tests/unit/test_metrics.py +++ b/tests/unit/test_metrics.py @@ -29,44 +29,6 @@ def query_prometheus(host, query, type): ) -def validate_text(url): - resp = requests.get("http://" + url) - val = None - res = {} - for l in resp.iter_lines(): - if not l: - continue - ln = l.decode("utf-8") - if "HELP" in ln: - continue - if "TYPE" in ln: - if val: - res[name] = {"name": name, "type": type, "value": val} - m = MATCH_TYPE.match(ln) - name = m.group(1) - type = m.group(2) - last_val = 0 - val = None - else: - if type == "histogram": - m = MATCH_HISTOGRAM.match(ln) - if not m: - continue - le = val_to_bucket(float(m.group(1)) - 1) - value = float(m.group(2)) - if not val: - val = {} - if value > last_val: - val[le] = value - last_val - last_val = value - else: - m = MATCH_VALUE.match(ln) - val = float(m.group(1)) - if val: - res[name] = {"name": name, "type": type, "value": val} - return res - - def val_to_bucket(val): low = 2 ** math.floor(math.log(val, 2)) high = 2 * low @@ -118,16 +80,6 @@ def conf_to_metrics(conf): metrics = yaml.safe_load(file) conf_metrics = conf_to_metrics(metrics) -from_text_metrics = validate_text(args.host) - -# Validate text format -for v in conf_metrics: - if v not in from_text_metrics: - print("Text format: metrics ", v, "is missing") - if from_text_metrics[v]["value"] != conf_metrics[v]["value"]: - print('Text format: Metrics', v, 'type', from_text_metrics[v]['type'], - 'Mismatch, expected', from_text_metrics[v]['value'], '!=', conf_metrics[v]['value']) - # Validate protobuf for v in conf_metrics: res = query_prometheus(args.prometheus, v, conf_metrics[v]["type"]) From ca94c445f6b46a1111a44f8017f7f1352085b970 Mon Sep 17 00:00:00 2001 From: Kefu Chai Date: Sun, 19 May 2024 18:46:49 +0800 Subject: [PATCH 02/20] tests: test protobuf support in prometheus_test.py in this change, we * add a test for testing protobuf support in prometheus_test.py * drop test_metrics.py, as all the tests in it have been moved into prometheus_test.py Signed-off-by: Kefu Chai --- tests/unit/prometheus_test.py | 60 ++++++++++++++++++++++++ tests/unit/test_metrics.py | 88 ----------------------------------- 2 files changed, 60 insertions(+), 88 deletions(-) delete mode 100755 tests/unit/test_metrics.py diff --git a/tests/unit/prometheus_test.py b/tests/unit/prometheus_test.py index 2f110334afd..449883ff524 100755 --- a/tests/unit/prometheus_test.py +++ b/tests/unit/prometheus_test.py @@ -22,9 +22,11 @@ import argparse import math +import json import re import subprocess import sys +import time import unittest import urllib.request import urllib.parse @@ -218,6 +220,8 @@ class TestPrometheus(unittest.TestCase): exporter_process = None exporter_config = None port = 10001 + prometheus = None + prometheus_scrape_interval = 15 @classmethod def setUpClass(cls) -> None: @@ -322,6 +326,54 @@ def test_help(self) -> None: else: self.assertIsNone(msg) + @staticmethod + def _from_native_histogram(values) -> dict[float, float]: + results = {} + for v in values: + bucket = Exposition.value_to_bucket(float(v[2]) - 1) + results[bucket] = float(v[3]) + return results + + @staticmethod + def _query_prometheus(host: str, query: str, type_: str) -> float | dict[float, float]: + url = f'http://{host}/api/v1/query?query={query}' + headers = {"Accept": "application/json"} + req = urllib.request.Request(url, headers=headers) + with urllib.request.urlopen(req) as f: + results = json.load(f)["data"]["result"][0] + if type_ == 'histogram': + buckets = results["histogram"][1]["buckets"] + return TestPrometheus._from_native_histogram(buckets) + return float(results["value"][1]) + + def test_protobuf(self) -> None: + if self.prometheus is None: + self.skipTest("prometheus is not configured") + + # Prometheus does not allow us to push metrics to it, neither + # can we force it to scrape an exporter, so we have to wait + # until prometheus scrapes the server + time.sleep(self.prometheus_scrape_interval + 1) + with open(self.exporter_config, encoding='utf-8') as f: + config = yaml.safe_load(f) + + labels = {'private': '1'} + for metric in config['metrics']: + name = metric['name'] + metric_name = f'{Metrics.prefix}_{Metrics.group}_{name}' + metric_labels = metric['labels'] + if metric_labels != labels: + continue + metric_type = metric['type'] + metric_value = metric['values'] + e = Exposition.from_conf(metric_name, + metric_type, + metric_value, + metric_labels) + res = self._query_prometheus(self.prometheus, + metric_name, + metric_type) + self.assertEqual(res, e.value) if __name__ == '__main__': @@ -332,8 +384,16 @@ def test_help(self) -> None: parser.add_argument('--config', required=True, help='Path to the metrics definition file') + parser.add_argument('--prometheus', + help='A Prometheus to connect to') + parser.add_argument('--prometheus-scrape-interval', + type=int, + help='Prometheus scrape interval (in seconds)', + default=15) opts, remaining = parser.parse_known_args() remaining.insert(0, sys.argv[0]) TestPrometheus.exporter_path = opts.exporter TestPrometheus.exporter_config = opts.config + TestPrometheus.prometheus = opts.prometheus + TestPrometheus.prometheus_scrape_interval = opts.prometheus_scrape_interval unittest.main(argv=remaining) diff --git a/tests/unit/test_metrics.py b/tests/unit/test_metrics.py deleted file mode 100755 index b5a5773e42f..00000000000 --- a/tests/unit/test_metrics.py +++ /dev/null @@ -1,88 +0,0 @@ -#!/usr/bin/env python3 - -import argparse -import requests -import yaml -import math -import re - -MATCH_TYPE = re.compile("# TYPE (.*) (.*)") -MATCH_VALUE = re.compile(r".*\{.*\} ([\d]+)") -MATCH_HISTOGRAM = re.compile(r'.*\{.*le="([\d]+\.[\d]+)".*\} ([\d]+)') - - -def from_native_histogram(values): - results = {} - for v in values: - results[val_to_bucket(float(v[2]) - 1)] = float(v[3]) - return results - - -def query_prometheus(host, query, type): - url = "http://" + host + "/api/v1/query?query=" + query - r = requests.get(url, headers={"Accept": "application/json"}) - results = r.json()["data"]["result"][0] - return ( - from_native_histogram(results["histogram"][1]["buckets"]) - if type == "histogram" - else float(results["value"][1]) - ) - - -def val_to_bucket(val): - low = 2 ** math.floor(math.log(val, 2)) - high = 2 * low - dif = (high - low) / 4 - return low + dif * math.floor((val - low) / dif) - - -def mk_histogram(values): - hist = {} - for val in values: - bucket = val_to_bucket(val) - if bucket not in hist: - hist[bucket] = 1 - else: - hist[bucket] = hist[bucket] + 1 - return hist - - -def conf_to_metrics(conf): - res = {} - for c in conf["metrics"]: - name = "seastar_test_group_" + c["name"] - res[name] = c - res[name]["value"] = ( - mk_histogram(c["values"]) if c["type"] == "histogram" else c["values"][0] - ) - return res - - -parser = argparse.ArgumentParser( - description="Validate that the text and protobuf metrics representative work as expected. You will need to run metrics_tester and a Prometheus server that reads from the metrics_tester", - conflict_handler="resolve", -) -parser.add_argument( - "-h", - "--host", - default="localhost:9180/metrics", - help="A host to connect to (the metrics_tester)", -) -parser.add_argument( - "-p", "--prometheus", default="localhost:9090", help="A Prometheus to connect to" -) -parser.add_argument( - "-c", "--config", default="conf.yaml", help="The metrics definition file" -) -args = parser.parse_args() - -with open(args.config, "r") as file: - metrics = yaml.safe_load(file) - conf_metrics = conf_to_metrics(metrics) - -# Validate protobuf -for v in conf_metrics: - res = query_prometheus(args.prometheus, v, conf_metrics[v]["type"]) - if res != conf_metrics[v]["value"]: - print("Protobuf format: Metrics", v, "type", conf_metrics[v]["type"], "Mismatch, expected", - res, "!=", conf_metrics[v]["value"]) From 75367799f6c7664031da37fad7a67f8b34da638b Mon Sep 17 00:00:00 2001 From: Pavel Emelyanov Date: Tue, 4 Jun 2024 17:01:33 +0300 Subject: [PATCH 03/20] loopback_socket: Rename buffer's shutdown() to abort() The method actualy shuts the socket down the hard way. There will appear a less disruptive call, so the existing one deserves some better name. Signed-off-by: Pavel Emelyanov --- tests/unit/loopback_socket.hh | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/tests/unit/loopback_socket.hh b/tests/unit/loopback_socket.hh index a2ac891af2f..c24ab14f652 100644 --- a/tests/unit/loopback_socket.hh +++ b/tests/unit/loopback_socket.hh @@ -69,7 +69,7 @@ public: return make_exception_future<>(std::runtime_error("test injected glitch on send")); } if (error == loopback_error_injector::error::abort) { - shutdown(); + abort(); return make_exception_future<>(std::runtime_error("test injected error on send")); } } @@ -85,13 +85,13 @@ public: return make_exception_future>(std::runtime_error("test injected glitch on receive")); } if (error == loopback_error_injector::error::abort) { - shutdown(); + abort(); return make_exception_future>(std::runtime_error("test injected error on receive")); } } return _q.pop_eventually(); } - void shutdown() noexcept { + void abort() noexcept { if (!_aborted) { // it can be called by both -- reader and writer socket impls _shutdown.set_value(); @@ -159,7 +159,7 @@ public: } future<> close() override { if (!_eof) { - _buffer->shutdown(); + _buffer->abort(); } return make_ready_future<>(); } @@ -180,11 +180,11 @@ public: return data_sink(std::make_unique(_tx, [this] { shutdown_input(); })); } void shutdown_input() override { - _rx->shutdown(); + _rx->abort(); } void shutdown_output() override { (void)smp::submit_to(_tx->get_owner_shard(), [tx = _tx] { - (*tx)->shutdown(); + (*tx)->abort(); }); } void set_nodelay(bool nodelay) override { @@ -330,9 +330,9 @@ public: _connect_abort->set_exception(std::make_exception_ptr(std::system_error(ECONNABORTED, std::system_category()))); _connect_abort = std::nullopt; } else { - _b1->shutdown(); + _b1->abort(); (void)smp::submit_to(_b2.get_owner_shard(), [b2 = std::move(_b2)] { - b2->shutdown(); + b2->abort(); }); } } From 32c61d034376acf91b77b8091627f01e0ef0a1ab Mon Sep 17 00:00:00 2001 From: Pavel Emelyanov Date: Tue, 4 Jun 2024 17:11:38 +0300 Subject: [PATCH 04/20] loopback_socket: Shutdown socket on EOF close When a socket is closed after noticing the EOF message in its shared buffer, it doesn't resolve the "shutdown" future. This is not very nice, because socket is indeed dead, and no other code would resolve it, so it's good to do it on close(). In fact, it is the peer who has to resolve this future, but it should happen only after the queue with messages is drained and it complicates the logic. Resolving it on close is good enough. In the non-EOF case the socket is aborted, which also resolve the aforementioned future. Signed-off-by: Pavel Emelyanov --- tests/unit/loopback_socket.hh | 20 ++++++++++++++------ 1 file changed, 14 insertions(+), 6 deletions(-) diff --git a/tests/unit/loopback_socket.hh b/tests/unit/loopback_socket.hh index c24ab14f652..4659f4c3f66 100644 --- a/tests/unit/loopback_socket.hh +++ b/tests/unit/loopback_socket.hh @@ -56,7 +56,7 @@ private: queue> _q{1}; loopback_error_injector* _error_injector; type _type; - promise<> _shutdown; + std::optional> _shutdown; public: loopback_buffer(loopback_error_injector* error_injection, type t) : _error_injector(error_injection), _type(t) {} future<> push(temporary_buffer&& b) { @@ -92,16 +92,22 @@ public: return _q.pop_eventually(); } void abort() noexcept { - if (!_aborted) { - // it can be called by both -- reader and writer socket impls - _shutdown.set_value(); - } + shutdown(); _aborted = true; _q.abort(std::make_exception_ptr(std::system_error(EPIPE, std::system_category()))); } + void shutdown() noexcept { + // it can be called by both -- reader and writer socket impls + if (_shutdown.has_value()) { + _shutdown->set_value(); + _shutdown.reset(); + } + } future<> wait_input_shutdown() { - return _shutdown.get_future(); + assert(!_shutdown.has_value()); + _shutdown.emplace(); + return _shutdown->get_future(); } }; @@ -160,6 +166,8 @@ public: future<> close() override { if (!_eof) { _buffer->abort(); + } else { + _buffer->shutdown(); } return make_ready_future<>(); } From 09733c2ead497cfc4e2bfb347aa1c53809917878 Mon Sep 17 00:00:00 2001 From: Pavel Emelyanov Date: Tue, 4 Jun 2024 17:33:35 +0300 Subject: [PATCH 05/20] test/http: Generalize http connection factory There's such a wrapper around loopback connection factory that fits it to http client needs. Several tests copy-and-pase it, new tests would appear soon, it's time to generalize it. Signed-off-by: Pavel Emelyanov --- tests/unit/httpd_test.cc | 39 ++++++++++++--------------------------- 1 file changed, 12 insertions(+), 27 deletions(-) diff --git a/tests/unit/httpd_test.cc b/tests/unit/httpd_test.cc index 070fe528bd4..767161ae136 100644 --- a/tests/unit/httpd_test.cc +++ b/tests/unit/httpd_test.cc @@ -41,6 +41,15 @@ class handl : public httpd::handler_base { } }; +class loopback_http_factory : public http::experimental::connection_factory { + loopback_socket_impl lsi; +public: + explicit loopback_http_factory(loopback_connection_factory& f) : lsi(f) {} + virtual future make() override { + return lsi.connect(socket_address(ipv4_addr()), socket_address(ipv4_addr())); + } +}; + SEASTAR_TEST_CASE(test_reply) { http::reply r; @@ -840,15 +849,7 @@ SEASTAR_TEST_CASE(test_client_unexpected_reply_status) { httpd::http_server_tester::listeners(server).emplace_back(lcf.get_server_socket()); future<> client = seastar::async([&lcf] { - class connection_factory : public http::experimental::connection_factory { - loopback_socket_impl lsi; - public: - explicit connection_factory(loopback_connection_factory& f) : lsi(f) {} - virtual future make() override { - return lsi.connect(socket_address(ipv4_addr()), socket_address(ipv4_addr())); - } - }; - auto cln = http::experimental::client(std::make_unique(lcf)); + auto cln = http::experimental::client(std::make_unique(lcf)); { auto req = http::request::make("GET", "test", "/test"); BOOST_REQUIRE_THROW(cln.make_request(std::move(req), [] (const http::reply& rep, input_stream&& in) { @@ -1093,15 +1094,7 @@ static future<> test_basic_content(bool streamed, bool chunked_reply) { } httpd::http_server_tester::listeners(server).emplace_back(lcf.get_server_socket()); future<> client = seastar::async([&lcf, chunked_reply] { - class connection_factory : public http::experimental::connection_factory { - loopback_socket_impl lsi; - public: - explicit connection_factory(loopback_connection_factory& f) : lsi(f) {} - virtual future make() override { - return lsi.connect(socket_address(ipv4_addr()), socket_address(ipv4_addr())); - } - }; - auto cln = http::experimental::client(std::make_unique(lcf)); + auto cln = http::experimental::client(std::make_unique(lcf)); { fmt::print("Simple request test\n"); @@ -1512,15 +1505,7 @@ SEASTAR_TEST_CASE(test_redirect_exception) { httpd::http_server_tester::listeners(server).emplace_back(lcf.get_server_socket()); future<> client = seastar::async([&lcf] { - class connection_factory : public http::experimental::connection_factory { - loopback_socket_impl lsi; - public: - explicit connection_factory(loopback_connection_factory& f) : lsi(f) {} - virtual future make() override { - return lsi.connect(socket_address(ipv4_addr()), socket_address(ipv4_addr())); - } - }; - auto cln = http::experimental::client(std::make_unique(lcf)); + auto cln = http::experimental::client(std::make_unique(lcf)); auto test = [&](sstring path, sstring expected_dest, http::reply::status_type expected_status) { auto req = http::request::make("GET", "test", path); From 682cb63036145fdfb2e66970e32009b70a6f3aea Mon Sep 17 00:00:00 2001 From: Pavel Emelyanov Date: Tue, 4 Jun 2024 18:05:38 +0300 Subject: [PATCH 06/20] http/client: Document max_connections It was added by 15942b9280 (http/client: Count and limit the number of connections) but wasn't documented, so do it now. Signed-off-by: Pavel Emelyanov --- include/seastar/http/client.hh | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/include/seastar/http/client.hh b/include/seastar/http/client.hh index e33519c1e7c..5bbe910e2f4 100644 --- a/include/seastar/http/client.hh +++ b/include/seastar/http/client.hh @@ -220,7 +220,14 @@ public: * may re-use the sockets on its own * * \param f -- the factory pointer + * \param max_connections -- maximum number of connection a client is allowed to maintain + * (both active and cached in pool) * + * The client uses connections provided by factory to send requests over and receive responses + * back. Once request-response cycle is over the connection used for that is kept by a client + * in a "pool". Making another http request may then pick up the existing connection from the + * pool thus avoiding the extra latency of establishing new connection. Pool may thus accumulate + * more than one connection if user sends several requests in parallel. */ explicit client(std::unique_ptr f, unsigned max_connections = default_max_connections); From 8160eb237476580b26cbb5dbb857a07c910c546d Mon Sep 17 00:00:00 2001 From: Pavel Emelyanov Date: Fri, 22 Sep 2023 12:14:59 +0300 Subject: [PATCH 07/20] http/client: Fix parser result checking Currently consuming http response doesn't check if parser failed reading the response check. Add it. Also, when parser sees correct, but incomplete response it sets itself to .eof(), not .failed(). This case should be told from invalid, so the system error with ECONABORTED status is thrown instead. Signed-off-by: Pavel Emelyanov --- src/http/client.cc | 7 +++- tests/unit/httpd_test.cc | 73 ++++++++++++++++++++++++++++++++++++++++ 2 files changed, 79 insertions(+), 1 deletion(-) diff --git a/src/http/client.cc b/src/http/client.cc index 68da0587154..796953576b2 100644 --- a/src/http/client.cc +++ b/src/http/client.cc @@ -137,7 +137,12 @@ future connection::recv_reply() { parser.init(); return _read_buf.consume(parser).then([this, &parser] { if (parser.eof()) { - throw std::runtime_error("Invalid response"); + http_log.trace("Parsing response EOFed"); + throw std::system_error(ECONNABORTED, std::system_category()); + } + if (parser.failed()) { + http_log.trace("Parsing response failed"); + throw std::runtime_error("Invalid http server response"); } auto resp = parser.get_parsed_response(); diff --git a/tests/unit/httpd_test.cc b/tests/unit/httpd_test.cc index 767161ae136..e9e6f96fedd 100644 --- a/tests/unit/httpd_test.cc +++ b/tests/unit/httpd_test.cc @@ -878,6 +878,79 @@ SEASTAR_TEST_CASE(test_client_unexpected_reply_status) { }); } +static void read_simple_http_request(input_stream& in) { + sstring req; + while (true) { + auto r = in.read().get(); + req += sstring(r.get(), r.size()); + if (req.ends_with("\r\n\r\n")) { + break; + } + } +} + +SEASTAR_TEST_CASE(test_client_response_eof) { + return seastar::async([] { + loopback_connection_factory lcf(1); + auto ss = lcf.get_server_socket(); + future<> server = ss.accept().then([] (accept_result ar) { + return seastar::async([sk = std::move(ar.connection)] () mutable { + input_stream in = sk.input(); + read_simple_http_request(in); + output_stream out = sk.output(); + out.write("HTT").get(); // write incomplete response + out.flush().get(); + out.close().get(); + }); + }); + + future<> client = seastar::async([&lcf] { + auto cln = http::experimental::client(std::make_unique(lcf)); + auto req = http::request::make("GET", "test", "/test"); + BOOST_REQUIRE_EXCEPTION(cln.make_request(std::move(req), [] (const http::reply& rep, input_stream&& in) { + return make_exception_future<>(std::runtime_error("Shouldn't happen")); + }, http::reply::status_type::ok).get(), std::system_error, [] (auto& ex) { + return ex.code().value() == ECONNABORTED; + }); + + cln.close().get(); + }); + + when_all(std::move(client), std::move(server)).discard_result().get(); + }); +} + +SEASTAR_TEST_CASE(test_client_response_parse_error) { + return seastar::async([] { + loopback_connection_factory lcf(1); + auto ss = lcf.get_server_socket(); + future<> server = ss.accept().then([] (accept_result ar) { + return seastar::async([sk = std::move(ar.connection)] () mutable { + input_stream in = sk.input(); + read_simple_http_request(in); + output_stream out = sk.output(); + out.write("HTTTT").get(); // write invalid line + out.flush().get(); + out.close().get(); + }); + }); + + future<> client = seastar::async([&lcf] { + auto cln = http::experimental::client(std::make_unique(lcf)); + auto req = http::request::make("GET", "test", "/test"); + BOOST_REQUIRE_EXCEPTION(cln.make_request(std::move(req), [] (const http::reply& rep, input_stream&& in) { + return make_exception_future<>(std::runtime_error("Shouldn't happen")); + }, http::reply::status_type::ok).get(), std::runtime_error, [] (auto& ex) { + return sstring(ex.what()).contains("Invalid http server response"); + }); + + cln.close().get(); + }); + + when_all(std::move(client), std::move(server)).discard_result().get(); + }); +} + SEASTAR_TEST_CASE(test_100_continue) { return seastar::async([] { loopback_connection_factory lcf(1); From 8fedb6b3a59ebe82e6b6a6ad2e439b918ef3c96f Mon Sep 17 00:00:00 2001 From: Pavel Emelyanov Date: Tue, 17 Oct 2023 13:18:09 +0300 Subject: [PATCH 08/20] http/client: Introduce make_new_connection() This needs splitting connection::get_connection() into get_ and make_connection. Both to facilitate next patching Signed-off-by: Pavel Emelyanov --- include/seastar/http/client.hh | 5 +++++ src/http/client.cc | 14 ++++++++++++++ 2 files changed, 19 insertions(+) diff --git a/include/seastar/http/client.hh b/include/seastar/http/client.hh index 5bbe910e2f4..42cdbdcd41d 100644 --- a/include/seastar/http/client.hh +++ b/include/seastar/http/client.hh @@ -180,12 +180,17 @@ class client { using connection_ptr = seastar::shared_ptr; future get_connection(); + future make_connection(); future<> put_connection(connection_ptr con); future<> shrink_connections(); template Fn> auto with_connection(Fn&& fn); + template + requires std::invocable + auto with_new_connection(Fn&& fn); + public: using reply_handler = noncopyable_function(const reply&, input_stream&& body)>; /** diff --git a/src/http/client.cc b/src/http/client.cc index 796953576b2..af45700144f 100644 --- a/src/http/client.cc +++ b/src/http/client.cc @@ -256,6 +256,10 @@ future client::get_connection() { }); } + return make_connection(); +} + +future client::make_connection() { _total_new_connections++; return _new_connections->make().then([cr = internal::client_ref(this)] (connected_socket cs) mutable { http_log.trace("created new http connection {}", cs.local_address()); @@ -314,6 +318,16 @@ auto client::with_connection(Fn&& fn) { }); } +template +requires std::invocable +auto client::with_new_connection(Fn&& fn) { + return make_connection().then([this, fn = std::move(fn)] (connection_ptr con) mutable { + return fn(*con).finally([this, con = std::move(con)] () mutable { + return put_connection(std::move(con)); + }); + }); +} + future<> client::make_request(request req, reply_handler handle, std::optional expected) { return with_connection([req = std::move(req), handle = std::move(handle), expected] (connection& con) mutable { return con.do_make_request(std::move(req)).then([&con, expected, handle = std::move(handle)] (connection::reply_ptr reply) mutable { From 6d38fd6af2e2dfd0ac0a3075ee7ea7abb8549564 Mon Sep 17 00:00:00 2001 From: Pavel Emelyanov Date: Thu, 21 Sep 2023 14:37:26 +0300 Subject: [PATCH 09/20] http/client: Pass request and handle by reference Currently request is passed by value all the way down to connection where it's moved to stable memory with do_with(std::move(req), ...) and then processing continues. Next patches are going to teach client restart requests over different connections, so the request should be moved into do_with-context at the client side. While at it -- keep the response handler in the stable context as well, next patching will appreciate this change. Signed-off-by: Pavel Emelyanov --- include/seastar/http/client.hh | 9 +++++++-- src/http/client.cc | 20 +++++++++++++------- 2 files changed, 20 insertions(+), 9 deletions(-) diff --git a/include/seastar/http/client.hh b/include/seastar/http/client.hh index 42cdbdcd41d..5dbe12b8854 100644 --- a/include/seastar/http/client.hh +++ b/include/seastar/http/client.hh @@ -126,7 +126,7 @@ public: future<> close(); private: - future do_make_request(request rq); + future do_make_request(request& rq); void setup_request(request& rq); future<> send_request_head(const request& rq); future maybe_wait_for_continue(const request& req); @@ -166,6 +166,10 @@ public: */ class client { +public: + using reply_handler = noncopyable_function(const reply&, input_stream&& body)>; + +private: friend class http::internal::client_ref; using connections_list_t = bi::list, bi::constant_time_size>; static constexpr unsigned default_max_connections = 100; @@ -191,8 +195,9 @@ class client { requires std::invocable auto with_new_connection(Fn&& fn); + future<> do_make_request(connection& con, request& req, reply_handler& handle, std::optional expected); + public: - using reply_handler = noncopyable_function(const reply&, input_stream&& body)>; /** * \brief Construct a simple client * diff --git a/src/http/client.cc b/src/http/client.cc index af45700144f..bc6abb4791c 100644 --- a/src/http/client.cc +++ b/src/http/client.cc @@ -156,8 +156,7 @@ future connection::recv_reply() { }); } -future connection::do_make_request(request req) { - return do_with(std::move(req), [this] (auto& req) { +future connection::do_make_request(request& req) { setup_request(req); return send_request_head(req).then([this, &req] { return maybe_wait_for_continue(req).then([this, &req] (reply_ptr cont) { @@ -172,13 +171,14 @@ future connection::do_make_request(request req) { }); }); }); - }); } future connection::make_request(request req) { - return do_make_request(std::move(req)).then([] (reply_ptr rep) { + return do_with(std::move(req), [this] (auto& req) { + return do_make_request(req).then([] (reply_ptr rep) { return make_ready_future(std::move(*rep)); }); + }); } input_stream connection::in(reply& rep) { @@ -329,8 +329,15 @@ auto client::with_new_connection(Fn&& fn) { } future<> client::make_request(request req, reply_handler handle, std::optional expected) { - return with_connection([req = std::move(req), handle = std::move(handle), expected] (connection& con) mutable { - return con.do_make_request(std::move(req)).then([&con, expected, handle = std::move(handle)] (connection::reply_ptr reply) mutable { + return do_with(std::move(req), std::move(handle), [this, expected] (request& req, reply_handler& handle) mutable { + return with_connection([this, &req, &handle, expected] (connection& con) { + return do_make_request(con, req, handle, expected); + }); + }); +} + +future<> client::do_make_request(connection& con, request& req, reply_handler& handle, std::optional expected) { + return con.do_make_request(req).then([&con, &handle, expected] (connection::reply_ptr reply) mutable { auto& rep = *reply; if (expected.has_value() && rep._status != expected.value()) { if (!http_log.is_enabled(log_level::debug)) { @@ -350,7 +357,6 @@ future<> client::make_request(request req, reply_handler handle, std::optional(std::move(ex)); }); - }); } future<> client::close() { From ec081a2e15c64a2501bb46fa4ccd7c03f8d7bca9 Mon Sep 17 00:00:00 2001 From: Pavel Emelyanov Date: Thu, 21 Sep 2023 14:39:11 +0300 Subject: [PATCH 10/20] http/client: Fix indentation after previous patch Signed-off-by: Pavel Emelyanov --- src/http/client.cc | 62 +++++++++++++++++++++++----------------------- 1 file changed, 31 insertions(+), 31 deletions(-) diff --git a/src/http/client.cc b/src/http/client.cc index bc6abb4791c..01a93a242c9 100644 --- a/src/http/client.cc +++ b/src/http/client.cc @@ -157,28 +157,28 @@ future connection::recv_reply() { } future connection::do_make_request(request& req) { - setup_request(req); - return send_request_head(req).then([this, &req] { - return maybe_wait_for_continue(req).then([this, &req] (reply_ptr cont) { - if (cont) { - return make_ready_future(std::move(cont)); - } + setup_request(req); + return send_request_head(req).then([this, &req] { + return maybe_wait_for_continue(req).then([this, &req] (reply_ptr cont) { + if (cont) { + return make_ready_future(std::move(cont)); + } - return write_body(req).then([this] { - return _write_buf.flush().then([this] { - return recv_reply(); - }); + return write_body(req).then([this] { + return _write_buf.flush().then([this] { + return recv_reply(); }); }); }); + }); } future connection::make_request(request req) { - return do_with(std::move(req), [this] (auto& req) { - return do_make_request(req).then([] (reply_ptr rep) { - return make_ready_future(std::move(*rep)); + return do_with(std::move(req), [this] (auto& req) { + return do_make_request(req).then([] (reply_ptr rep) { + return make_ready_future(std::move(*rep)); + }); }); - }); } input_stream connection::in(reply& rep) { @@ -337,26 +337,26 @@ future<> client::make_request(request req, reply_handler handle, std::optional client::do_make_request(connection& con, request& req, reply_handler& handle, std::optional expected) { - return con.do_make_request(req).then([&con, &handle, expected] (connection::reply_ptr reply) mutable { - auto& rep = *reply; - if (expected.has_value() && rep._status != expected.value()) { - if (!http_log.is_enabled(log_level::debug)) { - return make_exception_future<>(httpd::unexpected_status_error(rep._status)); - } + return con.do_make_request(req).then([&con, &handle, expected] (connection::reply_ptr reply) mutable { + auto& rep = *reply; + if (expected.has_value() && rep._status != expected.value()) { + if (!http_log.is_enabled(log_level::debug)) { + return make_exception_future<>(httpd::unexpected_status_error(rep._status)); + } - return do_with(con.in(rep), [reply = std::move(reply)] (auto& in) mutable { - return util::read_entire_stream_contiguous(in).then([reply = std::move(reply)] (auto message) { - http_log.debug("request finished with {}: {}", reply->_status, message); - return make_exception_future<>(httpd::unexpected_status_error(reply->_status)); - }); + return do_with(con.in(rep), [reply = std::move(reply)] (auto& in) mutable { + return util::read_entire_stream_contiguous(in).then([reply = std::move(reply)] (auto message) { + http_log.debug("request finished with {}: {}", reply->_status, message); + return make_exception_future<>(httpd::unexpected_status_error(reply->_status)); }); - } + }); + } - return handle(rep, con.in(rep)).finally([reply = std::move(reply)] {}); - }).handle_exception([&con] (auto ex) mutable { - con._persistent = false; - return make_exception_future<>(std::move(ex)); - }); + return handle(rep, con.in(rep)).finally([reply = std::move(reply)] {}); + }).handle_exception([&con] (auto ex) mutable { + con._persistent = false; + return make_exception_future<>(std::move(ex)); + }); } future<> client::close() { From 3c5c7ea8ad6d4cc8644b83e657a67f902e8ab400 Mon Sep 17 00:00:00 2001 From: Pavel Emelyanov Date: Tue, 17 Oct 2023 13:29:38 +0300 Subject: [PATCH 11/20] http/client: Retry request over fresh connection in case old one failed Currently if client::make_request() fails, the error is propagated back to the caller whatever the error is. Sometimes the request fails because of connection glitch, which can happen and in that case it's common to try the same request again after a while in a hope that it was indeed some teporary glitch. Signed-off-by: Pavel Emelyanov --- include/seastar/http/client.hh | 28 +++++++++++++++++++++- src/http/client.cc | 23 ++++++++++++++++-- tests/unit/httpd_test.cc | 43 ++++++++++++++++++++++++++++++++++ 3 files changed, 91 insertions(+), 3 deletions(-) diff --git a/include/seastar/http/client.hh b/include/seastar/http/client.hh index 5dbe12b8854..6601df4287e 100644 --- a/include/seastar/http/client.hh +++ b/include/seastar/http/client.hh @@ -168,6 +168,7 @@ public: class client { public: using reply_handler = noncopyable_function(const reply&, input_stream&& body)>; + using retry_requests = bool_class; private: friend class http::internal::client_ref; @@ -178,6 +179,7 @@ private: unsigned _nr_connections = 0; unsigned _max_connections; unsigned long _total_new_connections = 0; + const retry_requests _retry; condition_variable _wait_con; connections_list_t _pool; @@ -232,14 +234,35 @@ public: * \param f -- the factory pointer * \param max_connections -- maximum number of connection a client is allowed to maintain * (both active and cached in pool) + * \param retry -- whether or not to retry requests on connection IO errors * * The client uses connections provided by factory to send requests over and receive responses * back. Once request-response cycle is over the connection used for that is kept by a client * in a "pool". Making another http request may then pick up the existing connection from the * pool thus avoiding the extra latency of establishing new connection. Pool may thus accumulate * more than one connection if user sends several requests in parallel. + * + * HTTP servers may sometimes want to terminate the connections it keeps. This can happen in + * one of several ways. + * + * The "gentle" way is when server adds the "connection: close" header to its response. In that + * case client would handle the response and will just close the connection without putting it + * to pool. + * + * Less gentle way a server may terminate a connection is by closing it, so the underlying TCP + * stack would communicate regular TCP FIN-s. If the connection happens to be in pool when it + * happens the client would just clean the connection from pool in the background. + * + * Sometimes the least gentle closing occurs when server closes the connection on the fly and + * TCP starts communicating FIN-s in parallel with client using it. In that case, user would + * receive exception from the \ref make_request() call and will have to do something about it. + * Client provides a transparent way of handling it called "retry". + * + * When enabled, it makes client catch the transport error, close the broken connection, open + * another one and retry the very same request one more time over this new connection. If the + * second attempt fails, this error is reported back to user. */ - explicit client(std::unique_ptr f, unsigned max_connections = default_max_connections); + explicit client(std::unique_ptr f, unsigned max_connections = default_max_connections, retry_requests retry = retry_requests::no); /** * \brief Send the request and handle the response @@ -253,6 +276,9 @@ public: * \param handle -- the response handler * \param expected -- the optional expected reply status code, default is std::nullopt * + * Note that the handle callback should be prepared to be called more than once, because + * client may restart the whole request processing in case server closes the connection + * in the middle of operation */ future<> make_request(request req, reply_handler handle, std::optional expected = std::nullopt); diff --git a/src/http/client.cc b/src/http/client.cc index 01a93a242c9..53664a68017 100644 --- a/src/http/client.cc +++ b/src/http/client.cc @@ -236,9 +236,10 @@ client::client(socket_address addr, shared_ptr cre { } -client::client(std::unique_ptr f, unsigned max_connections) +client::client(std::unique_ptr f, unsigned max_connections, retry_requests retry) : _new_connections(std::move(f)) , _max_connections(max_connections) + , _retry(retry) { } @@ -330,9 +331,27 @@ auto client::with_new_connection(Fn&& fn) { future<> client::make_request(request req, reply_handler handle, std::optional expected) { return do_with(std::move(req), std::move(handle), [this, expected] (request& req, reply_handler& handle) mutable { - return with_connection([this, &req, &handle, expected] (connection& con) { + auto f = with_connection([this, &req, &handle, expected] (connection& con) { return do_make_request(con, req, handle, expected); }); + + if (_retry) { + f = f.handle_exception_type([this, &req, &handle, expected] (const std::system_error& ex) { + auto code = ex.code().value(); + if ((code != EPIPE) && (code != ECONNABORTED)) { + return make_exception_future<>(ex); + } + + // The 'con' connection may not yet be freed, so the total connection + // count still account for it and with_new_connection() may temporarily + // break the limit. That's OK, the 'con' will be closed really soon + return with_new_connection([this, &req, &handle, expected] (connection& con) { + return do_make_request(con, req, handle, expected); + }); + }); + } + + return f; }); } diff --git a/tests/unit/httpd_test.cc b/tests/unit/httpd_test.cc index e9e6f96fedd..26d96c14fdc 100644 --- a/tests/unit/httpd_test.cc +++ b/tests/unit/httpd_test.cc @@ -951,6 +951,49 @@ SEASTAR_TEST_CASE(test_client_response_parse_error) { }); } +SEASTAR_TEST_CASE(test_client_retry_request) { + return seastar::async([] { + loopback_connection_factory lcf(1); + auto ss = lcf.get_server_socket(); + future<> server = ss.accept().then([] (accept_result ar) { + return seastar::async([sk = std::move(ar.connection)] () mutable { + input_stream in = sk.input(); + read_simple_http_request(in); + output_stream out = sk.output(); + out.write("HTT").get(); // write incomplete response + out.flush().get(); + out.close().get(); + }); + }).then([&ss] { + return ss.accept().then([] (accept_result ar) { + return seastar::async([sk = std::move(ar.connection)] () mutable { + input_stream in = sk.input(); + read_simple_http_request(in); + output_stream out = sk.output(); + sstring r200("HTTP/1.1 200 OK\r\nHost: localhost\r\n\r\n"); + out.write(r200).get(); // now write complete response + out.flush().get(); + out.close().get(); + }); + }); + }); + + future<> client = seastar::async([&lcf] { + auto cln = http::experimental::client(std::make_unique(lcf), 2, http::experimental::client::retry_requests::yes); + auto req = http::request::make("GET", "test", "/test"); + bool got_response = false; + cln.make_request(std::move(req), [&] (const http::reply& rep, input_stream&& in) { + got_response = true; + return make_ready_future<>(); + }, http::reply::status_type::ok).get(); + cln.close().get(); + BOOST_REQUIRE(got_response); + }); + + when_all(std::move(client), std::move(server)).discard_result().get(); + }); +} + SEASTAR_TEST_CASE(test_100_continue) { return seastar::async([] { loopback_connection_factory lcf(1); From debb85bd606f7067fb6d5e6de662a97b489bbaec Mon Sep 17 00:00:00 2001 From: Kefu Chai Date: Wed, 5 Jun 2024 07:47:38 +0800 Subject: [PATCH 12/20] stall-analyser: use argparse.FileType when appropriate simpler this way. Signed-off-by: Kefu Chai Closes scylladb/seastar#2281 --- scripts/stall-analyser.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/scripts/stall-analyser.py b/scripts/stall-analyser.py index 39bfa599900..995026f313d 100755 --- a/scripts/stall-analyser.py +++ b/scripts/stall-analyser.py @@ -46,6 +46,8 @@ def get_command_line_parser(): parser.add_argument('-b', '--branch-threshold', type=float, default=0.03, help='Drop branches responsible for less than this threshold relative to the previous level, not global. (default 3%%)') parser.add_argument('file', nargs='?', + type=argparse.FileType('r'), + default=sys.stdin, help='File containing reactor stall backtraces. Read from stdin if missing.') return parser @@ -328,7 +330,6 @@ def print_command_line_options(args): def main(): args = get_command_line_parser().parse_args() - input = open(args.file) if args.file else sys.stdin count = 0 comment = re.compile(r'^\s*#') pattern = re.compile(r"Reactor stalled for (?P\d+) ms on shard (?P\d+).*Backtrace:") @@ -340,7 +341,7 @@ def main(): resolver = addr2line.BacktraceResolver(executable=args.executable, concise=not args.full_function_names) graph = Graph(resolver) - for s in input: + for s in args.file: if comment.search(s): continue # parse log line like: From 47bfd73e304fd02ed91d7edf6c60d720e767eac5 Mon Sep 17 00:00:00 2001 From: Kefu Chai Date: Tue, 23 Apr 2024 18:10:59 +0800 Subject: [PATCH 13/20] dns: use undeprecated c-ares APIs c-ares marked some APIs deprecated in 1.28.1. in this change, we conditionally use the undeprecated APIs when they are available. please note, we don't specify the minimal supported c-ares version in our building system. in which, ares_fds() and ares_process() are not changed yet, because we need to change the way how to poll the events for name resolution. this would need more thoughts before moving forward. Refs #2197 Signed-off-by: Kefu Chai Closes scylladb/seastar#2200 --- src/net/dns.cc | 103 ++++++++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 102 insertions(+), 1 deletion(-) diff --git a/src/net/dns.cc b/src/net/dns.cc index 09aee14486d..79d51ad9355 100644 --- a/src/net/dns.cc +++ b/src/net/dns.cc @@ -262,6 +262,32 @@ class net::dns_resolver::impl // The following pragma is needed to work around a false-positive warning // in Gcc 11 (see https://gcc.gnu.org/bugzilla/show_bug.cgi?id=96003). #pragma GCC diagnostic ignored "-Wnonnull" +#if ARES_VERSION >= 0x011000 + + ares_addrinfo_hints hints = { + .ai_flags = ARES_AI_CANONNAME, + .ai_family = af, + .ai_socktype = 0, + .ai_protocol = 0, + }; + ares_getaddrinfo(_channel, p->name.c_str(), nullptr, &hints, [](void* arg, int status, int timeouts, ares_addrinfo* addrinfo) { + // we do potentially allocating operations below, so wrap the pointer in a + // unique here. + std::unique_ptr p(reinterpret_cast(arg)); + + switch (status) { + default: + dns_log.debug("Query failed: {}", status); + p->set_exception(std::system_error(status, ares_errorc, p->name)); + break; + case ARES_SUCCESS: + p->set_value(make_hostent(addrinfo)); + break; + } + ares_freeaddrinfo(addrinfo); + + }, reinterpret_cast(p)); +#else ares_gethostbyname(_channel, p->name.c_str(), af, [](void* arg, int status, int timeouts, ::hostent* host) { // we do potentially allocating operations below, so wrap the pointer in a // unique here. @@ -278,7 +304,7 @@ class net::dns_resolver::impl } }, reinterpret_cast(p)); - +#endif poll_sockets(); @@ -343,6 +369,47 @@ class net::dns_resolver::impl dns_call call(*this); +#if ARES_VERSION >= 0x011c00 + ares_query_dnsrec(_channel, query.c_str(), ARES_CLASS_IN, ARES_REC_TYPE_SRV, + [](void* arg, ares_status_t status, size_t timeouts, + const ares_dns_record *dnsrec) { + auto p = std::unique_ptr>( + reinterpret_cast *>(arg)); + if (status != ARES_SUCCESS) { + dns_log.debug("Query failed: {}", fmt::underlying(status)); + p->set_exception(std::system_error(status, ares_errorc)); + return; + } + const size_t rr_count = ares_dns_record_rr_cnt(dnsrec, ARES_SECTION_ANSWER); + srv_records replies; + for (size_t i = 0; i < rr_count; i++) { + const ares_dns_rr_t* rr = ares_dns_record_rr_get( + const_cast(dnsrec), + ARES_SECTION_ANSWER, i); + if (!rr) { + // not likely, but still.. + status = ARES_EBADRESP; + break; + } + if (ares_dns_rr_get_class(rr) != ARES_CLASS_IN || + ares_dns_rr_get_type(rr) != ARES_REC_TYPE_SRV) { + continue; + } + replies.push_back({ + ares_dns_rr_get_u16(rr, ARES_RR_SRV_PRIORITY), + ares_dns_rr_get_u16(rr, ARES_RR_SRV_WEIGHT), + ares_dns_rr_get_u16(rr, ARES_RR_SRV_PORT), + sstring{ares_dns_rr_get_str(rr, ARES_RR_SRV_TARGET)} + }); + } + if (status != ARES_SUCCESS) { + dns_log.debug("Parse failed: {}", fmt::underlying(status)); + p->set_exception(std::system_error(status, ares_errorc)); + return; + } + p->set_value(std::move(replies)); + }, reinterpret_cast(p.release()), nullptr); +#else ares_query(_channel, query.c_str(), ns_c_in, ns_t_srv, [](void* arg, int status, int timeouts, unsigned char* buf, int len) { @@ -367,6 +434,7 @@ class net::dns_resolver::impl } ares_free_data(start); }, reinterpret_cast(p.release())); +#endif poll_sockets(); @@ -482,6 +550,39 @@ class net::dns_resolver::impl return records; } +#if ARES_VERSION >= 0x011000 + static hostent make_hostent(const ares_addrinfo* ai) { + hostent e; + if (!ai) { + return e; + } + if (ai->cnames) { + e.names.emplace_back(ai->cnames->name); + } else { + e.names.emplace_back(ai->name); + } + for (auto cname = ai->cnames; cname != nullptr; cname = cname->next) { + if (cname->alias == nullptr) { + continue; + } + e.names.emplace_back(cname->alias); + } + for (auto node = ai->nodes; node != nullptr; node = node->ai_next) { + switch (node->ai_family) { + case AF_INET: + e.addr_list.emplace_back(reinterpret_cast(node->ai_addr)->sin_addr); + break; + case AF_INET6: + e.addr_list.emplace_back(reinterpret_cast(node->ai_addr)->sin6_addr); + break; + } + } + + dns_log.debug("Query success: {}/{}", e.names.front(), e.addr_list.front()); + + return e; + } +#endif static hostent make_hostent(const ::hostent& host) { hostent e; e.names.emplace_back(host.h_name); From f908e5fd063ba8ceada8003fa51661a6ae3d9c5c Mon Sep 17 00:00:00 2001 From: pavelbezpravel Date: Wed, 5 Jun 2024 15:08:44 +0300 Subject: [PATCH 14/20] docker: lint dockerfile reduce image size reduce layers amount format combine apt-get update with apt-get install in the same RUN statement use arguments JSON notation for CMD: https://github.com/hadolint/hadolint/wiki/DL3025\#rationale --- docker/dev/Dockerfile | 44 ++++++++++++++++++++++++++++--------------- 1 file changed, 29 insertions(+), 15 deletions(-) diff --git a/docker/dev/Dockerfile b/docker/dev/Dockerfile index f9a3c939849..a1329284ff1 100644 --- a/docker/dev/Dockerfile +++ b/docker/dev/Dockerfile @@ -1,23 +1,37 @@ +# syntax=docker/dockerfile:1 + FROM ubuntu:mantic -RUN apt-get -y update -RUN apt-get -y install gnupg curl -RUN echo "deb http://apt.llvm.org/mantic/ llvm-toolchain-mantic-17 main" \ - >> /etc/apt/sources.list.d/llvm.list -RUN echo "deb http://apt.llvm.org/mantic/ llvm-toolchain-mantic-18 main" \ - >> /etc/apt/sources.list.d/llvm.list -RUN curl -sSL https://apt.llvm.org/llvm-snapshot.gpg.key -o /etc/apt/trusted.gpg.d/apt.llvm.org.asc -RUN apt -y update \ - && apt -y install build-essential \ - && apt -y install gcc-12 g++-12 gcc-13 g++-13 pandoc \ + +COPY install-dependencies.sh /tmp/ + +RUN apt-get update && apt-get install -y \ + curl \ + gnupg \ + && echo "deb http://apt.llvm.org/mantic/ llvm-toolchain-mantic-17 main" \ + >> /etc/apt/sources.list.d/llvm.list \ + && echo "deb http://apt.llvm.org/mantic/ llvm-toolchain-mantic-18 main" \ + >> /etc/apt/sources.list.d/llvm.list \ + && curl -sSL https://apt.llvm.org/llvm-snapshot.gpg.key -o /etc/apt/trusted.gpg.d/apt.llvm.org.asc \ + && apt-get update && apt-get install -y \ + build-essential \ + clang-17 \ + clang-18 \ + clang-tools-18 \ + gcc-12 \ + g++-12 \ + gcc-13 \ + g++-13 \ + pandoc \ && update-alternatives --install /usr/bin/gcc gcc /usr/bin/gcc-12 12 \ && update-alternatives --install /usr/bin/g++ g++ /usr/bin/g++-12 12 \ && update-alternatives --install /usr/bin/gcc gcc /usr/bin/gcc-13 13 \ && update-alternatives --install /usr/bin/g++ g++ /usr/bin/g++-13 13 \ - && apt -y install clang-17 clang-18 clang-tools-18 \ && update-alternatives --install /usr/bin/clang clang /usr/bin/clang-17 17 \ && update-alternatives --install /usr/bin/clang++ clang++ /usr/bin/clang++-17 17 \ && update-alternatives --install /usr/bin/clang clang /usr/bin/clang-18 18 \ - && update-alternatives --install /usr/bin/clang++ clang++ /usr/bin/clang++-18 18 -COPY install-dependencies.sh /tmp/ -RUN bash /tmp/install-dependencies.sh -CMD /bin/bash + && update-alternatives --install /usr/bin/clang++ clang++ /usr/bin/clang++-18 18 \ + && bash /tmp/install-dependencies.sh \ + && apt-get clean \ + && rm -rf /var/lib/apt/lists/* + +CMD ["/bin/bash"] From 2b4ddd26662f42c2f2d2910742ca7739bff4aaa3 Mon Sep 17 00:00:00 2001 From: pavelbezpravel Date: Wed, 5 Jun 2024 15:54:09 +0300 Subject: [PATCH 15/20] docker: bind the file instead of copying during the build stage it reduces the layer's size. also we do not need this file in running container --- docker/dev/Dockerfile | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/docker/dev/Dockerfile b/docker/dev/Dockerfile index a1329284ff1..12d2ebc3813 100644 --- a/docker/dev/Dockerfile +++ b/docker/dev/Dockerfile @@ -2,9 +2,8 @@ FROM ubuntu:mantic -COPY install-dependencies.sh /tmp/ - -RUN apt-get update && apt-get install -y \ +RUN --mount=type=bind,source=./install-dependencies.sh,target=./install-dependencies.sh \ + apt-get update && apt-get install -y \ curl \ gnupg \ && echo "deb http://apt.llvm.org/mantic/ llvm-toolchain-mantic-17 main" \ @@ -30,7 +29,7 @@ RUN apt-get update && apt-get install -y \ && update-alternatives --install /usr/bin/clang++ clang++ /usr/bin/clang++-17 17 \ && update-alternatives --install /usr/bin/clang clang /usr/bin/clang-18 18 \ && update-alternatives --install /usr/bin/clang++ clang++ /usr/bin/clang++-18 18 \ - && bash /tmp/install-dependencies.sh \ + && bash ./install-dependencies.sh \ && apt-get clean \ && rm -rf /var/lib/apt/lists/* From 14df7e6d8b3b6f62c710630a5f164343efd0a8e3 Mon Sep 17 00:00:00 2001 From: pavelbezpravel Date: Wed, 5 Jun 2024 16:14:46 +0300 Subject: [PATCH 16/20] scripts: sort packages alphanumerically it makes maintenance easier and helps to avoid duplication of packages and make the list much easier to update --- install-dependencies.sh | 171 +++++++++++++++++++++------------------- 1 file changed, 88 insertions(+), 83 deletions(-) diff --git a/install-dependencies.sh b/install-dependencies.sh index 184d8a5ec78..542bcda8445 100755 --- a/install-dependencies.sh +++ b/install-dependencies.sh @@ -28,39 +28,39 @@ else fi debian_packages=( - ninja-build - ragel - libhwloc-dev - libnuma-dev - libpciaccess-dev - libcrypto++-dev + cmake + diffutils + doxygen + g++ + gcc libboost-all-dev - libxml2-dev - xfslibs-dev + libc-ares-dev + libcrypto++-dev + libfmt-dev libgnutls28-dev + libhwloc-dev liblz4-dev + libnuma-dev + libpciaccess-dev + libprotobuf-dev libsctp-dev + libtool liburing-dev - gcc + libxml2-dev + libyaml-cpp-dev make meson + ninja-build + openssl + pkg-config + protobuf-compiler python3 python3-pyelftools - systemtap-sdt-dev - libtool - cmake - libyaml-cpp-dev - libc-ares-dev + ragel stow - g++ - libfmt-dev - diffutils + systemtap-sdt-dev valgrind - doxygen - openssl - pkg-config - libprotobuf-dev - protobuf-compiler + xfslibs-dev ) # seastar doesn't directly depend on these packages. They are @@ -68,85 +68,90 @@ debian_packages=( # has no way of saying "static seastar, but dynamic transitive # dependencies". They provide the various .so -> .so.ver symbolic # links. -transitive=(libtool-ltdl-devel trousers-devel libidn2-devel libunistring-devel) +transitive=( + libidn2-devel + libtool-ltdl-devel + libunistring-devel + trousers-devel +) redhat_packages=( + boost-devel + c-ares-devel + cmake + diffutils + doxygen + fmt-devel + gcc + gnutls-devel hwloc-devel - numactl-devel libpciaccess-devel + libtool + liburing-devel libxml2-devel - xfsprogs-devel - gnutls-devel lksctp-tools-devel lz4-devel - liburing-devel - gcc make meson + numactl-devel + openssl + protobuf-compiler + protobuf-devel python3 python3-pyelftools - systemtap-sdt-devel - libtool - cmake - yaml-cpp-devel - c-ares-devel stow - diffutils - doxygen - openssl - fmt-devel - boost-devel + systemtap-sdt-devel valgrind-devel - protobuf-devel - protobuf-compiler + xfsprogs-devel + yaml-cpp-devel "${transitive[@]}" ) fedora_packages=( "${redhat_packages[@]}" - gcc-c++ - ninja-build - ragel boost-devel fmt-devel - libubsan + gcc-c++ libasan libatomic + libubsan + ninja-build + ragel valgrind-devel ) centos7_packages=( "${redhat_packages[@]}" - ninja-build - ragel cmake3 - rh-mongodb36-boost-devel devtoolset-11-gcc-c++ - devtoolset-11-libubsan devtoolset-11-libasan devtoolset-11-libatomic + devtoolset-11-libubsan + ninja-build + ragel + rh-mongodb36-boost-devel ) centos8_packages=( "${redhat_packages[@]}" - ninja-build - ragel gcc-toolset-11-gcc gcc-toolset-11-gcc-c++ - gcc-toolset-11-libubsan-devel gcc-toolset-11-libasan-devel gcc-toolset-11-libatomic-devel + gcc-toolset-11-libubsan-devel + ninja-build + ragel ) centos9_packages=( "${redhat_packages[@]}" - ninja-build - ragel gcc-toolset-13-gcc gcc-toolset-13-gcc-c++ - gcc-toolset-13-libubsan-devel gcc-toolset-13-libasan-devel gcc-toolset-13-libatomic-devel + gcc-toolset-13-libubsan-devel + ninja-build + ragel ) # 1) glibc 2.30-3 has sys/sdt.h (systemtap include) @@ -157,43 +162,49 @@ centos9_packages=( # 3) aur installations require having sudo and being # a sudoer. makepkg does not work otherwise. arch_packages=( - gcc - ninja - ragel boost boost-libs + c-ares + cmake + crypto++ + filesystem + fmt + gcc + glibc + gnutls hwloc - numactl libpciaccess - crypto++ + libtool + liburing libxml2 - xfsprogs - gnutls lksctp-tools lz4 make meson - python-pyelftools - protobuf - libtool - cmake - yaml-cpp - stow - c-ares + ninja + numactl + openssl pkgconf - fmt + protobuf python3 - glibc - filesystem + python-pyelftools + ragel + stow valgrind - openssl - liburing + xfsprogs + yaml-cpp ) opensuse_packages=( c-ares-devel cmake hwloc-devel + libboost_atomic1_66_0 + libboost_atomic1_66_0-devel + libboost_chrono1_66_0 + libboost_chrono1_66_0-devel + libboost_date_time1_66_0 + libboost_date_time1_66_0-devel libboost_filesystem1_66_0 libboost_filesystem1_66_0-devel libboost_program_options1_66_0 @@ -204,26 +215,20 @@ opensuse_packages=( libboost_test1_66_0-devel libboost_thread1_66_0 libboost_thread1_66_0-devel - libboost_atomic1_66_0 - libboost_atomic1_66_0-devel - libboost_date_time1_66_0 - libboost_date_time1_66_0-devel - libboost_chrono1_66_0 - libboost_chrono1_66_0-devel libgnutls-devel libgnutlsxx28 liblz4-devel libnuma-devel + libtool lksctp-tools-devel meson ninja + openssl + protobuf-devel ragel + stow xfsprogs-devel yaml-cpp-devel - protobuf-devel - libtool - stow - openssl ) case "$ID" in From afca8342b111ae4326e914505e96b7d40b8e1032 Mon Sep 17 00:00:00 2001 From: Kefu Chai Date: Wed, 5 Jun 2024 07:39:39 +0800 Subject: [PATCH 17/20] stall-analyser: use itertools.dropwhile when appropriate for better readability Signed-off-by: Kefu Chai --- scripts/stall-analyser.py | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/scripts/stall-analyser.py b/scripts/stall-analyser.py index 995026f313d..119e3bd2eb5 100755 --- a/scripts/stall-analyser.py +++ b/scripts/stall-analyser.py @@ -5,6 +5,7 @@ import re import addr2line +from itertools import dropwhile from typing import Self @@ -368,12 +369,7 @@ def main(): # (inlined by) seastar::reactor::block_notifier(int) at ./build/release/seastar/./seastar/src/core/reactor.cc:1240 # ?? ??:0 if address_threshold: - for i in range(0, len(trace)): - if int(trace[i], 0) >= address_threshold: - while int(trace[i], 0) >= address_threshold: - i += 1 - trace = trace[i:] - break + trace = list(dropwhile(lambda addr: int(addr, 0) >= address_threshold, trace)) tmin = args.minimum or 0 if t >= tmin: graph.process_trace(trace, t) From 8f9c857f406f039f1310601a8b4fd2e1ac0123ea Mon Sep 17 00:00:00 2001 From: Kefu Chai Date: Wed, 5 Jun 2024 07:02:07 +0800 Subject: [PATCH 18/20] stall-analyser: remove unused variable `count` is never used after being assigned, so drop it. Signed-off-by: Kefu Chai --- scripts/stall-analyser.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/scripts/stall-analyser.py b/scripts/stall-analyser.py index 119e3bd2eb5..e801bfea008 100755 --- a/scripts/stall-analyser.py +++ b/scripts/stall-analyser.py @@ -331,7 +331,6 @@ def print_command_line_options(args): def main(): args = get_command_line_parser().parse_args() - count = 0 comment = re.compile(r'^\s*#') pattern = re.compile(r"Reactor stalled for (?P\d+) ms on shard (?P\d+).*Backtrace:") address_threshold = int(args.address_threshold, 0) @@ -350,7 +349,6 @@ def main(): m = pattern.search(s) if not m: continue - count += 1 # extract the time in ms trace = s[m.span()[1]:].split() t = int(m.group("stall")) From 5508176ebed6d1c8ba8395f935a8b7361765b85e Mon Sep 17 00:00:00 2001 From: Kefu Chai Date: Tue, 11 Jun 2024 06:39:53 +0800 Subject: [PATCH 19/20] build: add pyyaml to install-dependencies.sh in 6de08b079d, we introduced the dependency to pyyaml, but we failed to update `install-dependencies.sh` to install it. in this change, we add this dependency to `install-dependencies.sh`. Refs 6de08b079d Signed-off-by: Kefu Chai --- install-dependencies.sh | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/install-dependencies.sh b/install-dependencies.sh index 542bcda8445..3c913f81524 100755 --- a/install-dependencies.sh +++ b/install-dependencies.sh @@ -56,6 +56,7 @@ debian_packages=( protobuf-compiler python3 python3-pyelftools + python3-yaml ragel stow systemtap-sdt-dev @@ -99,6 +100,7 @@ redhat_packages=( protobuf-devel python3 python3-pyelftools + python3-pyyaml stow systemtap-sdt-devel valgrind-devel @@ -188,6 +190,7 @@ arch_packages=( protobuf python3 python-pyelftools + python-yaml ragel stow valgrind @@ -225,6 +228,7 @@ opensuse_packages=( ninja openssl protobuf-devel + python3-PyYAML ragel stow xfsprogs-devel From 6c1477986415dd91844747b133c9fc6949781b64 Mon Sep 17 00:00:00 2001 From: Kefu Chai Date: Tue, 11 Jun 2024 14:39:52 +0800 Subject: [PATCH 20/20] test: futures: verify stream yields the consumed value before this change, we do not verify that `stream` produces the consumed values. after this change, we check if the subscriber of `stream` gets the value sent by the writer side. Signed-off-by: Kefu Chai --- tests/unit/futures_test.cc | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/tests/unit/futures_test.cc b/tests/unit/futures_test.cc index 253dc468328..02cb220f8fd 100644 --- a/tests/unit/futures_test.cc +++ b/tests/unit/futures_test.cc @@ -144,15 +144,17 @@ SEASTAR_TEST_CASE(test_stream) { SEASTAR_TEST_CASE(test_stream_drop_sub) { auto s = make_lw_shared>(); + const int expected = 42; std::optional> ret; { - auto sub = s->listen([](int x) { + auto sub = s->listen([expected](int actual) { + BOOST_REQUIRE_EQUAL(expected, actual); return make_ready_future<>(); }); ret = sub.done(); // It is ok to drop the subscription when we only want the competition future. } - return s->produce(42).then([ret = std::move(*ret), s] () mutable { + return s->produce(expected).then([ret = std::move(*ret), s] () mutable { s->close(); return std::move(ret); });