diff --git a/taier-common/src/main/java/com/dtstack/taier/common/env/EnvironmentContext.java b/taier-common/src/main/java/com/dtstack/taier/common/env/EnvironmentContext.java index 0525a20c65..d8f2e89031 100644 --- a/taier-common/src/main/java/com/dtstack/taier/common/env/EnvironmentContext.java +++ b/taier-common/src/main/java/com/dtstack/taier/common/env/EnvironmentContext.java @@ -244,6 +244,9 @@ public class EnvironmentContext implements InitializingBean { @Value("${plugin.path:#{systemProperties['user.dir']}/pluginLibs}") private String pluginPath; + @Value("${stopLimit:100000}") + private Integer stopLimit; + @Value("${logs.limit.num:10000}") private Integer logsLimitNum; @@ -562,4 +565,8 @@ public Integer getLogsLimitNum() { public void setLogsLimitNum(Integer logsLimitNum) { this.logsLimitNum = logsLimitNum; } + + public int getStopLimit() { + return stopLimit; + } } diff --git a/taier-scheduler/src/main/java/com/dtstack/taier/scheduler/jobdealer/JobStopDealer.java b/taier-scheduler/src/main/java/com/dtstack/taier/scheduler/jobdealer/JobStopDealer.java index f9c2d81999..26a29cd1a6 100644 --- a/taier-scheduler/src/main/java/com/dtstack/taier/scheduler/jobdealer/JobStopDealer.java +++ b/taier-scheduler/src/main/java/com/dtstack/taier/scheduler/jobdealer/JobStopDealer.java @@ -83,7 +83,6 @@ public class JobStopDealer implements InitializingBean, DisposableBean { @Autowired private ScheduleJobOperatorRecordService scheduleJobOperatorRecordService; - private static final int JOB_STOP_LIMIT = 1000; private static final int WAIT_INTERVAL = 3000; private static final int OPERATOR_EXPIRED_INTERVAL = 60000; private final int asyncDealStopJobQueueSize = 100; @@ -93,7 +92,7 @@ public class JobStopDealer implements InitializingBean, DisposableBean { private final DelayBlockingQueue> stopJobQueue = new DelayBlockingQueue>(1000); private final ExecutorService delayStopProcessorService = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(), new CustomThreadFactory("delayStopProcessor")); - private final ExecutorService asyncDealStopJobService = new ThreadPoolExecutor(asyncDealStopJobPoolSize, asyncDealStopJobPoolSize, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(asyncDealStopJobQueueSize), new CustomThreadFactory("asyncDealStopJob"), new CustomThreadRunsPolicy("asyncDealStopJob", "stop", 180)); + private final ExecutorService asyncDealStopJobService = new ThreadPoolExecutor(2, asyncDealStopJobPoolSize, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(asyncDealStopJobQueueSize), new CustomThreadFactory("asyncDealStopJob"), new CustomThreadRunsPolicy("asyncDealStopJob", "stop", 180)); private final ScheduledExecutorService scheduledService = new ScheduledThreadPoolExecutor(1, new CustomThreadFactory(this.getClass().getSimpleName())); private final DelayStopProcessor delayStopProcessor = new DelayStopProcessor(); private final AcquireStopJob acquireStopJob = new AcquireStopJob(); @@ -122,8 +121,8 @@ public int addStopJobs(List scheduleJobList, Integer isForce) { return 0; } - if (scheduleJobList.size() > JOB_STOP_LIMIT) { - throw new RdosDefineException("please don't stop too many tasks at once, limit:" + JOB_STOP_LIMIT); + if (scheduleJobList.size() > environmentContext.getStopLimit()) { + throw new RdosDefineException("please don't stop too many tasks at once, limit:" + environmentContext.getStopLimit()); } // 分离实例是否提交到yarn上,如果提交到yarn上,需要发送请求stop,如果未提交,直接更新db diff --git a/taier-scheduler/src/main/java/com/dtstack/taier/scheduler/server/scheduler/OperatorRecordJobScheduler.java b/taier-scheduler/src/main/java/com/dtstack/taier/scheduler/server/scheduler/OperatorRecordJobScheduler.java index 08428ceab4..db923a62ac 100644 --- a/taier-scheduler/src/main/java/com/dtstack/taier/scheduler/server/scheduler/OperatorRecordJobScheduler.java +++ b/taier-scheduler/src/main/java/com/dtstack/taier/scheduler/server/scheduler/OperatorRecordJobScheduler.java @@ -14,11 +14,10 @@ import com.dtstack.taier.scheduler.service.ScheduleJobService; import com.google.common.collect.Lists; import org.apache.commons.collections.CollectionUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import java.util.ArrayList; +import java.util.Comparator; import java.util.List; import java.util.Map; import java.util.Set; @@ -44,39 +43,46 @@ public abstract class OperatorRecordJobScheduler extends AbstractJobSummitSchedu @Autowired private ScheduleJobOperatorRecordService scheduleJobOperatorRecordService; + private Long operatorRecordStartId = 0L; + @Override protected List listExecJob(Long startSort, String nodeAddress, Boolean isEq) { - List records = scheduleJobOperatorRecordService.listOperatorRecord(startSort, nodeAddress, getOperatorType().getType(), isEq); - - if (CollectionUtils.isNotEmpty(records)) { - Set jobIds = records.stream().map(ScheduleJobOperatorRecord::getJobId).collect(Collectors.toSet()); - List scheduleJobList = getScheduleJob(jobIds); - - if (CollectionUtils.isNotEmpty(scheduleJobList)) { - List jodExecIds = scheduleJobList.stream().map(ScheduleJob::getJobId).collect(Collectors.toList()); - if (jobIds.size() != scheduleJobList.size()) { - // 过滤出来已经提交运行的实例,删除操作记录 - List deleteJobIdList = jobIds.stream().filter(jobId -> !jodExecIds.contains(jobId)).collect(Collectors.toList()); - removeOperatorRecord(deleteJobIdList); - } + List records = scheduleJobOperatorRecordService.listOperatorRecord(operatorRecordStartId, nodeAddress, getOperatorType().getType(), isEq); + //empty + if (CollectionUtils.isEmpty(records)) { + operatorRecordStartId = 0L; + return new ArrayList<>(); + } - List jobKeys = scheduleJobList.stream().map(ScheduleJob::getJobKey).collect(Collectors.toList()); - List scheduleJobJobList = scheduleJobJobService.listByJobKeys(jobKeys); - Map> jobJobMap = scheduleJobJobList.stream().collect(Collectors.groupingBy(ScheduleJobJob::getJobKey)); - List scheduleJobDetailsList = new ArrayList<>(scheduleJobList.size()); + Set jobIds = records.stream().map(ScheduleJobOperatorRecord::getJobId).collect(Collectors.toSet()); + List scheduleJobList = getScheduleJob(jobIds); - for (ScheduleJob scheduleJob : scheduleJobList) { - ScheduleJobDetails scheduleJobDetails = new ScheduleJobDetails(); - scheduleJobDetails.setScheduleJob(scheduleJob); - scheduleJobDetails.setJobJobList(jobJobMap.get(scheduleJob.getJobKey())); - scheduleJobDetailsList.add(scheduleJobDetails); - } - return scheduleJobDetailsList; - } else { - removeOperatorRecord(Lists.newArrayList(jobIds)); - } + if (CollectionUtils.isEmpty(scheduleJobList)) { + operatorRecordStartId = 0L; + removeOperatorRecord(Lists.newArrayList(jobIds)); } - return Lists.newArrayList(); + + //set max + records.stream().max(Comparator.comparing(ScheduleJobOperatorRecord::getId)) + .ifPresent(scheduleJobOperatorRecord -> operatorRecordStartId = scheduleJobOperatorRecord.getId()); + + if (jobIds.size() != scheduleJobList.size()) { + List jodExecIds = scheduleJobList.stream().map(ScheduleJob::getJobId).collect(Collectors.toList()); + // 过滤出来已经提交运行的实例,删除操作记录 + List deleteJobIdList = jobIds.stream().filter(jobId -> !jodExecIds.contains(jobId)).collect(Collectors.toList()); + removeOperatorRecord(deleteJobIdList); + } + + List jobKeys = scheduleJobList.stream().map(ScheduleJob::getJobKey).collect(Collectors.toList()); + List scheduleJobJobList = scheduleJobJobService.listByJobKeys(jobKeys); + Map> jobJobMap = scheduleJobJobList.stream().collect(Collectors.groupingBy(ScheduleJobJob::getJobKey)); + + return scheduleJobList.stream().map(scheduleJob -> { + ScheduleJobDetails scheduleJobDetails = new ScheduleJobDetails(); + scheduleJobDetails.setScheduleJob(scheduleJob); + scheduleJobDetails.setJobJobList(jobJobMap.get(scheduleJob.getJobKey())); + return scheduleJobDetails; + }).collect(Collectors.toList()); } /**