Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dask task scheduling hangs, then eventually flow fails with PoolTimeout #12877

Open
4 tasks done
arneyjfs opened this issue Apr 24, 2024 · 2 comments
Open
4 tasks done
Labels
bug Something isn't working great writeup This is a wonderful example of our standards

Comments

@arneyjfs
Copy link

arneyjfs commented Apr 24, 2024

First check

  • I added a descriptive title to this issue.
  • I used the GitHub search to find a similar issue and didn't find it.
  • I searched the Prefect documentation for this issue.
  • I checked that this issue is related to Prefect and not one of its dependencies.

Bug summary

I have a flow run which triggers about 2,000 tasks. I am using the DaskTaskRunner to handle running this many tasks at scale. After about 200 tasks are scheduled, scheduling hangs. After this point the flow continues for about 5 minutes, with the scheduled tasks running fine but no more being scheduled. All tasks that managed to be scheduled eventually complete and the flow hangs in a running status, not finished, but with not picking showing any new tasks starting. After a seemingly variable amount of time (0-5mins?), the flow crashes with a PoolTimeout trying to reach <Private-Prefect-Server-IP>/api/task_runs/

Detailed sequential walkthrough of error

  • Prefect server is running on a medium size machine
  • A Flow is triggered. The flow starts triggering 2,208 tasks.
    • These tasks are handled by the DaskTaskRunner which is configured to point at a Dask cluster (on a 3rd separate VM) for distributed computation of the tasks.
    • A singular dask worker (another VM) is connected to the dask cluster, with 2cpus.
    • The dask worker starts processing tasks, max 2 at a time.
  • The logs of the prefect-worker show tasks 0-180 being scheduled quickly, then slowing down, and stopping at about 200. From here there are no more logs until the crash.
  • The prefect UI and dask UI show the existing tasks being completed as normal. Neither UI hangs on reload or becomes unresponsive
  • The 200 tasks complete normally, then the prefect UI shows the flow as running, but no tasks currently pending or in progress.
  • After about 5 minutes, the flow is reported as crashed with the following console UI Error
image

All logs are fine, apart from eventually reporting the crash in various ways. The docker logs on the prefect-worker are the only logs with a bit more info as shown in the stack trace provided below.

There is clearly a bottleneck here but I am struggling to find where it is. The cpu and memory stays at a normal level for all VMs. Thinking the bottleneck may be the prefect-server database (after reading similar issues) I moved to a Cloud SQL managed postgreSQL database. That has not fixed it - the database shows low loading and the issue still happens.

Reproduction

# I managed to reproduce the problem in a simpler way with the below code (running from prefect.serve). This seems to result in a similar error (though about 900 tasks are scheduled before grinding to a halt. Maybe because it is simpler code/not run through docker, or because my laptop is handling the sheduling and is higher spec? but still no out of the ordinary resource usage at the point it hangs)

import os
from prefect import task, flow, get_run_logger, serve
from prefect.runtime import task_run
from prefect_dask import DaskTaskRunner


def generate_task_name():
    parameters = task_run.parameters
    task_number = parameters["task_number"]
    return str(task_number)


@task(name="example_task",
      task_run_name=generate_task_name,
      log_prints=True)
def example_task(task_number):
    logger = get_run_logger()
    logger.info(f'I am task {task_number}')


@flow(name="example_flow",
      log_prints=True,
      task_runner=DaskTaskRunner(address="tcp://10.128.15.193:8786"))
def example_flow():
    logger = get_run_logger()

    args = list(range(2000))
    logger.info(f'Triggering {len(args)} tasks')

    example_task.map(args)


if __name__ == '__main__':
    repro_deploy = example_flow.to_deployment(
        name='reproduce_error',
        work_pool_name='default-agent-pool')
    serve(repro_deploy)

Error

15:38:42.341 | INFO    | Flow run 'flow-1234' - Submitted task run 'exampleDaskTask-201' for execution.
15:38:42.343 | INFO    | Flow run 'flow-1234' - Created task run 'exampleDaskTask-202' for task 'exampleDaskTask'
15:38:42.350 | INFO    | Flow run 'flow-1234' - Submitted task run 'exampleDaskTask-202' for execution.
15:38:42.352 | INFO    | Flow run 'flow-1234' - Created task run 'exampleDaskTask-203' for task 'exampleDaskTask'
15:38:42.359 | INFO    | Flow run 'flow-1234' - Submitted task run 'exampleDaskTask-203' for execution.
15:43:46.511 | INFO    | httpx - HTTP Request: POST http://<Prefect-Server-IP>:4200/api/logs/ "HTTP/1.1 201 Created"
15:45:03.206 | ERROR   | distributed.client -
Traceback (most recent call last):
  File "/usr/local/lib/python3.10/dist-packages/distributed/utils.py", line 832, in wrapper
    return await func(*args, **kwargs)
  File "/usr/local/lib/python3.10/dist-packages/distributed/client.py", line 1715, in _close
    await self.scheduler_comm.close()
asyncio.exceptions.CancelledError
15:45:03.210 | ERROR   | Flow run 'amazing-mastodon' - Crash detected! Request to <Prefect-Server-IP>:4200/api/task_runs/ failed: PoolTimeout: .
15:45:04.944 | INFO    | httpx - HTTP Request: POST <Prefect-Server-IP>/api/logs/ "HTTP/1.1 201 Created"
15:51:59.305 | ERROR   | prefect.engine - Engine execution of flow run '0ee09f65-828c-43a3-9112-77f2e84cbcea' exited with unexpected exception
Traceback (most recent call last):
  File "/usr/local/lib/python3.10/dist-packages/httpx/_transports/default.py", line 69, in map_httpcore_exceptions
    yield
  File "/usr/local/lib/python3.10/dist-packages/httpx/_transports/default.py", line 373, in handle_async_request
    resp = await self._pool.handle_async_request(req)
  File "/usr/local/lib/python3.10/dist-packages/httpcore/_async/connection_pool.py", line 216, in handle_async_request
    raise exc from None
  File "/usr/local/lib/python3.10/dist-packages/httpcore/_async/connection_pool.py", line 192, in handle_async_request
    connection = await pool_request.wait_for_connection(timeout=timeout)
  File "/usr/local/lib/python3.10/dist-packages/httpcore/_async/connection_pool.py", line 35, in wait_for_connection
    await self._connection_acquired.wait(timeout=timeout)
  File "/usr/local/lib/python3.10/dist-packages/httpcore/_synchronization.py", line 148, in wait
    with map_exceptions(anyio_exc_map):
  File "/usr/lib/python3.10/contextlib.py", line 153, in __exit__
    self.gen.throw(typ, value, traceback)
  File "/usr/local/lib/python3.10/dist-packages/httpcore/_exceptions.py", line 14, in map_exceptions
    raise to_exc(exc) from exc
httpcore.PoolTimeout

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/usr/local/lib/python3.10/dist-packages/prefect/engine.py", line 2997, in <module>
    enter_flow_run_engine_from_subprocess(flow_run_id)
  File "/usr/local/lib/python3.10/dist-packages/prefect/engine.py", line 322, in enter_flow_run_engine_from_subprocess
    state = from_sync.wait_for_call_in_loop_thread(
  File "/usr/local/lib/python3.10/dist-packages/prefect/_internal/concurrency/api.py", line 243, in wait_for_call_in_loop_thread
    return call.result()
  File "/usr/local/lib/python3.10/dist-packages/prefect/_internal/concurrency/calls.py", line 318, in result
    return self.future.result(timeout=timeout)
  File "/usr/local/lib/python3.10/dist-packages/prefect/_internal/concurrency/calls.py", line 179, in result
    return self.__get_result()
  File "/usr/lib/python3.10/concurrent/futures/_base.py", line 403, in __get_result
    raise self._exception
  File "/usr/local/lib/python3.10/dist-packages/prefect/_internal/concurrency/calls.py", line 389, in _run_async
    result = await coro
  File "/usr/local/lib/python3.10/dist-packages/prefect/client/utilities.py", line 78, in with_injected_client
    return await fn(*args, **kwargs)
  File "/usr/local/lib/python3.10/dist-packages/prefect/engine.py", line 477, in retrieve_flow_then_begin_flow_run
    return await begin_flow_run(
  File "/usr/local/lib/python3.10/dist-packages/prefect/engine.py", line 514, in begin_flow_run
    async with AsyncExitStack() as stack:
  File "/usr/lib/python3.10/contextlib.py", line 714, in __aexit__
    raise exc_details[1]
  File "/usr/lib/python3.10/contextlib.py", line 697, in __aexit__
    cb_suppress = await cb(*exc_details)
  File "/usr/lib/python3.10/contextlib.py", line 217, in __aexit__
    await self.gen.athrow(typ, value, traceback)
  File "/usr/local/lib/python3.10/dist-packages/prefect/engine.py", line 2344, in report_flow_run_crashes
    flow_run_state = await propose_state(client, state, flow_run_id=flow_run.id)
  File "/usr/local/lib/python3.10/dist-packages/prefect/client/orchestration.py", line 2117, in set_flow_run_state
    response = await self._client.post(
  File "/usr/local/lib/python3.10/dist-packages/httpx/_client.py", line 1892, in post
    return await self.request(
  File "/usr/local/lib/python3.10/dist-packages/httpx/_client.py", line 1574, in request
    return await self.send(request, auth=auth, follow_redirects=follow_redirects)
  File "/usr/local/lib/python3.10/dist-packages/prefect/client/base.py", line 311, in send
    response = await self._send_with_retry(
  File "/usr/local/lib/python3.10/dist-packages/prefect/client/base.py", line 235, in _send_with_retry
    response = await send(request, *send_args, **send_kwargs)
  File "/usr/local/lib/python3.10/dist-packages/httpx/_client.py", line 1661, in send
    response = await self._send_handling_auth(
  File "/usr/local/lib/python3.10/dist-packages/httpx/_client.py", line 1689, in _send_handling_auth
    response = await self._send_handling_redirects(
  File "/usr/local/lib/python3.10/dist-packages/httpx/_client.py", line 1726, in _send_handling_redirects
    response = await self._send_single_request(request)
  File "/usr/local/lib/python3.10/dist-packages/httpx/_client.py", line 1763, in _send_single_request
    response = await transport.handle_async_request(request)
  File "/usr/local/lib/python3.10/dist-packages/httpx/_transports/default.py", line 372, in handle_async_request
    with map_httpcore_exceptions():
  File "/usr/lib/python3.10/contextlib.py", line 153, in __exit__
    self.gen.throw(typ, value, traceback)
  File "/usr/local/lib/python3.10/dist-packages/httpx/_transports/default.py", line 86, in map_httpcore_exceptions
    raise mapped_exc(message) from exc
httpx.PoolTimeout
15:52:01.736 | ERROR   | prefect.flow_runs.runner - Process for flow run 'amazing-mastodon' exited with status code: 1
15:52:01.802 | INFO    | prefect.flow_runs.runner - Reported flow run '0ee09f65-828c-43a3-9112-77f2e84cbcea' as crashed: Flow run process exited with non-zero status code 1.

Versions

# for server and worker the output is the same
Version:             2.16.6
API version:         0.8.4
Python version:      3.10.12
Git commit:          3fecd435
Built:               Sat, Mar 23, 2024 4:06 PM
OS/Arch:             linux/x86_64
Profile:             default
Server type:         server

Additional context

This may be a terrible architecture, so if the solution is to use prefect in a diifferent way then I would also welcome so advice here. A post to the prefect discourse on how to do something like this did not lead to any advice on good architecture patterns. For context I tried another approach of using prefect workers to distribute tasks out to other VMs (i.e. instead of 1-prefect-server => 1-prefect-worker => 1-dask-cluster => many dask-workers, i tried 1-prefect-server => many prefect-workers), but this crashed in a similar way (failed to reach api).
I could also simply use prefect to trigger the flow and call dask normally from within the flow. Since it seems to be prefect struggling to keep track of all the tasks this would likely help - however I would lose the greatly increased visibility and logging that prefect provides at the task level

@arneyjfs arneyjfs added bug Something isn't working needs:triage Needs feedback from the Prefect product team labels Apr 24, 2024
@arneyjfs
Copy link
Author

For some additional context, I removed the prefect @task decorator from the task to let Dask handle scaling the tasks natively and I was easily able to scale to 1,000,000+ tasks. I think I can therefore also rule out the dask server as being a bottleneck here

@desertaxle desertaxle added great writeup This is a wonderful example of our standards and removed needs:triage Needs feedback from the Prefect product team labels Apr 25, 2024
@arneyjfs
Copy link
Author

arneyjfs commented May 1, 2024

Has anyone been able to reproduce this? @desertaxle? Perhaps there is some more information I can provide

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working great writeup This is a wonderful example of our standards
Projects
None yet
Development

No branches or pull requests

2 participants