From e1fa945f4bb7418d99b3afe4831046aec6efaf4a Mon Sep 17 00:00:00 2001 From: Kefu Chai Date: Fri, 3 May 2024 22:33:39 +0800 Subject: [PATCH 1/3] coroutine/async_generator: reimplement async generator this generator implementation is inspired by https://wg21.link/P2502R2. Refs #2190 Refs #1913 Refs #1677 Signed-off-by: Kefu Chai --- include/seastar/coroutine/async_generator.hh | 491 +++++++++++++++++++ tests/unit/CMakeLists.txt | 3 + tests/unit/generator_test.cc | 236 +++++++++ 3 files changed, 730 insertions(+) create mode 100644 include/seastar/coroutine/async_generator.hh create mode 100644 tests/unit/generator_test.cc diff --git a/include/seastar/coroutine/async_generator.hh b/include/seastar/coroutine/async_generator.hh new file mode 100644 index 00000000000..170e651e07b --- /dev/null +++ b/include/seastar/coroutine/async_generator.hh @@ -0,0 +1,491 @@ +/* + * This file is open source software, licensed to you under the terms + * of the Apache License, Version 2.0 (the "License"). See the NOTICE file + * distributed with this work for additional information regarding copyright + * ownership. You may not use this file except in compliance with the License. + * + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#pragma once + +#include +#include +#include +#include +#include +#include +#include +#include + +// seastar::coroutine::experimental::async_generator is inspired by the C++23 proposal +// P2502R2 (https://wg21.link/P2502R2), which introduced std::generator for +// synchronous coroutine-based range generation. +// +// Similar to P2502R2's generator, seastar::coroutine::experimental::async_generator +// * prioritizes storing references to yielded objects instead of copying them +// * generates a range with iterators that yield values +// +// However, there are key differences in seastar::coroutine::experimental::generator: +// +// * Alocator support: +// Seastar's generator does not support the Allocator template parameter. +// Unlike Seastar uses its built-in allocator eliminating the need for +// additional flexibility. +// * Asynchronous Operations: +// - generator::begin() is a coroutine, unlike P2502R2's synchronous approach +// - generator::iterator::operator++() is a coroutine +// - generator::iterator::operator++(int) is a coroutine +// Note: Due to its asynchronous nature, this generator cannot be used in +// range-based for loops. +// * Ranges Integration: +// seastar's generator is not a std::ranges::view_interface. So it lacks +// integration with the C++20 Ranges library due to its asynchronous operations. +// * Nesting: +// Nesting generators is not supported. You cannot yield another generator +// from within a generator. This prevents implementation asynchronous, +// recursive algorithms like depth-first search on trees. +namespace seastar::coroutine::experimental { + +template +class async_generator; + +namespace internal { + +template class next_awaiter; + +template +class async_generator_promise_base : public seastar::task { +protected: + // a glvalue yield expression is passed to co_yield as its operand. and + // the object denoted by this expression is guaranteed to live until the + // coroutine resumes. we take advantage of this fact by storing only a + // pointer to the denoted object in the promise as long as the result of + // dereferencing this pointer is convertible to the Ref type. + std::add_pointer_t _value = nullptr; + +protected: + std::exception_ptr _exception; + std::coroutine_handle<> _consumer; + task* _waiting_task = nullptr; + + /// awaiter returned by the generator when it produces a new element + /// + /// There are different combinations of expression types passed to + /// \c co_yield and \c Ref. In most cases, zero copies are made. Copies + /// are only necessary when \c co_yield requires type conversion. + /// + /// The following table summarizes the number of copies made for different + /// scenarios: + /// + /// | Ref | co_yield const T& | co_yield T& | co_yield T&& | co_yield U&& | + /// | --------- | ----------------- | ----------- | ------------ | ------------ | + /// | T | 0 | 0 | 0 | 1 | + /// | const T& | 0 | 0 | 0 | 1 | + /// | T& | ill-formed | 0 | ill-formed | ill-formed | + /// | T&& | ill-formed | ill-formed | 0 | 1 | + /// | const T&& | ill-formed | ill-formed | 0 | 1 | + /// + /// When no copies are required, \c yield_awaiter is used. Otherwise, + /// \c copy_awaiter is used. The latter converts \c U to \c T, and keeps the converted + /// value in it. + class yield_awaiter final { + async_generator_promise_base* _promise; + std::coroutine_handle<> _consumer; + public: + yield_awaiter(async_generator_promise_base* promise, + std::coroutine_handle<> consumer) noexcept + : _promise{promise} + , _consumer{consumer} + {} + bool await_ready() const noexcept { + return false; + } + template + std::coroutine_handle<> await_suspend(std::coroutine_handle producer) noexcept { + _promise->_waiting_task = &producer.promise(); + if (seastar::need_preempt()) { + auto consumer = std::coroutine_handle::from_address( + _consumer.address()); + seastar::schedule(&consumer.promise()); + return std::noop_coroutine(); + } + return _consumer; + } + void await_resume() noexcept {} + }; + + class copy_awaiter { + using value_type = std::remove_cvref_t; + async_generator_promise_base* _promise; + std::coroutine_handle<> _consumer; + value_type _value; + public: + copy_awaiter(async_generator_promise_base* promise, + std::coroutine_handle<> consumer, + const value_type& value) + : _promise{promise} + , _consumer{consumer} + , _value{value} + {} + constexpr bool await_ready() const noexcept { + return false; + } + template + std::coroutine_handle<> await_suspend(std::coroutine_handle producer) noexcept { + auto& current = producer.promise(); + current._value = std::addressof(_value); + _promise->_waiting_task = ¤t; + if (seastar::need_preempt()) { + auto consumer = std::coroutine_handle::from_address( + _consumer.address()); + seastar::schedule(&consumer.promise()); + return std::noop_coroutine(); + } + return _consumer; + } + constexpr void await_resume() const noexcept {} + }; + +public: + async_generator_promise_base() noexcept = default; + async_generator_promise_base(const async_generator_promise_base &) = delete; + async_generator_promise_base& operator=(const async_generator_promise_base &) = delete; + async_generator_promise_base(async_generator_promise_base &&) noexcept = default; + async_generator_promise_base& operator=(async_generator_promise_base &&) noexcept = default; + + // lazily-started coroutine, do not execute the coroutine until + // the coroutine is awaited. + std::suspend_always initial_suspend() const noexcept { + return {}; + } + + yield_awaiter final_suspend() noexcept { + _value = nullptr; + return yield_awaiter{this, this->_consumer}; + } + + void unhandled_exception() noexcept { + _exception = std::current_exception(); + } + + yield_awaiter yield_value(Yielded value) noexcept { + this->_value = std::addressof(value); + return yield_awaiter{this, this->_consumer}; + } + + copy_awaiter yield_value(const std::remove_reference_t& value) + requires (std::is_rvalue_reference_v && + std::constructible_from< + std::remove_cvref_t, + const std::remove_reference_t&>) { + return copy_awaiter{this, this->_consumer, value}; + } + + void return_void() noexcept {} + + // @return if the async_generator has reached the end of the sequence + bool finished() const noexcept { + return _value == nullptr; + } + + void rethrow_if_unhandled_exception() { + if (_exception) { + std::rethrow_exception(std::move(_exception)); + } + } + + void run_and_dispose() noexcept final { + using handle_type = std::coroutine_handle; + handle_type::from_promise(*this).resume(); + } + + seastar::task* waiting_task() noexcept final { + return _waiting_task; + } + +private: + friend class next_awaiter; +}; + +/// awaiter returned when the consumer gets the \c begin iterator or +/// when it advances the iterator. +template +class next_awaiter { +protected: + async_generator_promise_base* _promise = nullptr; + std::coroutine_handle<> _producer = nullptr; + + explicit next_awaiter(std::nullptr_t) noexcept {} + next_awaiter(async_generator_promise_base& promise, + std::coroutine_handle<> producer) noexcept + : _promise{std::addressof(promise)} + , _producer{producer} {} + +public: + bool await_ready() const noexcept { + return false; + } + + template + std::coroutine_handle<> await_suspend(std::coroutine_handle consumer) noexcept { + _promise->_consumer = consumer; + return _producer; + } +}; + +} // namespace internal + +/// async_generator represents a view modeling std::ranges::input_range, +/// and has move-only iterators. +/// +/// async_generator has 2 template parameters: +/// +/// - Ref +/// - Value +/// +/// From Ref and Value, we derive types: +/// - value_type: a cv-unqualified object type that specifies the value type of +/// the async_generator's range and iterators +/// - reference_Type: the reference type of the async_generator's range and iterators +/// - yielded_type: the type of the parameter to the primary overload of \c +/// yield_value in the async_generator's associated promise type +/// +/// Under the most circumstances, only the first parameter is specified: like +/// \c async_generator. The resulting async_generator: +/// - has a value type of \c remove_cvref_t +/// - has a reference type of \c meow, if it is a reference type, or \c meow&& +/// otherwise +/// - the operand of \c co_yield in the body of the async_generator should be +/// convertible to \c meow, if it is a reference type, otherwise the operand +/// type should be const meow& +/// +/// Consider following code snippet: +/// \code +/// async_generator send_query(std::string query) { +/// auto result_set = db.execute(query); +/// for (auto row : result_set) { +/// co_yield std::format("{}", row); +/// } +/// } +/// \endcode +/// +/// In this case, \c Ref is a reference type of \c const std::string&, +/// and \c Value is the default value of \c void. So the \c value_type is +/// \c std::string. As the async_generator always returns a \c std::string, its +/// iterator has the luxury of returning a reference to it. +/// +/// But if some rare users want to use a proxy reference type, or to generate a +/// range whose iterators yield prvalue for whatever reason, they should use +/// the two-argument \c async_generator, like async_generator. +/// The resulting async_generator: +/// - has a value type of \c woof +/// - has a reference type of \c meow +/// +/// For instance, consider following code snippet: +/// \code +/// async_generator generate_strings() { +/// co_yield "["; +/// std::string s; +/// for (auto sv : {"1"sv, "2"sv}) { +/// s = sv; +/// s.push_back(','); +/// co_yield s; +/// } +/// co_yield "]"; +/// } +/// \endcode +/// +/// In this case, \c Ref is \c std::string_view, and \Value is \c std::string. +/// So we can ensure that the caller cannot invalidate the yielded values by +/// mutating the defererenced value of iterator. As the \c std::string_view +/// instance is immutable. But in the meanwhile, the async_generator can return +/// a \c std::string by \c co_yield a \c std::string_view or a \c std::string. +/// And the caller can still access the element of the range via the same type: +/// \c std::string_view. +/// +/// Current Limitation and Future Plans: +/// +/// This generator implementation does not address the "Pingpong problem": +/// where the producer generates elements one at a time, forcing frequent +/// context switches between producer and consumer. This can lead to suboptimal +/// performance, especially when bulk generation and consumption would be more +/// efficient. +/// +/// We intend to extend the existing implementation to allow the producer +/// to yield a range of elements. This will enable batch processing, +/// potentially improving performance by reducing context switches. +/// +/// TODO: Implement range-based yielding to mitigate the Ping-pong problem. +template +class [[nodiscard]] async_generator { + using value_type = std::conditional_t, + std::remove_cvref_t, + Value>; + using reference_type = std::conditional_t, + Ref&&, + Ref>; + using yielded_type = std::conditional_t, + reference_type, + const reference_type&>; + +public: + class promise_type; + +private: + using handle_type = std::coroutine_handle; + handle_type _coro = {}; + +public: + class iterator; + + async_generator() noexcept = default; + explicit async_generator(promise_type& promise) noexcept + : _coro(std::coroutine_handle::from_promise(promise)) + {} + async_generator(async_generator&& other) noexcept + : _coro{std::exchange(other._coro, {})} + {} + async_generator(const async_generator&) = delete; + async_generator& operator=(const async_generator&) = delete; + + ~async_generator() { + if (_coro) { + _coro.destroy(); + } + } + + friend void swap(async_generator& lhs, async_generator& rhs) noexcept { + std::swap(lhs._coro, rhs._coro); + } + + async_generator& operator=(async_generator&& other) noexcept { + if (this == &other) { + return *this; + } + if (_coro) { + _coro.destroy(); + } + _coro = std::exchange(other._coro, nullptr); + return *this; + } + + [[nodiscard]] auto begin() noexcept { + using base_awaiter = internal::next_awaiter; + class begin_awaiter final : public base_awaiter { + using base_awaiter::_promise; + + public: + explicit begin_awaiter(std::nullptr_t) noexcept + : base_awaiter{nullptr} + {} + explicit begin_awaiter(handle_type producer_coro) noexcept + : base_awaiter{producer_coro.promise(), producer_coro} + {} + bool await_ready() const noexcept { + return _promise == nullptr || base_awaiter::await_ready(); + } + + iterator await_resume() { + if (_promise == nullptr) { + return iterator{nullptr}; + } + if (_promise->finished()) { + _promise->rethrow_if_unhandled_exception(); + return iterator{nullptr}; + } + return iterator{ + handle_type::from_promise(*static_cast(_promise)) + }; + } + }; + + if (_coro) { + return begin_awaiter{_coro}; + } else { + return begin_awaiter{nullptr}; + } + } + + [[nodiscard]] std::default_sentinel_t end() const noexcept { + return {}; + } +}; + +template +class async_generator::promise_type final : public internal::async_generator_promise_base { +public: + async_generator get_return_object() noexcept { + return async_generator{*this}; + } + + yielded_type value() const noexcept { + return static_cast(*this->_value); + } +}; + +template +class async_generator::iterator final { +private: + using handle_type = async_generator::handle_type; + handle_type _coro = nullptr; + +public: + using iterator_category = std::input_iterator_tag; + using difference_type = std::ptrdiff_t; + using value_type = async_generator::value_type; + using reference = async_generator::reference_type; + using pointer = std::add_pointer_t; + + explicit iterator(handle_type coroutine) noexcept + : _coro{coroutine} + {} + + explicit operator bool() const noexcept { + return _coro && !_coro.done(); + } + + [[nodiscard]] auto operator++() noexcept { + using base_awaiter = internal::next_awaiter; + class increment_awaiter final : public base_awaiter { + iterator& _iterator; + using base_awaiter::_promise; + + public: + explicit increment_awaiter(iterator& iterator) noexcept + : base_awaiter{iterator._coro.promise(), iterator._coro} + , _iterator{iterator} + {} + iterator& await_resume() { + if (_promise->finished()) { + // update iterator to end() + _iterator = iterator{nullptr}; + _promise->rethrow_if_unhandled_exception(); + } + return _iterator; + } + }; + + assert(bool(*this) && "cannot increment end iterator"); + return increment_awaiter{*this}; + } + + reference operator*() const noexcept { + return _coro.promise().value(); + } + + bool operator==(std::default_sentinel_t) const noexcept { + return !bool(*this); + } +}; + +} // namespace seastar::coroutine::experimental diff --git a/tests/unit/CMakeLists.txt b/tests/unit/CMakeLists.txt index 65a274c7acf..8deff2e8365 100644 --- a/tests/unit/CMakeLists.txt +++ b/tests/unit/CMakeLists.txt @@ -282,6 +282,9 @@ seastar_add_test (content_source seastar_add_test (coroutines SOURCES coroutines_test.cc) +seastar_add_test (generator + SOURCES generator_test.cc) + seastar_add_test (defer KIND BOOST SOURCES defer_test.cc) diff --git a/tests/unit/generator_test.cc b/tests/unit/generator_test.cc new file mode 100644 index 00000000000..a240109e917 --- /dev/null +++ b/tests/unit/generator_test.cc @@ -0,0 +1,236 @@ +/* + * This file is open source software, licensed to you under the terms + * of the Apache License, Version 2.0 (the "License"). See the NOTICE file + * distributed with this work for additional information regarding copyright + * ownership. You may not use this file except in compliance with the License. + * + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +/* + * Copyright (C) 2024 ScyllaDB Ltd. + */ + +#include +#include +#include +#include +#include +#include +#include +#include +#if __cplusplus >= 202302L && defined(__cpp_lib_generator) +#include +template +using sync_generator = std::generator; +#else +#include "tl-generator.hh" +template +using sync_generator = tl::generator; +#endif + +using namespace seastar; + +using do_suspend = bool_class; + +sync_generator +sync_fibonacci_sequence(unsigned count) { + auto a = 0, b = 1; + for (unsigned i = 0; i < count; ++i) { + if (std::numeric_limits::max() - a < b) { + throw std::out_of_range( + fmt::format("fibonacci[{}] is greater than the largest value of int", i)); + } + int next = std::exchange(a, std::exchange(b, a + b)); + // tl::generator::yield_value() only accepts arguments of reference type, + // instead of a cv-qualified value. + co_yield next; + } +} + +coroutine::experimental::async_generator +async_fibonacci_sequence(unsigned count, do_suspend suspend) { + auto a = 0, b = 1; + for (unsigned i = 0; i < count; ++i) { + if (std::numeric_limits::max() - a < b) { + throw std::out_of_range( + fmt::format("fibonacci[{}] is greater than the largest value of int", i)); + } + if (suspend) { + co_await yield(); + } + co_yield std::exchange(a, std::exchange(b, a + b)); + } +} + +seastar::future<> +verify_fib_drained(coroutine::experimental::async_generator actual_fibs, unsigned count) { + auto expected_fibs = sync_fibonacci_sequence(count); + auto expected_fib = std::begin(expected_fibs); + + auto actual_fib = co_await actual_fibs.begin(); + + for (; actual_fib != actual_fibs.end(); co_await ++actual_fib) { + BOOST_REQUIRE(expected_fib != std::end(expected_fibs)); + BOOST_REQUIRE_EQUAL(*actual_fib, *expected_fib); + ++expected_fib; + } + BOOST_REQUIRE(actual_fib == actual_fibs.end()); +} + +SEASTAR_TEST_CASE(test_async_generator_drained_with_suspend) { + constexpr int count = 4; + return verify_fib_drained(async_fibonacci_sequence(count, do_suspend::yes), + count); +} + +SEASTAR_TEST_CASE(test_async_generator_drained_without_suspend) { + constexpr int count = 4; + return verify_fib_drained(async_fibonacci_sequence(count, do_suspend::no), + count); +} + +seastar::future<> test_async_generator_not_drained(do_suspend suspend) { + auto fib = async_fibonacci_sequence(42, suspend); + auto actual_fib = co_await fib.begin(); + BOOST_REQUIRE_EQUAL(*actual_fib, 0); +} + +SEASTAR_TEST_CASE(test_async_generator_not_drained_with_suspend) { + return test_async_generator_not_drained(do_suspend::yes); +} + +SEASTAR_TEST_CASE(test_async_generator_not_drained_without_suspend) { + return test_async_generator_not_drained(do_suspend::no); +} + +coroutine::experimental::async_generator +generate_value_and_ref(std::vector strings) { + co_yield "["; + std::string s; + for (auto sv : strings) { + s = sv; + s.push_back(','); + co_yield s; + } + co_yield "]"; +} + +SEASTAR_TEST_CASE(test_async_generator_value_reference) { + using namespace std::literals; + std::vector expected_quoted = {"["sv, "foo,"sv, "bar,"sv, "]"sv}; + auto actual_quoted = generate_value_and_ref({"foo"sv, "bar"sv}); + auto actual = co_await actual_quoted.begin(); + for (auto expected : expected_quoted) { + BOOST_REQUIRE_EQUAL(*actual, expected); + co_await ++actual; + } +} + +coroutine::experimental::async_generator +generate_yield_rvalue_reference(const std::vector strings) { + for (auto& s: strings) { + co_yield s; + } +} + +SEASTAR_TEST_CASE(test_async_generator_rvalue_reference) { + std::vector expected_strings = {"hello", "world"}; + auto actual_strings = generate_yield_rvalue_reference(expected_strings); + auto actual = co_await actual_strings.begin(); + for (auto expected : expected_strings) { + BOOST_REQUIRE_EQUAL(*actual, expected); + co_await ++actual; + } +} + +SEASTAR_TEST_CASE(test_async_generator_move_ctor) { + constexpr int count = 4; + auto actual_fibs = async_fibonacci_sequence(count, do_suspend::no); + return verify_fib_drained(std::move(actual_fibs), count); +} + +SEASTAR_TEST_CASE(test_async_generator_swap) { + int count_a = 4; + int count_b = 42; + auto fibs_a = async_fibonacci_sequence(count_a, do_suspend::no); + auto fibs_b = async_fibonacci_sequence(count_b, do_suspend::no); + std::swap(fibs_a, fibs_b); + std::swap(count_a, count_b); + co_await verify_fib_drained(std::move(fibs_a), count_a); + co_await verify_fib_drained(std::move(fibs_b), count_b); +} + +struct counter_t { + int n; + int* count; + counter_t(counter_t&& other) noexcept + : n{std::exchange(other.n, -1)}, + count{std::exchange(other.count, nullptr)} + {} + counter_t(int n, int* count) noexcept + : n{n}, count{count} { + ++(*count); + } + ~counter_t() noexcept { + if (count) { + --(*count); + } + } +}; + +std::ostream& operator<<(std::ostream& os, const counter_t& c) { + return os << c.n; +} + +coroutine::experimental::async_generator +fiddle(int n, int* total) { + int i = 0; + while (true) { + if (i++ == n) { + throw std::invalid_argument("Eureka from generator!"); + } + co_yield counter_t{i, total}; + } +} + +SEASTAR_TEST_CASE(test_async_generator_throws_from_generator) { + int total = 0; + auto count_to = [total=&total](unsigned n) -> seastar::future<> { + auto count = fiddle(n, total); + auto it = co_await count.begin(); + for (unsigned i = 0; i < 2 * n; i++) { + co_await ++it; + } + }; + auto f = co_await coroutine::as_future(count_to(42)); + BOOST_REQUIRE(f.failed()); + BOOST_REQUIRE_THROW(std::rethrow_exception(f.get_exception()), std::invalid_argument); + BOOST_REQUIRE_EQUAL(total, 0); +} + +SEASTAR_TEST_CASE(test_async_generator_throws_from_consumer) { + int total = 0; + auto count_to = [total=&total](unsigned n) -> seastar::future<> { + auto count = fiddle(n, total); + auto it = co_await count.begin(); + for (unsigned i = 0; i < n; i++) { + if (i == n / 2) { + throw std::invalid_argument("Eureka from consumer!"); + } + co_await ++it; + } + }; + auto f = co_await coroutine::as_future(count_to(42)); + BOOST_REQUIRE(f.failed()); + BOOST_REQUIRE_THROW(std::rethrow_exception(f.get_exception()), std::invalid_argument); + BOOST_REQUIRE_EQUAL(total, 0); +} From b0024b324f83589b544ac89f57d0c532e06aefa3 Mon Sep 17 00:00:00 2001 From: Kefu Chai Date: Mon, 22 Jul 2024 12:37:43 +0800 Subject: [PATCH 2/3] treewide: replace generator with async_generator * replace coroutine::experimental::generator with coroutine::experimental::async_generator * remove the generator related tests in coroutines_test.cc. since we already have tests/unit/generator_test.cc, there are no needs to keep two copies of these tests. * update `experimental_list_directory()` to return `generator`, for better performance, so we can point the promise's value pointer to the yielded value, without copying it using the copy_awaiter. Fixes #2190 Fixes #1913 Fixes #1677 Signed-off-by: Kefu Chai --- include/seastar/core/file.hh | 4 +- include/seastar/coroutine/async_generator.hh | 491 ------------ include/seastar/coroutine/generator.hh | 799 +++++++++---------- src/core/file-impl.hh | 2 +- src/core/file.cc | 10 +- tests/unit/coroutines_test.cc | 142 ---- tests/unit/directory_test.cc | 12 +- tests/unit/generator_test.cc | 38 +- 8 files changed, 407 insertions(+), 1091 deletions(-) delete mode 100644 include/seastar/coroutine/async_generator.hh diff --git a/include/seastar/core/file.hh b/include/seastar/core/file.hh index 1c9b0b4651f..ffe089e8247 100644 --- a/include/seastar/core/file.hh +++ b/include/seastar/core/file.hh @@ -182,7 +182,7 @@ public: virtual subscription list_directory(std::function (directory_entry de)> next) = 0; // due to https://github.com/scylladb/seastar/issues/1913, we cannot use // buffered generator yet. - virtual coroutine::experimental::generator experimental_list_directory(); + virtual coroutine::experimental::generator experimental_list_directory(); }; future> make_file_impl(int fd, file_open_options options, int oflags, struct stat st) noexcept; @@ -700,7 +700,7 @@ public: /// Returns a directory listing, given that this file object is a directory. // due to https://github.com/scylladb/seastar/issues/1913, we cannot use // buffered generator yet. - coroutine::experimental::generator experimental_list_directory(); + coroutine::experimental::generator experimental_list_directory(); #if SEASTAR_API_LEVEL < 7 /** diff --git a/include/seastar/coroutine/async_generator.hh b/include/seastar/coroutine/async_generator.hh deleted file mode 100644 index 170e651e07b..00000000000 --- a/include/seastar/coroutine/async_generator.hh +++ /dev/null @@ -1,491 +0,0 @@ -/* - * This file is open source software, licensed to you under the terms - * of the Apache License, Version 2.0 (the "License"). See the NOTICE file - * distributed with this work for additional information regarding copyright - * ownership. You may not use this file except in compliance with the License. - * - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -#pragma once - -#include -#include -#include -#include -#include -#include -#include -#include - -// seastar::coroutine::experimental::async_generator is inspired by the C++23 proposal -// P2502R2 (https://wg21.link/P2502R2), which introduced std::generator for -// synchronous coroutine-based range generation. -// -// Similar to P2502R2's generator, seastar::coroutine::experimental::async_generator -// * prioritizes storing references to yielded objects instead of copying them -// * generates a range with iterators that yield values -// -// However, there are key differences in seastar::coroutine::experimental::generator: -// -// * Alocator support: -// Seastar's generator does not support the Allocator template parameter. -// Unlike Seastar uses its built-in allocator eliminating the need for -// additional flexibility. -// * Asynchronous Operations: -// - generator::begin() is a coroutine, unlike P2502R2's synchronous approach -// - generator::iterator::operator++() is a coroutine -// - generator::iterator::operator++(int) is a coroutine -// Note: Due to its asynchronous nature, this generator cannot be used in -// range-based for loops. -// * Ranges Integration: -// seastar's generator is not a std::ranges::view_interface. So it lacks -// integration with the C++20 Ranges library due to its asynchronous operations. -// * Nesting: -// Nesting generators is not supported. You cannot yield another generator -// from within a generator. This prevents implementation asynchronous, -// recursive algorithms like depth-first search on trees. -namespace seastar::coroutine::experimental { - -template -class async_generator; - -namespace internal { - -template class next_awaiter; - -template -class async_generator_promise_base : public seastar::task { -protected: - // a glvalue yield expression is passed to co_yield as its operand. and - // the object denoted by this expression is guaranteed to live until the - // coroutine resumes. we take advantage of this fact by storing only a - // pointer to the denoted object in the promise as long as the result of - // dereferencing this pointer is convertible to the Ref type. - std::add_pointer_t _value = nullptr; - -protected: - std::exception_ptr _exception; - std::coroutine_handle<> _consumer; - task* _waiting_task = nullptr; - - /// awaiter returned by the generator when it produces a new element - /// - /// There are different combinations of expression types passed to - /// \c co_yield and \c Ref. In most cases, zero copies are made. Copies - /// are only necessary when \c co_yield requires type conversion. - /// - /// The following table summarizes the number of copies made for different - /// scenarios: - /// - /// | Ref | co_yield const T& | co_yield T& | co_yield T&& | co_yield U&& | - /// | --------- | ----------------- | ----------- | ------------ | ------------ | - /// | T | 0 | 0 | 0 | 1 | - /// | const T& | 0 | 0 | 0 | 1 | - /// | T& | ill-formed | 0 | ill-formed | ill-formed | - /// | T&& | ill-formed | ill-formed | 0 | 1 | - /// | const T&& | ill-formed | ill-formed | 0 | 1 | - /// - /// When no copies are required, \c yield_awaiter is used. Otherwise, - /// \c copy_awaiter is used. The latter converts \c U to \c T, and keeps the converted - /// value in it. - class yield_awaiter final { - async_generator_promise_base* _promise; - std::coroutine_handle<> _consumer; - public: - yield_awaiter(async_generator_promise_base* promise, - std::coroutine_handle<> consumer) noexcept - : _promise{promise} - , _consumer{consumer} - {} - bool await_ready() const noexcept { - return false; - } - template - std::coroutine_handle<> await_suspend(std::coroutine_handle producer) noexcept { - _promise->_waiting_task = &producer.promise(); - if (seastar::need_preempt()) { - auto consumer = std::coroutine_handle::from_address( - _consumer.address()); - seastar::schedule(&consumer.promise()); - return std::noop_coroutine(); - } - return _consumer; - } - void await_resume() noexcept {} - }; - - class copy_awaiter { - using value_type = std::remove_cvref_t; - async_generator_promise_base* _promise; - std::coroutine_handle<> _consumer; - value_type _value; - public: - copy_awaiter(async_generator_promise_base* promise, - std::coroutine_handle<> consumer, - const value_type& value) - : _promise{promise} - , _consumer{consumer} - , _value{value} - {} - constexpr bool await_ready() const noexcept { - return false; - } - template - std::coroutine_handle<> await_suspend(std::coroutine_handle producer) noexcept { - auto& current = producer.promise(); - current._value = std::addressof(_value); - _promise->_waiting_task = ¤t; - if (seastar::need_preempt()) { - auto consumer = std::coroutine_handle::from_address( - _consumer.address()); - seastar::schedule(&consumer.promise()); - return std::noop_coroutine(); - } - return _consumer; - } - constexpr void await_resume() const noexcept {} - }; - -public: - async_generator_promise_base() noexcept = default; - async_generator_promise_base(const async_generator_promise_base &) = delete; - async_generator_promise_base& operator=(const async_generator_promise_base &) = delete; - async_generator_promise_base(async_generator_promise_base &&) noexcept = default; - async_generator_promise_base& operator=(async_generator_promise_base &&) noexcept = default; - - // lazily-started coroutine, do not execute the coroutine until - // the coroutine is awaited. - std::suspend_always initial_suspend() const noexcept { - return {}; - } - - yield_awaiter final_suspend() noexcept { - _value = nullptr; - return yield_awaiter{this, this->_consumer}; - } - - void unhandled_exception() noexcept { - _exception = std::current_exception(); - } - - yield_awaiter yield_value(Yielded value) noexcept { - this->_value = std::addressof(value); - return yield_awaiter{this, this->_consumer}; - } - - copy_awaiter yield_value(const std::remove_reference_t& value) - requires (std::is_rvalue_reference_v && - std::constructible_from< - std::remove_cvref_t, - const std::remove_reference_t&>) { - return copy_awaiter{this, this->_consumer, value}; - } - - void return_void() noexcept {} - - // @return if the async_generator has reached the end of the sequence - bool finished() const noexcept { - return _value == nullptr; - } - - void rethrow_if_unhandled_exception() { - if (_exception) { - std::rethrow_exception(std::move(_exception)); - } - } - - void run_and_dispose() noexcept final { - using handle_type = std::coroutine_handle; - handle_type::from_promise(*this).resume(); - } - - seastar::task* waiting_task() noexcept final { - return _waiting_task; - } - -private: - friend class next_awaiter; -}; - -/// awaiter returned when the consumer gets the \c begin iterator or -/// when it advances the iterator. -template -class next_awaiter { -protected: - async_generator_promise_base* _promise = nullptr; - std::coroutine_handle<> _producer = nullptr; - - explicit next_awaiter(std::nullptr_t) noexcept {} - next_awaiter(async_generator_promise_base& promise, - std::coroutine_handle<> producer) noexcept - : _promise{std::addressof(promise)} - , _producer{producer} {} - -public: - bool await_ready() const noexcept { - return false; - } - - template - std::coroutine_handle<> await_suspend(std::coroutine_handle consumer) noexcept { - _promise->_consumer = consumer; - return _producer; - } -}; - -} // namespace internal - -/// async_generator represents a view modeling std::ranges::input_range, -/// and has move-only iterators. -/// -/// async_generator has 2 template parameters: -/// -/// - Ref -/// - Value -/// -/// From Ref and Value, we derive types: -/// - value_type: a cv-unqualified object type that specifies the value type of -/// the async_generator's range and iterators -/// - reference_Type: the reference type of the async_generator's range and iterators -/// - yielded_type: the type of the parameter to the primary overload of \c -/// yield_value in the async_generator's associated promise type -/// -/// Under the most circumstances, only the first parameter is specified: like -/// \c async_generator. The resulting async_generator: -/// - has a value type of \c remove_cvref_t -/// - has a reference type of \c meow, if it is a reference type, or \c meow&& -/// otherwise -/// - the operand of \c co_yield in the body of the async_generator should be -/// convertible to \c meow, if it is a reference type, otherwise the operand -/// type should be const meow& -/// -/// Consider following code snippet: -/// \code -/// async_generator send_query(std::string query) { -/// auto result_set = db.execute(query); -/// for (auto row : result_set) { -/// co_yield std::format("{}", row); -/// } -/// } -/// \endcode -/// -/// In this case, \c Ref is a reference type of \c const std::string&, -/// and \c Value is the default value of \c void. So the \c value_type is -/// \c std::string. As the async_generator always returns a \c std::string, its -/// iterator has the luxury of returning a reference to it. -/// -/// But if some rare users want to use a proxy reference type, or to generate a -/// range whose iterators yield prvalue for whatever reason, they should use -/// the two-argument \c async_generator, like async_generator. -/// The resulting async_generator: -/// - has a value type of \c woof -/// - has a reference type of \c meow -/// -/// For instance, consider following code snippet: -/// \code -/// async_generator generate_strings() { -/// co_yield "["; -/// std::string s; -/// for (auto sv : {"1"sv, "2"sv}) { -/// s = sv; -/// s.push_back(','); -/// co_yield s; -/// } -/// co_yield "]"; -/// } -/// \endcode -/// -/// In this case, \c Ref is \c std::string_view, and \Value is \c std::string. -/// So we can ensure that the caller cannot invalidate the yielded values by -/// mutating the defererenced value of iterator. As the \c std::string_view -/// instance is immutable. But in the meanwhile, the async_generator can return -/// a \c std::string by \c co_yield a \c std::string_view or a \c std::string. -/// And the caller can still access the element of the range via the same type: -/// \c std::string_view. -/// -/// Current Limitation and Future Plans: -/// -/// This generator implementation does not address the "Pingpong problem": -/// where the producer generates elements one at a time, forcing frequent -/// context switches between producer and consumer. This can lead to suboptimal -/// performance, especially when bulk generation and consumption would be more -/// efficient. -/// -/// We intend to extend the existing implementation to allow the producer -/// to yield a range of elements. This will enable batch processing, -/// potentially improving performance by reducing context switches. -/// -/// TODO: Implement range-based yielding to mitigate the Ping-pong problem. -template -class [[nodiscard]] async_generator { - using value_type = std::conditional_t, - std::remove_cvref_t, - Value>; - using reference_type = std::conditional_t, - Ref&&, - Ref>; - using yielded_type = std::conditional_t, - reference_type, - const reference_type&>; - -public: - class promise_type; - -private: - using handle_type = std::coroutine_handle; - handle_type _coro = {}; - -public: - class iterator; - - async_generator() noexcept = default; - explicit async_generator(promise_type& promise) noexcept - : _coro(std::coroutine_handle::from_promise(promise)) - {} - async_generator(async_generator&& other) noexcept - : _coro{std::exchange(other._coro, {})} - {} - async_generator(const async_generator&) = delete; - async_generator& operator=(const async_generator&) = delete; - - ~async_generator() { - if (_coro) { - _coro.destroy(); - } - } - - friend void swap(async_generator& lhs, async_generator& rhs) noexcept { - std::swap(lhs._coro, rhs._coro); - } - - async_generator& operator=(async_generator&& other) noexcept { - if (this == &other) { - return *this; - } - if (_coro) { - _coro.destroy(); - } - _coro = std::exchange(other._coro, nullptr); - return *this; - } - - [[nodiscard]] auto begin() noexcept { - using base_awaiter = internal::next_awaiter; - class begin_awaiter final : public base_awaiter { - using base_awaiter::_promise; - - public: - explicit begin_awaiter(std::nullptr_t) noexcept - : base_awaiter{nullptr} - {} - explicit begin_awaiter(handle_type producer_coro) noexcept - : base_awaiter{producer_coro.promise(), producer_coro} - {} - bool await_ready() const noexcept { - return _promise == nullptr || base_awaiter::await_ready(); - } - - iterator await_resume() { - if (_promise == nullptr) { - return iterator{nullptr}; - } - if (_promise->finished()) { - _promise->rethrow_if_unhandled_exception(); - return iterator{nullptr}; - } - return iterator{ - handle_type::from_promise(*static_cast(_promise)) - }; - } - }; - - if (_coro) { - return begin_awaiter{_coro}; - } else { - return begin_awaiter{nullptr}; - } - } - - [[nodiscard]] std::default_sentinel_t end() const noexcept { - return {}; - } -}; - -template -class async_generator::promise_type final : public internal::async_generator_promise_base { -public: - async_generator get_return_object() noexcept { - return async_generator{*this}; - } - - yielded_type value() const noexcept { - return static_cast(*this->_value); - } -}; - -template -class async_generator::iterator final { -private: - using handle_type = async_generator::handle_type; - handle_type _coro = nullptr; - -public: - using iterator_category = std::input_iterator_tag; - using difference_type = std::ptrdiff_t; - using value_type = async_generator::value_type; - using reference = async_generator::reference_type; - using pointer = std::add_pointer_t; - - explicit iterator(handle_type coroutine) noexcept - : _coro{coroutine} - {} - - explicit operator bool() const noexcept { - return _coro && !_coro.done(); - } - - [[nodiscard]] auto operator++() noexcept { - using base_awaiter = internal::next_awaiter; - class increment_awaiter final : public base_awaiter { - iterator& _iterator; - using base_awaiter::_promise; - - public: - explicit increment_awaiter(iterator& iterator) noexcept - : base_awaiter{iterator._coro.promise(), iterator._coro} - , _iterator{iterator} - {} - iterator& await_resume() { - if (_promise->finished()) { - // update iterator to end() - _iterator = iterator{nullptr}; - _promise->rethrow_if_unhandled_exception(); - } - return _iterator; - } - }; - - assert(bool(*this) && "cannot increment end iterator"); - return increment_awaiter{*this}; - } - - reference operator*() const noexcept { - return _coro.promise().value(); - } - - bool operator==(std::default_sentinel_t) const noexcept { - return !bool(*this); - } -}; - -} // namespace seastar::coroutine::experimental diff --git a/include/seastar/coroutine/generator.hh b/include/seastar/coroutine/generator.hh index 24598584617..499695fd08f 100644 --- a/include/seastar/coroutine/generator.hh +++ b/include/seastar/coroutine/generator.hh @@ -23,525 +23,472 @@ #include #include -#include +#include +#include +#include +#include #include #include +// seastar::coroutine::generator is inspired by the C++23 proposal +// P2502R2 (https://wg21.link/P2502R2), which introduced std::generator for +// synchronous coroutine-based range generation. +// +// Similar to P2502R2's generator, seastar::coroutine::experimental::generator +// * prioritizes storing references to yielded objects instead of copying them +// * generates a range with iterators that yield values +// +// However, there are key differences in seastar::coroutine::experimental::generator: +// +// * Alocator support: +// Seastar's generator does not support the Allocator template parameter. +// Unlike Seastar uses its built-in allocator eliminating the need for +// additional flexibility. +// * Asynchronous Operations: +// - generator::begin() is a coroutine, unlike P2502R2's synchronous approach +// - generator::iterator::operator++() is a coroutine +// - generator::iterator::operator++(int) is a coroutine +// Note: Due to its asynchronous nature, this generator cannot be used in +// range-based for loops. +// * Ranges Integration: +// seastar's generator is not a std::ranges::view_interface. So it lacks +// integration with the C++20 Ranges library due to its asynchronous operations. +// * Nesting: +// Nesting generators is not supported. You cannot yield another generator +// from within a generator. This prevents implementation asynchronous, +// recursive algorithms like depth-first search on trees. namespace seastar::coroutine::experimental { -template class Container = std::optional> +template class generator; -/// `seastar::coroutine::experimental` is used as the type of the first -/// parameter of a buffered generator coroutine. -/// -/// the value of a `buffer_size_t` specifies the size of the buffer holding the -/// values produced by the generator coroutine. Unlike its unbuffered variant, -/// the bufferred generator does not wait for its caller to consume every single -/// produced values. Instead, it puts the produced values into an internal -/// buffer, before the buffer is full or the generator is suspended. This helps -/// to alleviate the problem of pingpong between the generator coroutine and -/// its caller. -enum class buffer_size_t : size_t; - namespace internal { -using std::coroutine_handle; -using std::suspend_never; -using std::suspend_always; -using std::suspend_never; -using std::noop_coroutine; - -template -using next_value_t = std::optional; - -template