Skip to content

Commit

Permalink
fix(start): validate endpoint parameters (reanahub#689)
Browse files Browse the repository at this point in the history
  • Loading branch information
mdonadoni committed Jun 25, 2024
1 parent 3f716f7 commit a7cb1d7
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 15 deletions.
36 changes: 22 additions & 14 deletions reana_server/rest/workflows.py
Original file line number Diff line number Diff line change
Expand Up @@ -1132,8 +1132,16 @@ def get_workflow_status(workflow_id_or_name, user): # noqa

@blueprint.route("/workflows/<workflow_id_or_name>/start", methods=["POST"])
@signin_required()
@use_kwargs(
{
"operational_options": fields.Dict(missing={}, location="json"),
"input_parameters": fields.Dict(missing={}, location="json"),
"restart": fields.Boolean(location="json"),
"reana_specification": fields.Raw(location="json"),
}
)
@check_quota
def start_workflow(workflow_id_or_name, user): # noqa
def start_workflow(workflow_id_or_name, user, **kwargs): # noqa
r"""Start workflow.
---
post:
Expand Down Expand Up @@ -1285,32 +1293,33 @@ def start_workflow(workflow_id_or_name, user): # noqa
"message": "Status resume is not supported yet."
}
"""

operational_options = kwargs["operational_options"]
input_parameters = kwargs["input_parameters"]
restart = kwargs.get("restart", False)
reana_specification = kwargs.get("reana_specification")

try:
if not workflow_id_or_name:
raise ValueError("workflow_id_or_name is not supplied")
parameters = request.json if request.is_json else {}

workflow = _get_workflow_with_uuid_or_name(workflow_id_or_name, str(user.id_))
operational_options = parameters.get("operational_options", {})
operational_options = validate_operational_options(
workflow.type_, operational_options
)

restart_type = None
if "restart" in parameters:
if restart:
if workflow.status not in [RunStatus.finished, RunStatus.failed]:
raise ValueError("Only finished or failed workflows can be restarted.")
if workflow.workspace_has_pending_retention_rules():
raise ValueError(
"The workflow cannot be restarted because some retention rules are "
"currently being applied to the workspace. Please retry later."
)
restart_type = (
parameters.get("reana_specification", {})
.get("workflow", {})
.get("type", None)
)
workflow = clone_workflow(
workflow, parameters.get("reana_specification", None), restart_type
)
if reana_specification:
restart_type = reana_specification.get("workflow", {}).get("type", None)
workflow = clone_workflow(workflow, reana_specification, restart_type)
elif workflow.status != RunStatus.created:
raise ValueError(
"Workflow {} is already {} and cannot be started "
Expand All @@ -1319,12 +1328,11 @@ def start_workflow(workflow_id_or_name, user): # noqa
if "yadage" in (workflow.type_, restart_type):
_load_and_save_yadage_spec(workflow, operational_options)

input_parameters = parameters.get("input_parameters", {})
validate_workflow(
workflow.reana_specification, input_parameters=input_parameters
)

publish_workflow_submission(workflow, user.id_, parameters)
publish_workflow_submission(workflow, user.id_, kwargs)
response = {
"message": "Workflow submitted.",
"workflow_id": workflow.id_,
Expand Down
2 changes: 1 addition & 1 deletion tests/test_views.py
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@ def test_restart_workflow_validates_specification(
workflow_specification["workflow"]["type"] = "unknown"
body = {
"reana_specification": workflow_specification,
"restart": "can be anything here doesnt matter",
"restart": True,
}
res = client.post(
url_for("workflows.start_workflow", workflow_id_or_name="test"),
Expand Down

0 comments on commit a7cb1d7

Please sign in to comment.