diff --git a/dss-appconn/appconns/dss-dolphinscheduler-appconn/src/main/resources/init.sql b/dss-appconn/appconns/dss-dolphinscheduler-appconn/src/main/resources/init.sql index 07522a339e..d6cf5e01f6 100644 --- a/dss-appconn/appconns/dss-dolphinscheduler-appconn/src/main/resources/init.sql +++ b/dss-appconn/appconns/dss-dolphinscheduler-appconn/src/main/resources/init.sql @@ -20,9 +20,10 @@ VALUES(@dolphinscheduler_appconnId, @dolphinscheduler_menuId,'dolphinscheduler', delete from dss_workspace_dictionary where dic_key = "pom_work_flow_ds"; delete from dss_workspace_dictionary where dic_key = "pom_work_flow_ds_DAG"; +delete from dss_workspace_dictionary where dic_key = "pdp_scheduler_center"; insert into `dss_workspace_dictionary`(`workspace_id`,`parent_key`,`dic_name`,`dic_name_en`,`dic_key`,`dic_value`,`dic_value_en`,`title`,`title_en`,`url`,`url_type`,`icon`,`order_num`,`remark`,`create_user`,`create_time`,`update_user`,`update_time`) values (0,'p_orchestrator_mode','DS工作流','Workflow_DS','pom_work_flow_ds','radio',NULL,NULL,NULL,NULL,0,'gongzuoliu-icon',1,'工程编排模式-DS工作流','SYSTEM','2022-03-21 14:25:35',NULL,'2022-03-21 14:25:35'); insert into `dss_workspace_dictionary`(`workspace_id`,`parent_key`,`dic_name`,`dic_name_en`,`dic_key`,`dic_value`,`dic_value_en`,`title`,`title_en`,`url`,`url_type`,`icon`,`order_num`,`remark`,`create_user`,`create_time`,`update_user`,`update_time`) values (0,'pom_work_flow_ds','DAG','DAG','pom_work_flow_ds_DAG',NULL,NULL,NULL,NULL,NULL,0,NULL,1,'工程编排模式-DS工作流-DAG','SYSTEM','2022-03-21 14:25:35',NULL,'2022-03-21 14:25:35'); INSERT INTO dss_workspace_dictionary (workspace_id, parent_key, dic_name, dic_name_en, dic_key, dic_value, dic_value_en, title, title_en, url, url_type, icon, order_num, remark, create_user, create_time, update_user, update_time) -VALUES(0, 'p_develop_process', '调度中心', 'Scheduler Center', 'pdp_scheduler_center', 'scheduler', NULL, NULL, NULL, NULL, 0, 'kaifa-icon', 1, '工程开发流程-调度中心', 'SYSTEM', '2020-12-28 17:32:35.0', NULL, '2021-02-22 17:49:02.0'); \ No newline at end of file +VALUES(0, 'p_develop_process', '调度中心', 'Scheduler Center', 'pdp_scheduler_center', 'scheduler', NULL, NULL, NULL, NULL, 0, 'kaifa-icon', 1, '工程开发流程-调度中心', 'SYSTEM', '2020-12-28 17:32:35.0', NULL, '2021-02-22 17:49:02.0'); diff --git a/dss-framework/dss-framework-orchestrator-server/src/main/java/com/webank/wedatasphere/dss/orchestrator/server/job/OrchestratorCopyJob.java b/dss-framework/dss-framework-orchestrator-server/src/main/java/com/webank/wedatasphere/dss/orchestrator/server/job/OrchestratorCopyJob.java index d087348a79..fc73840193 100644 --- a/dss-framework/dss-framework-orchestrator-server/src/main/java/com/webank/wedatasphere/dss/orchestrator/server/job/OrchestratorCopyJob.java +++ b/dss-framework/dss-framework-orchestrator-server/src/main/java/com/webank/wedatasphere/dss/orchestrator/server/job/OrchestratorCopyJob.java @@ -1,17 +1,22 @@ package com.webank.wedatasphere.dss.orchestrator.server.job; import com.google.common.collect.Lists; +import com.webank.wedatasphere.dss.appconn.scheduler.structure.orchestration.OrchestrationCreationOperation; +import com.webank.wedatasphere.dss.appconn.scheduler.structure.orchestration.OrchestrationService; +import com.webank.wedatasphere.dss.appconn.scheduler.structure.orchestration.ref.DSSOrchestrationContentRequestRef; +import com.webank.wedatasphere.dss.appconn.scheduler.structure.orchestration.ref.OrchestrationResponseRef; import com.webank.wedatasphere.dss.common.exception.DSSErrorException; import com.webank.wedatasphere.dss.common.label.DSSLabel; import com.webank.wedatasphere.dss.common.utils.MapUtils; -import com.webank.wedatasphere.dss.orchestrator.common.entity.DSSOrchestratorCopyInfo; -import com.webank.wedatasphere.dss.orchestrator.common.entity.DSSOrchestratorInfo; -import com.webank.wedatasphere.dss.orchestrator.common.entity.DSSOrchestratorVersion; +import com.webank.wedatasphere.dss.orchestrator.common.entity.*; import com.webank.wedatasphere.dss.orchestrator.common.ref.OrchestratorRefConstant; import com.webank.wedatasphere.dss.orchestrator.core.DSSOrchestrator; import com.webank.wedatasphere.dss.orchestrator.core.utils.OrchestratorUtils; +import com.webank.wedatasphere.dss.orchestrator.db.dao.OrchestratorMapper; import com.webank.wedatasphere.dss.orchestrator.publish.utils.OrchestrationDevelopmentOperationUtils; import com.webank.wedatasphere.dss.orchestrator.server.entity.vo.OrchestratorCopyVo; +import com.webank.wedatasphere.dss.orchestrator.server.service.OrchestratorService; +import com.webank.wedatasphere.dss.orchestrator.server.service.impl.OrchestratorFrameworkServiceImpl; import com.webank.wedatasphere.dss.standard.app.development.operation.RefCopyOperation; import com.webank.wedatasphere.dss.standard.app.development.ref.CopyRequestRef; import com.webank.wedatasphere.dss.standard.app.development.ref.RefJobContentResponseRef; @@ -33,6 +38,12 @@ public class OrchestratorCopyJob implements Runnable { protected OrchestratorCopyEnv orchestratorCopyEnv; + private OrchestratorFrameworkServiceImpl orchestratorFrameworkServiceImpl; + + private OrchestratorService orchestratorService; + + private OrchestratorMapper orchestratorMapper; + private DSSOrchestratorCopyInfo orchestratorCopyInfo = new DSSOrchestratorCopyInfo(UUID.randomUUID().toString()); @@ -66,9 +77,10 @@ private void copyOrchestrator() { newOrchestrator.setDesc("copy from " + sourceOrchestrator.getName()); newOrchestrator.setUpdateTime(null); newOrchestrator.setUpdateUser(null); + DSSOrchestratorVersion dssOrchestratorVersion = null; try { - doOrchestratorCopy(orchestratorCopyVo.getUsername(), orchestratorCopyVo.getWorkspace(), newOrchestrator, + dssOrchestratorVersion = doOrchestratorCopy(orchestratorCopyVo.getUsername(), orchestratorCopyVo.getWorkspace(), newOrchestrator, orchestratorCopyVo.getTargetProjectName(), Lists.newArrayList(orchestratorCopyVo.getDssLabel()), appId); } catch (Exception e) { //保存错误信息 @@ -91,9 +103,37 @@ private void copyOrchestrator() { orchestratorCopyInfo.setSuccessNode(Lists.newArrayList("All")); orchestratorCopyInfo.setStatus(1); orchestratorCopyEnv.getOrchestratorCopyJobMapper().updateCopyStatus(orchestratorCopyInfo); + + List dssLabels = new ArrayList<>(); + dssLabels.add(orchestratorCopyVo.getDssLabel()); + + //2.如果调度系统要求同步创建工作流,向调度系统发送创建工作流的请求 + OrchestrationResponseRef orchestrationResponseRef = orchestratorFrameworkServiceImpl.tryOrchestrationOperation(dssLabels, true, orchestratorCopyVo.getUsername(), + orchestratorCopyVo.getTargetProjectName(), orchestratorCopyVo.getWorkspace(), newOrchestrator, + OrchestrationService::getOrchestrationCreationOperation, + (structureOperation, structureRequestRef) -> ((OrchestrationCreationOperation) structureOperation) + .createOrchestration((DSSOrchestrationContentRequestRef) structureRequestRef), "create"); + + try { + orchestratorService.copyOrchestrator(orchestratorCopyVo.getUsername(), orchestratorCopyVo.getWorkspace(), orchestratorCopyVo.getTargetProjectName(), + orchestratorCopyVo.getTargetProjectId(), newOrchestrator.getDesc(), newOrchestrator, dssLabels); + } catch (Exception e) { + throw new RuntimeException("error happened when copying orc.", e); + } + + Long orchestratorId = newOrchestrator.getId(); + Long orchestratorVersionId = dssOrchestratorVersion.getId(); + //4.将工程和orchestrator的关系存储到的数据库中 + if (orchestrationResponseRef != null) { + Long refProjectId = (Long) orchestrationResponseRef.toMap().get("refProjectId"); + orchestratorMapper.addOrchestratorRefOrchestration(new DSSOrchestratorRefOrchestration(orchestratorId, refProjectId, orchestrationResponseRef.getRefOrchestrationId())); + } else { + LOGGER.info("copy orchestration {} with orchestratorId is {}, and versionId is {}, and orchestrationResponseRef is null.", newOrchestrator.getName(), orchestratorId, orchestratorVersionId); + + } } - private void doOrchestratorCopy(String userName, + private DSSOrchestratorVersion doOrchestratorCopy(String userName, Workspace workspace, DSSOrchestratorInfo dssOrchestratorInfo, String projectName, @@ -159,4 +199,28 @@ public DSSOrchestratorCopyInfo getOrchestratorCopyInfo() { public void setOrchestratorCopyInfo(DSSOrchestratorCopyInfo orchestratorCopyInfo) { this.orchestratorCopyInfo = orchestratorCopyInfo; } + + public OrchestratorFrameworkServiceImpl getOrchestratorFrameworkServiceImpl() { + return orchestratorFrameworkServiceImpl; + } + + public void setOrchestratorFrameworkServiceImpl(OrchestratorFrameworkServiceImpl orchestratorFrameworkServiceImpl) { + this.orchestratorFrameworkServiceImpl = orchestratorFrameworkServiceImpl; + } + + public OrchestratorService getOrchestratorService() { + return orchestratorService; + } + + public void setOrchestratorService(OrchestratorService orchestratorService) { + this.orchestratorService = orchestratorService; + } + + public OrchestratorMapper getOrchestratorMapper() { + return orchestratorMapper; + } + + public void setOrchestratorMapper(OrchestratorMapper orchestratorMapper) { + this.orchestratorMapper = orchestratorMapper; + } } diff --git a/dss-framework/dss-framework-orchestrator-server/src/main/java/com/webank/wedatasphere/dss/orchestrator/server/service/OrchestratorService.java b/dss-framework/dss-framework-orchestrator-server/src/main/java/com/webank/wedatasphere/dss/orchestrator/server/service/OrchestratorService.java index 711b79f65d..0e19e5860e 100644 --- a/dss-framework/dss-framework-orchestrator-server/src/main/java/com/webank/wedatasphere/dss/orchestrator/server/service/OrchestratorService.java +++ b/dss-framework/dss-framework-orchestrator-server/src/main/java/com/webank/wedatasphere/dss/orchestrator/server/service/OrchestratorService.java @@ -70,6 +70,18 @@ void deleteOrchestrator(String userName, String projectName, Long orchestratorInfoId, List dssLabels) throws Exception; + /** + * 复制编排 + * + */ + OrchestratorVo copyOrchestrator(String userName, + Workspace workspace, + String projectName, + Long projectId, + String description, + DSSOrchestratorInfo dssOrchestratorInfo, + List dssLabels) throws Exception; + /** * 解锁编排对应的工作流 diff --git a/dss-framework/dss-framework-orchestrator-server/src/main/java/com/webank/wedatasphere/dss/orchestrator/server/service/impl/OrchestratorFrameworkServiceImpl.java b/dss-framework/dss-framework-orchestrator-server/src/main/java/com/webank/wedatasphere/dss/orchestrator/server/service/impl/OrchestratorFrameworkServiceImpl.java index 2bec0bd571..a57950020b 100644 --- a/dss-framework/dss-framework-orchestrator-server/src/main/java/com/webank/wedatasphere/dss/orchestrator/server/service/impl/OrchestratorFrameworkServiceImpl.java +++ b/dss-framework/dss-framework-orchestrator-server/src/main/java/com/webank/wedatasphere/dss/orchestrator/server/service/impl/OrchestratorFrameworkServiceImpl.java @@ -180,7 +180,7 @@ public CommonOrchestratorVo createOrchestrator(String username, OrchestratorCrea return commonOrchestratorVo; } - private V tryOrchestrationOperation(List dssLabels, Boolean askProjectSender, String userName, String projectName, + public V tryOrchestrationOperation(List dssLabels, Boolean askProjectSender, String userName, String projectName, Workspace workspace, DSSOrchestratorInfo dssOrchestrator, Function getOrchestrationOperation, BiFunction responseRefConsumer, String operationName) { diff --git a/dss-framework/dss-framework-orchestrator-server/src/main/java/com/webank/wedatasphere/dss/orchestrator/server/service/impl/OrchestratorServiceImpl.java b/dss-framework/dss-framework-orchestrator-server/src/main/java/com/webank/wedatasphere/dss/orchestrator/server/service/impl/OrchestratorServiceImpl.java index b98bffdacf..11312bdce5 100644 --- a/dss-framework/dss-framework-orchestrator-server/src/main/java/com/webank/wedatasphere/dss/orchestrator/server/service/impl/OrchestratorServiceImpl.java +++ b/dss-framework/dss-framework-orchestrator-server/src/main/java/com/webank/wedatasphere/dss/orchestrator/server/service/impl/OrchestratorServiceImpl.java @@ -240,6 +240,47 @@ public void deleteOrchestrator(String userName, orchestratorMapper.deleteOrchestrator(orchestratorInfoId); } + @Override + @Transactional(rollbackFor = Exception.class) + public OrchestratorVo copyOrchestrator(String userName, + Workspace workspace, + String projectName, + Long projectId, + String description, + DSSOrchestratorInfo dssOrchestratorInfo, + List dssLabels) throws Exception { + OrchestratorVo orchestratorVo = new OrchestratorVo(); + //todo 增加校验 + String uuid = UUID.randomUUID().toString(); + + //作为Orchestrator的唯一标识,包括跨环境导入导出也不发生变化。 + dssOrchestratorInfo.setUUID(uuid); + + String version = OrchestratorUtils.generateNewVersion(); + String contextId = contextService.createContextID(workspace.getWorkspaceName(), projectName, dssOrchestratorInfo.getName(), version, userName); + LOGGER.info("Create a new ContextId: {} for new orchestrator {}.", contextId, dssOrchestratorInfo.getName()); + //1. 访问DSS工作流微模块创建工作流 + RefJobContentResponseRef appRef = tryRefOperation(dssOrchestratorInfo, userName, workspace, dssLabels, null, + developmentService -> ((RefCRUDService) developmentService).getRefCreationOperation(), + dssContextRequestRef -> dssContextRequestRef.setContextId(contextId), + projectRefRequestRef -> projectRefRequestRef.setProjectName(projectName).setRefProjectId(projectId), + (developmentOperation, developmentRequestRef) -> { + DSSOrchestrator dssOrchestrator = orchestratorManager.getOrCreateOrchestrator(userName, + workspace.getWorkspaceName(), dssOrchestratorInfo.getType(), dssLabels); + Map dssJobContent = MapUtils.newCommonMapBuilder() + .put(OrchestratorRefConstant.DSS_ORCHESTRATOR_INFO_KEY, dssOrchestratorInfo) + .put(OrchestratorRefConstant.ORCHESTRATOR_VERSION_KEY, version) + .put(OrchestratorRefConstant.ORCHESTRATION_SCHEDULER_APP_CONN, Optional.ofNullable(dssOrchestrator) + .map(DSSOrchestrator::getSchedulerAppConn).map(AppConn::getAppDesc).map(AppDesc::getAppName) + .map(Object::toString).orElse("NULL")).build(); + DSSJobContentRequestRef requestRef = (DSSJobContentRequestRef) developmentRequestRef; + requestRef.setDSSJobContent(dssJobContent); + return ((RefCreationOperation) developmentOperation).createRef(requestRef); + }, "create"); + + return orchestratorVo; + } + @Override public OrchestratorUnlockVo unlockOrchestrator(String userName, Workspace workspace,