From bdab669c61d768f92e07613c49e7a47f15666c06 Mon Sep 17 00:00:00 2001 From: Alputer Date: Thu, 12 Sep 2024 16:00:55 +0200 Subject: [PATCH] feat(dask): integrate Dask into reana (#701) --- docs/openapi.json | 117 ++++++++++++++++++++++++++++ reana_server/config.py | 31 +++++++- reana_server/rest/info.py | 136 ++++++++++++++++++++++++++++++++- reana_server/rest/workflows.py | 3 + reana_server/validation.py | 39 +++++++++- 5 files changed, 321 insertions(+), 5 deletions(-) diff --git a/docs/openapi.json b/docs/openapi.json index d7444d3a..b8045a9d 100644 --- a/docs/openapi.json +++ b/docs/openapi.json @@ -429,6 +429,34 @@ "slurmcern" ] }, + "dask_cluster_default_cores_limit": { + "title": "Default cores limit for dask clusters", + "value": "4" + }, + "dask_cluster_default_memory_limit": { + "title": "Default memory limit for dask clusters", + "value": "2Gi" + }, + "dask_cluster_default_single_worker_cores": { + "title": "Number of cores for one dask worker by default", + "value": "0.5" + }, + "dask_cluster_default_single_worker_memory": { + "title": "Amount of memory for one dask worker by default", + "value": "256Mi" + }, + "dask_cluster_max_cores_limit": { + "title": "Maximum cores limit for dask clusters", + "value": "8" + }, + "dask_cluster_max_memory_limit": { + "title": "Maximum memory limit for dask clusters", + "value": "4Gi" + }, + "dask_enabled": { + "title": "Dask workflows allowed in the cluster", + "value": false + }, "default_kubernetes_jobs_timeout": { "title": "Default timeout for Kubernetes jobs", "value": "604800" @@ -479,6 +507,83 @@ }, "type": "object" }, + "dask_cluster_default_cores_limit": { + "properties": { + "title": { + "type": "string" + }, + "value": { + "type": "string" + } + }, + "type": "object" + }, + "dask_cluster_default_memory_limit": { + "properties": { + "title": { + "type": "string" + }, + "value": { + "type": "string" + } + }, + "type": "object" + }, + "dask_cluster_default_single_worker_cores": { + "properties": { + "title": { + "type": "string" + }, + "value": { + "type": "string" + } + }, + "type": "object" + }, + "dask_cluster_default_single_worker_memory": { + "properties": { + "title": { + "type": "string" + }, + "value": { + "type": "string" + } + }, + "type": "object" + }, + "dask_cluster_max_cores_limit": { + "properties": { + "title": { + "type": "string" + }, + "value": { + "type": "string" + } + }, + "type": "object" + }, + "dask_cluster_max_memory_limit": { + "properties": { + "title": { + "type": "string" + }, + "value": { + "type": "string" + } + }, + "type": "object" + }, + "dask_enabled": { + "properties": { + "title": { + "type": "string" + }, + "value": { + "type": "boolean" + } + }, + "type": "object" + }, "default_kubernetes_jobs_timeout": { "properties": { "title": { @@ -574,6 +679,18 @@ "type": "object" } }, + "required": [ + "compute_backends", + "default_kubernetes_jobs_timeout", + "default_kubernetes_memory_limit", + "default_workspace", + "kubernetes_max_memory_limit", + "maximum_interactive_session_inactivity_period", + "maximum_kubernetes_jobs_timeout", + "maximum_workspace_retention_period", + "workspaces_available", + "dask_enabled" + ], "type": "object" } }, diff --git a/reana_server/config.py b/reana_server/config.py index 472361f8..d20e4085 100644 --- a/reana_server/config.py +++ b/reana_server/config.py @@ -1,7 +1,7 @@ # -*- coding: utf-8 -*- # # This file is part of REANA. -# Copyright (C) 2017, 2018, 2019, 2020, 2021, 2022, 2023 CERN. +# Copyright (C) 2017, 2018, 2019, 2020, 2021, 2022, 2023, 2024 CERN. # # REANA is free software; you can redistribute it and/or modify it # under the terms of the MIT License; see LICENSE file for more details. @@ -58,6 +58,35 @@ os.getenv("LOGIN_PROVIDERS_SECRETS", "{}") ) +DASK_ENABLED = strtobool(os.getenv("DASK_ENABLED", "true")) +"""Whether dask is enabled in the cluster or not""" + +REANA_DASK_CLUSTER_MAX_CORES_LIMIT = os.getenv("REANA_DASK_CLUSTER_MAX_CORES_LIMIT") +"""Maximum cores limit for dask clusters.""" + +REANA_DASK_CLUSTER_MAX_MEMORY_LIMIT = os.getenv("REANA_DASK_CLUSTER_MAX_MEMORY_LIMIT") +"""Maximum memory limit for dask clusters.""" + +REANA_DASK_CLUSTER_DEFAULT_CORES_LIMIT = os.getenv( + "REANA_DASK_CLUSTER_DEFAULT_CORES_LIMIT" +) +"""Default cores limit for dask clusters.""" + +REANA_DASK_CLUSTER_DEFAULT_MEMORY_LIMIT = os.getenv( + "REANA_DASK_CLUSTER_DEFAULT_MEMORY_LIMIT" +) +"""Default memory limit for dask clusters.""" + +REANA_DASK_CLUSTER_DEFAULT_SINGLE_WORKER_CORES = os.getenv( + "REANA_DASK_CLUSTER_DEFAULT_SINGLE_WORKER_CORES " +) +"""Number of cores for one dask worker by default.""" + +REANA_DASK_CLUSTER_DEFAULT_SINGLE_WORKER_MEMORY = os.getenv( + "REANA_DASK_CLUSTER_DEFAULT_SINGLE_WORKER_MEMORY" +) +"""Memory for one dask worker by default.""" + REANA_KUBERNETES_JOBS_MEMORY_LIMIT = os.getenv("REANA_KUBERNETES_JOBS_MEMORY_LIMIT") """Maximum memory limit for user job containers for workflow complexity estimation.""" diff --git a/reana_server/rest/info.py b/reana_server/rest/info.py index 0b12dae4..29e0ff99 100644 --- a/reana_server/rest/info.py +++ b/reana_server/rest/info.py @@ -1,7 +1,7 @@ # -*- coding: utf-8 -*- # # This file is part of REANA. -# Copyright (C) 2021, 2022 CERN. +# Copyright (C) 2021, 2022, 2024 CERN. # # REANA is free software; you can redistribute it and/or modify it # under the terms of the MIT License; see LICENSE file for more details. @@ -24,6 +24,13 @@ REANA_KUBERNETES_JOBS_TIMEOUT_LIMIT, REANA_KUBERNETES_JOBS_MAX_USER_TIMEOUT_LIMIT, REANA_INTERACTIVE_SESSION_MAX_INACTIVITY_PERIOD, + DASK_ENABLED, + REANA_DASK_CLUSTER_MAX_CORES_LIMIT, + REANA_DASK_CLUSTER_MAX_MEMORY_LIMIT, + REANA_DASK_CLUSTER_DEFAULT_CORES_LIMIT, + REANA_DASK_CLUSTER_DEFAULT_MEMORY_LIMIT, + REANA_DASK_CLUSTER_DEFAULT_SINGLE_WORKER_CORES, + REANA_DASK_CLUSTER_DEFAULT_SINGLE_WORKER_MEMORY, ) from reana_server.decorators import signin_required @@ -125,7 +132,67 @@ def info(user, **kwargs): # noqa type: string type: array type: object + dask_enabled: + properties: + title: + type: string + value: + type: boolean + type: object + dask_cluster_default_cores_limit: + properties: + title: + type: string + value: + type: string + type: object + dask_cluster_max_cores_limit: + properties: + title: + type: string + value: + type: string + type: object + dask_cluster_default_memory_limit: + properties: + title: + type: string + value: + type: string + type: object + dask_cluster_max_memory_limit: + properties: + title: + type: string + value: + type: string + type: object + dask_cluster_default_single_worker_cores: + properties: + title: + type: string + value: + type: string + type: object + dask_cluster_default_single_worker_memory: + properties: + title: + type: string + value: + type: string + type: object type: object + required: + - compute_backends + - default_kubernetes_jobs_timeout + - default_kubernetes_memory_limit + - default_workspace + - kubernetes_max_memory_limit + - maximum_interactive_session_inactivity_period + - maximum_kubernetes_jobs_timeout + - maximum_workspace_retention_period + - workspaces_available + - dask_enabled examples: application/json: { @@ -165,6 +232,34 @@ def info(user, **kwargs): # noqa "title": "Maximum timeout for Kubernetes jobs", "value": "1209600" }, + "dask_enabled": { + "title": "Dask workflows allowed in the cluster", + "value": False + }, + "dask_cluster_default_cores_limit": { + "title": "Default cores limit for dask clusters", + "value": "4" + }, + "dask_cluster_max_cores_limit": { + "title": "Maximum cores limit for dask clusters", + "value": "8" + }, + "dask_cluster_default_memory_limit": { + "title": "Default memory limit for dask clusters", + "value": "2Gi" + }, + "dask_cluster_max_memory_limit": { + "title": "Maximum memory limit for dask clusters", + "value": "4Gi" + }, + "dask_cluster_default_single_worker_cores": { + "title": "Number of cores for one dask worker by default", + "value": "0.5" + }, + "dask_cluster_default_single_worker_memory": { + "title": "Amount of memory for one dask worker by default", + "value": "256Mi" + }, } 500: description: >- @@ -217,7 +312,37 @@ def info(user, **kwargs): # noqa title="Maximum inactivity period in days before automatic closure of interactive sessions", value=REANA_INTERACTIVE_SESSION_MAX_INACTIVITY_PERIOD, ), + dask_enabled=dict( + title="Dask workflows allowed in the cluster", + value=bool(DASK_ENABLED), + ), ) + if DASK_ENABLED: + cluster_information["dask_cluster_default_cores_limit"] = dict( + title="Default cores limit for dask clusters", + value=REANA_DASK_CLUSTER_DEFAULT_CORES_LIMIT, + ) + cluster_information["dask_cluster_max_cores_limit"] = dict( + title="Maximum cores limit for dask clusters", + value=REANA_DASK_CLUSTER_MAX_CORES_LIMIT, + ) + cluster_information["dask_cluster_default_memory_limit"] = dict( + title="Default memory limit for dask clusters", + value=REANA_DASK_CLUSTER_DEFAULT_MEMORY_LIMIT, + ) + cluster_information["dask_cluster_max_memory_limit"] = dict( + title="Maximum memory limit for dask clusters", + value=REANA_DASK_CLUSTER_MAX_MEMORY_LIMIT, + ) + cluster_information["dask_cluster_default_single_worker_cores"] = dict( + title="Number of cores for one dask worker by default", + value=REANA_DASK_CLUSTER_DEFAULT_SINGLE_WORKER_CORES, + ) + cluster_information["dask_cluster_default_single_worker_memory"] = dict( + title="Memory for one dask worker by default", + value=REANA_DASK_CLUSTER_DEFAULT_SINGLE_WORKER_MEMORY, + ) + return InfoSchema().dump(cluster_information) except Exception as e: @@ -260,3 +385,12 @@ class InfoSchema(Schema): maximum_interactive_session_inactivity_period = fields.Nested( StringNullableInfoValue ) + kubernetes_max_memory_limit = fields.Nested(StringInfoValue) + dask_enabled = fields.Nested(StringInfoValue) + if DASK_ENABLED: + dask_cluster_default_cores_limit = fields.Nested(StringInfoValue) + dask_cluster_max_cores_limit = fields.Nested(StringInfoValue) + dask_cluster_default_memory_limit = fields.Nested(StringInfoValue) + dask_cluster_max_memory_limit = fields.Nested(StringInfoValue) + dask_cluster_default_single_worker_cores = fields.Nested(StringInfoValue) + dask_cluster_default_single_worker_memory = fields.Nested(StringInfoValue) diff --git a/reana_server/rest/workflows.py b/reana_server/rest/workflows.py index fad86ecf..96ddadb6 100644 --- a/reana_server/rest/workflows.py +++ b/reana_server/rest/workflows.py @@ -49,6 +49,7 @@ validate_inputs, validate_workflow, validate_workspace_path, + validate_dask_memory_and_cores_limits, ) from webargs import fields, validate from webargs.flaskparser import use_kwargs @@ -566,6 +567,8 @@ def create_workflow(user): # noqa validate_inputs(reana_spec_file) + validate_dask_memory_and_cores_limits(reana_spec_file) + retention_days = reana_spec_file.get("workspace", {}).get("retention_days") retention_rules = get_workspace_retention_rules(retention_days) diff --git a/reana_server/validation.py b/reana_server/validation.py index 8a50388a..503467af 100644 --- a/reana_server/validation.py +++ b/reana_server/validation.py @@ -1,7 +1,7 @@ # -*- coding: utf-8 -*- # # This file is part of REANA. -# Copyright (C) 2022 CERN. +# Copyright (C) 2022, 2024 CERN. # # REANA is free software; you can redistribute it and/or modify it # under the terms of the MIT License; see LICENSE file for more details. @@ -18,8 +18,15 @@ from reana_commons.validation.operational_options import validate_operational_options from reana_commons.validation.parameters import build_parameters_validator from reana_commons.validation.utils import validate_reana_yaml, validate_workspace - -from reana_server.config import SUPPORTED_COMPUTE_BACKENDS, WORKSPACE_RETENTION_PERIOD +from reana_commons.job_utils import kubernetes_memory_to_bytes + +from reana_server.config import ( + SUPPORTED_COMPUTE_BACKENDS, + WORKSPACE_RETENTION_PERIOD, + DASK_ENABLED, + REANA_DASK_CLUSTER_MAX_MEMORY_LIMIT, + REANA_DASK_CLUSTER_MAX_CORES_LIMIT, +) from reana_server import utils @@ -153,3 +160,29 @@ def validate_retention_rule(rule: str, days: int) -> None: "Maximum workflow retention period was reached. " f"Please use less than {WORKSPACE_RETENTION_PERIOD} days." ) + + +def validate_dask_memory_and_cores_limits(reana_yaml: Dict) -> None: + """Validate Dask workflows are allowed in the cluster and memory limits are respected.""" + # Validate Dask workflows are allowed in the cluster + dask_resources = reana_yaml["workflow"].get("resources", {}).get("dask", {}) + if not DASK_ENABLED and dask_resources != {}: + raise REANAValidationError("Dask workflows are not allowed in this cluster.") + + # Validate Dask memory limit requested by the workflow + if dask_resources: + requested_dask_cluster_memory = dask_resources.get("memory", "0Mi") + if kubernetes_memory_to_bytes( + requested_dask_cluster_memory + ) > kubernetes_memory_to_bytes(REANA_DASK_CLUSTER_MAX_MEMORY_LIMIT): + raise REANAValidationError( + f'The "kubernetes_memory_limit" provided in the workflow exceeds the limit ({REANA_DASK_CLUSTER_MAX_MEMORY_LIMIT}).' + ) + # Validate Dask cores limit requested by the workflow + requested_dask_cluster_cores = dask_resources.get("cores", 0) + if requested_dask_cluster_cores > REANA_DASK_CLUSTER_MAX_CORES_LIMIT: + raise REANAValidationError( + f'The "cores" provided in the workflow exceeds the limit ({REANA_DASK_CLUSTER_MAX_CORES_LIMIT}).' + ) + + return None