Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make runjobflowRequest to be synchronous #208

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions api/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,12 @@
<version>2.7.0</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>2.12.0</version>
</dependency>

<dependency>
<groupId>org.everit.json</groupId>
<artifactId>org.everit.json.schema</artifactId>
Expand All @@ -131,6 +137,12 @@
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>DataMigrationTool</groupId>
<artifactId>DataMigrationFramework</artifactId>
<version>1.0-SNAPSHOT</version>
<scope>compile</scope>
</dependency>
</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,8 @@ public class DataPullClientConfig {

@Bean
@Scope("prototype")
public DataPullTask getTask(String taskId, String json, String jksFile, List<String> subnets, Map<String, List<DescribeStepRequest>> stepPipelineMap) {
return new DataPullTask(taskId, json, jksFile, subnets, stepPipelineMap);
public DataPullTask getTask(String taskId, String creator, String json, String jksFile, List<String> subnets, Map<String, List<DescribeStepRequest>> stepPipelineMap) {
return new DataPullTask(taskId, creator, json, jksFile, subnets, stepPipelineMap);
}

@Bean
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -351,7 +351,7 @@ private Boolean createBootstrapScript(Migration[] myObjects, String bootstrapFil

private DataPullTask createDataPullTask(String fileS3Path, String jksFilePath, ClusterProperties properties, String jobName, String creator, String customJarFilePath, Boolean haveBootstrapAction) {
String creatorTag = String.join(" ", Arrays.asList(creator.split(",|;")));
DataPullTask task = config.getTask(jobName, fileS3Path, jksFilePath,rotateSubnets(),getStepForPipeline()).withClusterProperties(properties).withCustomJar(customJarFilePath).haveBootstrapAction(haveBootstrapAction)
DataPullTask task = config.getTask(jobName, creator, fileS3Path, jksFilePath,rotateSubnets(),getStepForPipeline()).withClusterProperties(properties).withCustomJar(customJarFilePath).haveBootstrapAction(haveBootstrapAction)
.addTag("Creator", creatorTag).addTag("Env", Objects.toString(properties.getAwsEnv(), env)).addTag("Name", jobName)
.addTag("AssetProtectionLevel", "99").addTag("ComponentInfo", properties.getComponentInfo())
.addTag("Portfolio", properties.getPortfolio()).addTag("Product", properties.getProduct()).addTag("Team", properties.getTeam()).addTag("tool", "datapull")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,23 @@

import com.amazonaws.services.elasticmapreduce.AmazonElasticMapReduce;
import com.amazonaws.services.elasticmapreduce.model.*;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.dataformat.yaml.YAMLFactory;
import com.homeaway.datapullclient.config.DataPullClientConfig;
import com.homeaway.datapullclient.config.DataPullProperties;
import com.homeaway.datapullclient.config.EMRProperties;
import com.homeaway.datapullclient.input.ClusterProperties;
import config.AppConfig;
import core.Controller;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;

import java.io.IOException;
import java.io.InputStream;
import java.time.Instant;
import java.util.*;
import java.util.concurrent.ThreadLocalRandom;
import java.util.stream.Collectors;
Expand All @@ -42,6 +50,8 @@ public class DataPullTask implements Runnable {

private final String jsonS3Path;

private final String creator;

private final String s3FilePath;

private final String jksS3Path;
Expand All @@ -59,7 +69,8 @@ public class DataPullTask implements Runnable {
private final List<String> subnets ;
private final Map<String,List<DescribeStepRequest>> stepPipelineMap;

public DataPullTask(final String taskId, final String s3File, final String jksFilePath, final List<String> subnets, final Map<String,List<DescribeStepRequest>> stepPipelineMap) {
public DataPullTask(final String taskId, final String creator, final String s3File, final String jksFilePath, final List<String> subnets, final Map<String,List<DescribeStepRequest>> stepPipelineMap) {
this.creator = creator;
s3FilePath = s3File;
this.taskId = taskId;
jsonS3Path = this.s3FilePath + ".json";
Expand Down Expand Up @@ -370,9 +381,17 @@ private RunJobFlowResult runTaskInNewCluster(final AmazonElasticMapReduce emr, f
request.withBootstrapActions(bsConfig);
}
RunJobFlowResult result=emr.runJobFlow(request);
// Wait for the cluster to be ready
boolean isClusterReady = waitForClusterReady(emr, result.getJobFlowId());

if (!isClusterReady) {
String errorMessage = "EMR cluster failed to start. Aborting the data pull task.";
DataPullTask.log.error(errorMessage);
sendEmail(errorMessage, creator);
throw new RuntimeException(errorMessage);
}
ListStepsResult steps = emr.listSteps(new ListStepsRequest().withClusterId(result.getJobFlowId()));
StepSummary step = steps.getSteps().get(0);
;
DescribeStepRequest ds = new DescribeStepRequest();
ds.withClusterId(result.getJobFlowId());
ds.withStepId(step.getId());
Expand All @@ -382,6 +401,87 @@ private RunJobFlowResult runTaskInNewCluster(final AmazonElasticMapReduce emr, f
return result;
}


private void sendEmail(String message, String creator) {
try {
// Create ObjectMapper for YAML
ObjectMapper yamlMapper = new ObjectMapper(new YAMLFactory());

// Load application.yml from resources
InputStream inputStream = this.getClass().getClassLoader().getResourceAsStream("application.yml");

// Read YAML content into JsonNode
JsonNode applicationConf = yamlMapper.readTree(inputStream);

AppConfig config = new AppConfig(applicationConf);
String pipelineName = clusterProperties.getPipelineName();

String env = clusterProperties.getAwsEnv();
String applicationId = clusterProperties.getApplication();
String subject = "Data Pull job failed for the pipeline " + pipelineName + " in " + env + " environment";

StringBuilder reportbodyHtml = new StringBuilder();
reportbodyHtml.append("<tr><td><h3>Errors!</h3></td><td>")
.append(Instant.now().toString())
.append("</td><td colspan=\"5\">")
.append(message)
.append("</td></tr>");
Controller controllerInstance = new Controller(config, pipelineName);
String htmlContent = controllerInstance.neatifyReportHtml(reportbodyHtml.toString(), true, true);
controllerInstance.SendEmail(creator, htmlContent, applicationId,
pipelineName, env, subject, "");
}
catch (IOException e) {
DataPullTask.log.error("Error while sending email", e);
}

}

private boolean waitForClusterReady(AmazonElasticMapReduce emrClient, String clusterId) {
DescribeClusterRequest describeRequest = new DescribeClusterRequest().withClusterId(clusterId);
int maxRetries = 60;
int retryIntervalSeconds = 30;
int retries = 0;

while (retries < maxRetries) {
try {
DescribeClusterResult describeResult = emrClient.describeCluster(describeRequest);
ClusterStatus status = describeResult.getCluster().getStatus();
String state = status.getState();
DataPullTask.log.info("Cluster {} is in state {}", clusterId, state);

switch (state) {
case "WAITING":
case "RUNNING":
DataPullTask.log.info("Cluster {} is ready.", clusterId);
return true;
case "TERMINATED_WITH_ERRORS":
case "TERMINATED":
String reason = status.getStateChangeReason().getMessage();
DataPullTask.log.error("Cluster {} failed to start. Reason: {}", clusterId, reason);
return false;
default:
// Cluster is still starting up
break;
}

// Wait before polling again
Thread.sleep(retryIntervalSeconds * 1000);
retries++;
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
DataPullTask.log.error("Interrupted while waiting for cluster to start.", e);
return false;
} catch (Exception e) {
DataPullTask.log.error("Error while checking cluster status.", e);
return false;
}
}

DataPullTask.log.error("Cluster {} did not start within the expected time.", clusterId);
return false;
}

private JobFlowInstancesConfig getJobFlowInstancesConfig(EMRProperties emrProperties,
ClusterProperties clusterProperties,
DataPullProperties dataPullProperties) {
Expand Down