Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(helm): add initial Dask support #701

Merged
merged 1 commit into from
Oct 31, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions AUTHORS.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
The list of contributors in alphabetical order:

- [Adelina Lintuluoto](https://orcid.org/0000-0002-0726-1452)
- [Alp Tuna](https://orcid.org/0009-0001-1915-3993)
- [Anton Khodak](https://orcid.org/0000-0003-3263-4553)
- [Audrius Mecionis](https://orcid.org/0000-0002-3759-1663)
- [Bruno Rosendo](https://orcid.org/0000-0002-0923-3148)
Expand Down
75 changes: 75 additions & 0 deletions docs/openapi.json
Original file line number Diff line number Diff line change
Expand Up @@ -429,6 +429,26 @@
"slurmcern"
]
},
"dask_cluster_default_number_of_workers": {
"title": "The number of Dask workers created by default",
"value": "2Gi"
},
"dask_cluster_default_single_worker_memory": {
"title": "The amount of memory used by default by a single Dask worker",
"value": "2Gi"
},
"dask_cluster_max_memory_limit": {
"title": "The maximum memory limit for Dask clusters created by users",
"value": "16Gi"
},
"dask_cluster_max_single_worker_memory": {
"title": "The maximum amount of memory that users can ask for the single Dask worker",
"value": "8Gi"
},
"dask_enabled": {
"title": "Dask workflows allowed in the cluster",
"value": "False"
},
"default_kubernetes_jobs_timeout": {
"title": "Default timeout for Kubernetes jobs",
"value": "604800"
Expand Down Expand Up @@ -479,6 +499,61 @@
},
"type": "object"
},
"dask_cluster_default_number_of_workers": {
"properties": {
"title": {
"type": "string"
},
"value": {
"type": "string"
}
},
"type": "object"
},
"dask_cluster_default_single_worker_memory": {
"properties": {
"title": {
"type": "string"
},
"value": {
"type": "string"
}
},
"type": "object"
},
"dask_cluster_max_memory_limit": {
"properties": {
"title": {
"type": "string"
},
"value": {
"type": "string"
}
},
"type": "object"
},
"dask_cluster_max_single_worker_memory": {
"properties": {
"title": {
"type": "string"
},
"value": {
"type": "string"
}
},
"type": "object"
},
"dask_enabled": {
"properties": {
"title": {
"type": "string"
},
"value": {
"type": "string"
}
},
"type": "object"
},
"default_kubernetes_jobs_timeout": {
"properties": {
"title": {
Expand Down
25 changes: 24 additions & 1 deletion reana_server/config.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# -*- coding: utf-8 -*-
#
# This file is part of REANA.
# Copyright (C) 2017, 2018, 2019, 2020, 2021, 2022, 2023 CERN.
# Copyright (C) 2017, 2018, 2019, 2020, 2021, 2022, 2023, 2024 CERN.
#
# REANA is free software; you can redistribute it and/or modify it
# under the terms of the MIT License; see LICENSE file for more details.
Expand Down Expand Up @@ -58,6 +58,29 @@
os.getenv("LOGIN_PROVIDERS_SECRETS", "{}")
)

DASK_ENABLED = strtobool(os.getenv("DASK_ENABLED", "true"))
"""Whether Dask is enabled in the cluster or not"""

REANA_DASK_CLUSTER_MAX_MEMORY_LIMIT = os.getenv(
"REANA_DASK_CLUSTER_MAX_MEMORY_LIMIT", "16Gi"
)
"""Maximum memory limit for Dask clusters."""

REANA_DASK_CLUSTER_DEFAULT_NUMBER_OF_WORKERS = int(
os.getenv("REANA_DASK_CLUSTER_DEFAULT_NUMBER_OF_WORKERS", 2)
)
"""Number of workers in Dask cluster by default """

REANA_DASK_CLUSTER_DEFAULT_SINGLE_WORKER_MEMORY = os.getenv(
"REANA_DASK_CLUSTER_DEFAULT_SINGLE_WORKER_MEMORY", "2Gi"
)
"""Memory for one Dask worker by default."""

REANA_DASK_CLUSTER_MAX_SINGLE_WORKER_MEMORY = os.getenv(
"REANA_DASK_CLUSTER_MAX_SINGLE_WORKER_MEMORY", "8Gi"
)
"""Maximum memory for one Dask worker."""

REANA_KUBERNETES_JOBS_MEMORY_LIMIT = os.getenv("REANA_KUBERNETES_JOBS_MEMORY_LIMIT")
"""Maximum memory limit for user job containers for workflow complexity estimation."""

Expand Down
91 changes: 90 additions & 1 deletion reana_server/rest/info.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# -*- coding: utf-8 -*-
#
# This file is part of REANA.
# Copyright (C) 2021, 2022 CERN.
# Copyright (C) 2021, 2022, 2024 CERN.
#
# REANA is free software; you can redistribute it and/or modify it
# under the terms of the MIT License; see LICENSE file for more details.
Expand All @@ -24,6 +24,11 @@
REANA_KUBERNETES_JOBS_TIMEOUT_LIMIT,
REANA_KUBERNETES_JOBS_MAX_USER_TIMEOUT_LIMIT,
REANA_INTERACTIVE_SESSION_MAX_INACTIVITY_PERIOD,
DASK_ENABLED,
REANA_DASK_CLUSTER_DEFAULT_NUMBER_OF_WORKERS,
REANA_DASK_CLUSTER_MAX_MEMORY_LIMIT,
REANA_DASK_CLUSTER_DEFAULT_SINGLE_WORKER_MEMORY,
REANA_DASK_CLUSTER_MAX_SINGLE_WORKER_MEMORY,
)
from reana_server.decorators import signin_required

Expand Down Expand Up @@ -125,6 +130,41 @@
type: string
type: array
type: object
dask_enabled:
properties:
title:
type: string
value:
type: string
type: object
dask_cluster_max_memory_limit:
properties:
title:
type: string
value:
type: string
type: object
dask_cluster_default_number_of_workers:
properties:
title:
type: string
value:
type: string
type: object
dask_cluster_default_single_worker_memory:
properties:
title:
type: string
value:
type: string
type: object
dask_cluster_max_single_worker_memory:
properties:
title:
type: string
value:
type: string
type: object
type: object
examples:
application/json:
Expand Down Expand Up @@ -165,6 +205,26 @@
"title": "Maximum timeout for Kubernetes jobs",
"value": "1209600"
},
"dask_enabled": {
"title": "Dask workflows allowed in the cluster",
"value": "False"
},
"dask_cluster_max_memory_limit": {
"title": "The maximum memory limit for Dask clusters created by users",
"value": "16Gi"
},
"dask_cluster_default_number_of_workers": {
"title": "The number of Dask workers created by default",
"value": "2Gi"
},
"dask_cluster_default_single_worker_memory": {
"title": "The amount of memory used by default by a single Dask worker",
"value": "2Gi"
},
"dask_cluster_max_single_worker_memory": {
"title": "The maximum amount of memory that users can ask for the single Dask worker",
"value": "8Gi"
},
}
500:
description: >-
Expand Down Expand Up @@ -217,7 +277,29 @@
title="Maximum inactivity period in days before automatic closure of interactive sessions",
value=REANA_INTERACTIVE_SESSION_MAX_INACTIVITY_PERIOD,
),
dask_enabled=dict(
title="Dask workflows allowed in the cluster",
value=bool(DASK_ENABLED),
),
)
if DASK_ENABLED:
cluster_information["dask_cluster_default_number_of_workers"] = dict(

Check warning on line 286 in reana_server/rest/info.py

View check run for this annotation

Codecov / codecov/patch

reana_server/rest/info.py#L285-L286

Added lines #L285 - L286 were not covered by tests
title="The number of Dask workers created by default",
value=REANA_DASK_CLUSTER_DEFAULT_NUMBER_OF_WORKERS,
)
cluster_information["dask_cluster_max_memory_limit"] = dict(

Check warning on line 290 in reana_server/rest/info.py

View check run for this annotation

Codecov / codecov/patch

reana_server/rest/info.py#L290

Added line #L290 was not covered by tests
title="The maximum memory limit for Dask clusters created by users",
value=REANA_DASK_CLUSTER_MAX_MEMORY_LIMIT,
)
cluster_information["dask_cluster_default_single_worker_memory"] = dict(

Check warning on line 294 in reana_server/rest/info.py

View check run for this annotation

Codecov / codecov/patch

reana_server/rest/info.py#L294

Added line #L294 was not covered by tests
title="The amount of memory used by default by a single Dask worker",
value=REANA_DASK_CLUSTER_DEFAULT_SINGLE_WORKER_MEMORY,
)
cluster_information["dask_cluster_max_single_worker_memory"] = dict(

Check warning on line 298 in reana_server/rest/info.py

View check run for this annotation

Codecov / codecov/patch

reana_server/rest/info.py#L298

Added line #L298 was not covered by tests
title="The maximum amount of memory that users can ask for the single Dask worker",
value=REANA_DASK_CLUSTER_MAX_SINGLE_WORKER_MEMORY,
)

return InfoSchema().dump(cluster_information)

except Exception as e:
Expand Down Expand Up @@ -260,3 +342,10 @@
maximum_interactive_session_inactivity_period = fields.Nested(
StringNullableInfoValue
)
kubernetes_max_memory_limit = fields.Nested(StringInfoValue)
dask_enabled = fields.Nested(StringInfoValue)
if DASK_ENABLED:
dask_cluster_default_number_of_workers = fields.Nested(StringInfoValue)
dask_cluster_max_memory_limit = fields.Nested(StringInfoValue)
dask_cluster_default_single_worker_memory = fields.Nested(StringInfoValue)
dask_cluster_max_single_worker_memory = fields.Nested(StringInfoValue)
3 changes: 3 additions & 0 deletions reana_server/rest/workflows.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
validate_inputs,
validate_workflow,
validate_workspace_path,
validate_dask_memory_and_cores_limits,
)
from webargs import fields, validate
from webargs.flaskparser import use_kwargs
Expand Down Expand Up @@ -566,6 +567,8 @@ def create_workflow(user): # noqa

validate_inputs(reana_spec_file)

validate_dask_memory_and_cores_limits(reana_spec_file)

retention_days = reana_spec_file.get("workspace", {}).get("retention_days")
retention_rules = get_workspace_retention_rules(retention_days)

Expand Down
53 changes: 50 additions & 3 deletions reana_server/validation.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# -*- coding: utf-8 -*-
#
# This file is part of REANA.
# Copyright (C) 2022 CERN.
# Copyright (C) 2022, 2024 CERN.
#
# REANA is free software; you can redistribute it and/or modify it
# under the terms of the MIT License; see LICENSE file for more details.
Expand All @@ -18,8 +18,17 @@
from reana_commons.validation.operational_options import validate_operational_options
from reana_commons.validation.parameters import build_parameters_validator
from reana_commons.validation.utils import validate_reana_yaml, validate_workspace

from reana_server.config import SUPPORTED_COMPUTE_BACKENDS, WORKSPACE_RETENTION_PERIOD
from reana_commons.job_utils import kubernetes_memory_to_bytes

from reana_server.config import (
SUPPORTED_COMPUTE_BACKENDS,
WORKSPACE_RETENTION_PERIOD,
DASK_ENABLED,
REANA_DASK_CLUSTER_MAX_MEMORY_LIMIT,
REANA_DASK_CLUSTER_DEFAULT_NUMBER_OF_WORKERS,
REANA_DASK_CLUSTER_DEFAULT_SINGLE_WORKER_MEMORY,
REANA_DASK_CLUSTER_MAX_SINGLE_WORKER_MEMORY,
)
from reana_server import utils


Expand Down Expand Up @@ -153,3 +162,41 @@
"Maximum workflow retention period was reached. "
f"Please use less than {WORKSPACE_RETENTION_PERIOD} days."
)


def validate_dask_memory_and_cores_limits(reana_yaml: Dict) -> None:
"""Validate Dask workflows are allowed in the cluster and memory limits are respected."""
# Validate Dask workflows are allowed in the cluster
dask_resources = reana_yaml["workflow"].get("resources", {}).get("dask", {})
if not DASK_ENABLED and dask_resources != {}:
raise REANAValidationError("Dask workflows are not allowed in this cluster.")

Check warning on line 172 in reana_server/validation.py

View check run for this annotation

Codecov / codecov/patch

reana_server/validation.py#L172

Added line #L172 was not covered by tests

# Validate Dask memory limit requested by the workflow
if dask_resources:
single_worker_memory = dask_resources.get(

Check warning on line 176 in reana_server/validation.py

View check run for this annotation

Codecov / codecov/patch

reana_server/validation.py#L176

Added line #L176 was not covered by tests
"single_worker_memory", REANA_DASK_CLUSTER_DEFAULT_SINGLE_WORKER_MEMORY
)
if kubernetes_memory_to_bytes(

Check warning on line 179 in reana_server/validation.py

View check run for this annotation

Codecov / codecov/patch

reana_server/validation.py#L179

Added line #L179 was not covered by tests
single_worker_memory
) > kubernetes_memory_to_bytes(REANA_DASK_CLUSTER_MAX_SINGLE_WORKER_MEMORY):
raise REANAValidationError(

Check warning on line 182 in reana_server/validation.py

View check run for this annotation

Codecov / codecov/patch

reana_server/validation.py#L182

Added line #L182 was not covered by tests
f'The "single_worker_memory" provided in the dask resources exceeds the limit ({REANA_DASK_CLUSTER_MAX_SINGLE_WORKER_MEMORY}).'
)

number_of_workers = int(

Check warning on line 186 in reana_server/validation.py

View check run for this annotation

Codecov / codecov/patch

reana_server/validation.py#L186

Added line #L186 was not covered by tests
dask_resources.get(
"number_of_workers", REANA_DASK_CLUSTER_DEFAULT_NUMBER_OF_WORKERS
)
)
requested_dask_cluster_memory = (

Check warning on line 191 in reana_server/validation.py

View check run for this annotation

Codecov / codecov/patch

reana_server/validation.py#L191

Added line #L191 was not covered by tests
kubernetes_memory_to_bytes(single_worker_memory) * number_of_workers
)

if requested_dask_cluster_memory > kubernetes_memory_to_bytes(

Check warning on line 195 in reana_server/validation.py

View check run for this annotation

Codecov / codecov/patch

reana_server/validation.py#L195

Added line #L195 was not covered by tests
REANA_DASK_CLUSTER_MAX_MEMORY_LIMIT
):
raise REANAValidationError(

Check warning on line 198 in reana_server/validation.py

View check run for this annotation

Codecov / codecov/patch

reana_server/validation.py#L198

Added line #L198 was not covered by tests
f'The "memory" requested in the dask resources exceeds the limit ({REANA_DASK_CLUSTER_MAX_MEMORY_LIMIT}).\nDecrease the number of workers requested or amount of memory consumed by a single worker.'
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The limit testing seems to work nicely, I have tested a few scenarios.

)

return None
Loading