Skip to content

Commit

Permalink
fix(start): validate endpoint parameters (reanahub#689)
Browse files Browse the repository at this point in the history
  • Loading branch information
mdonadoni committed Jun 26, 2024
1 parent 3f716f7 commit 2ca1891
Show file tree
Hide file tree
Showing 3 changed files with 38 additions and 16 deletions.
4 changes: 4 additions & 0 deletions docs/openapi.json
Original file line number Diff line number Diff line change
Expand Up @@ -3273,15 +3273,19 @@
"schema": {
"properties": {
"input_parameters": {
"description": "Optional. Additional input parameters that override the ones in the workflow specification.",
"type": "object"
},
"operational_options": {
"description": "Optional. Operational options for workflow execution.",
"type": "object"
},
"reana_specification": {
"description": "Optional. Replace the original workflow specification with the given one. Only considered when restarting a workflow.",
"type": "object"
},
"restart": {
"description": "Optional. If true, restart the given workflow.",
"type": "boolean"
}
},
Expand Down
48 changes: 33 additions & 15 deletions reana_server/rest/workflows.py
Original file line number Diff line number Diff line change
Expand Up @@ -1132,8 +1132,16 @@ def get_workflow_status(workflow_id_or_name, user): # noqa

@blueprint.route("/workflows/<workflow_id_or_name>/start", methods=["POST"])
@signin_required()
@use_kwargs(
{
"operational_options": fields.Dict(location="json"),
"input_parameters": fields.Dict(location="json"),
"restart": fields.Boolean(location="json"),
"reana_specification": fields.Raw(location="json"),
}
)
@check_quota
def start_workflow(workflow_id_or_name, user): # noqa
def start_workflow(workflow_id_or_name, user, **parameters): # noqa
r"""Start workflow.
---
post:
Expand Down Expand Up @@ -1166,12 +1174,20 @@ def start_workflow(workflow_id_or_name, user): # noqa
type: object
properties:
operational_options:
type: object
reana_specification:
description: Optional. Operational options for workflow execution.
type: object
input_parameters:
description: >-
Optional. Additional input parameters that override the ones
in the workflow specification.
type: object
reana_specification:
description: >-
Optional. Replace the original workflow specification with the given one.
Only considered when restarting a workflow.
type: object
restart:
description: Optional. If true, restart the given workflow.
type: boolean
responses:
200:
Expand Down Expand Up @@ -1285,32 +1301,33 @@ def start_workflow(workflow_id_or_name, user): # noqa
"message": "Status resume is not supported yet."
}
"""

operational_options = parameters.get("operational_options", {})
input_parameters = parameters.get("input_parameters", {})
restart = parameters.get("restart", False)
reana_specification = parameters.get("reana_specification")

try:
if not workflow_id_or_name:
raise ValueError("workflow_id_or_name is not supplied")
parameters = request.json if request.is_json else {}

workflow = _get_workflow_with_uuid_or_name(workflow_id_or_name, str(user.id_))
operational_options = parameters.get("operational_options", {})
operational_options = validate_operational_options(
workflow.type_, operational_options
)

restart_type = None
if "restart" in parameters:
if restart:
if workflow.status not in [RunStatus.finished, RunStatus.failed]:
raise ValueError("Only finished or failed workflows can be restarted.")
if workflow.workspace_has_pending_retention_rules():
raise ValueError(
"The workflow cannot be restarted because some retention rules are "
"currently being applied to the workspace. Please retry later."
)
restart_type = (
parameters.get("reana_specification", {})
.get("workflow", {})
.get("type", None)
)
workflow = clone_workflow(
workflow, parameters.get("reana_specification", None), restart_type
)
if reana_specification:
restart_type = reana_specification.get("workflow", {}).get("type", None)
workflow = clone_workflow(workflow, reana_specification, restart_type)
elif workflow.status != RunStatus.created:
raise ValueError(
"Workflow {} is already {} and cannot be started "
Expand All @@ -1319,11 +1336,12 @@ def start_workflow(workflow_id_or_name, user): # noqa
if "yadage" in (workflow.type_, restart_type):
_load_and_save_yadage_spec(workflow, operational_options)

input_parameters = parameters.get("input_parameters", {})
validate_workflow(
workflow.reana_specification, input_parameters=input_parameters
)

# when starting the workflow, the scheduler will call RWC's `set_workflow_status`
# with the given `parameters`
publish_workflow_submission(workflow, user.id_, parameters)
response = {
"message": "Workflow submitted.",
Expand Down
2 changes: 1 addition & 1 deletion tests/test_views.py
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@ def test_restart_workflow_validates_specification(
workflow_specification["workflow"]["type"] = "unknown"
body = {
"reana_specification": workflow_specification,
"restart": "can be anything here doesnt matter",
"restart": True,
}
res = client.post(
url_for("workflows.start_workflow", workflow_id_or_name="test"),
Expand Down

0 comments on commit 2ca1891

Please sign in to comment.