Skip to content

Commit

Permalink
Merge pull request #650 from peishengsheng/feat_1.2
Browse files Browse the repository at this point in the history
[feat_1.2][taier-service] fix work flow show
  • Loading branch information
vainhope authored Jul 25, 2022
2 parents 51edd80 + d02dd5f commit 4c545bd
Show file tree
Hide file tree
Showing 5 changed files with 391 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ public enum ErrorCode implements ExceptionEnums, Serializable {
SFTP_NOT_FOUND(29, "sftp can not found","sftp不存在"),

UPDATE_EXCEPTION(30, "update exception", "更新异常"),
UNSUPPORTED_OPERATION(31, "unSupported operation", "不支持的操作"),
CONFIG_ERROR(51, "config error","配置错误"),

HTTP_CALL_ERROR(64, "http call error", "远程调用失败"),
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
package com.dtstack.taier.develop.enums.develop;


import com.dtstack.taier.common.exception.RdosDefineException;

/**
* Reason:
* Date: 2017/5/4
* Company: www.dtstack.com
* @author xuchao
*/

public enum ESchedulePeriodType {

/**
* 分钟
*/
MIN(0),

/**
* 小时
*/
HOUR(1),

/**
* 天
*/
DAY(2),

/**
* 周
*/
WEEK(3),

/**
* 月
*/
MONTH(4);

private int val;

ESchedulePeriodType(int val){
this.val = val;
}

public int getVal(){
return this.val;
}


public static ESchedulePeriodType getEnumByVal(Integer val) {
for (ESchedulePeriodType periodType : ESchedulePeriodType.values()) {
if (periodType.getVal() == val) {
return periodType;
}
}
throw new RdosDefineException(String.format("val:%s 没有匹配到对应的周期调度类型", val));
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,232 @@
package com.dtstack.taier.develop.enums.develop;

import com.alibaba.fastjson.JSONObject;
import com.dtstack.taier.common.exception.ErrorCode;
import com.dtstack.taier.common.exception.RdosDefineException;
import com.dtstack.taier.dao.domain.Task;
import org.apache.commons.lang.ArrayUtils;
import org.apache.commons.lang.StringUtils;

/**
* 工作流调度属性工具类
*
* @author 昆卡
* @version 4.3.x-SNAPSHOT
* @since 2021/10/25
*/

public enum WorkFlowScheduleConfEnum {
/**
* 分钟
*/
MIN(String.valueOf(ESchedulePeriodType.MIN.getVal())) {
@Override
public void checkWorkFlowChildScheduleConf(JSONObject oldJsonObject, JSONObject newJsonObject) {
validate(oldJsonObject, newJsonObject, BEGIN_DATE_KEY_NAME, END_DATE_KEY_NAME, BEGIN_HOUR_KEY_NAME,
BEGIN_MIN_KEY_NAME, GAP_MIN_KEY_NAME, END_HOUR_KEY_NAME, END_MIN_KEY_NAME);
}

@Override
public void handleWorkFlowChildScheduleConf(Task childNodeTask, JSONObject parentJsonObject) {
applyParentScheduleConf(childNodeTask, parentJsonObject, BEGIN_DATE_KEY_NAME, END_DATE_KEY_NAME,
BEGIN_HOUR_KEY_NAME, BEGIN_MIN_KEY_NAME, GAP_MIN_KEY_NAME, END_HOUR_KEY_NAME, END_MIN_KEY_NAME,
PERIOD_TYPE);
}
},

/**
* 小时
*/
HOUR(String.valueOf(ESchedulePeriodType.HOUR.getVal())) {
@Override
public void checkWorkFlowChildScheduleConf(JSONObject oldJsonObject, JSONObject newJsonObject) {
validate(oldJsonObject, newJsonObject, BEGIN_DATE_KEY_NAME, END_DATE_KEY_NAME, BEGIN_HOUR_KEY_NAME,
BEGIN_MIN_KEY_NAME, GAP_HOUR_KEY_NAME, END_HOUR_KEY_NAME, END_MIN_KEY_NAME);
}

@Override
public void handleWorkFlowChildScheduleConf(Task childNodeTask, JSONObject parentJsonObject) {
applyParentScheduleConf(childNodeTask, parentJsonObject, BEGIN_HOUR_KEY_NAME, BEGIN_MIN_KEY_NAME,
BEGIN_DATE_KEY_NAME, END_DATE_KEY_NAME, GAP_HOUR_KEY_NAME, END_HOUR_KEY_NAME, END_MIN_KEY_NAME,
PERIOD_TYPE);
}
},

/**
* 天
*/
DAY(String.valueOf(ESchedulePeriodType.DAY.getVal())) {
@Override
public void checkWorkFlowChildScheduleConf(JSONObject oldJsonObject, JSONObject newJsonObject) {
validate(oldJsonObject, newJsonObject, BEGIN_DATE_KEY_NAME, END_DATE_KEY_NAME, HOUR_KEY_NAME, MIN_KEY_NAME);
}

@Override
public void handleWorkFlowChildScheduleConf(Task childNodeTask, JSONObject parentJsonObject) {
applyParentScheduleConf(childNodeTask, parentJsonObject, BEGIN_DATE_KEY_NAME, END_DATE_KEY_NAME, HOUR_KEY_NAME,
MIN_KEY_NAME, PERIOD_TYPE);
}
},

/**
* 周
*/
WEEK(String.valueOf(ESchedulePeriodType.WEEK.getVal())) {
@Override
public void checkWorkFlowChildScheduleConf(JSONObject oldJsonObject, JSONObject newJsonObject) {
validate(oldJsonObject, newJsonObject, BEGIN_DATE_KEY_NAME, END_DATE_KEY_NAME, WEEKDAY_KEY_NAME, HOUR_KEY_NAME,
MIN_KEY_NAME);
}

@Override
public void handleWorkFlowChildScheduleConf(Task childNodeTask, JSONObject parentJsonObject) {
applyParentScheduleConf(childNodeTask, parentJsonObject, BEGIN_DATE_KEY_NAME, END_DATE_KEY_NAME, WEEKDAY_KEY_NAME,
HOUR_KEY_NAME, MIN_KEY_NAME, PERIOD_TYPE);
}
},

/**
* 月
*/
MONTH(String.valueOf(ESchedulePeriodType.MONTH.getVal())) {
@Override
public void checkWorkFlowChildScheduleConf(JSONObject oldJsonObject, JSONObject newJsonObject) {
validate(oldJsonObject, newJsonObject, BEGIN_DATE_KEY_NAME, END_DATE_KEY_NAME, DAY_KEY_NAME, HOUR_KEY_NAME,
MIN_KEY_NAME);
}

@Override
public void handleWorkFlowChildScheduleConf(Task childNodeTask, JSONObject parentJsonObject) {
applyParentScheduleConf(childNodeTask, parentJsonObject, BEGIN_DATE_KEY_NAME, END_DATE_KEY_NAME, DAY_KEY_NAME,
HOUR_KEY_NAME, MIN_KEY_NAME, PERIOD_TYPE);
}
}
;
/**
* 开始小时键名
*/
private static final String BEGIN_HOUR_KEY_NAME = "beginHour";
/**
* 开始分钟键名
*/
private static final String BEGIN_MIN_KEY_NAME = "beginMin";
/**
* 间隔分钟键名
*/
private static final String GAP_MIN_KEY_NAME = "gapMin";
/**
* 结束小时键名
*/
private static final String END_HOUR_KEY_NAME = "endHour";
/**
* 结束分钟键名
*/
private static final String END_MIN_KEY_NAME = "endMin";
/**
* 间隔小时键名
*/
private static final String GAP_HOUR_KEY_NAME = "gapHour";
/**
* 小时键名
*/
private static final String HOUR_KEY_NAME = "hour";
/**
* 分钟键名
*/
private static final String MIN_KEY_NAME = "min";
/**
* 星期键名
*/
private static final String WEEKDAY_KEY_NAME = "weekDay";
/**
* 天键名
*/
private static final String DAY_KEY_NAME = "day";
/**
* CRON表达式键名
*/
private static final String CRON_KEY_NAME = "cron";
/**
* 开始日期键名
*/
private static final String BEGIN_DATE_KEY_NAME = "beginDate";
/**
* 结束日期键名
*/
private static final String END_DATE_KEY_NAME = "endDate";
/**
* 调度周期键名
*/
private static final String PERIOD_TYPE = "periodType";

/**
* 调度周期
*/
private final String periodType;

/**
* 处理工作流子节点调度配置
*
* @param oldJsonObject 老周期配置json对象
* @param newJsonObject 新周期配置json对象
*/
public abstract void checkWorkFlowChildScheduleConf(JSONObject oldJsonObject, JSONObject newJsonObject);

/**
* 处理工作流子节点调度配置,父的属性部分给到子
*
* @param childNodeTask 子节点任务
* @param parentJsonObject 工作流周期配置json对象
*/
public abstract void handleWorkFlowChildScheduleConf(Task childNodeTask, JSONObject parentJsonObject);

/**
* 获取工作流调度周期枚举对象
*
* @param periodType 调度周期
*/
public static String getCurrentPeriodType(String periodType) {
if (StringUtils.isEmpty(periodType)) {
throw new RdosDefineException(ErrorCode.INVALID_PARAMETERS);
}
for (WorkFlowScheduleConfEnum workFlowScheduleConfEnum : WorkFlowScheduleConfEnum.values()) {
if (!workFlowScheduleConfEnum.getPeriodType().equals(periodType)) {
continue;
}
return workFlowScheduleConfEnum.name();
}
throw new RdosDefineException("未知的调度周期");
}

private static void validate(JSONObject oldJsonObject, JSONObject newJsonObject, String... keyNameArray) {
if (ArrayUtils.isEmpty(keyNameArray)) {
return;
}
for (String keyName : keyNameArray) {
if (!String.valueOf(oldJsonObject.getOrDefault(keyName, StringUtils.EMPTY)).equals(newJsonObject.getString(keyName))) {
throw new RdosDefineException(ErrorCode.UNSUPPORTED_OPERATION);
}
}
}

private static void applyParentScheduleConf(Task childNodeTask, JSONObject parentJsonObject, String... keyNameArray) {
if (ArrayUtils.isEmpty(keyNameArray)) {
return;
}
final JSONObject childJsonObject = JSONObject.parseObject(childNodeTask.getScheduleConf());
for (String keyName : keyNameArray) {
if (parentJsonObject.containsKey(keyName)) {
childJsonObject.put(keyName, parentJsonObject.get(keyName));
}
}
childNodeTask.setScheduleConf(childJsonObject.toJSONString());
}

public String getPeriodType() {
return periodType;
}

WorkFlowScheduleConfEnum(String periodType) {
this.periodType = periodType;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@
import com.dtstack.taier.develop.enums.develop.FlinkVersion;
import com.dtstack.taier.develop.enums.develop.SourceDTOType;
import com.dtstack.taier.develop.enums.develop.TaskCreateModelType;
import com.dtstack.taier.develop.enums.develop.WorkFlowScheduleConfEnum;
import com.dtstack.taier.develop.mapstruct.vo.TaskDirtyDataManageTransfer;
import com.dtstack.taier.develop.mapstruct.vo.TaskMapstructTransfer;
import com.dtstack.taier.develop.service.console.TenantService;
Expand Down Expand Up @@ -1408,8 +1409,8 @@ public void editTask(Long taskId, String taskName, Long catalogueId, String desc
.eq(Task::getIsDeleted, Deleted.NORMAL.getStatus())
.eq(Task::getTenantId, tenantId));

Task updateInfo = new Task();
if (Objects.isNull(taskInfo)) {
Task updateInfo = new Task();
updateInfo.setId(taskId);
updateInfo.setGmtModified(Timestamp.valueOf(LocalDateTime.now()));
updateInfo.setName(taskName);
Expand All @@ -1422,6 +1423,72 @@ public void editTask(Long taskId, String taskName, Long catalogueId, String desc
if (!taskId.equals(taskInfo.getId())) {
throw new RdosDefineException(ErrorCode.NAME_ALREADY_EXIST);
}
updateInfo.setGmtModified(Timestamp.valueOf(LocalDateTime.now()));
updateInfo.setName(taskName);
updateInfo.setNodePid(catalogueId);
updateInfo.setTaskDesc(desc);
updateInfo.setComponentVersion(componentVersion);
developTaskMapper.update(updateInfo, Wrappers.lambdaUpdate(Task.class).eq(Task::getId, taskInfo.getId()));

if (Objects.equals(EScheduleJobType.WORK_FLOW.getType(), taskInfo.getTaskType())){
//更新父节点目录时,同步更新子节点
if (!taskInfo.getNodePid().equals(catalogueId)) {
updateSonTaskNodePidByFlowId(taskInfo.getId(), catalogueId);
}
}
}



/**
* 更新工作流的调度信息
* 历史任务若父节点和子节点的周期不一致,则在提交时将子节点自动改为与父节点一致
*
* @param flowWorkId 工作流id
* @param newScheduleConf 新调度信息
*/
public void updateSubTaskScheduleConf(final Long flowWorkId, final JSONObject newScheduleConf) {
Task task = developTaskMapper.selectById(flowWorkId);
if (task == null) {
throw new RdosDefineException(ErrorCode.CAN_NOT_FIND_TASK);
}
final List<Task> batchTasks = this.getFlowWorkSubTasks(flowWorkId);
if (CollectionUtils.isEmpty(batchTasks)) {
return;
}
final int periodType = newScheduleConf.getInteger("periodType");
newScheduleConf.put("selfReliance", 0);
//工作流更新调度属性时,子任务同步更新
for (final Task bTask : batchTasks) {
//工作流配置的自动取消不同步子任务
newScheduleConf.remove("isExpire");
JSONObject subTaskScheduleConf = JSON.parseObject(bTask.getScheduleConf());
Boolean isFailRetry = MapUtils.getBoolean(subTaskScheduleConf, "isFailRetry", true);
Integer maxRetryNum = MapUtils.getInteger(subTaskScheduleConf, "maxRetryNum", 3);
newScheduleConf.put("isFailRetry", isFailRetry);
newScheduleConf.put("maxRetryNum", maxRetryNum);
WorkFlowScheduleConfEnum.valueOf(WorkFlowScheduleConfEnum.getCurrentPeriodType(String.valueOf(periodType)))
.handleWorkFlowChildScheduleConf(bTask, newScheduleConf);
bTask.setPeriodType(periodType);
}
for (Task batchTask : batchTasks) {
Task task1 = new Task();
task1.setScheduleConf(batchTask.getScheduleConf());
developTaskMapper.update(task1,Wrappers.lambdaUpdate(Task.class).eq(Task::getId,batchTask.getId()));
}
}


/**
* 根据父任务id,更新所有子任务的目录id
*
* @param flowId
* @param nodePid
*/
public void updateSonTaskNodePidByFlowId(Long flowId, Long nodePid) {
Task task = new Task();
task.setNodePid(nodePid);
developTaskMapper.update(task, Wrappers.lambdaUpdate(Task.class).eq(Task::getFlowId, flowId));
}

}
Loading

0 comments on commit 4c545bd

Please sign in to comment.