Skip to content

Commit

Permalink
Merge branch 'master' into master
Browse files Browse the repository at this point in the history
  • Loading branch information
zqburde authored Apr 29, 2024
2 parents f6e8c52 + e068804 commit 824d841
Show file tree
Hide file tree
Showing 5 changed files with 125 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,10 @@ VALUES(@dolphinscheduler_appconnId, @dolphinscheduler_menuId,'dolphinscheduler',

delete from dss_workspace_dictionary where dic_key = "pom_work_flow_ds";
delete from dss_workspace_dictionary where dic_key = "pom_work_flow_ds_DAG";
delete from dss_workspace_dictionary where dic_key = "pdp_scheduler_center";
insert into `dss_workspace_dictionary`(`workspace_id`,`parent_key`,`dic_name`,`dic_name_en`,`dic_key`,`dic_value`,`dic_value_en`,`title`,`title_en`,`url`,`url_type`,`icon`,`order_num`,`remark`,`create_user`,`create_time`,`update_user`,`update_time`) values (0,'p_orchestrator_mode','DS工作流','Workflow_DS','pom_work_flow_ds','radio',NULL,NULL,NULL,NULL,0,'gongzuoliu-icon',1,'工程编排模式-DS工作流','SYSTEM','2022-03-21 14:25:35',NULL,'2022-03-21 14:25:35');
insert into `dss_workspace_dictionary`(`workspace_id`,`parent_key`,`dic_name`,`dic_name_en`,`dic_key`,`dic_value`,`dic_value_en`,`title`,`title_en`,`url`,`url_type`,`icon`,`order_num`,`remark`,`create_user`,`create_time`,`update_user`,`update_time`) values (0,'pom_work_flow_ds','DAG','DAG','pom_work_flow_ds_DAG',NULL,NULL,NULL,NULL,NULL,0,NULL,1,'工程编排模式-DS工作流-DAG','SYSTEM','2022-03-21 14:25:35',NULL,'2022-03-21 14:25:35');

INSERT INTO dss_workspace_dictionary
(workspace_id, parent_key, dic_name, dic_name_en, dic_key, dic_value, dic_value_en, title, title_en, url, url_type, icon, order_num, remark, create_user, create_time, update_user, update_time)
VALUES(0, 'p_develop_process', '调度中心', 'Scheduler Center', 'pdp_scheduler_center', 'scheduler', NULL, NULL, NULL, NULL, 0, 'kaifa-icon', 1, '工程开发流程-调度中心', 'SYSTEM', '2020-12-28 17:32:35.0', NULL, '2021-02-22 17:49:02.0');
VALUES(0, 'p_develop_process', '调度中心', 'Scheduler Center', 'pdp_scheduler_center', 'scheduler', NULL, NULL, NULL, NULL, 0, 'kaifa-icon', 1, '工程开发流程-调度中心', 'SYSTEM', '2020-12-28 17:32:35.0', NULL, '2021-02-22 17:49:02.0');
Original file line number Diff line number Diff line change
@@ -1,17 +1,22 @@
package com.webank.wedatasphere.dss.orchestrator.server.job;

import com.google.common.collect.Lists;
import com.webank.wedatasphere.dss.appconn.scheduler.structure.orchestration.OrchestrationCreationOperation;
import com.webank.wedatasphere.dss.appconn.scheduler.structure.orchestration.OrchestrationService;
import com.webank.wedatasphere.dss.appconn.scheduler.structure.orchestration.ref.DSSOrchestrationContentRequestRef;
import com.webank.wedatasphere.dss.appconn.scheduler.structure.orchestration.ref.OrchestrationResponseRef;
import com.webank.wedatasphere.dss.common.exception.DSSErrorException;
import com.webank.wedatasphere.dss.common.label.DSSLabel;
import com.webank.wedatasphere.dss.common.utils.MapUtils;
import com.webank.wedatasphere.dss.orchestrator.common.entity.DSSOrchestratorCopyInfo;
import com.webank.wedatasphere.dss.orchestrator.common.entity.DSSOrchestratorInfo;
import com.webank.wedatasphere.dss.orchestrator.common.entity.DSSOrchestratorVersion;
import com.webank.wedatasphere.dss.orchestrator.common.entity.*;
import com.webank.wedatasphere.dss.orchestrator.common.ref.OrchestratorRefConstant;
import com.webank.wedatasphere.dss.orchestrator.core.DSSOrchestrator;
import com.webank.wedatasphere.dss.orchestrator.core.utils.OrchestratorUtils;
import com.webank.wedatasphere.dss.orchestrator.db.dao.OrchestratorMapper;
import com.webank.wedatasphere.dss.orchestrator.publish.utils.OrchestrationDevelopmentOperationUtils;
import com.webank.wedatasphere.dss.orchestrator.server.entity.vo.OrchestratorCopyVo;
import com.webank.wedatasphere.dss.orchestrator.server.service.OrchestratorService;
import com.webank.wedatasphere.dss.orchestrator.server.service.impl.OrchestratorFrameworkServiceImpl;
import com.webank.wedatasphere.dss.standard.app.development.operation.RefCopyOperation;
import com.webank.wedatasphere.dss.standard.app.development.ref.CopyRequestRef;
import com.webank.wedatasphere.dss.standard.app.development.ref.RefJobContentResponseRef;
Expand All @@ -33,6 +38,12 @@ public class OrchestratorCopyJob implements Runnable {

protected OrchestratorCopyEnv orchestratorCopyEnv;

private OrchestratorFrameworkServiceImpl orchestratorFrameworkServiceImpl;

private OrchestratorService orchestratorService;

private OrchestratorMapper orchestratorMapper;

private DSSOrchestratorCopyInfo orchestratorCopyInfo = new DSSOrchestratorCopyInfo(UUID.randomUUID().toString());


Expand Down Expand Up @@ -66,9 +77,10 @@ private void copyOrchestrator() {
newOrchestrator.setDesc("copy from " + sourceOrchestrator.getName());
newOrchestrator.setUpdateTime(null);
newOrchestrator.setUpdateUser(null);
DSSOrchestratorVersion dssOrchestratorVersion = null;

try {
doOrchestratorCopy(orchestratorCopyVo.getUsername(), orchestratorCopyVo.getWorkspace(), newOrchestrator,
dssOrchestratorVersion = doOrchestratorCopy(orchestratorCopyVo.getUsername(), orchestratorCopyVo.getWorkspace(), newOrchestrator,
orchestratorCopyVo.getTargetProjectName(), Lists.newArrayList(orchestratorCopyVo.getDssLabel()), appId);
} catch (Exception e) {
//保存错误信息
Expand All @@ -91,9 +103,37 @@ private void copyOrchestrator() {
orchestratorCopyInfo.setSuccessNode(Lists.newArrayList("All"));
orchestratorCopyInfo.setStatus(1);
orchestratorCopyEnv.getOrchestratorCopyJobMapper().updateCopyStatus(orchestratorCopyInfo);

List<DSSLabel> dssLabels = new ArrayList<>();
dssLabels.add(orchestratorCopyVo.getDssLabel());

//2.如果调度系统要求同步创建工作流,向调度系统发送创建工作流的请求
OrchestrationResponseRef orchestrationResponseRef = orchestratorFrameworkServiceImpl.tryOrchestrationOperation(dssLabels, true, orchestratorCopyVo.getUsername(),
orchestratorCopyVo.getTargetProjectName(), orchestratorCopyVo.getWorkspace(), newOrchestrator,
OrchestrationService::getOrchestrationCreationOperation,
(structureOperation, structureRequestRef) -> ((OrchestrationCreationOperation) structureOperation)
.createOrchestration((DSSOrchestrationContentRequestRef) structureRequestRef), "create");

try {
orchestratorService.copyOrchestrator(orchestratorCopyVo.getUsername(), orchestratorCopyVo.getWorkspace(), orchestratorCopyVo.getTargetProjectName(),
orchestratorCopyVo.getTargetProjectId(), newOrchestrator.getDesc(), newOrchestrator, dssLabels);
} catch (Exception e) {
throw new RuntimeException("error happened when copying orc.", e);
}

Long orchestratorId = newOrchestrator.getId();
Long orchestratorVersionId = dssOrchestratorVersion.getId();
//4.将工程和orchestrator的关系存储到的数据库中
if (orchestrationResponseRef != null) {
Long refProjectId = (Long) orchestrationResponseRef.toMap().get("refProjectId");
orchestratorMapper.addOrchestratorRefOrchestration(new DSSOrchestratorRefOrchestration(orchestratorId, refProjectId, orchestrationResponseRef.getRefOrchestrationId()));
} else {
LOGGER.info("copy orchestration {} with orchestratorId is {}, and versionId is {}, and orchestrationResponseRef is null.", newOrchestrator.getName(), orchestratorId, orchestratorVersionId);

}
}

private void doOrchestratorCopy(String userName,
private DSSOrchestratorVersion doOrchestratorCopy(String userName,
Workspace workspace,
DSSOrchestratorInfo dssOrchestratorInfo,
String projectName,
Expand Down Expand Up @@ -159,4 +199,28 @@ public DSSOrchestratorCopyInfo getOrchestratorCopyInfo() {
public void setOrchestratorCopyInfo(DSSOrchestratorCopyInfo orchestratorCopyInfo) {
this.orchestratorCopyInfo = orchestratorCopyInfo;
}

public OrchestratorFrameworkServiceImpl getOrchestratorFrameworkServiceImpl() {
return orchestratorFrameworkServiceImpl;
}

public void setOrchestratorFrameworkServiceImpl(OrchestratorFrameworkServiceImpl orchestratorFrameworkServiceImpl) {
this.orchestratorFrameworkServiceImpl = orchestratorFrameworkServiceImpl;
}

public OrchestratorService getOrchestratorService() {
return orchestratorService;
}

public void setOrchestratorService(OrchestratorService orchestratorService) {
this.orchestratorService = orchestratorService;
}

public OrchestratorMapper getOrchestratorMapper() {
return orchestratorMapper;
}

public void setOrchestratorMapper(OrchestratorMapper orchestratorMapper) {
this.orchestratorMapper = orchestratorMapper;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,18 @@ void deleteOrchestrator(String userName,
String projectName,
Long orchestratorInfoId,
List<DSSLabel> dssLabels) throws Exception;
/**
* 复制编排
*
*/
OrchestratorVo copyOrchestrator(String userName,
Workspace workspace,
String projectName,
Long projectId,
String description,
DSSOrchestratorInfo dssOrchestratorInfo,
List<DSSLabel> dssLabels) throws Exception;


/**
* 解锁编排对应的工作流
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ public CommonOrchestratorVo createOrchestrator(String username, OrchestratorCrea
return commonOrchestratorVo;
}

private <K extends StructureRequestRef, V extends ResponseRef> V tryOrchestrationOperation(List<DSSLabel> dssLabels, Boolean askProjectSender, String userName, String projectName,
public <K extends StructureRequestRef, V extends ResponseRef> V tryOrchestrationOperation(List<DSSLabel> dssLabels, Boolean askProjectSender, String userName, String projectName,
Workspace workspace, DSSOrchestratorInfo dssOrchestrator,
Function<OrchestrationService, StructureOperation> getOrchestrationOperation,
BiFunction<StructureOperation, K, V> responseRefConsumer, String operationName) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,47 @@ public void deleteOrchestrator(String userName,
orchestratorMapper.deleteOrchestrator(orchestratorInfoId);
}

@Override
@Transactional(rollbackFor = Exception.class)
public OrchestratorVo copyOrchestrator(String userName,
Workspace workspace,
String projectName,
Long projectId,
String description,
DSSOrchestratorInfo dssOrchestratorInfo,
List<DSSLabel> dssLabels) throws Exception {
OrchestratorVo orchestratorVo = new OrchestratorVo();
//todo 增加校验
String uuid = UUID.randomUUID().toString();

//作为Orchestrator的唯一标识,包括跨环境导入导出也不发生变化。
dssOrchestratorInfo.setUUID(uuid);

String version = OrchestratorUtils.generateNewVersion();
String contextId = contextService.createContextID(workspace.getWorkspaceName(), projectName, dssOrchestratorInfo.getName(), version, userName);
LOGGER.info("Create a new ContextId: {} for new orchestrator {}.", contextId, dssOrchestratorInfo.getName());
//1. 访问DSS工作流微模块创建工作流
RefJobContentResponseRef appRef = tryRefOperation(dssOrchestratorInfo, userName, workspace, dssLabels, null,
developmentService -> ((RefCRUDService) developmentService).getRefCreationOperation(),
dssContextRequestRef -> dssContextRequestRef.setContextId(contextId),
projectRefRequestRef -> projectRefRequestRef.setProjectName(projectName).setRefProjectId(projectId),
(developmentOperation, developmentRequestRef) -> {
DSSOrchestrator dssOrchestrator = orchestratorManager.getOrCreateOrchestrator(userName,
workspace.getWorkspaceName(), dssOrchestratorInfo.getType(), dssLabels);
Map<String, Object> dssJobContent = MapUtils.newCommonMapBuilder()
.put(OrchestratorRefConstant.DSS_ORCHESTRATOR_INFO_KEY, dssOrchestratorInfo)
.put(OrchestratorRefConstant.ORCHESTRATOR_VERSION_KEY, version)
.put(OrchestratorRefConstant.ORCHESTRATION_SCHEDULER_APP_CONN, Optional.ofNullable(dssOrchestrator)
.map(DSSOrchestrator::getSchedulerAppConn).map(AppConn::getAppDesc).map(AppDesc::getAppName)
.map(Object::toString).orElse("NULL")).build();
DSSJobContentRequestRef requestRef = (DSSJobContentRequestRef) developmentRequestRef;
requestRef.setDSSJobContent(dssJobContent);
return ((RefCreationOperation) developmentOperation).createRef(requestRef);
}, "create");

return orchestratorVo;
}

@Override
public OrchestratorUnlockVo unlockOrchestrator(String userName,
Workspace workspace,
Expand Down

0 comments on commit 824d841

Please sign in to comment.