Skip to content

Commit

Permalink
Surface blocking flag to the public interface to support async action…
Browse files Browse the repository at this point in the history
…s. (#69)

* Surface blocking flag to the public action interface to support async actions.
  • Loading branch information
jun-he committed Aug 7, 2024
1 parent f2672dd commit b385356
Show file tree
Hide file tree
Showing 4 changed files with 47 additions and 48 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -121,12 +121,6 @@ public RunResponse restartDirectly(

/** bypass the signal dependencies. */
public StepInstanceActionResponse bypassStepDependencies(
WorkflowInstance instance, String stepId, User user) {
return bypassStepDependencies(instance, stepId, user, true);
}

@VisibleForTesting
StepInstanceActionResponse bypassStepDependencies(
WorkflowInstance instance, String stepId, User user, boolean blocking) {
validateStepId(instance, stepId, Actions.StepInstanceAction.BYPASS_STEP_DEPENDENCIES);
StepInstance stepInstance =
Expand Down Expand Up @@ -399,12 +393,6 @@ int deleteAction(StepInstance stepInstance, Actions.StepInstanceAction action) {
* callback will do the cleanup.
*/
public StepInstanceActionResponse terminate(
WorkflowInstance instance, String stepId, User user, Actions.StepInstanceAction action) {
return terminate(instance, stepId, user, action, true);
}

@VisibleForTesting
StepInstanceActionResponse terminate(
WorkflowInstance instance,
String stepId,
User user,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,15 +49,15 @@ public class StepInstanceActionHandler {
* the runtime DAG. Only allow existing a single non-terminal step attempt at any time, which must
* be the latest one.
*/
public RunResponse restart(RunRequest runRequest) {
public RunResponse restart(RunRequest runRequest, boolean blocking) {
if (!runRequest.isFreshRun()
&& runRequest.getCurrentPolicy() != RunPolicy.RESTART_FROM_SPECIFIC) {
updateRunRequestForRestartFromInlineRoot(runRequest);
}

RunResponse runResponse = actionHandler.restartRecursively(runRequest);
if (runResponse.getStatus() == RunResponse.Status.DELEGATED) {
return restartDirectly(runResponse, runRequest);
return restartDirectly(runResponse, runRequest, blocking);
}
return runResponse;
}
Expand Down Expand Up @@ -112,16 +112,17 @@ private void updateRunRequestForRestartFromInlineRoot(RunRequest runRequest) {
}

/** Directly restart a step without going to its ancestors. */
public RunResponse restartDirectly(RunResponse restartStepInfo, RunRequest runRequest) {
return actionDao.restartDirectly(restartStepInfo, runRequest, true);
public RunResponse restartDirectly(
RunResponse restartStepInfo, RunRequest runRequest, boolean blocking) {
return actionDao.restartDirectly(restartStepInfo, runRequest, blocking);
}

/** Bypasses the step dependencies. */
public StepInstanceActionResponse bypassStepDependencies(
String workflowId, long workflowInstanceId, String stepId, User user) {
String workflowId, long workflowInstanceId, String stepId, User user, boolean blocking) {
WorkflowInstance instance =
instanceDao.getLatestWorkflowInstanceRun(workflowId, workflowInstanceId);
return actionDao.bypassStepDependencies(instance, stepId, user);
return actionDao.bypassStepDependencies(instance, stepId, user, blocking);
}

/** Terminate a step instance, i.e. the latest workflow instance run's latest attempt. */
Expand All @@ -130,19 +131,25 @@ public StepInstanceActionResponse terminate(
long workflowInstanceId,
String stepId,
User user,
Actions.StepInstanceAction action) {
Actions.StepInstanceAction action,
boolean blocking) {
WorkflowInstance instance =
instanceDao.getLatestWorkflowInstanceRun(workflowId, workflowInstanceId);
if (instance.getStatus().isTerminal()) {
throw new MaestroInvalidStatusException(
"Cannot manually %s the step [%s] as the workflow instance %s is in a terminal state [%s]",
action.name(), stepId, instance.getIdentity(), instance.getStatus());
}
return actionDao.terminate(instance, stepId, user, action);
return actionDao.terminate(instance, stepId, user, action, blocking);
}

public StepInstanceActionResponse skip(
String workflowId, long workflowInstanceId, String stepId, User user, RunRequest runRequest) {
String workflowId,
long workflowInstanceId,
String stepId,
User user,
RunRequest runRequest,
boolean blocking) {
WorkflowInstance instance =
instanceDao.getWorkflowInstance(
workflowId, workflowInstanceId, Constants.LATEST_INSTANCE_RUN, true);
Expand Down Expand Up @@ -175,10 +182,10 @@ public StepInstanceActionResponse skip(
}

if (!instance.getStatus().isTerminal() && view.getStatus().shouldWakeup()) {
return actionDao.terminate(instance, stepId, user, Actions.StepInstanceAction.SKIP);
return actionDao.terminate(instance, stepId, user, Actions.StepInstanceAction.SKIP, blocking);
}

RunResponse runResponse = restart(runRequest);
RunResponse runResponse = restart(runRequest, blocking);
return runResponse.toStepInstanceActionResponse();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -518,7 +518,7 @@ public void testInvalidBypassSignalDependencies() {
"Cannot manually RESTART the step",
MaestroBadRequestException.class,
"Cannot manually BYPASS_STEP_DEPENDENCIES the step [not-existing] because the latest workflow run",
() -> actionDao.bypassStepDependencies(instance, "not-existing", user));
() -> actionDao.bypassStepDependencies(instance, "not-existing", user, true));

stepInstance.getRuntimeState().setStatus(StepInstance.Status.RUNNING);
stepInstanceDao.insertOrUpsertStepInstance(stepInstance, true);
Expand All @@ -527,7 +527,7 @@ public void testInvalidBypassSignalDependencies() {
"Cannot manually bypass the step dependencies",
MaestroInvalidStatusException.class,
"Cannot manually bypass-step-dependencies the step as its status [RUNNING] is not waiting for signals",
() -> actionDao.bypassStepDependencies(instance, "job1", user));
() -> actionDao.bypassStepDependencies(instance, "job1", user, true));
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ public void testRestartNewRun() {
.restartConfig(
RestartConfig.builder().addRestartNode("sample-minimal-wf", 1, "job1").build())
.build();
RunResponse response = stepActionHandler.restart(runRequest);
RunResponse response = stepActionHandler.restart(runRequest, true);

ArgumentCaptor<RunRequest> requestCaptor = ArgumentCaptor.forClass(RunRequest.class);
Mockito.verify(actionHandler, Mockito.times(1)).restartRecursively(requestCaptor.capture());
Expand All @@ -104,7 +104,7 @@ public void testRestartNewAttempt() {
RunResponse runResponse =
RunResponse.builder().status(RunResponse.Status.STEP_ATTEMPT_CREATED).build();
when(actionDao.restartDirectly(any(), any(), anyBoolean())).thenReturn(runResponse);
RunResponse response = stepActionHandler.restart(runRequest);
RunResponse response = stepActionHandler.restart(runRequest, true);

ArgumentCaptor<RunRequest> requestCaptor = ArgumentCaptor.forClass(RunRequest.class);
Mockito.verify(actionHandler, Mockito.times(1)).restartRecursively(requestCaptor.capture());
Expand Down Expand Up @@ -158,7 +158,7 @@ public void testRestartFromInlineRootWithinForeach() {
.build())
.build();

RunResponse response = stepActionHandler.restart(runRequest);
RunResponse response = stepActionHandler.restart(runRequest, true);

ArgumentCaptor<RunRequest> requestCaptor = ArgumentCaptor.forClass(RunRequest.class);
Mockito.verify(actionHandler, Mockito.times(1)).restartRecursively(requestCaptor.capture());
Expand Down Expand Up @@ -204,7 +204,7 @@ public void testRestartFromInlineRootWithinNonForeach() {
.build())
.build();

RunResponse response = stepActionHandler.restart(runRequest);
RunResponse response = stepActionHandler.restart(runRequest, true);

ArgumentCaptor<RunRequest> requestCaptor = ArgumentCaptor.forClass(RunRequest.class);
Mockito.verify(actionHandler, Mockito.times(1)).restartRecursively(requestCaptor.capture());
Expand Down Expand Up @@ -238,7 +238,7 @@ public void testInvalidRestartFromInlineRoot() {
"Cannot restart from inline root for non-terminal root",
IllegalArgumentException.class,
"instance [null] is in non-terminal state [IN_PROGRESS]",
() -> stepActionHandler.restart(runRequest));
() -> stepActionHandler.restart(runRequest, true));

when(instance.getStatus()).thenReturn(WorkflowInstance.Status.FAILED);
WorkflowInstanceAggregatedInfo aggregatedInfo = mock(WorkflowInstanceAggregatedInfo.class);
Expand All @@ -250,34 +250,34 @@ public void testInvalidRestartFromInlineRoot() {
"Cannot restart from inline root for non-terminal step",
IllegalArgumentException.class,
"step null[job1] is in non-terminal state [RUNNING]",
() -> stepActionHandler.restart(runRequest));
() -> stepActionHandler.restart(runRequest, true));

when(aggregatedView.getStatus()).thenReturn(StepInstance.Status.FATALLY_FAILED);
AssertHelper.assertThrows(
"Cannot restart from inline root for invalid restart path",
IllegalArgumentException.class,
"restart-path size is not 1",
() -> stepActionHandler.restart(runRequest));
() -> stepActionHandler.restart(runRequest, true));
}

@Test
public void testTerminate() {
when(instance.getStatus()).thenReturn(WorkflowInstance.Status.IN_PROGRESS);
stepActionHandler.terminate("sample-minimal-wf", 1, "job1", user, STOP);
verify(actionDao, times(1)).terminate(instance, "job1", user, STOP);
stepActionHandler.terminate("sample-minimal-wf", 1, "job1", user, STOP, true);
verify(actionDao, times(1)).terminate(instance, "job1", user, STOP, true);
when(instance.getStatus()).thenReturn(WorkflowInstance.Status.FAILED);
AssertHelper.assertThrows(
"Cannot manually terminate the step",
MaestroInvalidStatusException.class,
"Cannot manually STOP the step [job1] as the workflow instance",
() -> stepActionHandler.terminate("sample-minimal-wf", 1, "job1", user, STOP));
() -> stepActionHandler.terminate("sample-minimal-wf", 1, "job1", user, STOP, true));
}

@Test
public void testBypassDependencies() {
when(instance.getStatus()).thenReturn(WorkflowInstance.Status.IN_PROGRESS);
stepActionHandler.bypassStepDependencies("sample-minimal-wf", 1, "job1", user);
verify(actionDao, times(1)).bypassStepDependencies(instance, "job1", user);
stepActionHandler.bypassStepDependencies("sample-minimal-wf", 1, "job1", user, true);
verify(actionDao, times(1)).bypassStepDependencies(instance, "job1", user, true);
}

@Test
Expand All @@ -291,8 +291,9 @@ public void testSkipRunningStepInRunningInstance() {
when(aggregatedInfo.getStepAggregatedViews()).thenReturn(singletonMap("job1", aggregatedView));
when(instance.getStatus()).thenReturn(WorkflowInstance.Status.IN_PROGRESS);
when(aggregatedView.getStatus()).thenReturn(StepInstance.Status.RUNNING);
stepActionHandler.skip("sample-minimal-wf", 1, "job1", user, null);
verify(actionDao, times(1)).terminate(instance, "job1", user, Actions.StepInstanceAction.SKIP);
stepActionHandler.skip("sample-minimal-wf", 1, "job1", user, null, true);
verify(actionDao, times(1))
.terminate(instance, "job1", user, Actions.StepInstanceAction.SKIP, true);
}

@Test
Expand Down Expand Up @@ -321,8 +322,9 @@ public void testSkipFailedStepInRunningInstance() {
.thenReturn(RunResponse.builder().status(RunResponse.Status.DELEGATED).build());
when(actionDao.restartDirectly(any(), eq(runRequest), eq(true)))
.thenReturn(RunResponse.builder().status(RunResponse.Status.STEP_ATTEMPT_CREATED).build());
stepActionHandler.skip("sample-minimal-wf", 1, "job1", user, runRequest);
verify(actionDao, times(0)).terminate(instance, "job1", user, Actions.StepInstanceAction.SKIP);
stepActionHandler.skip("sample-minimal-wf", 1, "job1", user, runRequest, true);
verify(actionDao, times(0))
.terminate(instance, "job1", user, Actions.StepInstanceAction.SKIP, true);
verify(actionHandler, times(1)).restartRecursively(runRequest);
verify(actionDao, times(1)).restartDirectly(any(), eq(runRequest), eq(true));
}
Expand All @@ -338,8 +340,9 @@ public void testSkipShouldWakeupStepInRunningInstance() {
when(aggregatedInfo.getStepAggregatedViews()).thenReturn(singletonMap("job1", aggregatedView));
when(instance.getStatus()).thenReturn(WorkflowInstance.Status.IN_PROGRESS);
when(aggregatedView.getStatus()).thenReturn(StepInstance.Status.PLATFORM_FAILED);
stepActionHandler.skip("sample-minimal-wf", 1, "job1", user, null);
verify(actionDao, times(1)).terminate(instance, "job1", user, Actions.StepInstanceAction.SKIP);
stepActionHandler.skip("sample-minimal-wf", 1, "job1", user, null, true);
verify(actionDao, times(1))
.terminate(instance, "job1", user, Actions.StepInstanceAction.SKIP, true);
}

@Test
Expand Down Expand Up @@ -367,8 +370,9 @@ public void testSkipStoppedStepInStoppedInstance() {
.build();
when(actionHandler.restartRecursively(runRequest))
.thenReturn(RunResponse.builder().status(RunResponse.Status.WORKFLOW_RUN_CREATED).build());
stepActionHandler.skip("sample-minimal-wf", 1, "job1", user, runRequest);
verify(actionDao, times(0)).terminate(instance, "job1", user, Actions.StepInstanceAction.SKIP);
stepActionHandler.skip("sample-minimal-wf", 1, "job1", user, runRequest, true);
verify(actionDao, times(0))
.terminate(instance, "job1", user, Actions.StepInstanceAction.SKIP, true);
verify(actionHandler, times(1)).restartRecursively(runRequest);
verify(actionDao, times(0)).restartDirectly(any(), eq(runRequest), eq(true));
}
Expand All @@ -388,20 +392,20 @@ public void testInvalidSkip() {
"Cannot find status in aggregated step views",
NullPointerException.class,
"Invalid: cannot find the step view of workflow step ",
() -> stepActionHandler.skip("sample-minimal-wf", 1, "job2", user, null));
() -> stepActionHandler.skip("sample-minimal-wf", 1, "job2", user, null, true));

when(aggregatedView.getStatus()).thenReturn(StepInstance.Status.NOT_CREATED);
AssertHelper.assertThrows(
"Cannot skip not-created step",
MaestroBadRequestException.class,
"Cannot skip step [sample-minimal-wf][1][job1] before it is created. Please try it again.",
() -> stepActionHandler.skip("sample-minimal-wf", 1, "job1", user, null));
() -> stepActionHandler.skip("sample-minimal-wf", 1, "job1", user, null, true));

when(aggregatedView.getStatus()).thenReturn(StepInstance.Status.CREATED);
AssertHelper.assertThrows(
"Cannot skip not-created step",
MaestroBadRequestException.class,
"Cannot skip step [sample-minimal-wf][1][job1] because it is unsupported by the step action map",
() -> stepActionHandler.skip("sample-minimal-wf", 1, "job1", user, null));
() -> stepActionHandler.skip("sample-minimal-wf", 1, "job1", user, null, true));
}
}

0 comments on commit b385356

Please sign in to comment.