Skip to content

Commit

Permalink
feat(dask): add initial dask support (#706)
Browse files Browse the repository at this point in the history
  • Loading branch information
Alputer committed Oct 24, 2024
1 parent c4be985 commit 3b7f99b
Show file tree
Hide file tree
Showing 6 changed files with 243 additions and 5 deletions.
1 change: 1 addition & 0 deletions AUTHORS.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
The list of contributors in alphabetical order:

- [Adelina Lintuluoto](https://orcid.org/0000-0002-0726-1452)
- [Alp Tuna](https://orcid.org/0009-0001-1915-3993)
- [Anton Khodak](https://orcid.org/0000-0003-3263-4553)
- [Audrius Mecionis](https://orcid.org/0000-0002-3759-1663)
- [Bruno Rosendo](https://orcid.org/0000-0002-0923-3148)
Expand Down
75 changes: 75 additions & 0 deletions docs/openapi.json
Original file line number Diff line number Diff line change
Expand Up @@ -429,6 +429,26 @@
"slurmcern"
]
},
"dask_cluster_default_number_of_workers": {
"title": "Default number of workers for Dask clusters",
"value": "2Gi"
},
"dask_cluster_default_single_worker_memory": {
"title": "Amount of memory for one Dask worker by default",
"value": "2Gi"
},
"dask_cluster_max_memory_limit": {
"title": "Maximum memory limit for Dask clusters",
"value": "16Gi"
},
"dask_cluster_max_single_worker_memory": {
"title": "Maximum amount of memory for one Dask worker",
"value": "8Gi"
},
"dask_enabled": {
"title": "Dask workflows allowed in the cluster",
"value": "False"
},
"default_kubernetes_jobs_timeout": {
"title": "Default timeout for Kubernetes jobs",
"value": "604800"
Expand Down Expand Up @@ -479,6 +499,61 @@
},
"type": "object"
},
"dask_cluster_default_number_of_workers": {
"properties": {
"title": {
"type": "string"
},
"value": {
"type": "string"
}
},
"type": "object"
},
"dask_cluster_default_single_worker_memory": {
"properties": {
"title": {
"type": "string"
},
"value": {
"type": "string"
}
},
"type": "object"
},
"dask_cluster_max_memory_limit": {
"properties": {
"title": {
"type": "string"
},
"value": {
"type": "string"
}
},
"type": "object"
},
"dask_cluster_max_single_worker_memory": {
"properties": {
"title": {
"type": "string"
},
"value": {
"type": "string"
}
},
"type": "object"
},
"dask_enabled": {
"properties": {
"title": {
"type": "string"
},
"value": {
"type": "string"
}
},
"type": "object"
},
"default_kubernetes_jobs_timeout": {
"properties": {
"title": {
Expand Down
25 changes: 24 additions & 1 deletion reana_server/config.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# -*- coding: utf-8 -*-
#
# This file is part of REANA.
# Copyright (C) 2017, 2018, 2019, 2020, 2021, 2022, 2023 CERN.
# Copyright (C) 2017, 2018, 2019, 2020, 2021, 2022, 2023, 2024 CERN.
#
# REANA is free software; you can redistribute it and/or modify it
# under the terms of the MIT License; see LICENSE file for more details.
Expand Down Expand Up @@ -58,6 +58,29 @@
os.getenv("LOGIN_PROVIDERS_SECRETS", "{}")
)

DASK_ENABLED = strtobool(os.getenv("DASK_ENABLED", "true"))
"""Whether Dask is enabled in the cluster or not"""

REANA_DASK_CLUSTER_MAX_MEMORY_LIMIT = os.getenv(
"REANA_DASK_CLUSTER_MAX_MEMORY_LIMIT", "16Gi"
)
"""Maximum memory limit for Dask clusters."""

REANA_DASK_CLUSTER_DEFAULT_NUMBER_OF_WORKERS = int(
os.getenv("REANA_DASK_CLUSTER_DEFAULT_NUMBER_OF_WORKERS", 2)
)
"""Number of workers in Dask cluster by default """

REANA_DASK_CLUSTER_DEFAULT_SINGLE_WORKER_MEMORY = os.getenv(
"REANA_DASK_CLUSTER_DEFAULT_SINGLE_WORKER_MEMORY", "2Gi"
)
"""Memory for one Dask worker by default."""

REANA_DASK_CLUSTER_MAX_SINGLE_WORKER_MEMORY = os.getenv(
"REANA_DASK_CLUSTER_MAX_SINGLE_WORKER_MEMORY", "8Gi"
)
"""Maximum memory for one Dask worker."""

REANA_KUBERNETES_JOBS_MEMORY_LIMIT = os.getenv("REANA_KUBERNETES_JOBS_MEMORY_LIMIT")
"""Maximum memory limit for user job containers for workflow complexity estimation."""

Expand Down
91 changes: 90 additions & 1 deletion reana_server/rest/info.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# -*- coding: utf-8 -*-
#
# This file is part of REANA.
# Copyright (C) 2021, 2022 CERN.
# Copyright (C) 2021, 2022, 2024 CERN.
#
# REANA is free software; you can redistribute it and/or modify it
# under the terms of the MIT License; see LICENSE file for more details.
Expand All @@ -24,6 +24,11 @@
REANA_KUBERNETES_JOBS_TIMEOUT_LIMIT,
REANA_KUBERNETES_JOBS_MAX_USER_TIMEOUT_LIMIT,
REANA_INTERACTIVE_SESSION_MAX_INACTIVITY_PERIOD,
DASK_ENABLED,
REANA_DASK_CLUSTER_DEFAULT_NUMBER_OF_WORKERS,
REANA_DASK_CLUSTER_MAX_MEMORY_LIMIT,
REANA_DASK_CLUSTER_DEFAULT_SINGLE_WORKER_MEMORY,
REANA_DASK_CLUSTER_MAX_SINGLE_WORKER_MEMORY,
)
from reana_server.decorators import signin_required

Expand Down Expand Up @@ -125,6 +130,41 @@ def info(user, **kwargs): # noqa
type: string
type: array
type: object
dask_enabled:
properties:
title:
type: string
value:
type: string
type: object
dask_cluster_max_memory_limit:
properties:
title:
type: string
value:
type: string
type: object
dask_cluster_default_number_of_workers:
properties:
title:
type: string
value:
type: string
type: object
dask_cluster_default_single_worker_memory:
properties:
title:
type: string
value:
type: string
type: object
dask_cluster_max_single_worker_memory:
properties:
title:
type: string
value:
type: string
type: object
type: object
examples:
application/json:
Expand Down Expand Up @@ -165,6 +205,26 @@ def info(user, **kwargs): # noqa
"title": "Maximum timeout for Kubernetes jobs",
"value": "1209600"
},
"dask_enabled": {
"title": "Dask workflows allowed in the cluster",
"value": "False"
},
"dask_cluster_max_memory_limit": {
"title": "Maximum memory limit for Dask clusters",
"value": "16Gi"
},
"dask_cluster_default_number_of_workers": {
"title": "Default number of workers for Dask clusters",
"value": "2Gi"
},
"dask_cluster_default_single_worker_memory": {
"title": "Amount of memory for one Dask worker by default",
"value": "2Gi"
},
"dask_cluster_max_single_worker_memory": {
"title": "Maximum amount of memory for one Dask worker",
"value": "8Gi"
},
}
500:
description: >-
Expand Down Expand Up @@ -217,7 +277,29 @@ def info(user, **kwargs): # noqa
title="Maximum inactivity period in days before automatic closure of interactive sessions",
value=REANA_INTERACTIVE_SESSION_MAX_INACTIVITY_PERIOD,
),
dask_enabled=dict(
title="Dask workflows allowed in the cluster",
value=bool(DASK_ENABLED),
),
)
if DASK_ENABLED:
cluster_information["dask_cluster_default_number_of_workers"] = dict(
title="Number of workers in Dask clusters by default",
value=REANA_DASK_CLUSTER_DEFAULT_NUMBER_OF_WORKERS,
)
cluster_information["dask_cluster_max_memory_limit"] = dict(
title="Maximum memory limit for Dask clusters",
value=REANA_DASK_CLUSTER_MAX_MEMORY_LIMIT,
)
cluster_information["dask_cluster_default_single_worker_memory"] = dict(
title="Memory for one Dask worker by default",
value=REANA_DASK_CLUSTER_DEFAULT_SINGLE_WORKER_MEMORY,
)
cluster_information["dask_cluster_max_single_worker_memory"] = dict(
title="Maximum memory for one Dask worker",
value=REANA_DASK_CLUSTER_MAX_SINGLE_WORKER_MEMORY,
)

return InfoSchema().dump(cluster_information)

except Exception as e:
Expand Down Expand Up @@ -260,3 +342,10 @@ class InfoSchema(Schema):
maximum_interactive_session_inactivity_period = fields.Nested(
StringNullableInfoValue
)
kubernetes_max_memory_limit = fields.Nested(StringInfoValue)
dask_enabled = fields.Nested(StringInfoValue)
if DASK_ENABLED:
dask_cluster_default_number_of_workers = fields.Nested(StringInfoValue)
dask_cluster_max_memory_limit = fields.Nested(StringInfoValue)
dask_cluster_default_single_worker_memory = fields.Nested(StringInfoValue)
dask_cluster_max_single_worker_memory = fields.Nested(StringInfoValue)
3 changes: 3 additions & 0 deletions reana_server/rest/workflows.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
validate_inputs,
validate_workflow,
validate_workspace_path,
validate_dask_memory_and_cores_limits,
)
from webargs import fields, validate
from webargs.flaskparser import use_kwargs
Expand Down Expand Up @@ -566,6 +567,8 @@ def create_workflow(user): # noqa

validate_inputs(reana_spec_file)

validate_dask_memory_and_cores_limits(reana_spec_file)

retention_days = reana_spec_file.get("workspace", {}).get("retention_days")
retention_rules = get_workspace_retention_rules(retention_days)

Expand Down
53 changes: 50 additions & 3 deletions reana_server/validation.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# -*- coding: utf-8 -*-
#
# This file is part of REANA.
# Copyright (C) 2022 CERN.
# Copyright (C) 2022, 2024 CERN.
#
# REANA is free software; you can redistribute it and/or modify it
# under the terms of the MIT License; see LICENSE file for more details.
Expand All @@ -18,8 +18,17 @@
from reana_commons.validation.operational_options import validate_operational_options
from reana_commons.validation.parameters import build_parameters_validator
from reana_commons.validation.utils import validate_reana_yaml, validate_workspace

from reana_server.config import SUPPORTED_COMPUTE_BACKENDS, WORKSPACE_RETENTION_PERIOD
from reana_commons.job_utils import kubernetes_memory_to_bytes

from reana_server.config import (
SUPPORTED_COMPUTE_BACKENDS,
WORKSPACE_RETENTION_PERIOD,
DASK_ENABLED,
REANA_DASK_CLUSTER_MAX_MEMORY_LIMIT,
REANA_DASK_CLUSTER_DEFAULT_NUMBER_OF_WORKERS,
REANA_DASK_CLUSTER_DEFAULT_SINGLE_WORKER_MEMORY,
REANA_DASK_CLUSTER_MAX_SINGLE_WORKER_MEMORY,
)
from reana_server import utils


Expand Down Expand Up @@ -153,3 +162,41 @@ def validate_retention_rule(rule: str, days: int) -> None:
"Maximum workflow retention period was reached. "
f"Please use less than {WORKSPACE_RETENTION_PERIOD} days."
)


def validate_dask_memory_and_cores_limits(reana_yaml: Dict) -> None:
"""Validate Dask workflows are allowed in the cluster and memory limits are respected."""
# Validate Dask workflows are allowed in the cluster
dask_resources = reana_yaml["workflow"].get("resources", {}).get("dask", {})
if not DASK_ENABLED and dask_resources != {}:
raise REANAValidationError("Dask workflows are not allowed in this cluster.")

# Validate Dask memory limit requested by the workflow
if dask_resources:
single_worker_memory = dask_resources.get(
"single_worker_memory", REANA_DASK_CLUSTER_DEFAULT_SINGLE_WORKER_MEMORY
)
if kubernetes_memory_to_bytes(
single_worker_memory
) > kubernetes_memory_to_bytes(REANA_DASK_CLUSTER_MAX_SINGLE_WORKER_MEMORY):
raise REANAValidationError(
f'The "single_worker_memory" provided in the dask resources exceeds the limit ({REANA_DASK_CLUSTER_MAX_SINGLE_WORKER_MEMORY}).'
)

number_of_workers = int(
dask_resources.get(
"number_of_workers", REANA_DASK_CLUSTER_DEFAULT_NUMBER_OF_WORKERS
)
)
requested_dask_cluster_memory = (
kubernetes_memory_to_bytes(single_worker_memory) * number_of_workers
)

if requested_dask_cluster_memory > kubernetes_memory_to_bytes(
REANA_DASK_CLUSTER_MAX_MEMORY_LIMIT
):
raise REANAValidationError(
f'The "memory" requested in the dask resources exceeds the limit ({REANA_DASK_CLUSTER_MAX_MEMORY_LIMIT}).\nDecrease the number of workers requested or amount of memory consumed by a single worker.'
)

return None

0 comments on commit 3b7f99b

Please sign in to comment.