-
Notifications
You must be signed in to change notification settings - Fork 1.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
coroutine/generator: reimplement generator #2218
base: master
Are you sure you want to change the base?
Conversation
std::swap(lhs._coro, rhs._coro); | ||
} | ||
|
||
async_generator& operator=(async_generator &&other) noexcept { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit &&
sticks to type
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also, better do the below inside if (this != &other)
to protect against self-assignment;
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
to check self-assignment in move ctor was a controversial topic. but i think it has been clarified by https://cplusplus.github.io/LWG/issue2468 and formalized in the standard, see https://eel.is/c++draft/utility.arg.requirements#tab:cpp17.moveassignable .
If t and rv do not refer to the same object, t is equivalent to the value of rv before the assignment
so, if they refer to the same object, there are no requirements on the value of the lhs of the assignment operator after the assignment expression is evaluated. hence there is no need to perform the self-assignment here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@tchaikov we have such protections against self move all over the place.
The conservative approach is to make such a move be a no-op.
This way the lvalue contains the value of itself before the move, and it remains in a valid state (though unspecified).
Clearing self contents does not satisfy the first requirement of holding the state of the rvalue prior to move.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@avikivity do we have a documented convention regarding self-move assignment semantics?
I see that future<>
just guarantees validity but not the state after self-move is undetermined.
Maybe for control classes this is good enough, and that may be applicable in this case.
But for containers, self-move should be a no-op to prevent unpleasant surprises IMO (even though the stl isn't consistent about it).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@tchaikov we have such protections against self move all over the place. The conservative approach is to make such a move be a no-op. This way the lvalue contains the value of itself before the move, and it remains in a valid state (though unspecified).
yeah, i understand the concern.
Clearing self contents does not satisfy the first requirement of holding the state of the rvalue prior to move.
the postcondition has a requirement of "If t and rv do not refer to the same object, t is equivalent to the value of rv before the assignment", hence i am not sure we need this check in the case of move assignment operator .
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i changed it anyway. but personally, i think since the standard allows the undefined behavior in this case, we don't need to be conservative here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's get @avikivity's opinion on this too.
I can see cases where the generator needs to be moved around
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I can see cases where the generator needs to be moved around
i agree. but IMHO, moving a variable into itself is not quite a typical use case.
b9604c1
to
95c592b
Compare
v2:
|
b7a8b0a
to
e53fa6d
Compare
v3:
|
9f17f54
to
6111879
Compare
6111879
to
e76b86a
Compare
v4:
|
e76b86a
to
035f3e8
Compare
@scylladb/seastar-maint hello maintainers, could you help review this change? |
1 similar comment
@scylladb/seastar-maint hello maintainers, could you help review this change? |
What's the motivation? Shouldn't we fix the ping-pong problem? IIRC we had some internal queuing to avoid ping-pongs. |
|
||
template <typename Yielded> class next_awaiter; | ||
|
||
template <typename Yielded> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should have a constraint (type suitable to be carried in a future).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
in this implementation, the return value is passed to the consumer via a pointer -- the producer assigns the value to the pointer, so seastar::future<>
is not involved. and there are two cases:
- producer produces a lvalue: so we can just reuse it, and memorize its address, and return it to the consumer
- producer produces a rvalue: so we have to create a copy of it, and return the copy to the consumer
template <typename Promise> | ||
std::coroutine_handle<> await_suspend(std::coroutine_handle<Promise> producer) noexcept { | ||
_promise->_waiting_task = &producer.promise(); | ||
return _consumer; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here we have symmetric transfer, which is nice, the consumer is awakened without going through the scheduler. However, we must make at least one of the two directions has a preemption check.
I think it's okay now because await_ready always returns false, but I also think await_ready shouldn't return false.
If we are able to do symmetric transfer in both directions, then the ping-pong problem is much reduced: the two coroutines exchange control until one of them detects that preemption is needed or has to block itself.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
However, we must make at least one of the two directions has a preemption check.
i will take a closer look to see how to season the awaiter gently.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should be fixed in the latest revision, which yields on the producer's end.
|
||
[[nodiscard]] auto operator++() noexcept { | ||
using base_awaiter = internal::next_awaiter<yielded_type>; | ||
class increment_awaiter final : public base_awaiter { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Aha, so this is how we can be sure the consumer is a coroutine.
I think we should relax this, though my brain will probably explode.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
to accommodate both asynchronous and synchronous producers, we could specialize the generator based on the Promise
type passed to std::coroutine_handle
. However, this approach would introduce additional complexity to the implementation. i recommend utilizing the standard library's generator implementation,
which offers a more comprehensive set of features.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You mean synchronous and asynchronous consumers, no? The producer is always a coroutine.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You mean synchronous and asynchronous consumers, no?
ahh, sorry. i was thinking about the async generator versus sync generator. but seastar::coroutine::experimental::generator
implementation always assumes the promise is a seastar task. To obtain elements from an instance of this generator, one must use co_await
. consequently, it's challenging to envision a consumer of this generator that isn't itself a coroutine.
The producer is always a coroutine.
indeed. i just realized that it's more about async versus sync not about regular function versus coroutine.
for (unsigned i = 0; i < 2 * n; i++) { | ||
co_await ++it; | ||
} | ||
}; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe I should pause the internals review and look at the interface. Why an iterator range interface? It can't be be used in a range for loop. How do you determine the generator is exhausted? Here you know it generates 2*n, but in general, you don't.
I think we should go back to std::optional<T>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
With such an interface, you could implement both future<> operator(), and operator co_await directly on the generator object itself, using symmetric tranfser.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe I should pause the internals review and look at the interface. Why an iterator range interface?
to maintain consistency with the interface proposed in https://wg21.link/P2502R2, i opted for a design that closely aligns with the standard. the proposal introduces std::generator
, a standard library type implementing a coroutine generator that forms to the std::ranges::input_range
model. by adhering to this proposed interface, we ensure that the developers familiar with the standard will feel at home when using the asynchronous generator. hopefully, this approach makes the asynchronous generator more intuitive and easier to use.
It can't be be used in a range for loop.
it can't due to the inherent difference between async generator and synchronous generator. and unfortunately, Remove for co_await
statement was accepted. so we cannot use a ranged-for loop with async generator. but as explained above. probably we can preserve this interface? even though the async variant cannot allow the caller to use a nice "for" loop for enumerating the elements produced by the generator.
How do you determine the generator is exhausted? Here you know it generates 2*n, but in general, you don't.
we do have tests exercising the "you don't" cases, for example, see test_generator_drained_with_suspend
. i am quoting part of it as a reference
seastar::future<>
verify_fib_drained(coroutine::experimental::generator<int> actual_fibs, unsigned count) {
auto expected_fibs = sync_fibonacci_sequence(count);
auto expected_fib = std::begin(expected_fibs);
auto actual_fib = co_await actual_fibs.begin();
for (; actual_fib != actual_fibs.end(); co_await ++actual_fib) {
BOOST_REQUIRE(expected_fib != std::end(expected_fibs));
BOOST_REQUIRE_EQUAL(*actual_fib, *expected_fib);
++expected_fib;
}
BOOST_REQUIRE(actual_fib == actual_fibs.end());
}
I think we should go back to std::optional
it is a step back. the gist of the rewrite is to reuse the reference design proposed by https://wg21.link/P2502R2. so that:
- we don't need to track the emptiness or the free slots in the internal buffer. this is probably most important.
- we can save the overhead of calling move constructor or conversion.
- we can expose a similar interface to the one proposed by P2502R2
returning std::optinoal<T>
defeats the purpose of the efforts of rewriting. especially it does not work with the first item in the list above.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But what does co_await ++it
mean? Shouldn't it be co_await ++*it
? or *co_await ++it
?
Using iterators doesn't help, because there's no async for loop, and all the range algorithms don't work on such iterators.
How can you compare the iterator to the end iterator, before you co_await? How is the empty sequence represented?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But what does
co_await ++it
mean?
move to (wait for) the next element.
Shouldn't it be
co_await ++*it
? or*co_await ++it
?
i think *co_await ++it
works.
Using iterators doesn't help, because there's no async for loop, and all the range algorithms don't work on such iterators.
hmm, iterator helps with for loop. but i think it's also a way to represent a range. imaging we are walking from home to a bus stop, the start point is begin(journey)
, and the bus stop is represented by end(journey)
.
How can you compare the iterator to the end iterator, before you co_await? How is the empty sequence represented?
please see the code above, i am replicating it here:
for (; actual_fib != actual_fibs.end(); co_await ++actual_fib) {
// ..
}
if we hit the sentinel in the first iteration, then it is an empty sequence.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see it's inspired by https://github.com/lewissbaker/cppcoro#async_generatort. Does it have users? We can learn from its experience.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems to work. But it's just odd. The begin/end methods don't return iterators. You don't compare begin() to end(), you compare co_await begin() to end(). It looks like a range but it isn't a range.
The interface is also odd. You await begin() and ++i, but not anything else. You can't do regular iterator stuff like *++i.
yes, it's a known limit. i put this in the comment in the very beginning of the header file. see 5ef1a6e#diff-b7dc64e5af0d4dc90acdd0c4f909a940dbb2c74e88d65e503508ec1eb54253c5R48 . you are practically repeating it =( . i am quoting the comment here
// * Asynchronous Operations:
// - generator::begin() is a coroutine, unlike P2502R2's synchronous approach
// - generator::iterator::operator++() is a coroutine
// - generator::iterator::operator++(int) is a coroutine
// Note: Due to its asynchronous nature, this generator cannot be used in
// range-based for loops.
// * Ranges Integration:
// seastar's generator is not a std::ranges::view_interface. So it lacks
// integration with the C++20 Ranges library due to its asynchronous operations.
On the other hand, there's an advantage to being able to pass a reference, which isn't possible with std::optional. (there will be optional<T&> in C++26).
you mean returning a reference from the producer or returning a reference to the consumer?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see it's inspired by https://github.com/lewissbaker/cppcoro#async_generatort. Does it have users? We can learn from its experience.
yes, this implementation is also inspired by the cppcoro's async_generator<T>
and folly's generator implementation. i searched around with the keyword of <cppcoro/async_generator.hpp>
. but couldn't find any users of it. i think, C++20 coroutines are a relatively new feature in the C++ world. generators, which are based on coroutines, are still gaining traction among developers. as a result, the number of programmers actively using generators remains limited. asynchronous generators, being an even more advanced concept, are even less commonly utilized in applications with source code indexed by search engines.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@avikivity so are you okay with using an async iterator like: for (auto it = co_await gen.begin(); it != gen.end(); co_await ++it) { auto val = *it; }
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Qt seems to use the same interface as proposed here for its AsyncGenerator<T>
type (which is a good thing IMHO):
df154bc
to
b43215b
Compare
v9
|
this generator implementation is inspired by https://wg21.link/P2502R2. Refs scylladb#2190 Refs scylladb#1913 Refs scylladb#1677 Signed-off-by: Kefu Chai <[email protected]>
* replace coroutine::experimental::generator with coroutine::experimental::async_generator * remove the generator related tests in coroutines_test.cc. since we already have tests/unit/generator_test.cc, there are no needs to keep two copies of these tests. * update `experimental_list_directory()` to return `generator<const directory_entry&>`, for better performance, so we can point the promise's value pointer to the yielded value, without copying it using the copy_awaiter. Fixes scylladb#2190 Fixes scylladb#1913 Fixes scylladb#1677 Signed-off-by: Kefu Chai <[email protected]>
before this change, we only have an unbuffered generator implementation, where the producer and consumer operated in a "ping-pong" fashion. elements were produced and consumed one at a time. in this change, we introduce a buffered variant of the generator, which allows producer to yield a range of elements in batch. it also enables consumer to process multiple elements without suspension. the benefits are: * improved efficiency for batch operations * reduced context switching between producer and consumer * enhanced flexibility in element production and consumption patterns Signed-off-by: Kefu Chai <[email protected]>
b43215b
to
e201340
Compare
v10:
|
@avikivity could you take another look at your convenience? |
@avikivity zealous ping. |
@tchaikov Regarding the naming of your async-generator class: at various places in this PR, I see:
but when I look at the current implementation (under the "Files changed" tab), then I see that the class is named (btw thanks for your hard work on this!!) |
please see #2218 (comment) , the idea is to reimplement the existing |
I see, thanks for pointing me to that comment, I had missed it. (And the rationale makes sense indeed.) |
@avikivity gentle ping. |
@avikivity yet another gentle ping. |
@avikivity yet another ping. |
@avikivity yet another gentle ping. |
@tchaikov Dear Kefu, I have another question. With your current class int_input_stream {
seastar::input_stream<char> is_;
public:
future<int> read()
{
int x{};
auto buf = co_await is_.read_exactly(sizeof(int));
memcpy(&x, buf.get(), sizeof(int));
co_return x;
}
}; which passes through seastar's reactor for every time I call Now imagine I want to make a file writer which enjoys the same kind of "ping-pong" efficiency improvement: I can pass single integers into the generator, which, inside the generator fills some buffer which will then flush once in a while (essentially a more efficient version of a wrapper like the above around an |
yes. it suffers in multiple ways, every time we produces and consumes a number we do this dance:
i think it is possible in theory. as generator is a customized type, and we are able to resume the consumer's coroutine once one sends something to the generator. IIUC, but my question is why/how we would use it for implementing a reader in a more natural way. what i can think of is that it can be used to implement a pipeline, so that the producer can "send" values on one end of the pipeline explicitly instead of using |
My question was indeed about implementing a writer, in other words something that sits at the "sink" part of a pipeline. (A file reader can already be implemented using your current implementation) |
sorry, i misread your question. so instead of consuming the element generated from the generator, you want to feed the generator with well, something else (for instance another generator =)). i don't really think this is what a generator is designed for. i think it allows us to generate a series of elements not to consume elements. what do you think? |
Well, for sure the term "generator" becomes misleading when we want to build a data sink. But let's regard generator as a "coroutine that can produce several values", with which we could for example build a more efficient |
I see. this is indeed an interesting idea. and as you put, the points is the symmetric transferring between the producer and consumer, and producing the data in batch, so we can alleviate the pain of the ping-pong problem. i think the beauty of generator is that one is able to use if we have to mirror the paradigm of passing a value to a certain function we only a single option: input parameter. but we cannot re-evaluate a function without executing its body multiple times. however, we can "yield" / "return" values multiple times. i have no idea how to make an |
Wouldn't it be simply possible to make it legal (only in cases where applicable) to assign to the iterator? for (auto it = co_await gen.begin(); it != gen.end(); co_await ++it) { *it = my_value_to_be_consumed; } |
hi Kefu @tchaikov I've tried those approaches on your PR; the https://github.com/niekbouman/seastar/tree/niek-async-generator-send |
this generator implementation is inspired by https://wg21.link/P2502R2. instead of keeping an internal queue, it reuses the yielded value of the producer. in this change:
seastar::experimental::coroutine::generator
with this implementationcoroutines_test.cc
. and keepgenerator_test.cc
which is dedicated to generator.before this change, we were using a bounded buffer for holding the yielded elements. this proved to be error-prone, that's why we had "_wait_for_free_space assertion failure" in the
directory_test
. after this change, instead of using an internal buffer, we just hold a pointer referencing the yielded range or element, this saves us the pain of tracking the free space in the bounded buffer. hence address all of the three issues.Fixes #2190
Fixes #1913
Fixes #1677