-
-
Notifications
You must be signed in to change notification settings - Fork 718
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Profiling Scheduler Performance #4443
Comments
Hrm, here is another performance report for when security is turned off. Tornado is still a significant cost. |
OK, I think that this is just down to python's I put a timer around the socket.send calls in
We're spending 300-500us per call and making lots of calls. I can try to batch things a little bit on the worker side but that will only give us a factor increase. I'm curious how we can take communication overhead off of the main thread. For reference, I got these numbers by instrumenting Tornado in the following way diff --git a/tornado/iostream.py b/tornado/iostream.py
index 768b404b..f3d000bc 100644
--- a/tornado/iostream.py
+++ b/tornado/iostream.py
@@ -81,6 +81,38 @@ if sys.platform == "darwin":
_WINDOWS = sys.platform.startswith("win")
+
+import contextlib
+from collections import defaultdict
+from time import time
+from dask.utils import format_time
+
+total_time_data = defaultdict(float)
+counts_data = defaultdict(int)
+
+
+@contextlib.contextmanager
+def duration(name: str) -> None:
+ start = time()
+
+ yield
+
+ stop = time()
+
+ total_time_data[name] += stop - start
+ counts_data[name] += 1
+
+
+import atexit
+
+@atexit.register
+def _():
+ for name in total_time_data:
+ duration = total_time_data[name]
+ count = counts_data[name]
+ print(name, format_time(duration), "/", count, "=", format_time(duration / count))
+
+
class StreamClosedError(IOError):
"""Exception raised by `IOStream` methods when the stream is closed.
@@ -1144,7 +1176,8 @@ class IOStream(BaseIOStream):
def write_to_fd(self, data: memoryview) -> int:
try:
- return self.socket.send(data) # type: ignore
+ with duration("send"):
+ return self.socket.send(data) # type: ignore
finally:
# Avoid keeping to data, which can be a memoryview.
# See https://github.com/tornadoweb/tornado/pull/2008
@@ -1564,7 +1597,8 @@ class SSLIOStream(IOStream):
def write_to_fd(self, data: memoryview) -> int:
try:
- return self.socket.send(data) # type: ignore
+ with duration("send"):
+ return self.socket.send(data) # type: ignore
except ssl.SSLError as e:
if e.args[0] == ssl.SSL_ERROR_WANT_WRITE:
# In Python 3.5+, SSLSocket.send raises a WANT_WRITE error if @pitrou do you have thoughts here on if it is possible to avoid this 300us cost ? |
Do you witness different numbers when TLS is turned off? |
Also, to get a better idea of what's happening, can you print all deciles rather than simply the average? The actual distribution should be insightful. |
Oh, and account |
These numbers are actually with SSL turned off. I realize now that the title of the issue is confusing. I realized when diving into this that this is slow with normal TCP without security, so I've focused on that for now. And yes, I'll get deciles shortly. |
If SSL is turned off and this is a non-blocking socket (as it should be, since we're using Tornado), then the only reasonable explanation is GIL-induced measurement bias. The quantiles should probably help validate this hypothesis. |
|
This is surprising though. I would expect the 50% value to be around 300us. I'm double-checking the instrumentation. |
I'm not surprised. Most calls are quite fast (it's just a non-blocking system call). A small fraction of the calls have to wait for the GIL before returning, and therefore take more than 10 ms. |
In other words, you're just seeing the effects of the GIL on performance of a single thread in a multi-thread Python program. System calls like |
This tooling might be useful to better detect such situations: |
Yeah, you've mentioned that calls like socket.send are likely masking some other call before. I think that this is the first time that I fully understand what is going on. Seeing the quantiles helped me. Thank you for directing me to that. I'm still left with the question of "what is taking time and making things slow?" I don't have much experience profiling code at this level when the GIL is involved. @maartenbreddels, any suggestions? |
I tried to reproduce it locally, but I get a wildly different report, dump of what I've done:
import dask
from dask.distributed import Client, performance_report, wait
def main(args=None):
client = Client('127.0.0.1:8786')
dask.config.set({"optimization.fuse.active": False})
df = dask.datasets.timeseries(start="2020-01-01", end="2020-01-31", partition_freq="1h", freq="60s").persist()
with performance_report("report.html"):
df2 = df.set_index("x").persist()
wait(df2)
if __name__ == "__main__":
main()
Which shows >10x more tasks. Am I doing something wrong here? |
@pitrou Just curious why you think 10ms, I'd guess 5ms from the default of sys.getswitchinterval
I've been thinking a bit what the best way is to describe why it is/looks so costly. Would you agree that it's not perse the GIL switch that is costly (e.g the thread context switch is relatively cheap), but that often releasing the GIL in a thread within 5ms, while other threads don't do that (like running pure Python code) will lead that thread too often having to wait for the GIL.
I think it is the attempt to return from the (Python) This is how I understand the situation described in https://bugs.python.org/issue7946 which I've went over a bit in maartenbreddels/fastblog#3 (comment) (comment section of https://www.maartenbreddels.com/perf/jupyter/python/tracing/gil/2021/01/14/Tracing-the-Python-GIL.html ) I think giltracer might tell you if the picture painted here is correct, if that is the case, there are no good solutions I think. Workarounds/bandaids I can think of:
Possibly you may be able to identify a c-function that does not release the GIL (unlikely), or you can make a strong case for doing a c-extension to work around this. This is actually the primary reason I build this tool and wrote the article, I want to make sure before building a C version of a ThreadPoolExector that it's worth it (and still not sure about it). |
Hmm, yes, you're right. I was thinking about the typical OS timeslice.
I don't know if it's "too often". It depends what other threads are doing too. The distribution of durations can tell us how "often" that happens.
Before finding solutions, one would have to ask the question "does this need solving?". Even if you find ways to balance scheduling between the two Python threads (the one that does IO and the one that runs pure Python code), you'll still be executing the same workload in the same process execution time, just ordered differently. So the question is: would it improve your overall performance (for example by providing data earlier to other nodes) if you managed to prioritize IO calls before pure Python code in the scheduler process? |
Yes, when I produced the original performance reports I forgot to call With regards to thread switching I'm not sure I understand what is happening. My understanding from what I read above is that if you have two threads trading off the GIL there is a multi-millisecond delay in handing off the GIL. This would surprise me. I generally expect lock-style objects to engage in 10us or so in Python. Why would the GIL be so much slower?
Even if this doesn't improve performance I'm very curious about what would improve performance. I care as much about visibility here as anything else. People today are very curious about how to make the scheduler run faster. They're happy to pour engineering resources into it. Currently they're targetting various aspects of the scheduler, but I'm not confident that we're working in the right place. I am searching for more visibility into what is taking up time. Another thing we could do here, if it would help, is try to keep the scheduler single-threaded. Currently we intentionally offload compression/decompression to a separate thread. To me this seems like a good idea, but that's because I assumed that engaging multiple threads didn't cause significant GIL issues. The 5-10ms number above has me confused (it's way higher than I would have expected). Would keeping things single-threaded improve performance and/or visibility? |
Releasing the GIL is fast. Acquiring the GIL can lead to waits if the GIL is already held by someone else, which is what is being witnessed here :-) What
My opinion is that it would need one or both of these things:
Both things obviously non-trivial...
It would certainly make things easier to understand, but also would give worse performance. Offloading compression probably benefits overall performance, but also makes it slightly less predictable. |
Yeah, I don't think this is feasible given Dask's execution model.
We're currently using Cython (you might find this work interesting, see recent PRs from @jakirkham). If we need to go to C++ we're open to that, and the state machine logic is being isolated with this option in mind. However I'm also not convinced that that logic is the slow part of the system. I am comfortable rewriting in C++ if necessary, but I want to avoid the situation where we do that, and then find that it's not significantly faster. John has been able to increase the performance of the tricky scheduling logic considerably (the stuff that I would expect to be slow in Python) but this hasn't resulted in significant performance improvements overall, which has me concerned. I'm open to moving to a lower level language, but I first want to understand the scope of code that we need to move over to have a good effect. |
I didn't mean that. There's probably no obvious hot spot (which is why an entire rewrite may be the solution, rather than some select optimizations in Cython). |
There are two options for the scheduler, I think:
Option 2 would be nice, if it makes sense. |
My intuition is that the "death of a thousand cuts" that's the main performance limiter applies as much to the networking and event management as to the core scheduling logic. But don't take my word for it, I haven't tried to profile the dask scheduler in years. |
Adding @jcrist , who might find this conversation interesting. I think that he was looking at efficient Python networking recently. |
Well one observation I've shared with Ben and maybe you as well is within the Scheduler there are a bunch of sends in transitions themselves. PR ( #4365 ) fixes that by moving the sends out of the transitions and grouping them together. IDK if it would help the issues identified here, but I do think grouping communication a bit more closely together may generally be helpful regardless of what we do next. Another thing to consider is that when we send a serialized message currently we do a few sends. First to tell how many buffers need to be sent. Next how big those buffers are (with UCX we also include whether they are on the CPU or GPU). Finally the buffers themselves. We may want to figure out how we can aggregate all of these into one message. Admittedly it's not entirely clear how one would do this with TCP. I believe (but could be wrong about this) there is a way to do this with UCX. |
Agreed.
Does IOStream buffer writes internally? I don't remember. |
Sorry responded too quickly. I think we are doing the buffering with Forgot to add that Scheduler messages themselves usually have a simpler form and so don't necessarily need to be serialized as if they were something more complex (like a DataFrame or Array). Maybe we can exploit this when sending? Some thoughts on that in issue ( #4376 ). |
Results from running with you ref_trans2 branch . No strong difference or information here, but I did start adding some more detail in the higher quantiles.
Right, so it might make sense to start looking into protecting calling I've turned off offloading to a separate thread for this. Something else is still going on. |
Did that include Cythonizing it or merely run it as Python? |
Just Python for now |
It's also worth noting that we would no longer need to transmit the length of the frames since that is part of this message format. What I'm still trying to understand is if we might be able to get rid of transmitting the number of frames, at which point we really just have one send/recv occurring. |
Perhaps, but that's relatively rare with respect to the small metadata
messages that the scheduler itself handles. The dataframe or array
communications are much more likely to be worker-to-worker.
…On Fri, Feb 12, 2021 at 4:20 PM jakirkham ***@***.***> wrote:
Wouldn't this happen basically anytime we are sending Dataframes or Arrays?
—
You are receiving this because you authored the thread.
Reply to this email directly, view it on GitHub
<#4443 (comment)>,
or unsubscribe
<https://github.com/notifications/unsubscribe-auth/AACKZTBHSIMI6SCEFX2WSELS6WSRTANCNFSM4WJNIRZQ>
.
|
Why do you say that? You definitely need to know the lengths of the frames in order to presize the buffers you give to recvmsg_into. |
What if we use Edit: Would add that the example in the docs of |
It looks like someone started to implement this for Tornado ( tornadoweb/tornado#2734 ) |
At least for changes in Distributed, PR ( #4506 ) gets this down to one Under-the-hood the writes may still translate to multiple |
Some observations
|
This is still using a sampling window where only a small part of the run is kept, right? To contrast this a bit, would take a look at this info from the Scheduler performance report ( quasiben/dask-scheduler-performance#109 (comment) ) |
Spotted Also saw a pattern in These are all in PR ( #4516 ) |
This is starting to look like the task completion thrashing that @quasiben mentioned on Friday we might run into |
I think that @quasiben mentioned that because the workers have been given very very short tasks that they might overwhelm the scheduler. Is this what you are referring to? if so, then that is intentional. We want the workers to pester the scheduler as much as they can so that we're able to simulate the traffic on a larger cluster while testing on a very small machine. Or in summary, all of this is about thrashing. Thrashing is the goal of these benchmarks :) |
What I mean is there is a bunch of work happening due to task completion. At least when we came into this work , the problem was the Scheduler was taking a long time dispatching the initial chunk of work. It doesn't seem we are seeing that any more (at least not for this use case). |
My method of profiling is only catching the tail end of the computation, so unfortunately I'm blind to the |
Yep no worries. FWIW if you are looking to try more things, PR ( #4526 ) batches more communications through things like task completion (amongst other things), which weren't handled before. |
FWIW Under-the-hood |
This is partially resolved in that sending uses vectorized IO in Edit: Python 3.12 will include a similar change ( python/cpython#31871 ) |
Hi, CPython core dev here; @jakirkham I suggest you to file issues upstream for |
Thanks for the update. When 3.12 comes out we'll want to revisit our workaround for doing zero-copy writes with asyncio here: distributed/distributed/comm/asyncio_tcp.py Line 765 in b5a2078
|
That's really exciting. Thank you @kumaraditya303 for the work and for keeping us informed. |
Note that Python 3.12 experimentally supports |
When running Dask with TLS security turned on, the scheduler can easily become bottlenecked by SSL communication. See the performance report below, in particular the "Scheduler Profile" tab.
https://gistcdn.githack.com/mrocklin/1d24fbac2c66364d8717952ee8827c64/raw/9c87343cb358b54eb26dbfe8b0714120c0f5ad81/report.html
I ran this on my laptop with the following code
When this is run on a larger cluster with many workers this problem becomes significantly worse. What are some ways that we can reduce the cost of secure communication?
The text was updated successfully, but these errors were encountered: