Skip to content

Commit

Permalink
feat(dask): integrate Dask into reana (#701)
Browse files Browse the repository at this point in the history
  • Loading branch information
Alputer committed Sep 13, 2024
1 parent 50495fc commit bdab669
Show file tree
Hide file tree
Showing 5 changed files with 321 additions and 5 deletions.
117 changes: 117 additions & 0 deletions docs/openapi.json
Original file line number Diff line number Diff line change
Expand Up @@ -429,6 +429,34 @@
"slurmcern"
]
},
"dask_cluster_default_cores_limit": {
"title": "Default cores limit for dask clusters",
"value": "4"
},
"dask_cluster_default_memory_limit": {
"title": "Default memory limit for dask clusters",
"value": "2Gi"
},
"dask_cluster_default_single_worker_cores": {
"title": "Number of cores for one dask worker by default",
"value": "0.5"
},
"dask_cluster_default_single_worker_memory": {
"title": "Amount of memory for one dask worker by default",
"value": "256Mi"
},
"dask_cluster_max_cores_limit": {
"title": "Maximum cores limit for dask clusters",
"value": "8"
},
"dask_cluster_max_memory_limit": {
"title": "Maximum memory limit for dask clusters",
"value": "4Gi"
},
"dask_enabled": {
"title": "Dask workflows allowed in the cluster",
"value": false
},
"default_kubernetes_jobs_timeout": {
"title": "Default timeout for Kubernetes jobs",
"value": "604800"
Expand Down Expand Up @@ -479,6 +507,83 @@
},
"type": "object"
},
"dask_cluster_default_cores_limit": {
"properties": {
"title": {
"type": "string"
},
"value": {
"type": "string"
}
},
"type": "object"
},
"dask_cluster_default_memory_limit": {
"properties": {
"title": {
"type": "string"
},
"value": {
"type": "string"
}
},
"type": "object"
},
"dask_cluster_default_single_worker_cores": {
"properties": {
"title": {
"type": "string"
},
"value": {
"type": "string"
}
},
"type": "object"
},
"dask_cluster_default_single_worker_memory": {
"properties": {
"title": {
"type": "string"
},
"value": {
"type": "string"
}
},
"type": "object"
},
"dask_cluster_max_cores_limit": {
"properties": {
"title": {
"type": "string"
},
"value": {
"type": "string"
}
},
"type": "object"
},
"dask_cluster_max_memory_limit": {
"properties": {
"title": {
"type": "string"
},
"value": {
"type": "string"
}
},
"type": "object"
},
"dask_enabled": {
"properties": {
"title": {
"type": "string"
},
"value": {
"type": "boolean"
}
},
"type": "object"
},
"default_kubernetes_jobs_timeout": {
"properties": {
"title": {
Expand Down Expand Up @@ -574,6 +679,18 @@
"type": "object"
}
},
"required": [
"compute_backends",
"default_kubernetes_jobs_timeout",
"default_kubernetes_memory_limit",
"default_workspace",
"kubernetes_max_memory_limit",
"maximum_interactive_session_inactivity_period",
"maximum_kubernetes_jobs_timeout",
"maximum_workspace_retention_period",
"workspaces_available",
"dask_enabled"
],
"type": "object"
}
},
Expand Down
31 changes: 30 additions & 1 deletion reana_server/config.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# -*- coding: utf-8 -*-
#
# This file is part of REANA.
# Copyright (C) 2017, 2018, 2019, 2020, 2021, 2022, 2023 CERN.
# Copyright (C) 2017, 2018, 2019, 2020, 2021, 2022, 2023, 2024 CERN.
#
# REANA is free software; you can redistribute it and/or modify it
# under the terms of the MIT License; see LICENSE file for more details.
Expand Down Expand Up @@ -58,6 +58,35 @@
os.getenv("LOGIN_PROVIDERS_SECRETS", "{}")
)

DASK_ENABLED = strtobool(os.getenv("DASK_ENABLED", "true"))
"""Whether dask is enabled in the cluster or not"""

REANA_DASK_CLUSTER_MAX_CORES_LIMIT = os.getenv("REANA_DASK_CLUSTER_MAX_CORES_LIMIT")
"""Maximum cores limit for dask clusters."""

REANA_DASK_CLUSTER_MAX_MEMORY_LIMIT = os.getenv("REANA_DASK_CLUSTER_MAX_MEMORY_LIMIT")
"""Maximum memory limit for dask clusters."""

REANA_DASK_CLUSTER_DEFAULT_CORES_LIMIT = os.getenv(
"REANA_DASK_CLUSTER_DEFAULT_CORES_LIMIT"
)
"""Default cores limit for dask clusters."""

REANA_DASK_CLUSTER_DEFAULT_MEMORY_LIMIT = os.getenv(
"REANA_DASK_CLUSTER_DEFAULT_MEMORY_LIMIT"
)
"""Default memory limit for dask clusters."""

REANA_DASK_CLUSTER_DEFAULT_SINGLE_WORKER_CORES = os.getenv(
"REANA_DASK_CLUSTER_DEFAULT_SINGLE_WORKER_CORES "
)
"""Number of cores for one dask worker by default."""

REANA_DASK_CLUSTER_DEFAULT_SINGLE_WORKER_MEMORY = os.getenv(
"REANA_DASK_CLUSTER_DEFAULT_SINGLE_WORKER_MEMORY"
)
"""Memory for one dask worker by default."""

REANA_KUBERNETES_JOBS_MEMORY_LIMIT = os.getenv("REANA_KUBERNETES_JOBS_MEMORY_LIMIT")
"""Maximum memory limit for user job containers for workflow complexity estimation."""

Expand Down
136 changes: 135 additions & 1 deletion reana_server/rest/info.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# -*- coding: utf-8 -*-
#
# This file is part of REANA.
# Copyright (C) 2021, 2022 CERN.
# Copyright (C) 2021, 2022, 2024 CERN.
#
# REANA is free software; you can redistribute it and/or modify it
# under the terms of the MIT License; see LICENSE file for more details.
Expand All @@ -24,6 +24,13 @@
REANA_KUBERNETES_JOBS_TIMEOUT_LIMIT,
REANA_KUBERNETES_JOBS_MAX_USER_TIMEOUT_LIMIT,
REANA_INTERACTIVE_SESSION_MAX_INACTIVITY_PERIOD,
DASK_ENABLED,
REANA_DASK_CLUSTER_MAX_CORES_LIMIT,
REANA_DASK_CLUSTER_MAX_MEMORY_LIMIT,
REANA_DASK_CLUSTER_DEFAULT_CORES_LIMIT,
REANA_DASK_CLUSTER_DEFAULT_MEMORY_LIMIT,
REANA_DASK_CLUSTER_DEFAULT_SINGLE_WORKER_CORES,
REANA_DASK_CLUSTER_DEFAULT_SINGLE_WORKER_MEMORY,
)
from reana_server.decorators import signin_required

Expand Down Expand Up @@ -125,7 +132,67 @@ def info(user, **kwargs): # noqa
type: string
type: array
type: object
dask_enabled:
properties:
title:
type: string
value:
type: boolean
type: object
dask_cluster_default_cores_limit:
properties:
title:
type: string
value:
type: string
type: object
dask_cluster_max_cores_limit:
properties:
title:
type: string
value:
type: string
type: object
dask_cluster_default_memory_limit:
properties:
title:
type: string
value:
type: string
type: object
dask_cluster_max_memory_limit:
properties:
title:
type: string
value:
type: string
type: object
dask_cluster_default_single_worker_cores:
properties:
title:
type: string
value:
type: string
type: object
dask_cluster_default_single_worker_memory:
properties:
title:
type: string
value:
type: string
type: object
type: object
required:
- compute_backends
- default_kubernetes_jobs_timeout
- default_kubernetes_memory_limit
- default_workspace
- kubernetes_max_memory_limit
- maximum_interactive_session_inactivity_period
- maximum_kubernetes_jobs_timeout
- maximum_workspace_retention_period
- workspaces_available
- dask_enabled
examples:
application/json:
{
Expand Down Expand Up @@ -165,6 +232,34 @@ def info(user, **kwargs): # noqa
"title": "Maximum timeout for Kubernetes jobs",
"value": "1209600"
},
"dask_enabled": {
"title": "Dask workflows allowed in the cluster",
"value": False
},
"dask_cluster_default_cores_limit": {
"title": "Default cores limit for dask clusters",
"value": "4"
},
"dask_cluster_max_cores_limit": {
"title": "Maximum cores limit for dask clusters",
"value": "8"
},
"dask_cluster_default_memory_limit": {
"title": "Default memory limit for dask clusters",
"value": "2Gi"
},
"dask_cluster_max_memory_limit": {
"title": "Maximum memory limit for dask clusters",
"value": "4Gi"
},
"dask_cluster_default_single_worker_cores": {
"title": "Number of cores for one dask worker by default",
"value": "0.5"
},
"dask_cluster_default_single_worker_memory": {
"title": "Amount of memory for one dask worker by default",
"value": "256Mi"
},
}
500:
description: >-
Expand Down Expand Up @@ -217,7 +312,37 @@ def info(user, **kwargs): # noqa
title="Maximum inactivity period in days before automatic closure of interactive sessions",
value=REANA_INTERACTIVE_SESSION_MAX_INACTIVITY_PERIOD,
),
dask_enabled=dict(
title="Dask workflows allowed in the cluster",
value=bool(DASK_ENABLED),
),
)
if DASK_ENABLED:
cluster_information["dask_cluster_default_cores_limit"] = dict(

Check warning on line 321 in reana_server/rest/info.py

View check run for this annotation

Codecov / codecov/patch

reana_server/rest/info.py#L320-L321

Added lines #L320 - L321 were not covered by tests
title="Default cores limit for dask clusters",
value=REANA_DASK_CLUSTER_DEFAULT_CORES_LIMIT,
)
cluster_information["dask_cluster_max_cores_limit"] = dict(

Check warning on line 325 in reana_server/rest/info.py

View check run for this annotation

Codecov / codecov/patch

reana_server/rest/info.py#L325

Added line #L325 was not covered by tests
title="Maximum cores limit for dask clusters",
value=REANA_DASK_CLUSTER_MAX_CORES_LIMIT,
)
cluster_information["dask_cluster_default_memory_limit"] = dict(

Check warning on line 329 in reana_server/rest/info.py

View check run for this annotation

Codecov / codecov/patch

reana_server/rest/info.py#L329

Added line #L329 was not covered by tests
title="Default memory limit for dask clusters",
value=REANA_DASK_CLUSTER_DEFAULT_MEMORY_LIMIT,
)
cluster_information["dask_cluster_max_memory_limit"] = dict(

Check warning on line 333 in reana_server/rest/info.py

View check run for this annotation

Codecov / codecov/patch

reana_server/rest/info.py#L333

Added line #L333 was not covered by tests
title="Maximum memory limit for dask clusters",
value=REANA_DASK_CLUSTER_MAX_MEMORY_LIMIT,
)
cluster_information["dask_cluster_default_single_worker_cores"] = dict(

Check warning on line 337 in reana_server/rest/info.py

View check run for this annotation

Codecov / codecov/patch

reana_server/rest/info.py#L337

Added line #L337 was not covered by tests
title="Number of cores for one dask worker by default",
value=REANA_DASK_CLUSTER_DEFAULT_SINGLE_WORKER_CORES,
)
cluster_information["dask_cluster_default_single_worker_memory"] = dict(

Check warning on line 341 in reana_server/rest/info.py

View check run for this annotation

Codecov / codecov/patch

reana_server/rest/info.py#L341

Added line #L341 was not covered by tests
title="Memory for one dask worker by default",
value=REANA_DASK_CLUSTER_DEFAULT_SINGLE_WORKER_MEMORY,
)

return InfoSchema().dump(cluster_information)

except Exception as e:
Expand Down Expand Up @@ -260,3 +385,12 @@ class InfoSchema(Schema):
maximum_interactive_session_inactivity_period = fields.Nested(
StringNullableInfoValue
)
kubernetes_max_memory_limit = fields.Nested(StringInfoValue)
dask_enabled = fields.Nested(StringInfoValue)
if DASK_ENABLED:
dask_cluster_default_cores_limit = fields.Nested(StringInfoValue)
dask_cluster_max_cores_limit = fields.Nested(StringInfoValue)
dask_cluster_default_memory_limit = fields.Nested(StringInfoValue)
dask_cluster_max_memory_limit = fields.Nested(StringInfoValue)
dask_cluster_default_single_worker_cores = fields.Nested(StringInfoValue)
dask_cluster_default_single_worker_memory = fields.Nested(StringInfoValue)
3 changes: 3 additions & 0 deletions reana_server/rest/workflows.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
validate_inputs,
validate_workflow,
validate_workspace_path,
validate_dask_memory_and_cores_limits,
)
from webargs import fields, validate
from webargs.flaskparser import use_kwargs
Expand Down Expand Up @@ -566,6 +567,8 @@ def create_workflow(user): # noqa

validate_inputs(reana_spec_file)

validate_dask_memory_and_cores_limits(reana_spec_file)

retention_days = reana_spec_file.get("workspace", {}).get("retention_days")
retention_rules = get_workspace_retention_rules(retention_days)

Expand Down
Loading

0 comments on commit bdab669

Please sign in to comment.