From 9199c4f5ccc0f4badaa8461c5a54354aa8a92104 Mon Sep 17 00:00:00 2001 From: Tobias Hermann Date: Tue, 2 Apr 2024 15:07:33 +0200 Subject: [PATCH 1/4] docs: show a coroutine-based implementation of the echo server in the tutorial --- doc/tutorial.md | 34 ++++++++++++++++++++++++++++++++++ 1 file changed, 34 insertions(+) diff --git a/doc/tutorial.md b/doc/tutorial.md index 3d7487e941..d5e26ac162 100644 --- a/doc/tutorial.md +++ b/doc/tutorial.md @@ -1778,6 +1778,40 @@ It is often a mistake to silently ignore an exception, so if the future we're ig The ```handle_connection()``` function itself is straightforward --- it repeatedly calls ```read()``` read on the input stream, to receive a ```temporary_buffer``` with some data, and then moves this temporary buffer into a ```write()``` call on the output stream. The buffer will eventually be freed, automatically, when the ```write()``` is done with it. When ```read()``` eventually returns an empty buffer signifying the end of input, we stop ```repeat```'s iteration by returning a ```stop_iteration::yes```. +Re-written using C++20's coroutines, the above becomes this: + +```cpp +seastar::future<> handle_connection(seastar::connected_socket s) { + try { + auto out = s.output(); + auto in = s.input(); + while (true) { + auto buf = co_await in.read(); + if (buf) { + co_await out.write(std::move(buf)); + co_await out.flush(); + } else { + break; + } + } + co_await out.close(); + } + catch (const std::exception &ex) { + fmt::print(stderr, "Could not handle connection: {}\n", ex); + } +} + +seastar::future<> service_loop_3() { + seastar::listen_options lo; + lo.reuse_address = true; + auto listener = seastar::listen(seastar::make_ipv4_address({1234}), lo); + while (true) { + auto res = co_await listener.accept(); + (void) handle_connection(std::move(res.connection)); + } +} +``` + # Sharded services In the previous section we saw that a Seastar application usually needs to run its code on all available CPU cores. We saw that the `seastar::smp::submit_to()` function allows the main function, which initially runs only on the first core, to start the server's code on all `seastar::smp::count` cores. From 3aa1710d06e1752cb9f0edfda5a65ab010afb830 Mon Sep 17 00:00:00 2001 From: Dobiasd Date: Tue, 2 Apr 2024 18:00:50 +0200 Subject: [PATCH 2/4] docs: add comment about ignoring the future retured by handle_connection --- doc/tutorial.md | 3 +++ 1 file changed, 3 insertions(+) diff --git a/doc/tutorial.md b/doc/tutorial.md index d5e26ac162..cc917ac7f4 100644 --- a/doc/tutorial.md +++ b/doc/tutorial.md @@ -1807,6 +1807,9 @@ seastar::future<> service_loop_3() { auto listener = seastar::listen(seastar::make_ipv4_address({1234}), lo); while (true) { auto res = co_await listener.accept(); + // Note we ignore, not co_await, the future returned by + // handle_connection(), so we do not wait for one + // connection to be handled before accepting the next one. (void) handle_connection(std::move(res.connection)); } } From ea5b7a3b5e98b9ede805bd72dea50045cc35db25 Mon Sep 17 00:00:00 2001 From: Dobiasd Date: Tue, 2 Apr 2024 18:03:45 +0200 Subject: [PATCH 3/4] docs: simplify handle_connection body by co_await-ing in the loop condition --- doc/tutorial.md | 11 +++-------- 1 file changed, 3 insertions(+), 8 deletions(-) diff --git a/doc/tutorial.md b/doc/tutorial.md index cc917ac7f4..23f7b15681 100644 --- a/doc/tutorial.md +++ b/doc/tutorial.md @@ -1785,14 +1785,9 @@ seastar::future<> handle_connection(seastar::connected_socket s) { try { auto out = s.output(); auto in = s.input(); - while (true) { - auto buf = co_await in.read(); - if (buf) { - co_await out.write(std::move(buf)); - co_await out.flush(); - } else { - break; - } + while (auto buf = co_await in.read()) { + co_await out.write(std::move(buf)); + co_await out.flush(); } co_await out.close(); } From 8ba25cf17326fd293c0327b3259eac8367474e76 Mon Sep 17 00:00:00 2001 From: Dobiasd Date: Mon, 20 May 2024 07:58:18 +0200 Subject: [PATCH 4/4] Remove old version of echo server and adjust the explanation --- doc/tutorial.md | 72 +++++++------------------------------------------ 1 file changed, 10 insertions(+), 62 deletions(-) diff --git a/doc/tutorial.md b/doc/tutorial.md index 23f7b15681..3c80bd3c2d 100644 --- a/doc/tutorial.md +++ b/doc/tutorial.md @@ -1719,67 +1719,6 @@ In the above example we only saw writing to the socket. Real servers will also w Let's look at a simple example server involving both reads an writes. This is a simple echo server, as described in RFC 862: The server listens for connections from the client, and once a connection is established, any data received is simply sent back - until the client closes the connection. -```cpp -#include -#include -#include -#include - -seastar::future<> handle_connection(seastar::connected_socket s) { - auto out = s.output(); - auto in = s.input(); - return do_with(std::move(s), std::move(out), std::move(in), - [] (auto& s, auto& out, auto& in) { - return seastar::repeat([&out, &in] { - return in.read().then([&out] (auto buf) { - if (buf) { - return out.write(std::move(buf)).then([&out] { - return out.flush(); - }).then([] { - return seastar::stop_iteration::no; - }); - } else { - return seastar::make_ready_future( - seastar::stop_iteration::yes); - } - }); - }).then([&out] { - return out.close(); - }); - }); -} - -seastar::future<> service_loop_3() { - seastar::listen_options lo; - lo.reuse_address = true; - return seastar::do_with(seastar::listen(seastar::make_ipv4_address({1234}), lo), - [] (auto& listener) { - return seastar::keep_doing([&listener] () { - return listener.accept().then( - [] (seastar::accept_result res) { - // Note we ignore, not return, the future returned by - // handle_connection(), so we do not wait for one - // connection to be handled before accepting the next one. - (void)handle_connection(std::move(res.connection)).handle_exception( - [] (std::exception_ptr ep) { - fmt::print(stderr, "Could not handle connection: {}\n", ep); - }); - }); - }); - }); -} -``` - -The main function ```service_loop()``` loops accepting new connections, and for each connection calls ```handle_connection()``` to handle this connection. Our ```handle_connection()``` returns a future saying when handling this connection completed, but importantly, we do ***not*** wait for this future: Remember that ```keep_doing``` will only start the next iteration when the future returned by the previous iteration is resolved. Because we want to allow parallel ongoing connections, we don't want the next ```accept()``` to wait until the previously accepted connection was closed. So we call ```handle_connection()``` to start the handling of the connection, but return nothing from the continuation, which resolves that future immediately, so ```keep_doing``` will continue to the next ```accept()```. - -This demonstrates how easy it is to run parallel _fibers_ (chains of continuations) in Seastar - When a continuation runs an asynchronous function but ignores the future it returns, the asynchronous operation continues in parallel, but never waited for. - -It is often a mistake to silently ignore an exception, so if the future we're ignoring might resolve with an except, it is recommended to handle this case, e.g. using a ```handle_exception()``` continuation. In our case, a failed connection is fine (e.g., the client might close its connection will we're sending it output), so we did not bother to handle the exception. - -The ```handle_connection()``` function itself is straightforward --- it repeatedly calls ```read()``` read on the input stream, to receive a ```temporary_buffer``` with some data, and then moves this temporary buffer into a ```write()``` call on the output stream. The buffer will eventually be freed, automatically, when the ```write()``` is done with it. When ```read()``` eventually returns an empty buffer signifying the end of input, we stop ```repeat```'s iteration by returning a ```stop_iteration::yes```. - -Re-written using C++20's coroutines, the above becomes this: - ```cpp seastar::future<> handle_connection(seastar::connected_socket s) { try { @@ -1796,7 +1735,7 @@ seastar::future<> handle_connection(seastar::connected_socket s) { } } -seastar::future<> service_loop_3() { +seastar::future<> service_loop() { seastar::listen_options lo; lo.reuse_address = true; auto listener = seastar::listen(seastar::make_ipv4_address({1234}), lo); @@ -1810,6 +1749,15 @@ seastar::future<> service_loop_3() { } ``` +The main function ```service_loop()``` loops accepting new connections, and for each connection calls ```handle_connection()``` to handle this connection. Our ```handle_connection()``` returns a future saying when handling this connection completed, but importantly, we do ***not*** wait for this future because we want to allow parallel ongoing connections. + +This demonstrates how easy it is to run parallel _fibers_ (chains of continuations) in Seastar - When a continuation runs an asynchronous function but ignores the future it returns, the asynchronous operation continues in parallel, but never waited for. + +It is often a mistake to silently ignore an exception, so if the future we're ignoring might resolve with an except, it is recommended to handle this case, e.g. using a ```handle_exception()``` continuation. In our case, a failed connection is fine (e.g., the client might close its connection while we're sending it output), so we did not bother to handle the exception. + +The ```handle_connection()``` function itself is straightforward --- it repeatedly calls ```read()``` read on the input stream, to receive a ```temporary_buffer``` with some data, and then moves this temporary buffer into a ```write()``` call on the output stream. The buffer will eventually be freed, automatically, when the ```write()``` is done with it. When ```read()``` eventually returns an empty buffer signifying the end of input, we exit the loop. + + # Sharded services In the previous section we saw that a Seastar application usually needs to run its code on all available CPU cores. We saw that the `seastar::smp::submit_to()` function allows the main function, which initially runs only on the first core, to start the server's code on all `seastar::smp::count` cores.