Skip to content

Commit

Permalink
Test email commit 2
Browse files Browse the repository at this point in the history
Added Hashmap for success and failure email address for taskId mapping
  • Loading branch information
Balamurugan Kannadasan committed Apr 9, 2024
1 parent 4e07bf7 commit 554dd0f
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -85,9 +85,9 @@ public class DataPullRequestProcessor implements DataPullClientService {

private final ThreadPoolTaskScheduler scheduler;

private String userEmail;
private String failureEmail;
HashMap<String,String> successEmails = new HashMap<>();

HashMap<String,String> failureEmails = new HashMap<>();
public DataPullRequestProcessor(){
scheduler = new ThreadPoolTaskScheduler();
scheduler.setPoolSize(POOL_SIZE);
Expand Down Expand Up @@ -127,6 +127,10 @@ public void runSimpleDataPull(String awsenv, String pipelinename) {
}

private void runDataPull(String json, boolean isStart, boolean validateJson) throws ProcessingException {

String userEmail;
String failureEmail;

String originalInputJson = json;
json = extractUserJsonFromS3IfProvided(json, isStart);

Expand All @@ -148,6 +152,9 @@ private void runDataPull(String json, boolean isStart, boolean validateJson) thr
JsonNode jsonNode = objectMapper.readTree(json);
userEmail = null != jsonNode.get("useremailaddress") ? jsonNode.get("useremailaddress").asText(): "";

String taskId = jsonNode.get("cluster").get("awsenv").asText().concat("-emr-").concat(jsonNode.get("cluster").get("pipelinename").asText()).concat("-pipeline");
successEmails.put(taskId,userEmail);

JsonNode failureEmailNode = jsonNode.get("failureemailaddress");
if(StringUtils.isNotEmpty(userEmail)){
failureEmail = (failureEmailNode != null) ? userEmail.concat(",").concat(failureEmailNode.asText()): userEmail;
Expand All @@ -156,7 +163,7 @@ private void runDataPull(String json, boolean isStart, boolean validateJson) thr
failureEmail = (failureEmailNode != null) ? (failureEmailNode.asText()): "";

}

failureEmails.put(taskId,failureEmail);


List<Map.Entry<String, JsonNode>> result = new LinkedList<Map.Entry<String, JsonNode>>();
Expand Down Expand Up @@ -218,13 +225,13 @@ private void runDataPull(String json, boolean isStart, boolean validateJson) thr
log.debug("runDataPull <- return");
}

public String successMailAddress() throws ProcessingException {
return userEmail;
public HashMap<String, String> successMailAddress() throws ProcessingException {
return successEmails;
}


public String failureMailAddress() throws ProcessingException {
return failureEmail;
public HashMap<String, String> failureMailAddress() throws ProcessingException {
return failureEmails;
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,6 @@ public class DataPullTask implements Runnable {

String jobFlowId = null;

String emailAddress = null;
String ClusterId = null;

enum emailStatusCode {
Expand Down Expand Up @@ -453,14 +452,15 @@ private RunJobFlowResult runTaskInNewCluster(final AmazonElasticMapReduce emr, f

private void sendEmailNotification() throws ProcessingException {
String emailStatusCodeVal = null;
String emailAddress = null;
try {

if (jobFlowId != null) {
emailStatusCodeVal = emailStatusCode.CLUSTER_CREATION_SUCCESS.name();
emailAddress = dataPullRequestProcessor.successMailAddress();
emailAddress = dataPullRequestProcessor.successMailAddress().get(taskId);
} else if (ClusterId != null) {
emailStatusCodeVal = emailStatusCode.SPARK_EXEC_ON_EXISTING_CLUSTER.name();
emailAddress = dataPullRequestProcessor.successMailAddress();
emailAddress = dataPullRequestProcessor.successMailAddress().get(taskId);
} else {
if (jobFlowId == null) {
emailStatusCodeVal = emailStatusCode.CLUSTER_CREATION_FAILED.name();
Expand All @@ -471,7 +471,7 @@ private void sendEmailNotification() throws ProcessingException {
} else if (runTaskOnExistingClusterFail == true) {
emailStatusCodeVal = emailStatusCode.RUN_ON_EXISTING_CLUSTER_FAILED.name();
}
emailAddress = dataPullRequestProcessor.failureMailAddress();
emailAddress = dataPullRequestProcessor.failureMailAddress().get(taskId);
}

String[] emailAddressArray = null;
Expand Down

0 comments on commit 554dd0f

Please sign in to comment.