diff --git a/taier-common/src/main/java/com/dtstack/taier/common/exception/ErrorCode.java b/taier-common/src/main/java/com/dtstack/taier/common/exception/ErrorCode.java index bb9d2f3214..32442adb37 100644 --- a/taier-common/src/main/java/com/dtstack/taier/common/exception/ErrorCode.java +++ b/taier-common/src/main/java/com/dtstack/taier/common/exception/ErrorCode.java @@ -49,6 +49,7 @@ public enum ErrorCode implements ExceptionEnums, Serializable { SFTP_NOT_FOUND(29, "sftp can not found","sftp不存在"), UPDATE_EXCEPTION(30, "update exception", "更新异常"), + UNSUPPORTED_OPERATION(31, "unSupported operation", "不支持的操作"), CONFIG_ERROR(51, "config error","配置错误"), HTTP_CALL_ERROR(64, "http call error", "远程调用失败"), diff --git a/taier-data-develop/src/main/java/com/dtstack/taier/develop/enums/develop/ESchedulePeriodType.java b/taier-data-develop/src/main/java/com/dtstack/taier/develop/enums/develop/ESchedulePeriodType.java new file mode 100644 index 0000000000..e1ae778daf --- /dev/null +++ b/taier-data-develop/src/main/java/com/dtstack/taier/develop/enums/develop/ESchedulePeriodType.java @@ -0,0 +1,60 @@ +package com.dtstack.taier.develop.enums.develop; + + +import com.dtstack.taier.common.exception.RdosDefineException; + +/** + * Reason: + * Date: 2017/5/4 + * Company: www.dtstack.com + * @author xuchao + */ + +public enum ESchedulePeriodType { + + /** + * 分钟 + */ + MIN(0), + + /** + * 小时 + */ + HOUR(1), + + /** + * 天 + */ + DAY(2), + + /** + * 周 + */ + WEEK(3), + + /** + * 月 + */ + MONTH(4); + + private int val; + + ESchedulePeriodType(int val){ + this.val = val; + } + + public int getVal(){ + return this.val; + } + + + public static ESchedulePeriodType getEnumByVal(Integer val) { + for (ESchedulePeriodType periodType : ESchedulePeriodType.values()) { + if (periodType.getVal() == val) { + return periodType; + } + } + throw new RdosDefineException(String.format("val:%s 没有匹配到对应的周期调度类型", val)); + } + +} diff --git a/taier-data-develop/src/main/java/com/dtstack/taier/develop/enums/develop/WorkFlowScheduleConfEnum.java b/taier-data-develop/src/main/java/com/dtstack/taier/develop/enums/develop/WorkFlowScheduleConfEnum.java new file mode 100644 index 0000000000..b66fdf5ec3 --- /dev/null +++ b/taier-data-develop/src/main/java/com/dtstack/taier/develop/enums/develop/WorkFlowScheduleConfEnum.java @@ -0,0 +1,232 @@ +package com.dtstack.taier.develop.enums.develop; + +import com.alibaba.fastjson.JSONObject; +import com.dtstack.taier.common.exception.ErrorCode; +import com.dtstack.taier.common.exception.RdosDefineException; +import com.dtstack.taier.dao.domain.Task; +import org.apache.commons.lang.ArrayUtils; +import org.apache.commons.lang.StringUtils; + +/** + * 工作流调度属性工具类 + * + * @author 昆卡 + * @version 4.3.x-SNAPSHOT + * @since 2021/10/25 + */ + +public enum WorkFlowScheduleConfEnum { + /** + * 分钟 + */ + MIN(String.valueOf(ESchedulePeriodType.MIN.getVal())) { + @Override + public void checkWorkFlowChildScheduleConf(JSONObject oldJsonObject, JSONObject newJsonObject) { + validate(oldJsonObject, newJsonObject, BEGIN_DATE_KEY_NAME, END_DATE_KEY_NAME, BEGIN_HOUR_KEY_NAME, + BEGIN_MIN_KEY_NAME, GAP_MIN_KEY_NAME, END_HOUR_KEY_NAME, END_MIN_KEY_NAME); + } + + @Override + public void handleWorkFlowChildScheduleConf(Task childNodeTask, JSONObject parentJsonObject) { + applyParentScheduleConf(childNodeTask, parentJsonObject, BEGIN_DATE_KEY_NAME, END_DATE_KEY_NAME, + BEGIN_HOUR_KEY_NAME, BEGIN_MIN_KEY_NAME, GAP_MIN_KEY_NAME, END_HOUR_KEY_NAME, END_MIN_KEY_NAME, + PERIOD_TYPE); + } + }, + + /** + * 小时 + */ + HOUR(String.valueOf(ESchedulePeriodType.HOUR.getVal())) { + @Override + public void checkWorkFlowChildScheduleConf(JSONObject oldJsonObject, JSONObject newJsonObject) { + validate(oldJsonObject, newJsonObject, BEGIN_DATE_KEY_NAME, END_DATE_KEY_NAME, BEGIN_HOUR_KEY_NAME, + BEGIN_MIN_KEY_NAME, GAP_HOUR_KEY_NAME, END_HOUR_KEY_NAME, END_MIN_KEY_NAME); + } + + @Override + public void handleWorkFlowChildScheduleConf(Task childNodeTask, JSONObject parentJsonObject) { + applyParentScheduleConf(childNodeTask, parentJsonObject, BEGIN_HOUR_KEY_NAME, BEGIN_MIN_KEY_NAME, + BEGIN_DATE_KEY_NAME, END_DATE_KEY_NAME, GAP_HOUR_KEY_NAME, END_HOUR_KEY_NAME, END_MIN_KEY_NAME, + PERIOD_TYPE); + } + }, + + /** + * 天 + */ + DAY(String.valueOf(ESchedulePeriodType.DAY.getVal())) { + @Override + public void checkWorkFlowChildScheduleConf(JSONObject oldJsonObject, JSONObject newJsonObject) { + validate(oldJsonObject, newJsonObject, BEGIN_DATE_KEY_NAME, END_DATE_KEY_NAME, HOUR_KEY_NAME, MIN_KEY_NAME); + } + + @Override + public void handleWorkFlowChildScheduleConf(Task childNodeTask, JSONObject parentJsonObject) { + applyParentScheduleConf(childNodeTask, parentJsonObject, BEGIN_DATE_KEY_NAME, END_DATE_KEY_NAME, HOUR_KEY_NAME, + MIN_KEY_NAME, PERIOD_TYPE); + } + }, + + /** + * 周 + */ + WEEK(String.valueOf(ESchedulePeriodType.WEEK.getVal())) { + @Override + public void checkWorkFlowChildScheduleConf(JSONObject oldJsonObject, JSONObject newJsonObject) { + validate(oldJsonObject, newJsonObject, BEGIN_DATE_KEY_NAME, END_DATE_KEY_NAME, WEEKDAY_KEY_NAME, HOUR_KEY_NAME, + MIN_KEY_NAME); + } + + @Override + public void handleWorkFlowChildScheduleConf(Task childNodeTask, JSONObject parentJsonObject) { + applyParentScheduleConf(childNodeTask, parentJsonObject, BEGIN_DATE_KEY_NAME, END_DATE_KEY_NAME, WEEKDAY_KEY_NAME, + HOUR_KEY_NAME, MIN_KEY_NAME, PERIOD_TYPE); + } + }, + + /** + * 月 + */ + MONTH(String.valueOf(ESchedulePeriodType.MONTH.getVal())) { + @Override + public void checkWorkFlowChildScheduleConf(JSONObject oldJsonObject, JSONObject newJsonObject) { + validate(oldJsonObject, newJsonObject, BEGIN_DATE_KEY_NAME, END_DATE_KEY_NAME, DAY_KEY_NAME, HOUR_KEY_NAME, + MIN_KEY_NAME); + } + + @Override + public void handleWorkFlowChildScheduleConf(Task childNodeTask, JSONObject parentJsonObject) { + applyParentScheduleConf(childNodeTask, parentJsonObject, BEGIN_DATE_KEY_NAME, END_DATE_KEY_NAME, DAY_KEY_NAME, + HOUR_KEY_NAME, MIN_KEY_NAME, PERIOD_TYPE); + } + } + ; + /** + * 开始小时键名 + */ + private static final String BEGIN_HOUR_KEY_NAME = "beginHour"; + /** + * 开始分钟键名 + */ + private static final String BEGIN_MIN_KEY_NAME = "beginMin"; + /** + * 间隔分钟键名 + */ + private static final String GAP_MIN_KEY_NAME = "gapMin"; + /** + * 结束小时键名 + */ + private static final String END_HOUR_KEY_NAME = "endHour"; + /** + * 结束分钟键名 + */ + private static final String END_MIN_KEY_NAME = "endMin"; + /** + * 间隔小时键名 + */ + private static final String GAP_HOUR_KEY_NAME = "gapHour"; + /** + * 小时键名 + */ + private static final String HOUR_KEY_NAME = "hour"; + /** + * 分钟键名 + */ + private static final String MIN_KEY_NAME = "min"; + /** + * 星期键名 + */ + private static final String WEEKDAY_KEY_NAME = "weekDay"; + /** + * 天键名 + */ + private static final String DAY_KEY_NAME = "day"; + /** + * CRON表达式键名 + */ + private static final String CRON_KEY_NAME = "cron"; + /** + * 开始日期键名 + */ + private static final String BEGIN_DATE_KEY_NAME = "beginDate"; + /** + * 结束日期键名 + */ + private static final String END_DATE_KEY_NAME = "endDate"; + /** + * 调度周期键名 + */ + private static final String PERIOD_TYPE = "periodType"; + + /** + * 调度周期 + */ + private final String periodType; + + /** + * 处理工作流子节点调度配置 + * + * @param oldJsonObject 老周期配置json对象 + * @param newJsonObject 新周期配置json对象 + */ + public abstract void checkWorkFlowChildScheduleConf(JSONObject oldJsonObject, JSONObject newJsonObject); + + /** + * 处理工作流子节点调度配置,父的属性部分给到子 + * + * @param childNodeTask 子节点任务 + * @param parentJsonObject 工作流周期配置json对象 + */ + public abstract void handleWorkFlowChildScheduleConf(Task childNodeTask, JSONObject parentJsonObject); + + /** + * 获取工作流调度周期枚举对象 + * + * @param periodType 调度周期 + */ + public static String getCurrentPeriodType(String periodType) { + if (StringUtils.isEmpty(periodType)) { + throw new RdosDefineException(ErrorCode.INVALID_PARAMETERS); + } + for (WorkFlowScheduleConfEnum workFlowScheduleConfEnum : WorkFlowScheduleConfEnum.values()) { + if (!workFlowScheduleConfEnum.getPeriodType().equals(periodType)) { + continue; + } + return workFlowScheduleConfEnum.name(); + } + throw new RdosDefineException("未知的调度周期"); + } + + private static void validate(JSONObject oldJsonObject, JSONObject newJsonObject, String... keyNameArray) { + if (ArrayUtils.isEmpty(keyNameArray)) { + return; + } + for (String keyName : keyNameArray) { + if (!String.valueOf(oldJsonObject.getOrDefault(keyName, StringUtils.EMPTY)).equals(newJsonObject.getString(keyName))) { + throw new RdosDefineException(ErrorCode.UNSUPPORTED_OPERATION); + } + } + } + + private static void applyParentScheduleConf(Task childNodeTask, JSONObject parentJsonObject, String... keyNameArray) { + if (ArrayUtils.isEmpty(keyNameArray)) { + return; + } + final JSONObject childJsonObject = JSONObject.parseObject(childNodeTask.getScheduleConf()); + for (String keyName : keyNameArray) { + if (parentJsonObject.containsKey(keyName)) { + childJsonObject.put(keyName, parentJsonObject.get(keyName)); + } + } + childNodeTask.setScheduleConf(childJsonObject.toJSONString()); + } + + public String getPeriodType() { + return periodType; + } + + WorkFlowScheduleConfEnum(String periodType) { + this.periodType = periodType; + } +} diff --git a/taier-data-develop/src/main/java/com/dtstack/taier/develop/service/develop/impl/DevelopTaskService.java b/taier-data-develop/src/main/java/com/dtstack/taier/develop/service/develop/impl/DevelopTaskService.java index 44875fd67d..c50a049134 100644 --- a/taier-data-develop/src/main/java/com/dtstack/taier/develop/service/develop/impl/DevelopTaskService.java +++ b/taier-data-develop/src/main/java/com/dtstack/taier/develop/service/develop/impl/DevelopTaskService.java @@ -69,6 +69,7 @@ import com.dtstack.taier.develop.enums.develop.FlinkVersion; import com.dtstack.taier.develop.enums.develop.SourceDTOType; import com.dtstack.taier.develop.enums.develop.TaskCreateModelType; +import com.dtstack.taier.develop.enums.develop.WorkFlowScheduleConfEnum; import com.dtstack.taier.develop.mapstruct.vo.TaskDirtyDataManageTransfer; import com.dtstack.taier.develop.mapstruct.vo.TaskMapstructTransfer; import com.dtstack.taier.develop.service.console.TenantService; @@ -1408,8 +1409,8 @@ public void editTask(Long taskId, String taskName, Long catalogueId, String desc .eq(Task::getIsDeleted, Deleted.NORMAL.getStatus()) .eq(Task::getTenantId, tenantId)); + Task updateInfo = new Task(); if (Objects.isNull(taskInfo)) { - Task updateInfo = new Task(); updateInfo.setId(taskId); updateInfo.setGmtModified(Timestamp.valueOf(LocalDateTime.now())); updateInfo.setName(taskName); @@ -1422,6 +1423,72 @@ public void editTask(Long taskId, String taskName, Long catalogueId, String desc if (!taskId.equals(taskInfo.getId())) { throw new RdosDefineException(ErrorCode.NAME_ALREADY_EXIST); } + updateInfo.setGmtModified(Timestamp.valueOf(LocalDateTime.now())); + updateInfo.setName(taskName); + updateInfo.setNodePid(catalogueId); + updateInfo.setTaskDesc(desc); + updateInfo.setComponentVersion(componentVersion); + developTaskMapper.update(updateInfo, Wrappers.lambdaUpdate(Task.class).eq(Task::getId, taskInfo.getId())); + + if (Objects.equals(EScheduleJobType.WORK_FLOW.getType(), taskInfo.getTaskType())){ + //更新父节点目录时,同步更新子节点 + if (!taskInfo.getNodePid().equals(catalogueId)) { + updateSonTaskNodePidByFlowId(taskInfo.getId(), catalogueId); + } + } + } + + + + /** + * 更新工作流的调度信息 + * 历史任务若父节点和子节点的周期不一致,则在提交时将子节点自动改为与父节点一致 + * + * @param flowWorkId 工作流id + * @param newScheduleConf 新调度信息 + */ + public void updateSubTaskScheduleConf(final Long flowWorkId, final JSONObject newScheduleConf) { + Task task = developTaskMapper.selectById(flowWorkId); + if (task == null) { + throw new RdosDefineException(ErrorCode.CAN_NOT_FIND_TASK); + } + final List batchTasks = this.getFlowWorkSubTasks(flowWorkId); + if (CollectionUtils.isEmpty(batchTasks)) { + return; + } + final int periodType = newScheduleConf.getInteger("periodType"); + newScheduleConf.put("selfReliance", 0); + //工作流更新调度属性时,子任务同步更新 + for (final Task bTask : batchTasks) { + //工作流配置的自动取消不同步子任务 + newScheduleConf.remove("isExpire"); + JSONObject subTaskScheduleConf = JSON.parseObject(bTask.getScheduleConf()); + Boolean isFailRetry = MapUtils.getBoolean(subTaskScheduleConf, "isFailRetry", true); + Integer maxRetryNum = MapUtils.getInteger(subTaskScheduleConf, "maxRetryNum", 3); + newScheduleConf.put("isFailRetry", isFailRetry); + newScheduleConf.put("maxRetryNum", maxRetryNum); + WorkFlowScheduleConfEnum.valueOf(WorkFlowScheduleConfEnum.getCurrentPeriodType(String.valueOf(periodType))) + .handleWorkFlowChildScheduleConf(bTask, newScheduleConf); + bTask.setPeriodType(periodType); + } + for (Task batchTask : batchTasks) { + Task task1 = new Task(); + task1.setScheduleConf(batchTask.getScheduleConf()); + developTaskMapper.update(task1,Wrappers.lambdaUpdate(Task.class).eq(Task::getId,batchTask.getId())); + } + } + + + /** + * 根据父任务id,更新所有子任务的目录id + * + * @param flowId + * @param nodePid + */ + public void updateSonTaskNodePidByFlowId(Long flowId, Long nodePid) { + Task task = new Task(); + task.setNodePid(nodePid); + developTaskMapper.update(task, Wrappers.lambdaUpdate(Task.class).eq(Task::getFlowId, flowId)); } } diff --git a/taier-data-develop/src/main/java/com/dtstack/taier/develop/service/develop/saver/AbstractTaskSaver.java b/taier-data-develop/src/main/java/com/dtstack/taier/develop/service/develop/saver/AbstractTaskSaver.java index affe3fe71b..0a39895a24 100644 --- a/taier-data-develop/src/main/java/com/dtstack/taier/develop/service/develop/saver/AbstractTaskSaver.java +++ b/taier-data-develop/src/main/java/com/dtstack/taier/develop/service/develop/saver/AbstractTaskSaver.java @@ -174,6 +174,24 @@ private void updateTask(TaskVO taskVO) { if (specialTask == null) { throw new RdosDefineException(ErrorCode.CAN_NOT_FIND_TASK); } + Task specialTask1 = new Task(); + // 转换环境参数 + if (!Objects.equals(EScheduleJobType.WORK_FLOW.getVal(), taskVO.getTaskType())){ + String convertParams = convertParams(FlinkVersion.getVersion(specialTask.getComponentVersion()), + FlinkVersion.getVersion(taskVO.getComponentVersion()), + taskVO.getTaskParams(), taskVO.getTaskType()); + taskVO.setTaskParams(convertParams); + + TaskMapstructTransfer.INSTANCE.taskVOTOTask(taskVO, specialTask1); + developTaskService.updateById(specialTask1); + }else { + TaskMapstructTransfer.INSTANCE.taskVOTOTask(taskVO, specialTask1); + specialTask1.setSqlText(String.valueOf(JSONObject.toJSON(taskVO.getNodeMap()))); + developTaskService.updateById(specialTask1); + } + + + if (EScheduleJobType.WORK_FLOW.getVal().equals(specialTask.getTaskType())){ // 判断任务依赖是否成环 if (MapUtils.isNotEmpty(taskVO.getNodeMap())) { @@ -184,15 +202,19 @@ private void updateTask(TaskVO taskVO) { List dependencyTasks = getTaskByIds(entry.getValue()); developTaskTaskService.addOrUpdateTaskTask(entry.getKey(), dependencyTasks); } + + + List childrenTaskByFlowId = developTaskService.getFlowWorkSubTasks(specialTask.getId()); + if (CollectionUtils.isNotEmpty(childrenTaskByFlowId)){ + developTaskService.updateSubTaskScheduleConf(taskVO.getId(), JSONObject.parseObject(taskVO.getScheduleConf())); + } + + //更新父节点目录时,同步更新子节点 + if (!taskVO.getNodePid().equals(specialTask.getNodePid())) { + developTaskService.updateSonTaskNodePidByFlowId(taskVO.getId(), taskVO.getNodePid()); + } } - // 转换环境参数 - String convertParams = convertParams(FlinkVersion.getVersion(specialTask.getComponentVersion()), - FlinkVersion.getVersion(taskVO.getComponentVersion()), - taskVO.getTaskParams(), taskVO.getTaskType()); - taskVO.setTaskParams(convertParams); - Task specialTask1 = new Task(); - TaskMapstructTransfer.INSTANCE.taskVOTOTask(taskVO, specialTask1); - developTaskService.updateById(specialTask1); + }