Skip to content

Commit

Permalink
refactor: Update AWS Glue Databrew StartJobRun step to use integratio…
Browse files Browse the repository at this point in the history
…n pattern input (#176)
  • Loading branch information
ca-nguyen authored Nov 15, 2021
1 parent aafdb76 commit 382241a
Show file tree
Hide file tree
Showing 2 changed files with 51 additions and 22 deletions.
31 changes: 15 additions & 16 deletions src/stepfunctions/steps/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -550,10 +550,14 @@ class GlueDataBrewStartJobRunStep(Task):
Creates a Task state that starts a DataBrew job. See `Manage AWS Glue DataBrew Jobs with Step Functions <https://docs.aws.amazon.com/step-functions/latest/dg/connect-databrew.html>`_ for more details.
"""

def __init__(self, state_id, wait_for_completion=True, **kwargs):
def __init__(self, state_id, integration_pattern=IntegrationPattern.WaitForCompletion, **kwargs):
"""
Args:
state_id (str): State name whose length **must be** less than or equal to 128 unicode characters. State names **must be** unique within the scope of the whole state machine.
integration_pattern (stepfunctions.steps.integration_resources.IntegrationPattern, optional): Service integration pattern used to call the integrated service. Supported integration patterns (default: WaitForCompletion):
* WaitForCompletion: Wait for the Databrew job to complete before going to the next state. (See `Run A Job <https://docs.aws.amazon.com/step-functions/latest/dg/connect-to-resource.html#connect-sync>`_ for more details.)
* CallAndContinue: Call StartJobRun and progress to the next state (See `Request Response <https://docs.aws.amazon.com/step-functions/latest/dg/connect-to-resource.html#connect-default>`_ for more details.)
comment (str, optional): Human-readable comment or description. (default: None)
timeout_seconds (int, optional): Positive integer specifying timeout for the state in seconds. If the state runs longer than the specified timeout, then the interpreter fails the state with a `States.Timeout` Error Name. (default: 60)
timeout_seconds_path (str, optional): Path specifying the state's timeout value in seconds from the state input. When resolved, the path must select a field whose value is a positive integer.
Expand All @@ -563,23 +567,18 @@ def __init__(self, state_id, wait_for_completion=True, **kwargs):
parameters (dict, optional): The value of this field becomes the effective input for the state.
result_path (str, optional): Path specifying the raw input’s combination with or replacement by the state’s result. (default: '$')
output_path (str, optional): Path applied to the state’s output after the application of `result_path`, producing the effective output which serves as the raw input for the next state. (default: '$')
wait_for_completion (bool, optional): Boolean value set to `True` if the Task state should wait to complete before proceeding to the next step in the workflow. (default: True)
"""
if wait_for_completion:
"""
Example resource arn: arn:aws:states:::databrew:startJobRun.sync
"""

kwargs[Field.Resource.value] = get_service_integration_arn(GLUE_DATABREW_SERVICE_NAME,
GlueDataBrewApi.StartJobRun,
IntegrationPattern.WaitForCompletion)
else:
"""
Example resource arn: arn:aws:states:::databrew:startJobRun
"""
supported_integ_patterns = [IntegrationPattern.WaitForCompletion, IntegrationPattern.CallAndContinue]

kwargs[Field.Resource.value] = get_service_integration_arn(GLUE_DATABREW_SERVICE_NAME,
GlueDataBrewApi.StartJobRun)
is_integration_pattern_valid(integration_pattern, supported_integ_patterns)
kwargs[Field.Resource.value] = get_service_integration_arn(GLUE_DATABREW_SERVICE_NAME,
GlueDataBrewApi.StartJobRun,
integration_pattern)
"""
Example resource arns:
- CallAndContinue: arn: arn:aws:states:::databrew:startJobRun
- WaitForCompletion: arn: arn:aws:states:::databrew:startJobRun.sync
"""

super(GlueDataBrewStartJobRunStep, self).__init__(state_id, **kwargs)

Expand Down
42 changes: 36 additions & 6 deletions tests/unit/test_service_steps.py
Original file line number Diff line number Diff line change
Expand Up @@ -677,8 +677,8 @@ def test_emr_modify_instance_group_by_name_step_creation():


@patch.object(boto3.session.Session, 'region_name', 'us-east-1')
def test_databrew_start_job_run_step_creation_sync():
step = GlueDataBrewStartJobRunStep('Start Glue DataBrew Job Run - Sync', parameters={
def test_databrew_start_job_run_step_creation_default():
step = GlueDataBrewStartJobRunStep('Start Glue DataBrew Job Run - default', parameters={
"Name": "MyWorkflowJobRun"
})

Expand All @@ -693,10 +693,30 @@ def test_databrew_start_job_run_step_creation_sync():


@patch.object(boto3.session.Session, 'region_name', 'us-east-1')
def test_databrew_start_job_run_step_creation():
step = GlueDataBrewStartJobRunStep('Start Glue DataBrew Job Run', wait_for_completion=False, parameters={
"Name": "MyWorkflowJobRun"
})
def test_databrew_start_job_run_step_creation_wait_for_completion():
step = GlueDataBrewStartJobRunStep(
'Start Glue DataBrew Job Run - WaitForCompletion', integration_pattern=IntegrationPattern.WaitForCompletion,
parameters={
"Name": "MyWorkflowJobRun"
})

assert step.to_dict() == {
'Type': 'Task',
'Resource': 'arn:aws:states:::databrew:startJobRun.sync',
'Parameters': {
'Name': 'MyWorkflowJobRun'
},
'End': True
}


@patch.object(boto3.session.Session, 'region_name', 'us-east-1')
def test_databrew_start_job_run_step_creation_call_and_continue():
step = GlueDataBrewStartJobRunStep(
'Start Glue DataBrew Job Run - CallAndContinue',
integration_pattern=IntegrationPattern.CallAndContinue, parameters={
"Name": "MyWorkflowJobRun"
})

assert step.to_dict() == {
'Type': 'Task',
Expand All @@ -708,6 +728,16 @@ def test_databrew_start_job_run_step_creation():
}


@patch.object(boto3.session.Session, 'region_name', 'us-east-1')
def test_databrew_start_job_run_step_creation_wait_for_task_token_raises_error():
error_message = re.escape(f"Integration Pattern ({IntegrationPattern.WaitForTaskToken.name}) is not supported for this step - "
f"Please use one of the following: "
f"{[IntegrationPattern.WaitForCompletion.name, IntegrationPattern.CallAndContinue.name]}")
with pytest.raises(ValueError, match=error_message):
GlueDataBrewStartJobRunStep('Start Glue DataBrew Job Run - WaitForTaskToken',
integration_pattern=IntegrationPattern.WaitForTaskToken)


@patch.object(boto3.session.Session, 'region_name', 'us-east-1')
def test_eks_create_cluster_step_creation_call_and_continue():
step = EksCreateClusterStep("Create Eks cluster - CallAndContinue", integration_pattern=IntegrationPattern.CallAndContinue,
Expand Down

0 comments on commit 382241a

Please sign in to comment.