From 2ca189141f13af1bf383b02dba38f0873fb895bd Mon Sep 17 00:00:00 2001 From: Marco Donadoni Date: Tue, 25 Jun 2024 17:02:19 +0200 Subject: [PATCH] fix(start): validate endpoint parameters (#689) Closes reanahub/reana-client#718 --- docs/openapi.json | 4 +++ reana_server/rest/workflows.py | 48 +++++++++++++++++++++++----------- tests/test_views.py | 2 +- 3 files changed, 38 insertions(+), 16 deletions(-) diff --git a/docs/openapi.json b/docs/openapi.json index 95f18a10..d5747319 100644 --- a/docs/openapi.json +++ b/docs/openapi.json @@ -3273,15 +3273,19 @@ "schema": { "properties": { "input_parameters": { + "description": "Optional. Additional input parameters that override the ones in the workflow specification.", "type": "object" }, "operational_options": { + "description": "Optional. Operational options for workflow execution.", "type": "object" }, "reana_specification": { + "description": "Optional. Replace the original workflow specification with the given one. Only considered when restarting a workflow.", "type": "object" }, "restart": { + "description": "Optional. If true, restart the given workflow.", "type": "boolean" } }, diff --git a/reana_server/rest/workflows.py b/reana_server/rest/workflows.py index de6f7861..231118cc 100644 --- a/reana_server/rest/workflows.py +++ b/reana_server/rest/workflows.py @@ -1132,8 +1132,16 @@ def get_workflow_status(workflow_id_or_name, user): # noqa @blueprint.route("/workflows//start", methods=["POST"]) @signin_required() +@use_kwargs( + { + "operational_options": fields.Dict(location="json"), + "input_parameters": fields.Dict(location="json"), + "restart": fields.Boolean(location="json"), + "reana_specification": fields.Raw(location="json"), + } +) @check_quota -def start_workflow(workflow_id_or_name, user): # noqa +def start_workflow(workflow_id_or_name, user, **parameters): # noqa r"""Start workflow. --- post: @@ -1166,12 +1174,20 @@ def start_workflow(workflow_id_or_name, user): # noqa type: object properties: operational_options: - type: object - reana_specification: + description: Optional. Operational options for workflow execution. type: object input_parameters: + description: >- + Optional. Additional input parameters that override the ones + in the workflow specification. + type: object + reana_specification: + description: >- + Optional. Replace the original workflow specification with the given one. + Only considered when restarting a workflow. type: object restart: + description: Optional. If true, restart the given workflow. type: boolean responses: 200: @@ -1285,17 +1301,23 @@ def start_workflow(workflow_id_or_name, user): # noqa "message": "Status resume is not supported yet." } """ + + operational_options = parameters.get("operational_options", {}) + input_parameters = parameters.get("input_parameters", {}) + restart = parameters.get("restart", False) + reana_specification = parameters.get("reana_specification") + try: if not workflow_id_or_name: raise ValueError("workflow_id_or_name is not supplied") - parameters = request.json if request.is_json else {} + workflow = _get_workflow_with_uuid_or_name(workflow_id_or_name, str(user.id_)) - operational_options = parameters.get("operational_options", {}) operational_options = validate_operational_options( workflow.type_, operational_options ) + restart_type = None - if "restart" in parameters: + if restart: if workflow.status not in [RunStatus.finished, RunStatus.failed]: raise ValueError("Only finished or failed workflows can be restarted.") if workflow.workspace_has_pending_retention_rules(): @@ -1303,14 +1325,9 @@ def start_workflow(workflow_id_or_name, user): # noqa "The workflow cannot be restarted because some retention rules are " "currently being applied to the workspace. Please retry later." ) - restart_type = ( - parameters.get("reana_specification", {}) - .get("workflow", {}) - .get("type", None) - ) - workflow = clone_workflow( - workflow, parameters.get("reana_specification", None), restart_type - ) + if reana_specification: + restart_type = reana_specification.get("workflow", {}).get("type", None) + workflow = clone_workflow(workflow, reana_specification, restart_type) elif workflow.status != RunStatus.created: raise ValueError( "Workflow {} is already {} and cannot be started " @@ -1319,11 +1336,12 @@ def start_workflow(workflow_id_or_name, user): # noqa if "yadage" in (workflow.type_, restart_type): _load_and_save_yadage_spec(workflow, operational_options) - input_parameters = parameters.get("input_parameters", {}) validate_workflow( workflow.reana_specification, input_parameters=input_parameters ) + # when starting the workflow, the scheduler will call RWC's `set_workflow_status` + # with the given `parameters` publish_workflow_submission(workflow, user.id_, parameters) response = { "message": "Workflow submitted.", diff --git a/tests/test_views.py b/tests/test_views.py index e9afcaf5..d9f0c98a 100644 --- a/tests/test_views.py +++ b/tests/test_views.py @@ -206,7 +206,7 @@ def test_restart_workflow_validates_specification( workflow_specification["workflow"]["type"] = "unknown" body = { "reana_specification": workflow_specification, - "restart": "can be anything here doesnt matter", + "restart": True, } res = client.post( url_for("workflows.start_workflow", workflow_id_or_name="test"),